Upgrading Data Warehouse Infrastructure at Airbnb
This blog aims to introduce Airbnb’s experience upgrading Data Warehouse infrastructure to Spark and Iceberg.
By: Ronnie Zhu, Edgar Rodriguez, Jason Xu, Gustavo Torres, Kerim Oktay, Xu Zhang
Introduction
In this blog, we will introduce our motivations for upgrading our Data Warehouse Infrastructure to Spark 3 and Iceberg. We will briefly describe the current state of Airbnb data warehouse infrastructure and the challenges. We will then share our learnings from upgrading one critical production workload: event data ingestion. Finally, we will share the results and the lessons learned.
Context
Airbnb’s Data Warehouse (DW) storage was previously migrated from legacy HDFS clusters to S3 to provide better stability and scalability. While our team has continued to improve the reliability and stability of the workloads that operate on data in S3, certain characteristics of these workloads and the infrastructure they depend on introduce scalability and productivity limitations that our users encounter on a regular basis.
Challenges
Hive Metastore
With an increasing number of partitions, Hive’s backend DBMS’s load has become a bottleneck, as has the load on partition operations (e.g., querying thousands of partitions for a month’s worth of data). As a workaround, we usually add a stage of daily aggregation and keep two tables for queries of different time granularities (e.g., hourly and daily). To save on storage, we limit intraday Hive tables to short retention (three days), and keep daily tables for longer retention (several years).
Hive/S3 Interactions
Hive was not originally designed for object storage. Instead, many assumptions were made around HDFS when implementing features such as renames and file listings. When we migrated from HDFS to S3 it therefore required certain guarantees to ensure that datasets were consistent on list-after-write operations. We customized the way Hive writes to S3, first writing to an HDFS temporary cluster and then moving the data to S3 via an optimized distcp process that writes to unique locations during the commit phase, storing file-listing information in a separate store for fast access. This process has performed well over the past two years, but it requires additional cluster resources to run.
Schema Evolution
At Airbnb, we use three compute engines to access data in our Data Warehouse: Spark, Trino and Hive. Since each compute engine handles schema changes differently, changes to table schemas have almost always resulted in data quality issues or required engineers to perform costly rewrites.
Partitioning
Hive tables are partitioned by fixed columns, and partition columns cannot be easily changed. In case one needs to repartition a dataset, one has to create a new table and reload the entire dataset.
New Data Stack
These challenges have motivated us to upgrade our Data Warehouse infrastructure to a new stack based on Iceberg and Spark 3, which addresses these problems and also provides usability improvements.
Iceberg
Apache Iceberg is a table format designed to address several of the shortcomings of traditional file system-based Data Warehousing storage formats such as Hive. Iceberg is designed to deliver high-performance reads for huge analytics tables, with features such as serializable isolation, snapshot-based time travel, and predictable schema evolution. Some important Iceberg features that help in some of the challenges mentioned early:
- Partition information is not stored in the Hive metastore, hence removing a large source of load to the metastore.
- Iceberg tables do not require S3 listings, which removes the list-after-write consistency requirement, which can in turn eliminate the need for the extra discp job, and avoids entirely the latency of the list operation.
- Consistent table schema is defined in Iceberg spec, which guarantees consistent behavior across compute engines avoiding unexpected behavior when changing columns.
Spark 3
Apache Spark has become the de facto standard for big data processing in the past 10 years. Spark 3 is a new major version released in 2020, it comes with a long list of features — new functionalities, bug fixes and performance improvements. We focus on introducing Adaptive Query Execution (AQE) here; you can find more info on the Databricks blog.
AQE is a query optimization technique that uses runtime statistics to optimize the Spark query execution plan. This solves one of the greatest struggles of Spark cost-based optimization — inaccurate statistics collected before query starts often lead to suboptimal query plans. AQE will figure out data characteristics and improve query plans as the query runs, increasing query performance.
Spark 3 is also a prerequisite for Iceberg adoption. Iceberg table write and read support using Spark SQL is only available on Spark 3.
The diagram below shows the change we made:
Figure 1. Evolution of data compute and storage tech stack
Production Case Study — Data Ingestion
At Airbnb, the Hive-based data ingestion framework processes >35 billion Kafka event messages and 1,000+ tables per day, and lands datasets ranging from kilobytes to terabytes into hourly and daily partitions. The volume and coverage of datasets of different sizes, and time granularity requirement makes this framework a good candidate to benefit from our Spark+Iceberg tech stack.
Spark 3
The first step in migrating to the aforementioned Spark+Iceberg compute tech stack was to move our Hive queries to Spark. This introduced a new challenge: Spark tuning. Unlike Hive, which relies on data volume stats, Spark uses preset shuffle partition values to determine task split sizes. Thus, choosing the proper number of shuffle partitions became a big challenge in tuning the event data ingestion framework on Spark. Data volume of different events varies a lot, and the data size of one event also changes over time. Figure 2 shows the high variance of shuffle data size of Spark jobs processing a sampling of 100 different types of events.
Figure 2. High variance of raw data size of 100 randomly sampled events; each bar represents a single dataset
There isn’t a fixed number of shuffle partitions that would work well for all events in the ingestion framework; if we pick a fixed number for all ingestion jobs, it might be too big for some jobs but too small for others, and both would result in low performance. While we were exploring different solutions to tune shuffle partition parameters, we found that Adaptive Query Execution could be a perfect solution.
How does AQE help?
In Spark 3.0, the AQE framework ships with several key features, including dynamically switching join strategies and dynamically optimizing skew joins. However, the most critical new feature for our use case is dynamically coalescing shuffle partitions, which ensures that each Spark task operates on roughly the same amount of data. It does this by combining adjacent small partitions into bigger partitions at runtime. Since shuffle data can dynamically grow or shrink between different stages of a job, AQE is continually re-optimizing the size of each partition through coalescing throughout a job’s lifetime. This brought a great performance boost.
AQE handles all cases in our data ingestion framework well, including edge cases of spiky events and new events. One note is that flattening of nested columns and compression of file storage format (in our case, Parquet GZIP) might generate fairly small output files for small task splits. To ensure output file sizes are large enough to be efficiently accessed, we can increase the AQE advisory shuffle partition size accordingly.
AQE Tuning Experience
Let’s walk through an example to get a better understanding of AQE and its tuning experience. Say we run the example query to load one dataset. The query has one Map stage to flatten events and another Reduce stage to handle deduplication. After adopting AQE and running the job in Spark, we can see two highlighted steps get added to the physical plan.
Figure 3. Change of physical plan of the example Spark job
Now let’s take a closer look at our tuning phase. As shown in Table 1, we went through several iterations of param setting. From our experience, if the actual shuffle partition used is equal to the initial partition number we set, we should increase the initial partition number to split initial tasks more and get them coalesced. And if the average output file size is too small, we can increase the advisory partition size to generate larger shuffle partitions, and thus larger output files. Upon inspecting shuffle data of each task, we could also decrease executor memory and the max number of executors.
We also experimented with the tuned job parameters on datasets of different sizes, as shown in Table 2 and 3. From the results, we can see that once tuned, AQE performs well on datasets from zero bytes size to TB in size, all while using a single set of job parameters.¹
Table 1. Tuning AQE using example medium-size dataset
Table 2. Job stats of example small-size dataset
Table 3. Job stats of example empty-size dataset
From our result, it’s clear that AQE can adjust the shuffle split size very close to our predefined value in the Reduce stage and thus generate outputs of target file size as we expect. Furthermore, since each shuffle split is close to predefined value, we can also lower executor memory from default values to ensure efficient resource allocation. As an additional big advantage to the framework, we do not need to do any special handling to onboard new datasets.
Iceberg — Partition specs & Compaction
In our data ingestion framework, we found that we could take advantage of Iceberg’s flexibility to define multiple partition specs to consolidate ingested data over time. Each data file written in a partitioned Iceberg table belongs to exactly one partition, but we can control the granularity of the partition values over time. Ingested tables write new data with an hourly granularity (ds/hr), and a daily automated process compresses the files on a daily partition (ds), without losing the hourly granularity, which later can be applied to queries as a residual filter.
Our compaction process is smart enough to determine whether a data-rewrite is required to reach an optimal file size, otherwise just rewriting the metadata to assign the already existing data files to the daily partition. This has simplified the process for ingesting event data and provides a consolidated view of the data to the user within the same table. As an added benefit, we’ve realized cost savings in the overall process with this approach.
As shown in the diagram below, in the consolidated Iceberg table we switch the partition spec from ds/hr to ds at the end of day. In addition, now user queries are easier to write and able to access fresher data with full history. Keeping only one copy of data also helps improve both compute and storage efficiencies and ensures data consistency.
Figure 4. Change of table storage format for table consolidation
Table Consolidation Experience
Consolidating hourly and daily data into one Iceberg table requires changes in both the write and read path. For the write path, to mitigate the aforementioned issues caused by small files, we force run a compaction during the partition spec switch. Tables 4 and 5 compare the statistics from our intelligent compaction jobs with the cost of a full rewrite of all the data files associated with the daily partition. For some large tables we obtain resource savings of > 90% by leveraging Iceberg’s ability to avoid data copying during compaction.
Table 4. Compaction job comparison of example small-size dataset
Table 5. Compaction job comparison of example large-size dataset
For the read path, since most data consumers use Airflow’s partition sensors, we updated the implementation of partition sensing. Specifically, we implemented a signal system to sense empty partitions in Iceberg tables, as opposed to the prior method of looking up each Hive partition as an actual row in Hive metastore.
Results
Comparing the prior TEZ and Hive stack, we see more than 50% compute resource saving and 40% job elapsed time reduction in our data ingestion framework with Spark 3 and Iceberg. From a usability standpoint, we made it simpler and faster to consume stored data by leveraging Iceberg’s capabilities for native schema and partition evolution.
Conclusion
In this post, we shared the upgrades we applied to Airbnb’s data compute and storage tech stack. We hope that readers enjoyed learning how our event data ingestion framework benefits from Adaptive Query Execution and Iceberg and that they consider applying similar tech stack changes to their use cases involving datasets of varying size and time granularity.
If this type of work interests you, please check out our open roles here!
Acknowledgments
Special thanks to Bruce Jin, Guang Yang, Adam Kocoloski and Jingwei Lu for their continued guidance and support!
Also countless thanks to Mark Giangreco, Surashree Kulkarni and Shylaja Ramachandra for providing edits and great suggestions to the post!