Why is the `$.flow.rerun()` Function in the OpenAI SDK Causing My Node.js Test Step to Get Stuck Running without Completion?

This topic was automatically generated from Slack. You can find the original thread here.

I’m having a lot of trouble with $.flow.rerun() and would greatly appreciate some help. I feel like i’ve tried almost everything, even Pi couldn’t even help.

Every time I try to test a step which has the $.flow.rerun() getting triggered in it, it just gets stuck running and the step never completes testing. It’s like it breaks and I don’t even get a timeout error…

I’m using this feature to poll a thread “run” status with the openAI assistants API using node.js and the openAI SDK.

Is there something I could be missing? I’m happy to provide my current code. Thank you for your attention :pray:

for reference here is my code attempts. I don’t see anything wrong here…

import OpenAI from "openai";

export default defineComponent({
  async run({ steps, $ }) {
    const apiKey = process.env.OPENAI_API_KEY;
    const openai = new OpenAI( apiKey );

    const MAX_RETRIES = 5;
    const DELAY = 30000;

    const threadId = steps.retrieve_thread.$return_value.id;
    const assistantId = steps.retrieve_assistant.$return_value.id
    const runId = steps.create_run.$return_value.id;

    //run retrieve
    const run = await openai.beta.threads.runs.retrieve(threadId, runId);
    const runStatus = run.status;
    

    if(runStatus!= "complete"){
        console.log("run not complete, run status: ",runStatus);
        $.flow.rerun(DELAY,null,MAX_RETRIES);
    } else {
      const lastThreadMessage = await openai.beta.threads.messages.list(
        threadId,
        {
          order: "desc",
          limit: "1"
        }
      );
      
      return lastThreadMessage;
    }
  },
});

then i tried:

import OpenAI from "openai";

export default defineComponent({
  async run({ steps, $ }) {
    const apiKey = process.env.OPENAI_API_KEY;
    const openai = new OpenAI( apiKey );

    const MAX_RETRY = 10;
    const DELAY = 5000;

    const threadId = steps.retrieve_thread.$return_value.id;
    const assistantId = steps.retrieve_assistant.$return_value.id
    const runId = steps.create_run.$return_value.id;

    const flowRuns = $.context.run.runs;
    console.log(flowRuns);
    
    if(flowRuns === 1){
      // first run through
      console.log("first run...");
      $.flow.rerun(DELAY, null, MAX_RETRY);
      
    } else if (flowRuns <= MAX_RETRY) {
      //retrieve openAI run status
      console.log("flow run#: " ,flowRuns);
      const run = await openai.beta.threads.runs.retrieve(threadId, runId);
      const runStatus = run.status;
      console.log("run status: ", runStatus);

      if(runStatus === "complete"){  
        console.log("flow run#: " ,flowRuns);
        console.log("run status: ", runStatus);
        const lastThreadMessage = await openai.beta.threads.messages.list(threadId,{order: "desc", limit: "1"});
        
        return lastThreadMessage;
      } else {
        $.flow.rerun(DELAY, null, MAX_RETRY);
      }
    } else {
      // Max retries exceeded
      throw new Error("Max retries exceeded");
    }
  },
});

Hey , I believe you’re using $.flow.rerun incorrectly. The $.flow.rerun will pause your whole workflow. If you want to retry in your function, you’ll need to implement a function to retry in your code

I think he’s using it correctly. We’re doing the same thing in many of our workflows.

The issue is that $.flow.rerun() is not fully supported in the builder.

Typically, we’d have to deploy these workflows in order to test them.

However, support is improving! (see this thread from yesterday)

What I would suggest is to log the return of $.flow.rerun(), especially the resume_url. Then, just call that URL by hand in order to “wake up” the workflow.

You can wait 30 seconds or however long you want before calling it. That will give you a similar behavior to how it will work once deployed.

Oh I didn’t know that, thanks

So it seems like the resume_url functionality now works well in the builder, but not the pause/delay+timeout part. But I’m sure that will be supported soon as well. :slightly_smiling_face: :+1:

Although I did run into a weird issue today actually, because when the resume_url was called by a webhook/callback, it resumed the workflow three times. I know because I received three Slack notifications that the workflow had competed. :sweat_smile:

Edit: Just debugged this with :point_up_2:, and it only happens when the resume_url is called multiple times in quick succession.

Yep , as Marco pointed out, testing in the builder will make it get stuck! But it should work fine after deploying your workflow

Edit: Just debugged this with :point_up_2:, and it only happens when the resume_url is called multiple times in quick succession
Yup, and also in the builder, not on deployed workflows

More context here

: Also, it turns out the resume_url can’t easily be printed in the console, so it’s better to write it to the data store:

await this.ds.set(
  'test_rerun',
  $.flow.rerun(5000, null, 1)
);

Then you can grab the resume_url from there to manually resume your workflow. :+1:

@U05LWJMUX0A
Thank you guys for your input! I’m still relatively new to pipedream and javascript too actually. And thanks for the datastore tip! i’m not familiar with ds but i’ll read up :pray:

ds is just a data store prop! It could be called anything else.

Like this:

export default defineComponent({
  props: {
    ds/something_else: {
      type: "data_store"
    }
  },
  async run({ steps, $ }) {
    ...
  },
})