Automate anything in minutes with Snowflake + Pipedream

Pipedream lets you connect Snowflake to a forest of apps: it supports 1,000+ integrations, Snowflake triggers and actions, and code-level control for your automations. Learn how to connect Snowflake, ChatGPT, and Slack in 5 minutes.

Automate anything in minutes with Snowflake + Pipedream

This is Part 1 of our Pipedream + Snowflake series. This week, we'll show you how to use Pipedream to observe data in Snowflake and trigger automations from those events.

You'll build a workflow that runs on failed Snowflake tasks, consults ChatGPT on the error, and provides a corrected query in Slack, all in less than 5 minutes:

In Part 2, we'll cover how we use Snowflake at Pipedream, and share a few more workflows we've built to automate our own internal processes and save hours of our team's time. You'll see how to automate virtually anything, and we hope you'll be hooked by the creative process of developing workflows.

What is Pipedream?

Example workflow: send HTTP requests to this endpoint, send that request data to Snowflake and 1,000+ other apps

Pipedream is an integration platform for developers.

Pipedream provides a hosted platform for connecting apps and developing event-driven automations. You can connect to 1,000 fully-integrated applications, using pre-built components to quickly send messages to Slack, add a new row to Google Sheets, send data to Snowflake, and more. You can also run any Node.js, Python, Golang, or Bash code when you need custom logic. Pipedream has a generous free tier to help you get started.

Watch the demo to see how quickly you can build complex workflows:

Read more in our docs, and join our community as you have questions.

Pipedream + Snowflake

Below, you'll build a workflow that:

  1. Runs any time a Snowflake task fails
  2. Sends the SQL and the error to OpenAI's Chat API — the same API that powers ChatGPT — and asks it to correct the SQL.
  3. Sends information on the failed task, with ChatGPT's response, to Slack:
ChatGPT is right — it was a typo

If you want to run this workflow yourself, sign up for Pipedream and open this workflow template. Just connect your Snowflake, OpenAI, and Slack accounts, set the tasks you want to watch for, and choose your Slack channel. That's it.

We'll show you how to build this from scratch. You can have the whole thing working in 5 minutes.

First, create a new workflow at https://pipedream.com/new. Workflows start with a trigger step: the event that runs the workflow. We want to run the workflow when a Snowflake task fails. Search for Snowflake and select the Failed Task in Schema trigger:

Search for Snowflake, select Failed Task in Schema source

You'll be asked to connect your Snowflake account. When you do, you'll find instructions for creating a Snowflake user with the appropriate permissions. See our Snowflake guide for more info. To monitor failed tasks, this user will need SELECT privileges on TABLE(INFORMATION_SCHEMA.TASK_HISTORY()).

Once you connect your account, select the Database and Schema you want to watch for failed tasks. Any task configured in this schema will trigger this workflow if it fails.

You can select any databases and schemas your Snowflake user has access to

Click the Create Source button to create your trigger. This trigger runs as a separate resource called an event source. You can modify the sources's configuration, see the list of failed tasks, and even edit the code at https://pipedream.com/sources.

As soon your Failed Task source is created, Pipedream will look for any failed tasks in the last 24 hours and emit them as "test" events. If you see a test event, select one. Pipedream will use this data as you add and test steps in the rest of your workflow.

If you don't see any failed tasks, create a test task in Snowflake, run it, then click the Try Now button. Pipedream should pick up the failed new failed task.

-- Create a test task that runs SQL with a typo
CREATE TASK test_task
  SCHEDULE = 'USING CRON * * * * * America/Los_Angeles'
  AS
  SELECT CURRENT_TIMESTAM();

-- Execute now
EXECUTE TASK test_task;

-- Drop the test task
DROP TASK test_task;

-- Pipedream runs a query like this to look for new failed events
SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
WHERE state = 'FAILED';
SELECT CURRENT_TIMESTAM() will fail

When you select an event, you'll see the task name, the SQL the task ran, the error message, and more. Read more about events here.

An example event in Pipedream for a failed task

Click the + button below the trigger step. This adds another step to your workflow, where you can connect to 1,000+ other integrations. These steps will run when the workflow is triggered on failed tasks.

Search for OpenAI and select the Chat action. Pipedream just released an integration with OpenAI's Chat, Audio, and Embeddings API. We're going to use the Chat action to tell us more about the error, and correct the SQL query.

Connect your OpenAI account, and enter the following text in the User Message field.

Summarize why this Error happened in Snowflake, correct the Query. Return the error reason and the corrected SQL, formatting all code for Slack (i.e. surrounded by backticks, or three backticks for larger code blocks). Pay attention to misspellings and other common issues.

Error: {{steps.trigger.event.errorMessage}}
Query: {{steps.trigger.event.queryText}}

Notice that we're referencing the failed task data at {{steps.trigger.event.errorMessage}} and {{steps.trigger.event.queryText}}. This will be replaced with the data from your test event when we make a request to the Chat API.

Under Optional Fields, select System Instructions. Provide additional instructions to make sure the API provides Snowflake-specific answers:

You're an agent that helps diagnose errors with queries run on the Snowflake database

The action should look like this when configured:

OpenAI Chat action configuration

Click the Test button at the bottom of this step. This will make a request to the Chat API, returning the response.

Add another step to your workflow, search for Slack, and choose one of the Send Message actions:

Connect your Slack account, select your channel, and add the following Text to include the original query, the correct query, and information on the error:

Failed task \`{{steps.trigger.event.taskName}}\`

Query: \`{{steps.trigger.event.queryText}}\`

Error:

\`\`\`
{{steps.trigger.event.errorMessage}}
\`\`\`

{{steps.chat.$return_value.generated_message.content}}

If you test the step, you should see a message like this in Slack:

Final Slack message

Once you're done, Deploy the workflow in the top-right of this page. All the steps you just configured will now run on new failed tasks.

What else can I do with Pipedream + Snowflake?

Pipedream supports a handful of other Snowflake sources:

  • New Row — Run a workflow each time a new row is added to a table / view. For example, if you add new customers to a users table in Snowflake, you can trigger a workflow for each new row, sending a welcome email or kicking off any automation.
  • New Query Results — Run any SQL query on a schedule. Every record returned from the query will trigger a Pipedream workflow. If you want to watch any data in Snowflake and run some automation on its results, this is a powerful source.
  • New, Updated, or Deleted Warehouse — Run a workflow when a warehouse is added, updated, or deleted. This is helpful for monitoring changes to warehouse sizes or other details.

You can also insert one or multiple rows into Snowflake from any step in your workflow. You can send data from AWS, Slack, Stripe, and any app integrated in Pipedream directly to Snowflake. You can also accept incoming emails, webhooks, and any event in any system that's exposed via API.

Add a new row with data from the trigger or any other step of a workflow

We'd love to see what you build. If you have any questions, or want to share example workflows, reach out in the Pipedream community.