import fauna from "../../faunadb.app.mjs";
import _ from "lodash";
import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform";
export default {
type: "source",
key: "faunadb-changes-to-collection",
name: "New or Removed Documents in a Collection",
description: "Emit new event each time you add or remove a document from a specific collection, with the details of the document.",
version: "0.0.8",
dedupe: "unique",
props: {
timer: {
type: "$.interface.timer",
default: {
intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL,
},
},
db: "$.service.db",
fauna,
collection: {
propDefinition: [
fauna,
"collections",
],
},
emitEventsInBatch: {
type: "boolean",
label: "Emit changes as a single event",
description:
"If `true`, all events are emitted as an array, within a single Pipedream event. Defaults to `false`, emitting each event in Fauna as its own event in Pipedream",
optional: true,
default: false,
},
},
async run() {
const ts = +new Date() * 1000;
const cursor = this.db.get("cursor") || ts;
const events = await this.fauna.getEventsInCollectionAfterTs(
this.collection,
cursor,
);
if (!events.length) {
console.log(`No new events in collection ${this.collection}`);
this.db.set("cursor", ts);
return;
}
console.log(`${events.length} new events in collection ${this.collection}`);
if (this.emitEventsInBatch) {
this.$emit({
events,
}, {
summary: `${events.length} new event${events.length > 1
? "s"
: ""}`,
id: cursor,
});
} else {
for (const event of events) {
this.$emit(event, {
summary: `${event.action.toUpperCase()} - ${event.document.id}`,
id: `${event.action}-${event.document.id}`,
});
}
}
const maxEventTs = _.maxBy(events, (event) => event.ts).ts + 1;
this.db.set("cursor", maxEventTs);
},
};