Mussel — Airbnb’s Key-Value Store for Derived Data

Shouyan guo
The Airbnb Tech Blog
9 min readOct 10, 2022

--

How Airbnb built a persistent, high availability and low latency key-value storage engine for accessing derived data from offline and streaming events.

By: Chandramouli Rangarajan, Shouyan Guo, Yuxi Jin

Introduction

Within Airbnb, many online services need access to derived data, which is data computed with large scale data processing engines like Spark or streaming events like Kafka and stored offline. These services require a high quality derived data storage system, with strong reliability, availability, scalability, and latency guarantees for serving online traffic. For example, the user profiler service stores and accesses real-time and historical user activities on Airbnb to deliver a more personalized experience.

In this post, we will talk about how we leveraged a number of open source technologies, including HRegion, Helix, Spark, Zookeeper,and Kafka to build a scalable and low latency key-value store for hundreds of Airbnb product and platform use cases.

Derived Data at Airbnb

Over the past few years, Airbnb has evolved and enhanced our support for serving derived data, moving from teams rolling out custom solutions to a multi-tenant storage platform called Mussel. This evolution can be summarized into three stages:

Stage 1 (01/2015): Unified read-only key-value store (HFileService)

Before 2015, there was no unified key-value store solution inside Airbnb that met four key requirements:

  1. Scale to petabytes of data
  2. Efficient bulk load (batch generation and uploading)
  3. Low latency reads (<50ms p99)
  4. Multi-tenant storage service that can be used by multiple customers

Also, none of the existing solutions were able to meet these requirements. MySQL doesn’t support bulk loading, Hbase’s massive bulk loading (distcp) is not optimal and reliable, RocksDB had no built-in horizontal sharding, and we didn’t have enough C++ expertise to build a bulk load pipeline to support RocksDB file format.

So we built HFileService, which internally used HFile (the building block of Hadoop HBase, which is based on Google’s SSTable):

Fig. 1: HFileService Architecture
  1. Servers were sharded and replicated to address scalability and reliability issues
  2. The number of shards was fixed (equivalent to the number of Hadoop reducers in the bulk load jobs) and the mapping of servers to shards stored in Zookeeper. We configured the number of servers mapped to a specific shard by manually changing the mapping in Zookeeper
  3. A daily Hadoop job transformed offline data to HFile format and uploaded it to S3. Each server downloaded the data of their own partitions to local disk and removed the old versions of data
  4. Different data sources were partitioned by primary key. Clients determined the correct shard their requests should go to by calculating the hash of the primary key and modulo with the total number of shards. Then queried Zookeeper to get a list of servers that had those shards and sent the request to one of them

Stage 2 (10/2015): Store both real-time and derived data (Nebula)

While we built a multi-tenant key-value store that supported efficient bulk load and low latency read, it had its drawbacks. For example, it didn’t support point, low-latency writes, and any update to the stored data had to go through the daily bulk load job. As Airbnb grew, there was an increased need to have low latency access to real-time data.

Therefore, Nebula was built to support both batch-update and real-time data in a single system. It internally used DynamoDB to store real-time data and S3/HFile to store batch-update data. Nebula introduced timestamp based versioning as a version control mechanism. For read requests, data would be read from both a list of dynamic tables and the static snapshot in HFileService, and the result merged based on timestamp.

To minimize online merge operations, Nebula also had scheduled spark jobs that ran daily and merged snapshots of DynamoDB data with the static snapshot of HFileService. Zookeeper was used to coordinate write availability of dynamic tables, snapshots being marked ready for read, and dropping of stale tables.

Fig. 2: Nebula Architecture

Stage 3 (2018): Scalable and low latency key-value storage engine (Mussel)

In Stage 3, we built a system that supported both read and write on real-time and batch-update data with timestamp-based conflict resolution. However, there were opportunities for improvement:

  1. Scale-out challenge: It was cumbersome to manually edit partition mappings inside Zookeeper with increasing data growth, or to horizontally scale the system for increasing traffic by adding additional nodes
  2. Improve read performance under spiky write traffic
  3. High maintenance overhead: We needed to maintain HFileService and DynamoDB at the same time
  4. Inefficient merging process: The process of merging the delta update from DynamoDB and HFileService daily became very slow as our total data size became larger. The daily update data in DynamoDB was just 1–2% of the baseline data in HFileService. However, we re-published the full snapshot (102% of total data size) back to HFileService daily

To solve the drawbacks, we came up with a new key-value store system called Mussel.

  1. We introduced Helix to manage the partition mapping within the cluster
  2. We leveraged Kafka as a replication log to replicate the write to all of the replicas instead of writing directly to the Mussel store
  3. We used HRegion as the only storage engine in the Mussel storage nodes
  4. We built a Spark pipeline to load the data from the data warehouse into storage nodes directly

Let’s go into more details in the following paragraphs.

Fig. 3: Mussel Architecture

Manage partitions with Helix

In Mussel, in order to make our cluster more scalable, we increased the number of shards from 8 in HFileService to 1024. In Mussel, data is partitioned into those shards by the hash of the primary keys, so we introduced Apache Helix to manage these many logical shards. Helix manages the mapping of logical shards to physical storage nodes automatically. Each Mussel storage node could hold multiple logical shards. Each logical shard is replicated across multiple Mussel storage nodes.

Leaderless Replication with Kafka

