Using Data Stores
You can store and retrieve data from Data Stores in Python without connecting to a 3rd party database.
Add a data store as a input to a Python step, then access it in your Python handler
with pd.inputs["data_store"]
.
def handler(pd: "pipedream"):
# Access the data store under the pd.inputs
data_store = pd.inputs["data_store"]
# Store a value under a key
data_store["key"] = "Hello World"
# Retrieve the value and print it to the step's Logs
print(data_store["key"])
Adding a Data Store
Click Add Data Store near the top of a Python step:
This will add the selected data store to your Python code step.
Saving data
Data stores are key-value stores. Saving data within a data store is just like setting a property on a dictionary:
from datetime import datetime
def handler(pd: "pipedream"):
# Access the data store under the pd.inputs
data_store = pd.inputs["data_store"]
# Store a timestamp
data_store["last_ran_at"] = datetime.now().isoformat()
Setting expiration (TTL) for records
You can set an expiration time for a record by passing a TTL (Time-To-Live) option as the third argument to the set
method. The TTL value is specified in seconds:
def handler(pd: "pipedream"):
# Access the data store under the pd.inputs
data_store = pd.inputs["data_store"]
# Store a temporary value that will expire after 1 hour (3600 seconds)
data_store.set("temporaryToken", "abc123", ttl=3600)
# Store a value that will expire after 1 day
data_store.set("dailyMetric", 42, ttl=86400)
When the TTL period elapses, the record will be automatically deleted from the data store.
Updating TTL for existing records
You can update the TTL for an existing record using the set_ttl
method:
def handler(pd: "pipedream"):
# Access the data store under the pd.inputs
data_store = pd.inputs["data_store"]
# Update an existing record to expire after 30 minutes
data_store.set_ttl("temporaryToken", ttl=1800)
# Remove expiration from a record
data_store.set_ttl("temporaryToken", ttl=None)
This is useful for extending the lifetime of temporary data or removing expiration from records that should now be permanent.
Retrieving keys
Fetch all the keys in a given data store using the keys
method:
def handler(pd: "pipedream"):
# Access the data store under the pd.inputs
data_store = pd.inputs["data_store"]
# Retrieve all keys in the data store
keys = pd.inputs["data_store"].keys()
# Print a comma separated string of all keys
print(*keys, sep=",")
The datastore.keys()
method does not return a list, but instead it returns a Keys
iterable object. You cannot export a data_store
or data_store.keys()
from a Python code step at this time.
Instead, build a dictionary or list when using the data_store.keys()
method.
Checking for the existence of specific keys
If you need to check whether a specific key
exists in a data store, use if
and in
as a conditional:
def handler(pd: "pipedream"):
# Access the data store under the pd.inputs
data_store = pd.inputs["data_store"]
# Search for a key in a conditional
if "last_ran_at" in data_store:
print(f"Last ran at {data_store['last_ran_at']}")
Retrieving data
Data stores are very performant at retrieving single records by keys. However you can also use key iteration to retrieve all records within a Data Store as well.
Data stores are intended to be a fast and convienent data storage option for quickly adding data storage capability to your workflows without adding another database dependency.
However, if you need more advanced querying capabilities for querying records with nested dictionaries or filtering based on a record value - consider using a full fledged database. Pipedream can integrate with MySQL, Postgres, DynamoDb, MongoDB and more.
Get a single record
You can retrieve single records from a data store by key:
def handler(pd: "pipedream"):
# Access the data store under the pd.inputs
data_store = pd.inputs["data_store"]
# Retrieve the timestamp value by the key name
last_ran_at = data_store["last_ran_at"]
# Print the timestamp
print(f"Last ran at {last_ran_at}")
Alternatively, use the data_store.get()
method to retrieve a specific key’s contents:
def handler(pd: "pipedream"):
# Access the data store under the pd.inputs
data_store = pd.inputs["data_store"]
# Retrieve the timestamp value by the key name
last_ran_at = data_store.get("last_ran_at")
# Print the timestamp
print(f"Last ran at {last_ran_at}")
What’s the difference between data_store["key"]
and data_store.get("key")
?
data_store["key"]
will throw aTypeError
if the key doesn’t exist in the data store.data_store.get("key")
will instead returnNone
if the key doesn’t exist in the data store.data_store.get("key", "default_value")
will return"default_value"
if the key doesn’t exist on the data store.
Retrieving all records
You can retrieve all records within a data store by using an async iterator:
def handler(pd: "pipedream"):
data_store = pd.inputs["data_store"]
records = {}
for k,v in data_store.items():
records[k] = v
return records
This code step example exports all records within the data store as a dictionary.
Deleting or updating values within a record
To delete or update the value of an individual record, assign key
a new value or ''
to remove the value but retain the key.
def handler(pd: "pipedream"):
# Access the data store under the pd.inputs
data_store = pd.inputs["data_store"]
# Assign a new value to the key
data_store["myKey"] = "newValue"
# Remove the value but retain the key
data_store["myKey"] = ""
Working with nested dictionaries
You can store dictionaries within a record. This allows you to create complex records.
However, to update specific attributes within a nested dictionary, you’ll need to replace the record entirely.
For example, the code the below will not update the name
attribute on the stored dictionary stored under the key pokemon
:
def handler(pd: "pipedream"):
# The current dictionary looks like this:
# pokemon: {
# "name": "Charmander"
# "type": "fire"
# }
# You'll see "Charmander" in the logs
print(pd.inputs['data_store']['pokemon']['name'])
# attempting to overwrite the pokemon's name will not apply
pd.inputs['data_store']['pokemon']['name'] = 'Bulbasaur'
# Exports "Charmander"
return pd.inputs['data_store']['pokemon']['name']
Instead, overwrite the entire record to modify attributes:
def handler(pd: "pipedream"):
# retrieve the record item by it's key first
pokemon = pd.inputs['data_store']['pokemon']
# now update the record's attribute
pokemon['name'] = 'Bulbasaur'
# and out right replace the record with the new modified dictionary
pd.inputs['data_store']['pokemon'] = pokemon
# Now we'll see "Bulbasaur" exported
return pd.inputs['data_store']['pokemon']['name']
Deleting specific records
To delete individual records in a data store, use the del
operation for a specific key
:
def handler(pd: "pipedream"):
# Access the data store under the pd.inputs
data_store = pd.inputs["data_store"]
# Delete the last_ran_at timestamp key
del data_store["last_ran_at"]
Deleting all records from a specific data store
If you need to delete all records in a given data store, you can use the clear
method.
def handler(pd: "pipedream"):
# Access the data store under the pd.inputs
data_store = pd.inputs["data_store"]
# Delete the entire contents of the datas store
data_store.clear()
data_store.clear()
is an irreversible change, even when testing code in the workflow builder.
Viewing store data
You can view the contents of your data stores in your Pipedream dashboard.
From here you can also manually edit your data store’s data, rename stores, delete stores or create new stores.
Workflow counter example
You can use a data store as a counter. For example, this code counts the number of times the workflow runs:
def handler(pd: "pipedream"):
# Access the data store under the pd.inputs
data_store = pd.inputs["data_store"]
# if the counter doesn't exist yet, start it at one
if data_store.get("counter") == None:
data_store["counter"] = 1
# Otherwise, increment it by one
else:
count = data_store["counter"]
data_store["counter"] = count + 1
Dedupe data example
Data Stores are also useful for storing data from prior runs to prevent acting on duplicate data, or data that’s been seen before.
For example, this workflow’s trigger contains an email address from a potential new customer. But we want to track all emails collected so we don’t send a welcome email twice:
def handler(pd: "pipedream"):
# Access the data store
data_store = pd.inputs["data_store"]
# Reference the incoming email from the HTTP request
new_email = pd.steps["trigger"]["event"]["body"]["new_customer_email"]
# Retrieve the emails stored in our data store
emails = data_store.get('emails', [])
# If this email has been seen before, exit early
if new_email in emails:
print(f"Already seen {new_email}, exiting")
return False
# This email is new, append it to our list
else:
print(f"Adding new email to data store {new_email}")
emails.append(new_email)
data_store["emails"] = emails
return new_email
TTL use case: temporary caching and rate limiting
TTL functionality is particularly useful for implementing temporary caching and rate limiting. Here’s an example of a simple rate limiter that prevents a user from making more than 5 requests per hour:
def handler(pd: "pipedream"):
# Access the data store
data_store = pd.inputs["data_store"]
user_id = pd.steps["trigger"]["event"]["user_id"]
rate_key = f"ratelimit:{user_id}"
# Try to get current rate limit counter
requests_num = data_store.get("rate_key")
if not requests_num:
# First request from this user in the time window
data_store.set(rate_key, 1, ttl=3600) # Expire after 1 hour
return { "allowed": True, "remaining": 4 }
if requests_num >= 5:
# Rate limit exceeded
return { "allowed": False, "error": "Rate limit exceeded", "retry_after": "1 hour" }
# Increment the counter
data_store["rate_key"] = requests_num + 1
return { "allowed": True, "remaining": 4 - requests_num }
This pattern can be extended for various temporary caching scenarios like:
- Session tokens with automatic expiration
- Short-lived feature flags
- Temporary access grants
- Time-based promotional codes
Supported data types
Data stores can hold any JSON-serializable data within the storage limits. This includes data types including:
- Strings
- Dictionaries
- Lists
- Integers
- Floats
But you cannot serialize Modules, Functions, Classes, or other more complex objects.