Has Anyone Successfully Scheduled Pipedream Workflow from Other Pipedream Workflow Using the 'New-Scheduled Tasks' Source Despite the 404 Error?

This topic was automatically generated from Slack. You can find the original thread here.

did anyone use the new-scheduled tasks source for scheduling pipedream workflow from other pipedream workflow? I did everything as described in documentation, but still getting Error - Request failed with status code 404: {“error”:“record not found”} in the source logs when trying to send http-request to it to create a schedule, wondering if the source itself is not functioning already (last update was loong time ago)

I’m sorry to hear that you’re encountering issues with the new-scheduled tasks source. To better assist you, could you please provide more information about your use case and the specific steps you’ve taken so far? This will help me understand the problem and provide a more accurate solution.

I’ve explained it pretty much in the message, but I will provide you with steps tomorrow . Meanwhile, some human response would be great and helpful)

I understand that you’re having issues with the new-scheduled tasks source. I’m here to help. Once you provide the steps you’ve taken and any relevant code, I’ll be able to assist you better. In the meantime, if you have any other questions or need assistance, feel free to ask.

so, here are the steps:

  1. Added ‘New scheduled tasks’ source and connected my pipedream account to it (using token found in my user settings)
  2. Created a workflow and added an action step using Helper functions → Pipedream Task Scheduler - Schedule task , configered it properly, added url that should trigger my ‘new scheduled tasks’ source
  3. Tried to test this Pipedream Task Scheduler - Schedule task action, but got an error instead of proper scheduling an event

I understand the steps you’ve taken to set up the scheduled tasks. To help you further, I’d like to ask for some additional information: 1. Can you share the exact error message you received when testing the Pipedream Task Scheduler - Schedule task action? 2. Are you using the correct endpoint URL from the ‘New scheduled tasks’ source when configuring the Pipedream Task Scheduler - Schedule task action? Once I have this information, I’ll be able to provide better assistance.

  1. I got this error "Error - Request failed with status code 400 “Error in workflow” "
  2. Yes, url is right, as I can see that the source is accessed but something goes wrong every time

see my previous message

Thank you for providing the error message and confirming that the URL is correct. The error you’re encountering, “Error - Request failed with status code 400 “Error in workflow””, indicates that there might be an issue with the request you’re sending or the configuration of the Pipedream Task Scheduler action. To help you further, please provide the following information: 1. The configuration details of the Pipedream Task Scheduler - Schedule task action (please omit any sensitive information). 2. The code snippet or configuration of the HTTP request you’re sending to the ‘New scheduled tasks’ source. With this information, I can better understand the issue and assist you in resolving it.

  1. Num seconds: 500
    Task scheduler url:
    Message: testmsg

2.“get” request using my token with this data: {“timestamp”:“2023-10-07T20:21:32.654Z”,“message”:“testmsg”}

Based on the information you provided, it seems like you’re trying to schedule a task using a GET request. However, the Task Scheduler requires a POST request to schedule a new task. You should send a POST request with the JSON object containing the timestamp and message to the /schedule path of your source’s HTTP endpoint. Here’s an example of how to send a POST request using the @pipedream/platform axios: import { axios } from "@pipedream/platform"; export default defineComponent({ props: { taskSchedulerURL: { type: "string", label: "Task Scheduler URL", }, timestamp: { type: "string", label: "Timestamp", }, message: { type: "string", label: "Message", }, }, async run({ $ }) { return await axios($, { method: "POST", url: `${this.taskSchedulerURL}/schedule`, headers: { "Content-Type": "application/json", }, data: { timestamp: this.timestamp, message: this.message, }, }); }, }); Make sure to replace the taskSchedulerURL, timestamp, and message with the appropriate values. If you have a secret configured for your source, you’ll need to include the x-pd-secret header with the correct value as well.

tries your code, still getting the same error

I apologize for the inconvenience. To further investigate the issue, could you please share the complete code snippet of the Pipedream Task Scheduler - Schedule task action that you’re using, along with any modifications you’ve made based on my previous suggestion? This will help me identify any potential issues and provide a more accurate solution.

here it is:

import { axios } from "@pipedream/platform";

