import pipedream from "../../pipedream.app.mjs";
import sampleEmit from "./test-event.mjs";
import { uuid } from "uuidv4";
export default {
key: "pipedream-new-scheduled-tasks",
name: "New Scheduled Tasks",
type: "source",
description:
"Exposes an HTTP API for scheduling messages to be emitted at a future time",
version: "0.3.1",
dedupe: "unique",
props: {
pipedream,
secret: {
type: "string",
secret: true,
label: "Secret",
optional: true,
description:
"**Optional but recommended**: if you enter a secret here, you must pass this value in the `x-pd-secret` HTTP header when making requests",
},
http: {
label: "Endpoint",
description: "The endpoint where you'll send task scheduler requests",
type: "$.interface.http",
customResponse: true,
},
db: "$.service.db",
},
methods: {
selfChannel() {
return "self";
},
queuedEventsChannel() {
return "$in";
},
httpRespond({
status, body,
}) {
this.http.respond({
headers: {
"content-type": "application/json",
},
status,
body,
});
},
async selfSubscribe() {
const isSubscribedToSelf = this.db.get("isSubscribedToSelf");
if (!isSubscribedToSelf) {
const componentId = process.env.PD_COMPONENT;
const selfChannel = this.selfChannel();
console.log(`Subscribing to ${selfChannel} channel for event source`);
console.log(
await this.pipedream.subscribe(componentId, componentId, selfChannel),
);
this.db.set("isSubscribedToSelf", true);
}
},
validateEventBody(event, operation) {
const errors = [];
if (this.secret && event.headers["x-pd-secret"] !== this.secret) {
errors.push(
"Secret on incoming request doesn't match the configured secret",
);
}
if (operation === "schedule") {
const {
timestamp,
message,
} = event.body;
const epoch = Date.parse(timestamp);
if (!timestamp) {
errors.push(
"No timestamp included in payload. Please provide an ISO8601 timestamp in the 'timestamp' field",
);
}
if (timestamp && !epoch) {
errors.push("Timestamp isn't a valid ISO 8601 string");
}
if (!message) {
errors.push("No message passed in payload");
}
}
return errors;
},
scheduleTask(event) {
const errors = this.validateEventBody(event, "schedule");
let status, body;
if (errors.length) {
console.log(errors);
status = 400;
body = {
errors,
};
} else {
const id = this.emitScheduleEvent(event.body, event.body.timestamp);
status = 200;
body = {
msg: "Successfully scheduled task",
id,
};
}
this.httpRespond({
status,
body,
});
},
emitScheduleEvent(event, timestamp) {
const selfChannel = this.selfChannel();
const epoch = Date.parse(timestamp);
const $id = uuid();
console.log(`Scheduled event to emit on: ${new Date(epoch)}`);
this.$emit(
{
...event,
$channel: selfChannel,
$id,
},
{
name: selfChannel,
id: $id,
delivery_ts: epoch,
},
);
return $id;
},
async cancelTask(event) {
const errors = this.validateEventBody(event, "cancel");
let status, msg;
if (errors.length) {
console.log(errors);
status = 400;
msg = "Secret on incoming request doesn't match the configured secret";
} else {
try {
const id = event.body.id;
const isCanceled = await this.deleteEvent(event);
if (isCanceled) {
status = 200;
msg = `Cancelled scheduled task for event ${id}`;
} else {
status = 404;
msg = `No event with ${id} found`;
}
} catch (error) {
console.log(error);
status = 500;
msg = "Failed to schedule task. Please see the logs";
}
}
this.httpRespond({
status,
body: {
msg,
},
});
},
async deleteEvent(event) {
const componentId = process.env.PD_COMPONENT;
const inChannel = this.queuedEventsChannel();
const { id } = event.body;
const events = await this.pipedream.listEvents(
componentId,
inChannel,
);
console.log("Events: ", events);
const eventToCancel = events.data.find((e) => {
const { metadata } = e;
return metadata.id === id;
});
console.log("Event to cancel: ", eventToCancel);
if (!eventToCancel) {
console.log(`No event with ${id} found`);
return false;
}
await this.pipedream.deleteEvent(
componentId,
eventToCancel.id,
inChannel,
);
return true;
},
emitEvent(event, summary) {
const id = event.$id;
delete event.$channel;
delete event.$id;
this.$emit(event, {
summary: summary ?? JSON.stringify(event),
id,
ts: +new Date(),
});
},
},
async run(event) {
await this.selfSubscribe();
const { path } = event;
if (path === "/schedule") {
this.scheduleTask(event);
} else if (path === "/cancel") {
await this.cancelTask(event);
} else if (event.$channel === this.selfChannel()) {
this.emitEvent(event);
}
},
sampleEmit,
};