import { defineSource } from "@pipedream/types";
import {
ConfigurationError,
DEFAULT_POLLING_SOURCE_TIMER_INTERVAL
} from "@pipedream/platform";
import { defineApp } from "@pipedream/types";
import axios from "axios";
import FeedParser from "feedparser";
import hash from "object-hash";
var rss_app_default = defineApp({
type: "app",
app: "rss",
propDefinitions: {
url: {
type: "string",
label: "Feed URL",
description: "Enter the URL for any public RSS feed"
},
urls: {
type: "string[]",
label: "Feed URLs",
description: "Enter either one or multiple URLs from any public RSS feed"
},
timer: {
type: "$.interface.timer",
description: "How often you want to poll the feed for new items",
default: {
intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL
}
}
},
methods: {
itemTs(item = {}) {
const {
pubdate,
pubDate,
date_published
} = item;
const itemPubDate = pubdate ?? pubDate ?? date_published;
if (itemPubDate) {
return +new Date(itemPubDate);
}
return + new Date();
},
itemKey(item = {}) {
const {
id,
guid,
link,
title
} = item;
const itemId = id ?? guid ?? link ?? title;
if (itemId) {
return itemId.length > 64 ? itemId.slice(-64) : itemId;
}
return hash(item);
},
async fetchFeed(url) {
const res = await axios.request({
url,
method: "GET",
headers: {
"accept": "text/html, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8, application/json, application/feed+json",
"User-Agent": "@PipedreamHQ/pipedream v1.0"
},
validateStatus: () => true,
responseType: "stream"
});
if (res.status === 404) throw new ConfigurationError(`The URL ${url} does not exist. Please double-check the URL and try again.`);
if (res.status === 429) throw new ConfigurationError(`${url} isn't returning a valid feed because requests have been rate-limited. Please reach out to the site hosting the RSS feed to confirm or increase their rate limit.`);
if (res.status >= 500) throw new ConfigurationError(`${url} is returning a server error. Please try again later or reach out to the site hosting the RSS feed if you continue to see this error.`);
if (res.status >= 400) {
throw new ConfigurationError(`Error fetching URL ${url}. Please load the URL directly in your browser and try again.`);
}
return {
data: res.data,
contentType: res.headers["content-type"]
};
},
async parseFeed(stream) {
const feedparser = new FeedParser({
addmeta: true
});
const items = [];
await new Promise((resolve, reject) => {
feedparser.on("error", reject);
feedparser.on("end", resolve);
feedparser.on("readable", function() {
let item = this.read();
while (item) {
if (item["atom:title"] && item["atom:title"]["#"]) {
item.title = item["atom:title"]["#"];
} else if (item["rss:title"] && item["rss:title"]["#"]) {
item.title = item["rss:title"]["#"];
}
for (const k in item) {
if (item[`rss:${k}`]) {
delete item[`rss:${k}`];
continue;
}
const o = item[k];
if (o == null || typeof o === "object" && !Object.keys(o).length || Array.isArray(o) && !o.length) {
delete item[k];
continue;
}
}
items.push(item);
item = this.read();
}
});
stream.pipe(feedparser);
});
return items;
},
isJSONFeed(response) {
const acceptedJsonFeedMimes = [
"application/feed+json",
"application/json"
];
return acceptedJsonFeedMimes.includes(response?.contentType?.toLowerCase());
},
async parseJSONFeed(stream) {
const buffer = await new Promise((resolve, reject) => {
const _buf = [];
stream.on("data", (chunk) => _buf.push(chunk));
stream.on("end", () => resolve(Buffer.concat(_buf)));
stream.on("error", (err) => reject(err));
});
const contentString = buffer.toString();
const feed = JSON.parse(contentString);
return feed?.items || [];
},
async fetchAndParseFeed(u) {
const url = this.validateAndFixFeedURL(u);
const response = await this.fetchFeed(url);
if (this.isJSONFeed(response)) {
return await this.parseJSONFeed(response.data);
} else {
return await this.parseFeed(response.data);
}
},
validateAndFixFeedURL(u) {
if (!u) throw new ConfigurationError("No URL provided. Please enter an RSS URL to fetch");
let url = u;
if (!/^(?:(ht|f)tp(s?):\/\/)/.test(url)) {
url = `https://${u}`;
}
return url;
},
sortItems(items) {
return items.sort((itemA, itemB) => {
if (this.itemTs(itemA) > this.itemTs(itemB)) {
return 1;
}
return -1;
});
},
sortItemsForActions(items) {
return this.sortItems(items).reverse();
}
}
});
var common_default = {
props: {
rss: rss_app_default,
timer: {
propDefinition: [
rss_app_default,
"timer"
]
}
},
methods: {
generateMeta: function(item) {
return {
id: this.rss.itemKey(item),
summary: item.title,
ts: this.rss.itemTs(item)
};
}
}
};
var new_item_in_feed_default = defineSource({
...common_default,
key: "rss-new-item-in-feed",
name: "New Item in Feed",
description: "Emit new items from an RSS feed",
version: "1.2.10",
type: "source",
dedupe: "unique",
props: {
...common_default.props,
url: {
propDefinition: [
rss_app_default,
"url"
]
},
publishedAfter: {
type: "string",
label: "Published After",
description: "Emit items published after the specified date in ISO 8601 format .e.g `2022-12-07T12:57:10+07:00`",
default: new Date(Date.now() - 24 * 60 * 60 * 1e3).toISOString()
}
},
hooks: {
async activate() {
await this.rss.fetchAndParseFeed(this.url);
}
},
methods: {
...common_default.methods,
generateMeta: function(item) {
return {
id: this.rss.itemKey(item),
summary: item.title,
ts: Date.now()
};
}
},
async run() {
const items = await this.rss.fetchAndParseFeed(this.url);
for (const item of items.reverse()) {
const publishedAfter = +new Date(this.publishedAfter);
const ts = this.rss.itemTs(item);
if (Number.isNaN(publishedAfter) || publishedAfter > ts) {
continue;
}
const meta = this.generateMeta(item);
this.$emit(item, meta);
}
}
});
export {
new_item_in_feed_default as default
};