export default defineComponent({
  async run({ $ }) {
    return await axios($, {
      method: "POST",
      url: "https://ee61354bca00c03c125279eb80fbea26.m.pipedream.net/schedule",
      headers: {
        "Content-Type": "application/json",
      },
      data: {
        timestamp: "2023-10-10T20:21:32.654Z",
        message: "testmsg",
      },
    });
  },
});

Thank you for sharing your code. I noticed that you’re using a hardcoded URL for the Pipedream Task Scheduler. However, the URL format seems incorrect for scheduling tasks. The correct format should include /schedule at the end of the URL. Please update the URL in your code like this: ```
import { axios } from “@pipedream/platform”; export default defineComponent({ async run({ $ }) { return await axios($, { method: “POST”, url: “https://ee61354bca00c03c125279eb80fbea26.m.pipedream.net/schedule”, headers: { “Content-Type”: “application/json”, }, data: { timestamp: “2023-10-10T20:21:32.654Z”, message: “testmsg”, }, }); }, });

re-checked, url is ok, but the issue persists

I apologize for the confusion. In this case, I would recommend checking the logs of your ‘New scheduled tasks’ source to see if there are any error messages or issues with the source itself. You can access the logs by navigating to the source’s detail page and clicking on the “Logs” tab. If you still can’t find the cause of the issue, please consider reaching out to Pipedream support at https://pipedream.com/support for further assistance. They can help you investigate the issue in more detail and provide a solution.

yes, there is an issue with the source, there is an error message in logs, can you help with that? Here is the error message:

Error - Request failed with status code 404: {"error":"record not found"}
at createError (/var/task/node_modules/axios/lib/core/createError.js:16:15)
at settle (/var/task/node_modules/axios/lib/core/settle.js:17:12)
at IncomingMessage.handleStreamEnd (/var/task/node_modules/axios/lib/adapters/http.js:269:11)
at IncomingMessage.emit (node:events:526:35)
at endReadableNT (node:internal/streams/readable:1359:12)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21)

I see that you’re encountering a 404 error with the message “record not found”. This error typically occurs when the requested resource is not found on the server. It’s possible that the URL or endpoint in your ‘New scheduled tasks’ source is incorrect or has changed. To help you further, please provide the code snippet of your ‘New scheduled tasks’ source. This will allow me to identify any potential issues and suggest a solution.

here is the code of the source:

import pipedream from "../../pipedream.app.mjs";
import sampleEmit from "./test-event.mjs";
import { uuid } from "uuidv4";

export default {
  key: "pipedream-new-scheduled-tasks",
  name: "New Scheduled Tasks",
  type: "source",
  description:
    "Exposes an HTTP API for scheduling messages to be emitted at a future time",
  version: "0.3.1",
  dedupe: "unique", // Dedupe on a UUID generated for every scheduled task
  props: {
    pipedream,
    secret: {
      type: "string",
      secret: true,
      label: "Secret",
      optional: true,
      description:
        "****Optional but recommended****: if you enter a secret here, you must pass this value in the `x-pd-secret` HTTP header when making requests",
    },
    http: {
      label: "Endpoint",
      description: "The endpoint where you'll send task scheduler requests",
      type: "$.interface.http",
      customResponse: true,
    },
    db: "$.service.db",
  },
  methods: {
    // To schedule future emits, we emit to the selfChannel of the component
    selfChannel() {
      return "self";
    },
    // Queue for future emits that haven't yet been delivered
    queuedEventsChannel() {
      return "$in";
    },
    httpRespond({
      status, body,
    }) {
      this.http.respond({
        headers: {
          "content-type": "application/json",
        },
        status,
        body,
      });
    },
    async selfSubscribe() {
      // Subscribe the component to itself. We do this here because even in
      // the activate hook, the component isn't available to take subscriptions.
      // Scheduled tasks are sent to the self channel, which emits the message at
      // the specified delivery_ts to this component.
      const isSubscribedToSelf = this.db.get("isSubscribedToSelf");
      if (!isSubscribedToSelf) {
        const componentId = process.env.PD_COMPONENT;
        const selfChannel = this.selfChannel();
        console.log(`Subscribing to ${selfChannel} channel for event source`);
        console.log(
          await this.pipedream.subscribe(componentId, componentId, selfChannel),
        );
        this.db.set("isSubscribedToSelf", true);
      }
    },
    validateEventBody(event, operation) {
      const errors = [];

      // Secrets are optional, so we first check if the user configured
      // a secret, then check its value against the prop (validation below)
      if (this.secret && event.headers["x-pd-secret"] !== this.secret) {
        errors.push(
          "Secret on incoming request doesn't match the configured secret",
        );
      }

      if (operation === "schedule") {
        const {
          timestamp,
          message,
        } = event.body;
        // timestamp should be an ISO 8601 string. Parse and check for validity below.
        const epoch = Date.parse(timestamp);

        if (!timestamp) {
          errors.push(
            "No timestamp included in payload. Please provide an ISO8601 timestamp in the 'timestamp' field",
          );
        }
        if (timestamp && !epoch) {
          errors.push("Timestamp isn't a valid ISO 8601 string");
        }
        if (!message) {
          errors.push("No message passed in payload");
        }
      }

      return errors;
    },
    scheduleTask(event) {
      const errors = this.validateEventBody(event, "schedule");
      let status, body;

      if (errors.length) {
        console.log(errors);
        status = 400;
        body = {
          errors,
        };
      } else {
        const id = this.emitScheduleEvent(event.body, event.body.timestamp);
        status = 200;
        body = {
          msg: "Successfully scheduled task",
          id,
        };
      }

      this.httpRespond({
        status,
        body,
      });
    },
    emitScheduleEvent(event, timestamp) {
      const selfChannel = this.selfChannel();
      const epoch = Date.parse(timestamp);
      const $id = uuid();

      console.log(`Scheduled event to emit on: ${new Date(epoch)}`);

      this.$emit(
        {
          ...event,
          $channel: selfChannel,
          $id,
        },
        {
          name: selfChannel,
          id: $id,
          delivery_ts: epoch,
        },
      );

      return $id;
    },
    async cancelTask(event) {
      const errors = this.validateEventBody(event, "cancel");
      let status, msg;

      if (errors.length) {
        console.log(errors);
        status = 400;
        msg = "Secret on incoming request doesn't match the configured secret";
      } else {
        try {
          const id = event.body.id;
          const isCanceled = await this.deleteEvent(event);
          if (isCanceled) {
            status = 200;
            msg = `Cancelled scheduled task for event ${id}`;
          } else {
            status = 404;
            msg = `No event with ${id} found`;
          }
        } catch (error) {
          console.log(error);
          status = 500;
          msg = "Failed to schedule task. Please see the logs";
        }
      }

      this.httpRespond({
        status,
        body: {
          msg,
        },
      });
    },
    async deleteEvent(event) {
      const componentId = process.env.PD_COMPONENT;
      const inChannel = this.queuedEventsChannel();

      // The user must pass a scheduled event UUID they'd like to cancel
      // We lookup the event by ID and delete it
      const { id } = event.body;

      // List events in the $in channel - the queue of scheduled events, to be emitted in the future
      const events = await this.pipedream.listEvents(
        componentId,
        inChannel,
      );
      console.log("Events: ", events);

      // Find the event in the list by id
      const eventToCancel = events.data.find((e) => {
        const { metadata } = e;
        return metadata.id === id;
      });

      console.log("Event to cancel: ", eventToCancel);

      if (!eventToCancel) {
        console.log(`No event with ${id} found`);
        return false;
      }

      // Delete the event
      await this.pipedream.deleteEvent(
        componentId,
        eventToCancel.id,
        inChannel,
      );
      return true;
    },
    emitEvent(event, summary) {
      // Delete the channel name and id from the incoming event, which were used only as metadata
      const id = event.$id;
      delete event.$channel;
      delete event.$id;

      this.$emit(event, {
        summary: summary ?? JSON.stringify(event),
        id,
        ts: +new Date(),
      });
    },
  },
  async run(event) {
    await this.selfSubscribe();

    const { path } = event;
    if (path === "/schedule") {
      this.scheduleTask(event);
    } else if (path === "/cancel") {
      await this.cancelTask(event);
    } else if (event.$channel === this.selfChannel()) {
      this.emitEvent(event);
    }
  },
  sampleEmit,
};