Ensuring Data Consistency Across Replicas

Chris Bonadonna
Mixpanel Engineering
9 min readFeb 28, 2023

--

At Mixpanel, data trust is of utmost importance. This means that once customers have sent data to our APIs and gotten an OK response, we should not lose that data or change it in unexpected ways. Our emphasis on data integrity has meant that we’ve designed our systems in a way to quickly identify and resolve any issues with data.

The high-level architecture

Mixpanel is powered by a distributed, multi-tenant database named Arb. Arb organizes customer data into logical units called projects. We keep two copies of our production data to ensure availability and the ability to roll out changes safely. The high-level setup looks something like this -

In this diagram, we have two zones, each of which has a service reading from Kafka. That service is responsible for reading the data, buffering it into a file, then uploading the file to GCS once the file is big enough or old enough. Under that paradigm, if a bug, OS upgrade issue, etc. is rolled out to the first zone, the second zone will have also written the data and can be used as the source of truth. In these rare instances, we can copy the data from the “good” zone to the “bad” zone after the root cause of the problem has been fixed or rolled back.

Detecting differences between zones

While having two copies of the data is key to our recovery strategy, we haven’t talked at all about how we detect differences between these zones. The first decision we needed to make was, “when should we compare data between the two zones?” Ultimately, the data becomes available when it gets written to a file, so we should be comparing it sometime after writing to the file. Comparing after every event is written would be too often (imagine reading a whole file, then communicating information about it across zones for every event when we occasionally reach 1 million events per second), but we still would like to detect differences quickly. To that end, we decided to check when we cut over files (remember, this is based on file size and file age). The main challenge here is making sure that the files are exactly the same between the two zones when we cut them over.

Making files the same

To ensure that files are the same, we need both the contents of the files and the points at which they cut over to be the same across zones. For the cutover question, there are two main strategies we considered employing to ensure consistency.

One option was to introduce coordination between the zones in some form; examples of solutions in this space include a leader/follower relationship between zones or a push-based API between the zones with rewind mechanisms.

The other option, which we decided to implement, was to make the cutover heuristics between the zones consistent. This section will focus on that set of solutions.

Ensuring data is the same

In our simplified architecture diagram above, you’ll have noticed that a given Kafka topic has multiple partitions. We need multiple partitions for every topic to ensure high availability in cases where some Kafka brokers are down. Data can be sent to any of these partitions, and we still need to write it to files. In the past, this was done by eagerly pulling from all partitions and writing to the same file. This creates non-determinism in the file contents because the ordering is no longer guaranteed -

To resolve this, we started writing to separate files per partition in Kafka, relying on Kafka’s ordering guarantees to ensure our serial writes are the same across zones (remember that both zones read from the same Kafka instance):

One problem that readers may have noticed is that adopting the strategy of a file per partition can drastically increase the number of files we store, which could have adverse effects elsewhere in our infrastructure. We mitigate that problem by prioritizing a given partition for each project since files will never be shared across projects, but Kafka topics will have data from many projects. We’ll only fail over to other partitions in the event that the “preferred” partition is unavailable -

We can see that after changing which partitions we write to, almost all data for a given project (A or B) will go to the same partition. This means that we’ll almost never need to process data from the same project in multiple partitions, limiting the number of files we need to create.

Cutting over based on size

Choosing when to stop writing to a file and writing to a new one due to the file getting too large is the simplest piece, given that it’s something we’ve always needed to do. In order to do this properly, we need to check the size of the file after each append operation, and cut over if it’s too large.

There are two considerations that make this slightly more complicated —

File Format Changes

While the files are append-only and in one of our proprietary formats, we occasionally need to update our file formats to add features (for example, to support additional precision in timestamps) or make performance improvements. Under these circumstances, we still need to incrementally roll out changes, meaning that the two zones could be writing different bytes in an “expected” manner — we’ll talk about how this affects comparison in a later section. The important invariant to maintain in this situation is that we write the same events to the file before cutting over. To accomplish this, we store both the size of the file and the size of the oldest version of bytes we could have written to the file, using the latter to determine when we cut over. This ensures that both zones will dictate size-based cutover based on the same “size” of events, and have the same events in the file even if one version leads to larger events.

