Resharding petabytes of data to improve performance for our largest customers

As our customers grow, their data volume grows as well. A customer that sends us 10 Million events per day today might send us 100 Million or 1 Billion events a year from now. Mixpanel allocates a default number of resources per customer to ingest, transform, and query their data. Growing data volumes put pressure on these resources, leading to increased latencies and higher unit costs. This can lead to a poor customer experience. In this post, we’re going to talk about the work we did recently to seamlessly increase resource allocation for our largest customer, reducing query latencies by 65% and data transformation costs by 30%.

Background

Mixpanel’s database, Arb, splits up customer data into smaller buckets called shards. Data in each shard is stored separately, and processes like ingestion (one-time processing and row-based storage), compaction (conversion to a columnar format), and queries are performed independently on each shard. Arb is a multi-tenant database, and this allows us to provide a consistent experience to all customers regardless of their data volume. Customers are given 200 shards by default, but larger customers can be given 800 or even 3200 shards. Each customer has a sharding spec, a piece of metadata describing how many shards they are assigned and where the shards live in Arb.

Within a customer’s instance (called a project), data is sharded by user id, meaning that all the data for each customer’s end user will live in the same place, allowing for quick user-based analysis. For example, you can use the following formula to determine which shard each datum should live in.

shard_id = hash(user_id) % N

Where N is the number of shards given to the customer.

Within each shard, data is split into separate files based on event date. This allows for efficient time-based queries, as a query for data within a given time range only loads data for that time, instead of all data.

At query time, the query is fanned out to a pool of local query servers (LQS). Each LQS is responsible for a subset of a customer’s shards, so each subquery operates in parallel on a smaller dataset. The results are then aggregated and displayed to the user. A key invariant of this design is that all of a user’s data lives on the same shard. This ensures that time-based user queries like Flows and Funnels can be performed independently by a single LQS.

Scoping

Our engineering team discussed several options for handling changing customer data volumes.

Option 1: Reshard future data

We could say, “At some time t1 in the future, set N to a larger value”. This would ensure that new data coming in is distributed across more shards. However, this would make it very difficult to perform queries with a time range that includes t1, as user data after t1 would live on a different shard than user data before t1, meaning that each LQS cannot independently handle a user’s subquery. As a result, we required that any redistribution of data would redistribute all data, not just data going forward.

Option 2: Query downtime

We could set N to a larger value going forward, and simultaneously kick off a background task to distribute historical data across the new set of shards. However, queries would not have access to any historical data during this time period, rendering the product pretty much unusable until the background task is complete. For large customers, this can take days. We decided that any significant downtime should be avoided as it could erode customer trust in the product.

Option 3: No Downtime (Preferred)

We devised the following procedure for re-sharding:

  • Initialize a new sharding spec B for the customer with a larger N than existing sharding spec A.
  • Ingest two copies of all incoming data: sending them to both A and B.
  • Kick off a background task to load historical data from A into B.
  • Once complete, configure the query servers to read data from B instead of A.
  • After a few days, delete all the data stored in A.

This procedure has the advantage that there is no query downtime. However, it requires that Mixpanel ingest and store two copies of a customer’s data for the re-sharding period. We felt that this was a reasonable cost to trade-off for high availability. In addition, it provides the nice benefit of having the data in a ‘staging area’ before it goes live, allowing us to validate it and ensure no data points were lost during the re-shard.

The Resharding Process

The first step is to create a new sharding spec with a larger number of shards.

$ carb reshard begin --project-id={project} --num-data-shards=800

This change is a no-op for the customer, it simply initializes new metadata for the customer and instructs Mixpanel’s ingestion pipeline to send each event it receives for the customer to one additional destination. We call this process teeing:

By default, ingestion sends events tagged with generation 0 to Arb to the sharding spec that is marked live. Once we create the new sharding spec, ingestion starts sending events to both the old and the new specs tagged with generation 1. The generations help us segment the data; all events with generation 1 already exist in the new sharding spec, while events with generation 0 need to be backfilled.

Next, we perform the backfill of historical data. A background job is kicked off to find all data files in A that have generation 0 events. These files are then sent to our compacter service in batches to be split up by the re-sharding factor. For instance, if we are re-sharding a customer from 200 to 800 shards, we’d typically end up splitting each data file into 4 smaller files. In order to decide which event should go into which file, we’d employ the same user-based sharding formula we use at ingestion time.

shard_id = hash(user_id) % N

This ensures that backfilled historical data ends up in the same shard as newly ingested data for the same user.

Once each file has been split up by the compacter service, we enqueue control plane messages to Arb’s storage layer, instructing it to add the files to the customer’s manifest in B. Once all these messages are processed, we will end up with a complete copy of a customer’s data, sharded by user id and then time, in B.

In addition to splitting up files, compacters also label all historical events with generation 0.

This allows us to perform validation effectively. Comparing generation 0 data between A and B tells us if the historical backfill was successful. On the other hand, comparing generation 1 data tells us if teeing at the ingestion layer is working correctly. We are able to use our own query engine to perform this validation as it will most closely match customer observations. While all customer queries, at this point, still point at A, we can manually point queries at B to perform this validation.