Since Mussel is a read-heavy store, we adopted a leaderless architecture. Read requests could be served by any of the Mussel storage nodes that have the same logical shard, which increases read scalability. In the write path, we needed to consider the following:

  1. We want to smooth the write traffic to avoid the impact on the read path
  2. Since we don’t have the leader node in each shard, we need a way to make sure each Mussel storage node applies the write requests in the same order so the data is consistent across different nodes

To solve these problems, we introduced Kafka as a write-ahead-log here. For write requests, instead of directly writing to the Mussel storage node, it’ll first write to Kafka asynchronously. We have 1024 partitions for the Kafka topic, each partition belonging to one logical shard in the Mussel. Each Mussel storage node will poll the events from Kafka and apply the change to its local store. Since there is no leader-follower relationship between the shards, this configuration allows the correct write ordering within a partition, ensuring consistent updates. The drawback here is that it can only provide eventual consistency. However, given the derived data use case, it is an acceptable tradeoff to compromise on consistency in the interest of ensuring availability and partition tolerance.

Supporting both read, write, and compaction in one storage engine

In order to reduce the hardware cost and operational load of managing DynamoDB, we decided to remove it and extend HFileService as the only storage engine to serve both real-time and offline data. To better support both read and write operations, we used HRegion instead of Hfile. HRegion is a fully functional key-value store with MemStore and BlockCache. Internally it uses a Log Structured Merged (LSM) Tree to store the data and supports both read and write operations.

An HRegion table contains column families, which are the logical and physical grouping of columns. There are column qualifiers inside of a column family, which are the columns. Column families contain columns with time stamped versions. Columns only exist when they are inserted, which makes HRegion a sparse database. We mapped our client data to HRegion as the following:

With this mapping, for read queries, we’re able to support:

  1. Point query by looking up the data with primary key
  2. Prefix/range query by scanning data on secondary key
  3. Queries for the latest data or data within a specific time range, as both real-time and offline data written to Mussel will have a timestamp

Because we have over 4000 client tables in Mussel, each user table is mapped to a column family in HRegion instead of its own table to reduce scalability challenges at the metadata management layer. Also, as HRegion is a column-based storage engine, each column family is stored in a separate file so they can be read/written independently.

For write requests, it consumes the write request from Kafka and calls the HRegion put API to write the data directly. For each table, it can also support customizing the max version and TTL (time-to-live).

When we serve write requests with HRegion, another thing to consider is compaction. Compaction needs to be run in order to clean up data that is deleted or has reached max version or max TTL. Also when the MemStore in HRegion reaches a certain size, it is flushed to disk into a StoreFile. Compaction will merge those files together in order to reduce disk seek and improve read performance. However, on the other hand, when compaction is running, it causes higher cpu and memory usage and blocks writes to prevent JVM (Java Virtual Machine) heap exhaustion, which impacts the read and write performance of the cluster.

Here we use Helix to mark Mussel storage nodes for each logical shard into two types of resources: online nodes and batch nodes. For example, if we have 9 Mussel storage nodes for one logical shard, 6 of them are online nodes and 3 of them are batch nodes. The relationship between online and batch are:

  1. They both serve write requests
  2. Only online nodes serve read requests and we rate limit the compaction on online nodes to have good read performance
  3. Helix schedules a daily rotation between online nodes and batch nodes. In the example above, it moves 3 online nodes to batch and 3 batch nodes to online so those 3 new batch nodes can perform full speed major compaction to clean up old data

With this change, now we’re able to support both read and write with a single storage engine.

Supporting bulk load from data warehouse

We support two types of bulk load pipelines from data warehouse to Mussel via Airflow jobs: merge type and replace type. Merge type means merging the data from the data warehouse and the data from previous write with older timestamps in Mussel. Replace means importing the data from the data warehouse and deleting all the data with previous timestamps.

We utilize Spark to transform data from the data warehouse into HFile format and upload to S3. Each Mussel storage node downloads the files and uses HRegion bulkLoadHFiles API to load those HFiles into the column family.

With this bulk load pipeline, we can just load the delta data into the cluster instead of the full data snapshot every day. Before the migration, the user profile service needed to load about 4TB data into the cluster daily. After, it only needs to load about 40–80GB, drastically reducing the cost and improving the performance of the cluster.

Conclusion and Next Steps

In the last few years, Airbnb has come a long way in providing a high-quality derived data store for our engineers. The most recent key-value store Mussel is widely used within Airbnb and has become a foundational building block for any key-value based application with strong reliability, availability, scalability, and performance guarantees. Since its introduction, there have been ~4000 tables created in Mussel, storing ~130TB data in our production clusters without replication. Mussel has been working reliably to serve large amounts of read, write, and bulk load requests: For example, mussel-general, our largest cluster, has achieved >99.9% availability, average read QPS > 800k and write QPS > 35k, with average P95 read latency less than 8ms.

Even though Mussel can serve our current use cases well, there are still many opportunities to improve. For example, we’re looking forward to providing the read-after-write consistency to our customers. We also want to enable auto-scale and repartition based on the traffic in the cluster. We’re looking forward to sharing more details about this soon.

Acknowledgments

Mussel is a collaborative effort of Airbnb’s storage team including: Calvin Zou, Dionitas Santos, Ruan Maia, Wonhee Cho, Xiaomou Wang, Yanhan Zhang.

Interested in working on the Airbnb Storage team? Check out this role: Staff Software Engineer, Distributed Storage

--

--