Restarts

In the past, we would always create a new file after restarting our tailing service. While this was simpler in some ways, it makes it impossible to have consistent cutovers (imagine restarting partway through a file in one zone but not the other; it would result in the restarted zone having a smaller file than the other zone, throwing off our equality invariant). The way we remedied this gap was to resume files on restart and persist all information we needed to cut over (Kafka offset, oldest version size, and timestamps) periodically, truncating the files to whatever we had last persisted on the restart and resuming from that point.

Cutting over based on age

Age-based guarantees required a little more thought since ingestion might occur at different times, clocks can skew, etc. Ultimately, cutting over based on some form of age requires the source of truth to be shared across the two zones. The Kafka message itself is the natural vehicle since we’re using it for cross-zone consistency of data already. We considered both time-based heuristics and message count-based heuristics before eventually deciding to maintain a more similar paradigm in the form of cutover based on time.

We leverage the timestamp stored on the Kafka message, keeping track of the oldest and newest Kafka timestamp for all messages a file has data for. When we see a Kafka message for any file in the partition beyond our age threshold, we then cut over the file. This ensures consistency because all messages for the partition are read serially, and using the Kafka timestamp for both “when was this file opened/written to” and “what is the current time” ensures both zones see the same timestamps after the same number of events.

In the event that there are a large number of open files for a given partition (which is the case in our system), it is prohibitively expensive to check every open file’s timestamps on every message. To avoid doing so, we perform age-based cutover checks only every X messages. This is implemented as a modulus check against the Kafka offset to avoid having to maintain a count of messages and to avoid problems around non-contiguous offset ranges that exist in other queue systems.

Comparing the Files

Now that we understand how we can ensure file shapes and contents are consistent across zones, we can look at how to compare those files. Our high-level strategy is to write the Kafka offsets and checksums to a table in our preferred distributed database, Spanner, then compare the results to the other zone if it has made it to the same point. Because we perform these inserts and comparisons transactionally, we never miss a comparison due to writing at the same time. The remaining sections will focus on the problems we had to solve in this approach.

Backward compatibility

We touched on the concept of deliberate changes to the file format earlier, which presents some problems for comparing bytes (or more accurately an MD5 hash of the bytes) directly. Internally, we have a backward-compatible binary that will read data and stream the results in a consumable form. Our solution for the purposes of comparison was to run this binary on the “finished” file and take an MD5 of that result. The backward compatibility of those results ensures that the MD5s will be consistent across zones, even if the format of the files is different.

Differing ingestion speeds

Given that the two zones are almost completely isolated from each other, the rate of ingestion can differ between the zones due to a variety of reasons (deploys, different shard layouts, etc.). In those cases, we need to accommodate either zone being ahead, while also ensuring we catch any mismatches. The sections below describe the main ways a mismatch can occur.

This diagram represents a scenario where we dropped an event in zone 2, and started the subsequent file slightly later in terms of Kafka offsets. This should generally lead to a differing MD5 (although not always — for example, if all the events in the union of Kafka offsets for the two files were duplicates, they’d result in the same MD5), but we still want an easy way to identify why the mismatch occurred. To account for this type of bug, when querying Spanner, we query for the first file with a start offset greater than or equal two the new file we just finished. If the start offsets don’t match, we fail the comparison and alert the team. This type of comparison is important to catch since inconsistent files make it impossible to compare the checksums, invalidating the entire exercise.

This diagram represents a scenario where we cut over a file at the wrong time, be it too early or too late. Similar to the above scenario, this will generally (but not always) lead to mismatched checksums and is mostly important to catch for debugging purposes and to ensure that we can always compare the checksums. To account for this failure mode, we compare the end offsets of the two files in addition to the start offsets, failing the comparison if they don’t match.

This diagram represents a scenario where an event in the file is missing or corrupted. This comparison is the end result of the architecture we built, and a mismatched MD5 between the two zones will fail the comparison and alert the team.

Conclusion

After we rolled out this change, we were able to quickly test this out by knowingly deploying the tailers with code that modified events in one zone and not the other. The monitoring we have in place quickly alerted us and we were able to restore the correct copy of the data from the other zone.

If working on problems like this interests you, come join us!

--

--