$ carb reshard validate --project-id={project} --new-sharding-spec=B

This validation involves counting the total number of events per day and comparing them. If there were bugs at any point in the re-sharding process and the data in B is corrupt, we can abort the process here and delete all the corrupt data without any customer impact as we haven’t actually committed anything.

If validation succeeds, all that is left to do is mark B as ‘live’, and delete sharding spec A.

$ carb reshard finalize --project-id={project} --new-sharding-spec=B

This will instruct our query engine to point all new queries to the new data, and also inform ingestion to stop teeing newly ingested data.

After a retention period of about 10 days, Arb’s collector process will eventually delete all the old data in A. The retention period ensures that we have an additional window where we could recover the old data in case the re-shard introduced some data loss or corruption that was missed during validation.

Re-sharding our largest customer

We store around 4.5 petabytes of data for our largest customer. We were facing numerous bottlenecks throughout the system and decided we needed to allocate 3200 shards for optimal performance. While doing this, we faced some challenges.

Teeing was untenable

At peak, our largest customer accounts for roughly 30% of our entire daily ingestion volume. In order to tee their data, Arb would have to ingest 30% more events than it usually does. While we can absorb such peaks over a short period of time as all incoming data is queued, we wouldn’t be able to handle the increased traffic over a period of weeks, which is how long it would take to re-shard all their historical data.

Instead, we decided to re-shard their historical data in chunks without teeing. We took advantage of the fact that they don’t import any historical data over 5 days old. This means that, once 5 days have passed, any day-partitioned data files we have created for them remain unchanged going forward.

As a result, we could re-shard their historical data up until 7 days ago (in grey) in chunks over a period of weeks without teeing their newly ingested data. Then, we started teeing, and while that was going on, we backfilled the last 7 days’ worth of data (in blue). Once that smaller dataset was re-sharded in a couple of hours, we quickly validated the re-sharded data and stopped teeing. This ensures that we applied minimal pressure on our ingestion pipeline as a result of teeing. We also determined that they had 8 hours of reduced ingestion volume every day between 11am and 7pm PST. We thus aimed to complete the teeing process within these 8 hours.

Insufficient compacter resources

Compacter nodes do the major heavy lifting for a re-shard. They pull each Arb file down from Google Cloud Storage (GCS) onto their attached Persistent Disk (PD), iterate through every event in the file, hash the user id to determine which shard it should go to, and write out the smaller files to PDs. They then upload the re-sharded files to GCS. Since this is a very I/O-intensive process, each compacter runs with a 100GB PD attached (I/O throughput scales with the size of the attached PD).

In order to process 4.5 petabytes of data in a reasonable amount of time, we needed to horizontally scale up our pool of compacters to handle the workload. We ended up hitting our regional limit on PD capacity. This prevented us from scaling up compacters more and we had to request an additional petabyte of PD quota from Google.

Impact

Once the re-shard was complete, we saw a lot of benefits across the board.

Reduced latency across the stack

The workloads we run on customer data operate independently on each shard. If there is too much data per shard, these operations take a lot longer. We also can’t effectively employ parallelism to spread out the work over multiple machines.

Average query latencies for our largest customer dropped 65% from 23s to 8s after the re-shard

Dotted line is before, solid line is after

p90 compaction times reduced 50% from 6 to 3 minutes for the customer. We now took less time to transform their data to a columnar format, leading to CPU savings at query time.

Reduced Cost

Compacter costs were down 30% compared to the prior quarter.

We were also spending 2x less CPU on compaction for them.

Given that the overall dataset had not changed, how do we explain these reduced costs?

  • Reduced write amplification by 35% — whenever customers send us new data points, we have to merge that data with existing files we already have for that shard and that day. If new data trickles in over a long period of time, we end up repeatedly compacting the same data into a columnar format, amplifying our writes and wasting CPU. With smaller shards, the files that we have to repeatedly compact are smaller, so we generate less waste.
  • As alluded to in a previous blog post, Compacters work best if they are given similarly-sized jobs. If some machines receive a handful of oversized jobs, they may be pegged on CPU, while other machines are idle. By re-sharding our largest customer, we ensure their compaction requests are sized similarly to other customers, which ensures more uniform utilization of compacters across the board.

Reduced Operational Overhead

Arb is a multi-tenant database, and our largest customers are often outliers — noisy neighbors that starve out resources for other customers. Engineers across storage and query teams had to spend valuable time fixing bottlenecks for a handful of customers, instead of making platform-level improvements for all customers. Re-sharding has reduced some of that operational overhead.

Going Forward

In the past year, we have re-sharded over 25 large customers, and continue to do so as our customers grow. Going forward, we plan to automate the discovery of customers that require re-sharding. In the future, we can even automate re-shards completely, requiring no engineering intervention to right-size customers as they grow or shrink in data volume!

If such problems interest you, come join us!

ホーム - Wiki
Copyright © 2011-2024 iteam. Current version is 2.134.0. UTC+08:00, 2024-09-28 10:19
浙ICP备14020137号-1 $お客様$