or this.key = 'value'
, pass input data to your code viaparams
, and maintain state across executions with$checkpoint.async
(event, steps) => {
By default, this workflow sends at most one email per day, per error type,
per workflow. The second time a specific error is raised from the same
workflow in the same 24-hour period, no email will be sent. You can modify
this behavior to apply any notification logic you'd like.
import dateFormat from 'dateformat';
import { differenceInHours } from 'date-fns';
const { code, msg, ts } = event.error
const { id, workflow_id, workflow_name } = event.original_context
// $checkpoint allows us to save any JSON-serializable data in a workflow,
// reading it on subsequent invocations. Here, we save the last time we
// saw a specific error on a workflow, exiting if we've seen the error recently.
if ($checkpoint && $checkpoint[workflow_id] && $checkpoint[workflow_id][code]) {
console.log(`We've seen error ${code} for workflow ${workflow_id} before. Checking the time of the last event.`)
// Check if we've seen this error within the past 24 hours
const lastErrorDate = new Date($checkpoint[workflow_id][code].lastSeen)
const diffInHours = differenceInHours(new Date(ts), lastErrorDate)
if (diffInHours <= 24) {
$end(`Error ${code} occurred ${diffInHours} hours ago. No notification to send.`)
// We haven't seen the error recently, so we can send our error notitication
const errorTime = dateFormat(event.original_context.ts, "h:MM:ss TT Z")
const errorDate = dateFormat(event.original_context.ts, "dddd, mmmm dS, yyyy")
const formattedErrMsg = msg ? `${code} — ${msg}` : code
this.subject = `[pipedream] Error in workflow ${workflow_name || workflow_id} at ${errorTime}`
this.text = `Your workflow at${workflow_id}?e=${id} generated the following error at ${errorTime} on ${errorDate}:
params => {
const options = {
subject: params.subject,
text: params.text,
if (params.html) {
options.html = params.html
or this.key = 'value'
, pass input data to your code viaparams
, and maintain state across executions with$checkpoint.async
(event, steps) => {
import { merge } from 'lodash-es';
const { code, ts } = event.error
const { workflow_id } = event.original_context
// The first time our workflow runs, initialize $checkpoint
if (!$checkpoint) {
console.log("First run of Global Error Workflow. Checkpointing initial state")
$checkpoint = {} // workflow error gets set on merge below
// Finally, we need to checkpoint the time we saw this error for
// this workflow, merging it with the existing checkpoint data.
merge($checkpoint, {
[workflow_id]: {
[code]: {
lastSeen: ts