Pipedream is a platform for creating event driven sources, and consuming those sources via workflows. As you can imagine, we see a lot of different kind of workloads. We'll take a look at how we initially built our request handling, and how we had to evolve it for this increasingly heterogeneous mix of workloads.
Workloads: tall and wide sources, light and heavy workflows
Let's take a look at two common types of workloads we see. Scheduled sources (like a twitter source) will emit many events all at once, but then have a long pause where they will not emit anything at all. Let's call these tall sources. Wide sources in contrast are things like busy http sources, where there are events in a steady (but possibly uneven) flow.
When they are consumed by workflows, the workflow code can either be light, something like send a message to Slack, or something heavier, like enriching data for ETL before inserting into a data lake. Light workflows durations are less than a second, and are relatively predictable, heavy workflows can take minutes, and can vary widely. Of course, you can have anything in between.
For the purposes of this post, let's say that the tall source is consumed by a light workflow and the wide source is consumed by the a heavy workflow.
Kinesis based job process
Our initial request handling architecture our sources create a job for each emit and insert them into a random Kinesis shard. Job processors then use KCL to pick up the request and run workflow. This scales incredibly well, add more HTTP servers, Kinesis shards and job processors as request volume grows. However we hit 2 big problems as we grew the platform.
Problem 1 - Non-linear delivery
You may have not noticed, but I slipped a very important detail in the summary 'random Kinesis shard.' The random shard allows us to distribute all the requests evenly over all the shards and job processors. This makes sure that every shard is loaded evenly. However for our tall sources, that means that all the emits will get spread over all the shards. While this is great if you want to get everything done as soon as possible, sometimes you want to be sure you are processing things one at a time, like when talking to a downstream service where racing changes could overwrite each other. In the diagram you can see JT2 and JT3 running at the same time.
Wide sources have a similar problem, some later emits can actually get processed earlier than later emits, particularly on heavy workflows. (JW4 actually starts before JW3). You may want to force an in order delivery.
Problem 2 - Kinesis shard handoffs
This problem is low-level and specific to KCL, so feel free to skip to the next section and take it on my word that KCL has some real big problems when you need to restart shard processors. The KCL library, while fine in the steady state, has some problems when it comes time for a job processor to give up its lease and another one to pick it up. Though tunable, at best the default settings take about 20 seconds after a processor gives up its lease for another job processor to notice and pick it up. Tune that downwards and you end up increasing your dynamo costs. Worse, the nature of how kinesis handles checkpointing leads to a bigger problem.
KCL only allows you one checkpoint per shard (this stems from the fact that the underlying Kinesis data stream is very much a stream and does not allow easy random access). This is fine when you are dealing with short lived homogenous tasks, but since we allow tasks to run up to 5 minutes, the shard is often times in a situation like below where a heavy workflow is mixed with a light workflow:
All the green jobs have finished, but the long job B is still running. Now imagine you get a shutdown signal, you now have 3 unsavory choices:
- Checkpoint Job A and release the lease. This allows the new job processor to start immediately (modulo the above timing constraint), however it will now need to re-run Job B through Job C, which will mean duplicate events.
- Checkpoint Job C and release the lease. The handoff is immediate again, but Job B is aborted, and won't be re-run.
- Wait until Job B is done, and checkpoint. Everything is run once to completion, but the hidden cost here is that the shard will now be un-serviced for 5 minutes. We have stopped servicing on the old processor, but can't hand off to the new processor. Meanwhile, jobs are still coming into this shard, so anyone unlucky enough to be routed to this shard will up to a 5 minute delay in their job starting!
Since we are a CI/CD shop, we deploy many times a day, so these handoff issues are a real pain point.
Redis Stream per workflow
To solve both the problems, we decided to implement a queue per workflow. Kinesis is not meant to scale this way (you are limited in the number of data streams you can create, and streams don't scale down cost-wise to the low throughput workflows that many of our users use), so we had to build something on our own. We already used a lot of Redis so it was natural to use Redis Streams as our new message bus.
When a request comes in, it gets written to its own Redis stream based on the source or workflow, and then we ping a broker to make sure that that queue is being managed by one (and only one) process, we call this creating a lease for a tenant.
Every tenant looks for new leases when it has cycles to spare, and once it has the lease, it has the exclusive access to the redis stream which contain the jobs. In this case it uses that exclusive access to feed jobs to the job processors.
An idle job processor will pick it up that job, read the event from the Redis stream, run the workflow, and signal back to the tenant when the job is completed.
Solution #1: Linear Delivery
To enforce linear delivery, the tenant will only feed a job to a job processor one at a time, meaning it will wait for that job completion signal before sending the next one in the queue.
Solution #2: Smooth Handoffs
Since the architecture has more parts, that has to result in a more complex handoff story right? Wrong. Let's break it down by each subsystem.
Okay, I was exaggerating a bit when I said we pinged a broker. All this ping is a set membership check on Redis set, to see if a lease is allocated, and if it isn't it creates a lease, which is (surprise!) an entry in a Redis Stream. That logic can be encapsulated in a lua script on Redis itself, so there are no actual broker servers, nor any state to hand off.
We take great pains in making sure that any state that tenants are using to run the queues are actually persisted in Redis as well (Redis Streams itself provides a lot of this out of the box). During an orderly shutdown, a tenant will vacate its lease, which will allow another tenant to pick it up immediately with all its associated state. Even in failure cases, a tenant will get its leases evicted and another tenant can take it over with its current state.
Job processors are agnostic to the jobs they are consuming, they just take the next available job and run it. Since Redis Streams allow every job to be XACK(nowledged) separately, it doesn't suffer from the problems of only being able to checkpoint one point of the stream. When job processors need to be restarted, they stop accepting jobs and finish the ones that it is working on. Doing so does not block any other progress, since new jobs will be serviced by other available job processors.
As mentioned in the preface, Kinesis scales incredibly well. How does this system fare? Again, let's break it down by subsystem
Remember, brokers are actually just lua scripts running on Redis. It's 1 op when the lease is already assigned, and 5 ops when the lease needs be created (this is including the operations that the tenant needs to do when it acquires a lease). However, since request volume dwarfs the number of active sources, it's effectively one op per request. This can be furthered optimized via some simple client side caches. If necessary we could shard Redis by source, but since Redis can scale to 500k ops/sec/shard, one shard goes a long long way. The memory requirements are also tiny, since we are not holding request data here, just a little bit of metadata.
Tenants are also very memory efficient, they only use the queue states, not any event data, since that's all stored in Redis. To scale the Redis Streams, we make use of Redis Cluster, sharding the streams by workflow.
This design can be used to do all sorts of other interesting things other than serial delivery that would've been impossible using Kinesis. The first two we've added is rate limiting and scheduled delivery. Both cases are again making sure that we store the state for each (which boils down to the next time we can run a job) in Redis, and setting timers in the tenant about when to send the message to a job processor.