import { v4 as uuid } from "uuid";
import sampleEmit from "./test-event.mjs";
import googleCalendar from "../../google_calendar.app.mjs";
import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform";
export default {
key: "google_calendar-new-or-updated-event-instant",
type: "source",
name: "New Created or Updated Event (Instant)",
description: "Emit new event when a Google Calendar events is created or updated (does not emit cancelled events)",
version: "0.1.13",
dedupe: "unique",
props: {
googleCalendar,
db: "$.service.db",
calendarIds: {
propDefinition: [
googleCalendar,
"calendarId",
],
type: "string[]",
default: [
"primary",
],
label: "Calendars",
description: "Select one or more calendars to watch (defaults to the primary calendar)",
},
newOnly: {
label: "Emit only for new events",
type: "boolean",
description: "Emit new events only, and not updates to existing events (defaults to `false`)",
optional: true,
default: false,
},
http: "$.interface.http",
timer: {
label: "Push notification renewal schedule",
description: "The Google Calendar API requires occasional renewal of push notification subscriptions. **This runs in the background, so you should not need to modify this schedule**.",
type: "$.interface.timer",
static: {
intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL,
},
},
},
hooks: {
async deploy() {
const events = [];
const params = {
maxResults: 25,
orderBy: "updated",
};
for (const calendarId of this.calendarIds) {
params.calendarId = calendarId;
const { items } = await this.googleCalendar.listEvents(params);
events.push(...items);
}
events.sort((a, b) => (Date.parse(a.updated) > Date.parse(b.updated))
? 1
: -1);
for (const event of events.slice(-25)) {
const meta = this.generateMeta(event);
this.$emit(event, meta);
}
},
async activate() {
await this.makeWatchRequest();
},
async deactivate() {
await this.stopWatchRequest();
},
},
methods: {
setNextSyncToken(calendarId, nextSyncToken) {
this.db.set(`${calendarId}.nextSyncToken`, nextSyncToken);
},
getNextSyncToken(calendarId) {
return this.db.get(`${calendarId}.nextSyncToken`);
},
setChannelId(calendarId, channelId) {
this.db.set(`${calendarId}.channelId`, channelId);
},
getChannelId(calendarId) {
return this.db.get(`${calendarId}.channelId`);
},
setResourceId(calendarId, resourceId) {
this.db.set(`${calendarId}.resourceId`, resourceId);
},
getResourceId(calendarId) {
return this.db.get(`${calendarId}.resourceId`);
},
setExpiration(calendarId, expiration) {
this.db.set(`${calendarId}.expiration`, expiration);
},
getExpiration(calendarId) {
return this.db.get(`${calendarId}.expiration`);
},
_isNewEvent(event) {
const {
created,
updated,
} = event;
const createdTimestampMilliseconds = Date.parse(created);
const updatedTimestampMilliseconds = Date.parse(updated);
const diffMilliseconds = Math.abs(
updatedTimestampMilliseconds - createdTimestampMilliseconds,
);
const maxDiffMilliseconds = 2000;
return diffMilliseconds <= maxDiffMilliseconds;
},
isEventRelevant(event) {
return !this.newOnly || this._isNewEvent(event);
},
generateMeta(event) {
const {
id,
summary,
updated: tsString,
} = event;
const ts = Date.parse(tsString);
return {
id: `${id}-${ts}`,
summary,
ts,
};
},
async makeWatchRequest() {
for (const calendarId of this.calendarIds) {
const watchResp =
await this.googleCalendar.watchEvents({
calendarId,
requestBody: {
id: uuid(),
type: "web_hook",
address: this.http.endpoint,
},
});
const nextSyncToken = await this.googleCalendar.fullSync(calendarId);
this.setNextSyncToken(calendarId, nextSyncToken);
this.setChannelId(calendarId, watchResp.id);
this.setResourceId(calendarId, watchResp.resourceId);
this.setExpiration(calendarId, watchResp.expiration);
}
},
async stopWatchRequest() {
for (const calendarId of this.calendarIds) {
const id = this.getChannelId(calendarId);
const resourceId = this.getResourceId(calendarId);
if (id && resourceId) {
const { status } =
await this.googleCalendar.stopChannel({
returnOnlyData: false,
requestBody: {
id,
resourceId,
},
});
if (status === 204) {
console.log("webhook deactivated");
this.setNextSyncToken(calendarId, null);
this.setChannelId(calendarId, null);
this.setResourceId(calendarId, null);
this.setExpiration(calendarId, null);
} else {
console.log("There was a problem deactivating the webhook");
}
}
}
},
getSoonestExpirationDate() {
let min;
for (const calendarId of this.calendarIds) {
const expiration = parseInt(this.db.get(`${calendarId}.expiration`));
if (!min || expiration < min) {
min = expiration;
}
}
return new Date(min);
},
getChannelIds() {
const channelIds = [];
for (const calendarId of this.calendarIds) {
const channelId = this.db.get(`${calendarId}.channelId`);
channelIds.push(channelId);
}
return channelIds;
},
},
async run(event) {
if (event.interval_seconds) {
const now = new Date();
const intervalMs = event.interval_seconds * 1000;
const expireDate = this.getSoonestExpirationDate();
if (now.getTime() + intervalMs > expireDate.getTime()) {
await this.stopWatchRequest();
await this.makeWatchRequest();
}
} else {
const channelIds = this.getChannelIds();
const incomingChannelId = event?.headers?.["x-goog-channel-id"];
if (!channelIds.includes(incomingChannelId)) {
console.log(
`Unexpected channel ID ${incomingChannelId}. This likely means there are multiple, older subscriptions active.`,
);
return;
}
const state = event?.headers?.["x-goog-resource-state"];
switch (state) {
case "exists":
break;
case "not_exists":
console.log("Resource does not exist. Exiting.");
return;
case "sync":
console.log("New channel created");
return;
default:
console.log(`Unknown state: ${state}`);
return;
}
}
for (const calendarId of this.calendarIds) {
const syncToken = this.getNextSyncToken(calendarId);
let nextSyncToken = null;
let nextPageToken = null;
while (!nextSyncToken) {
const {
data: syncData = {},
status: syncStatus,
} = await this.googleCalendar.listEvents({
returnOnlyData: false,
calendarId,
syncToken,
pageToken: nextPageToken,
maxResults: 2500,
});
if (syncStatus === 410) {
console.log("Sync token invalid, resyncing");
nextSyncToken = await this.googleCalendar.fullSync(this.calendarId);
break;
}
nextPageToken = syncData.nextPageToken;
nextSyncToken = syncData.nextSyncToken;
const { items: events = [] } = syncData;
events
.filter(this.isEventRelevant, this)
.forEach((event) => {
const { status } = event;
if (status === "cancelled") {
console.log("Event cancelled. Exiting.");
return;
}
const meta = this.generateMeta(event);
this.$emit(event, meta);
});
}
this.setNextSyncToken(calendarId, nextSyncToken);
}
},
sampleEmit,
};