Skip to main content

Command Palette

Search for a command to run...

The Bidirectional Stream Processor: Why Pull Beats Push for Crash Recovery

Updated
7 min read
The Bidirectional Stream Processor: Why Pull Beats Push for Crash Recovery

At Epsio — we’re building a streaming SQL engine, focusing our engineering effort towards ease of use / removing the middleware usually associated with stream processing.

As part of that effort — we need to ensure that if and when our stream processor crashes, no data duplication occurs. This means that even in the scenario of a disk malfunction, network issues, unexpected compute restart — there shouldn't be duplications in the output stream. Each inputted change should be received once, processed once, and its derived change should also be outputted only once to the component after the stream processor.

As you’ll see in this blog post, certain scenarios can make this especially challenging — particularly when it’s difficult to determine exactly what the stream processor managed to process or output before it crashed. Since our mechanism can’t depend on any local disk or state that might be lost during a crash, we had to get a bit creative in figuring out which messages had already been sent by the stream processor — and which hadn’t.

Problematic Scenario #1 — Connection to Sink Breaking

The first (and easier to handle) failure scenario is when the connection to the sink breaks during a batch write or delete operation. This can happen either because of a network hiccup, a sink/database restart, etc..

When this failure occurs (e.g., an insert operation receives a timeout or a TCP connection breaks forcefully mid insert), the stream processor is left in the dark: did the sink manage to apply any of the writes before the disconnect? If we retry the operation blindly, we risk duplicating data. But if we skip it, we could lose records altogether.

To handle this, we employ one of two strategies, depending on the capabilities of the target sink:

Utilizing transactions

Luckily, many of the data sinks we work with have transactions built-in. This makes life much easier because we can apply a batch of changes in bulk, and if our connection breaks in the middle — all is good because the transaction wasn’t committed yet! For example:

BEGIN;
INSERT <change1>
INSERT <change2>
INSERT <change3>
COMMIT;

If for some reason, the server disconnects between insert2 and insert3 — we know the previous inserts didn’t get committed yet — so we can just restart the entire write operation when we re-establish the connection and commit. Great, right? No need to worry about duplicate writes anymore?

Wrong!

In the world of data processing, we need to be prepared for failure at ANY POINT IN TIME. This include the specific time when we run the COMMIT command. If the connection breaks while we perform the COMMIT — how can we know if the transaction was committed (and not that the reply confirming the COMMIT just never reached us?), or if we need to re-apply the transaction?

You might think this scenario is fairly rare (how often does a database restart? And how unlucky must one be to have a restart exactly on the commit command) — but this is something that actually happened many times for some of our users (when running many views and writing into many data stores that frequently upgrade/restart due to scaling events).

Surprisingly, some of the most famous stream processors (Flink included!) market themselves as “exactly once” stream processors, but can’t handle this edge case:

“If a commit fails (for example, due to an intermittent network issue), the entire Flink application fails, restarts according to the user’s restart strategy, and there is another commit attempt. This process is critical because if the commit does not eventually succeed, data loss occurs.” — An Overview of End-to-End Exactly-Once Processing in Apache Flink

Epsio, on the other hand, overcomes this by utilizing a built-in feature many data stores have to check if a specific transaction ID was committed. This means that when the engine needs to decide whether to re-apply the changes to the sink, it can just check if the previous transaction ID was committed or not. For example, in PostgreSQL:

BEGIN;
SELECT xid_current(); -- Save this for later. If commit fails, check txid if was committed
INSERT <change1>
INSERT <change2>
INSERT <change3>
COMMIT;

If the COMMIT command fails, Epsio simply checks if the transaction ID was previously committed or not and decides if it needs to re-apply all the changes again.

Making writes idempotent

Sometimes, transactions aren’t available in the data sink — so Epsio has to “cheat” its way into only delivering changes once.

Instead of trying to guarantee that a change is written only once to the datastore, an alternative approach is to make the write process idempotent: if a particular change has already been written, any subsequent attempt to write it again will be safely ignored.

Epsio handles this by assigning a unique identifier (UUIDv7) to each change before it’s sent to the sink. It then relies on unique indexes (or similar constraints) in the target datastore to ensure that duplicate writes are ignored — typically by using an UPSERT or merge operation.

For example, if a batch of three changes is being written to the sink and the connection drops after the first change is written, retrying the batch will result in only the remaining two changes being applied. The first change, already present, will be ignored.

Problematic Scenario #2 — Engine crashes Mid-Processing

A more complex (and interesting!) scenario arises when the stream processor itself crashes or restarts mid-processing.

In this scenario, it’s unclear how the processor should proceed upon recovery. How does it know which changes were already written? How does it continue to ensure no duplicate changes are written to sink, even in the scenario of state data loss?

Traditional stream processing systems — ahem, ahem, Flink — handle this by combining end-to-end transactions with periodic state checkpoints. Here’s how it works:

  1. For each input batch, start a transaction in the sink.

  2. Process, transform and write the batch into the sink (using the transaction opened in previous stage)

  3. After fully processing the batch, back up the internal state (key-value store, input stream offsets, metadata, etc.).

  4. Commit the transaction after all steps are successful.

If a failure occurs mid-processing, a “replica node” (a node sitting in standby) can revert to the last successful checkpoint/backup. Since the transaction isn’t committed while the batch is still being processed, any partially outputted changes from this batch are discarded, and the replica node simply retries processing the entire batch.

The huge (!) downside of this approach — is that the “frequency” of your transactions is highly limited by how fast you can perform checkpoints and backup all your internal state. Consider Confluent’s default checkpoint interval of 1 minute — this overhead means that achieving exactly-once guarantees in Confluent’s Flink implementation currently requires accepting at least 1 minute of latency in your stream processing pipeline.

“Exactly-Once: If you require exactly-once, the latency is roughly one minute and is dominated by the interval at which Kafka transactions are committed” — Delivery Guarantees and Latency in Confluent Cloud for Apache Flink

How does Epsio solve this?

Unlike traditional stream processors that function as a “one-way street,” continuously receiving data from sources and pushing it downstream, Epsio is bidirectional. This means it can not only push data to the sink but also pull data from it when needed.

This bidirectional approach is crucial because it enables failure recovery in ways that traditional stream processors can’t. While traditional processors simply push data to the sink, Epsio can pull data back from the sink during failure scenarios and compare the current state in the sink with its expected internal state — thereby avoiding outputting changes it already outputted.

For example, consider a highly available Epsio deployment with a “Main” instance and a “Replica”. This system receives a stream of changes and multiplies each input integer by 2. To prepare for failure scenarios, the replica will continuously maintains internally the “expected” result state of the sink.

When a failure occurs, the replica will pull the current state from the sink, compare it with its expected state, and apply only the differences (ie new rows that need to be inserted / deleted).

After applying these changes, the replica can just continue ingesting more events from the input stream, and never worry about duplicate data delivery being an issue.

Because this approach doesn’t require any external checkpoint/backup, and more importantly decouples sink writes from state backup frequency, it enables users to achieve strong delivery guarantees without sacrificing streaming latency.

Concluding Thoughts

Historically, stream processors have been built to operate in isolation — generic, self-contained components with minimal awareness of their upstream and downstream counterparts. While this approach has merit, it comes with significant trade-offs.

While there are still many things to talk about in the above “strategy” (how can you deal with the scenario multiple instance crash, how do you deal with a scenario where the sink dataset is too large to efficiently maintain on the instance, etc…) — I hope this article gave you a small glimpse into how stream processors could become significantly easier to use — and more efficient — if they were designed to be more “aware” of the surrounding ecosystem.