Setting Uber’s Transactional Data Lake in Motion with Incremental ETL Using Apache Hudi
The Global Data Warehouse team at Uber democratizes data for all of Uber with a unified, petabyte-scale, centrally modeled data lake. The data lake consists of foundational fact, dimension, and aggregate tables developed using dimensional data modeling techniques that can be accessed by engineers and data scientists in a self-serve manner to power data engineering, data science, machine learning, and reporting across Uber. The ETL (extract, transform, load) pipelines that compute these tables are thus mission-critical to Uber’s apps and services, powering core platform features like rider safety, ETA predictions, fraud detection, and more. At Uber, data freshness is a key business requirement. Uber invests heavily in engineering efforts that process data as quickly as possible to keep it up to date with the happenings in the physical world.
In order to achieve such data freshness in our ETL pipelines, a key challenge is incrementally updating these modeled tables rather than recomputing all the data with each new ETL run. This is also necessary to operate these pipelines cost-effectively at Uber’s enormous scale. In fact, as early as 2016, Uber introduced a new “transactional data lake” paradigm with powerful incremental data processing capabilities through the Apache Hudi project to address these challenges. We later donated the project to the Apache Software Foundation. Apache Hudi is now a top-level Apache project used industry wide in a new emerging technology category called the lakehouse. During this time, we are excited to see that the industry has largely moved away from bulk data ingestion towards a more incremental ingestion model that Apache Hudi ushered in at Uber. In this blog, we share our work over the past year or so in extending this incremental data processing model to our complex ETL pipelines to unlock true end-to-end incremental data processing.
Background
Before we dive in, let’s spend time reviewing some background on incremental processing and how it relates to traditional batch processing and newer stream processing. Using stream processing, one can process data streams as fast as possible in real time and produce more data streams for downstream processing. Stream processing systems have built-in support for dealing with late-arriving/after-the-fact data, processing data using either event time or processing time semantics. Though event time and processing time should ideally be the same, a drift is very common due to system retries, missing or corrupt data, networking delays, business logic, etc. While batch data processing can process huge volumes, they handle late-arriving data poorly and can’t process data effectively as it arrives or reconciles with late data. In order to overcome this, batch processing pipelines are typically triggered after the data settles for a few hours or by effectively recomputing the entire table or time windows repeatedly multiple times a day. This means late-arriving/after-the-fact data would need to recompute the entire partition even if the data actually changing is minimal compared to the entire partition.
Figure 1
The idea behind incremental processing is quite simple. Incremental processing extends the semantics of processing streaming data to batch processing pipelines by processing only new data each run and then incrementally updating the new results. This unlocks great cost savings due to much shorter batch pipelines as well as data freshness speedups due to being able to run them much more frequently as well. Apache Hudi was designed from the ground up to deliver incremental data processing on the lake. Streaming systems handle late-arriving data using some kind of a versioned state store with point look up abilities to read and modify the data. Similarly, Apache Hudi supports point-in-time reads, powerful indexing capabilities, optimized Merge-On-Read storage format, and indexed metadata to handle fast, frequent mutations to the table. In traditional, massive data lakes, recomputing to handle late data would mean triggering the recalculation of all affected partitions (in case of a partitioned table) and cascading this process for all downstream tables. Apache Hudi supports powerful change data capture capabilities to enable incrementally chaining the data processing all the way from ingesting data to computing modeled tables and even the downstream data applications mentioned above.
For a long time, data in traditional data lakes were assumed to be immutable. Even as the lakehouse architecture challenges this by adding transactions/updates/deletes similar to data warehouses, we believe that database-like functionality similar to stream processing state stores is required to fully realize the benefits of this incremental data processing model.
Use Cases
Let’s now examine two example scenarios at Uber, where such incremental data processing can have a significant impact.
Driver and Courier Earnings
Let’s take an example of a very important use case where incremental reads and writes can bring significant performance improvements over traditional batch data processing. Assume that a dataset contains driver earnings for every driver on a daily basis. Sometimes, riders can choose to tip drivers hours after a trip is completed, which will then be a late-arriving update to the initial record, which contains the base earnings information. Figure 2 below details how this happens for a specific driver, who has earned $10 on a trip on Monday and got a $1 tip for Monday’s trip on Tuesday morning.
Figure 2
In a typical batch ETL world, we have no intelligence on how the input data has changed. We would assume an ‘N’ day lookback (based on operational heuristics) and reprocess all partitions, updating driver earnings wherever necessary to the target table. This has proved to be a very time- and resource-exhaustive process, as there could be a very small number of records to update in partitions which are months old. In the figure below, you can see all the date partitions that are touched in a single run, along with the number of events that need to be updated for that specific date.
Figure 3
As you can see from Figure 3 above, there is even a small number of updates on driver earnings dating back several months, which would be missed when we assume a N-day lookback window. In the incremental ETL approach, all of these updates can be consumed during each execution of the incremental pipeline and reflected in the target Apache Hudi table at a record level. With this process we are able to achieve 100% data accuracy and completeness in our modeled datasets without the need to reprocess months of partitions at regular intervals.
Another use case where incremental read and upserts win over the batch processing model in both performance and cost is when we have to cater to frequent updates and get the data in our modeled tables with short SLAs. Assume there are multiple tables which contain information regarding the restaurant menu updated at various granularities. For example, one table might contain the menu item information and another could store the menu level information. Every merchant can change this information as many times as needed in a day, and in our modeled tables we would want to show the latest state of these changes as soon as possible.
Figure 4
The batch approach of catering to this use case is to pull the whole day’s changes at once and then merge this against the latest state of each existing record to get the latest picture for all the entities and then use this information to populate all the other tables. As seen in the above graph (Figure 4) the percentage of entity updates coming in per day is a significant fraction of the total number of entities (408 million delta changes as compared to 11 billion total entities). The problem with the batch approach is that the computation is done after the data is deemed complete and increases the SLA for all the downstream tables. Since it takes around 14 hours for the pipeline to complete given the large amounts of data involved, a single failure would again increase the SLA by many hours. In an incremental ETL approach, the effect of failures on data freshness SLAs is largely reduced due to the ability to run much more frequent, shorter pipeline runs.
Incremental ETL Model
Our goal was to improve the latency and quality of our modeled datasets, using Apache Hudi’s incremental processing primitives query, and then update records without rewriting entire partitions. This also reduces a lot of operational overhead to scale otherwise very large batch pipelines. By doing so, we’re able to achieve 100% data completeness for all derived datasets using this architecture. Figure 5 below details how incremental processing is enabled from raw data to derived datasets downstream from them at Uber. In this section, we present a model for choosing how to compose these incremental data pipelines, depending on the nature of the ETL logic.
Figure 5
Read and Join Strategies
In our ETL pipelines we needed to handle various types of reads and joins that can be performed using Apache Hudi. This involves incremental reads on a single source, as well as incremental reads with joins on multiple raw data, derived, and lookup tables. Additionally, we also needed to handle snapshot reads for backfills on single or multiple tables.
The table below summarizes how we handle these scenarios.
Scenario | How is it Handled? |
Incremental read on a single source | Use Apache Hudi’s incremental reader and upsert to the target table |
Incremental read + join with multiple raw data tables | Use Apache Hudi’s incremental read on the main table and perform left outer join on other raw data tables with T-24 hr incremental pull data |
Incremental read + join with multiple derived and lookup tables | Use Apache Hudi’s incremental read on the main table and perform left outer join on other derived tables fetching only the affected partitions |
Backfills use case | Use snapshot read on single or multiple tables within etl_start_date and etl_end_date |
Writes Strategies
We will now cover the various ways to apply incremental updates to partitioned and non-partitioned tables in Apache Hudi, including using upserts and insert overwrites. Additionally, we will discuss the use of targeted merge and update statements for non-incremental columns in partitioned tables. We also explored strategies for avoiding data quality issues on non-incremental columns in non-partitioned tables.
Type of Table | How is it Handled? |
Partitioned | – Use upsert to apply only the incremental updates– Use insert overwrite to update all affected partitions when performing backfill operation – Use targeted merge/update statements for non-incremental columns using Apache Spark SQL |
Non-partitioned | – Use upsert to apply only the incremental updates – Use insert overwrite when joining incremental rows with full outer join on target table to update both the incremental and non-incremental columns (to avoid DQ issues on non-incremental columns) |
Backfill Strategies
Like stream processing pipelines, incremental data pipelines also need a way to backfill tables when business logic changes. Since Apache Hudi also supports batch write operations such as insert_overwrite, we handle such backfill scenarios seamlessly by a snapshot read on the source table, followed by a concurrent write on the same table/partition. A few key design choices and features in Apache Hudi are worth highlighting in this context. Apache Hudi supports record keys and pre-combine keys, which make the backfilling process painless by allowing the incremental writer and the backfill process to operate without unintended side effects like the latest write being overwritten by the backfill process. Apache Hudi also offers the ability to run the table services that optimize and manage the table concurrently without blocking the incremental/backfill writer, helping us achieve lower SLAs for the table. Apache Hudi also enables us to ensure that the backfill process does not update or affect the checkpoints for the incremental writer.
Implementation
In this section, we will explore the basic building blocks needed to build and manage incremental ETL pipelines using Apache Hudi, Apache Spark, and Uber’s workflow management system Piper (although it should also work on a system like Apache Airflow). At Uber, we built an Apache Spark ETL framework to be able to manage and operate ETL pipelines at scale, which is scheduled through Piper. The framework is built on top of Apache Hudi’s incremental data processing tool “DeltaStreamer,” which was originally contributed by Uber and is now in use at many other organizations for similar purposes. At Uber, we now used the tool to migrate away from our legacy Hive ETL framework. Our new Apache Spark ETL framework lets users author and manage ETL pipelines with simple steps. Users can customize the frequency of the jobs and resources consumed by a job run for optimal table freshness. Following are the minimally required user inputs that define each pipeline.
Table Definition
A DDL Definition file that contains the schema information for your intended dataset and declares the table’s format to be an Apache Hudi format. An example DDL file can be seen below in Figure 6:
Figure 6
This file will contain a list of configurations, which is expected by the Apache Spark DeltaStreamer application. Below in Figure 7, you can see a sample YAML config file used in DeltaStreamer jobs.
Figure 7
Let’s go over some important configurations:
- hoodie.datasource.recordkey.field This is the primary key in the target table. Deduplication is performed on the primary key, and if there are duplicate records, they are reduced to a single record based on the largest value for the column identified by hoodie.datasource.write.precombine.field. This is very useful even for append-only tables.
- hoodie.datasource.write.operation Upsert indicates that record level updates should be performed on the target table with the payload generated from the SQL transformation.
SQL-Based Transformation
We provide a file containing the SQL transformation, which holds the business logic that DeltaStreamer will execute using Apache Spark SQL. The final payload will then be used to perform record level updates to the target table. Below is an example of a SQL transformation in Figure 8.
Figure 8
<SRC> denotes the incremental source from which the incremental read operation is performed. Every time a new run begins, DeltaStreamer will look at the latest saved checkpoint in the target table’s Apache Hudi-commit metadata and resume from the corresponding checkpoint to read new data from the upstream incremental source table.
Note that DeltaStreamer also allows for incremental reads from a Kafka source instead of an Apache Hudi table.
Code Transformations
For more advanced users, alternative to or in addition to an SQL file, they can choose to supply a custom Scala/Java Apache Spark RDD-based transformer, which will be executed at run time by DeltaStreamer. To do so, you simply have to implement the transformer Interface located in the Apache Hudi Utilities Bundle. Below, in Figure 9, you can see a simple custom transformation class, which transforms incremental data from a dimensional driver table and joins a dimensional city table:
Figure 9
Similarly, users can also provide several custom transformations that will be chained and executed sequentially.
Impact
We were able to achieve the following benefits by rolling out this new incremental framework across Uber orgs such as CoreGDW, Rides, Driver, UberEats, Finance, and Earnings.
Performance and Cost Savings
From the table below, you can see the large performance gains we observed by converting our batch ETL pipelines to use incremental reads and upserts with Apache Hudi’s DeltaStreamer. With this approach, we are able to decrease the pipeline run time by 50% and also decrease the SLA by 60%.
Pipeline | vcore_seconds | memory_seconds | Cost | Run Time (mins) |
Batch ETL of Dimensional Driver Table | 3,129,130 | 23,815,200 | $11.39 | 220 |
Incremental ETL of Dimensional Driver Table | 1,280,928 | 6,427,500 | $2.44 | 39 |
Difference | 1,848,202 | 17,387,700 | $8.95 | 181 |
% Improvement | 59.06% | 73.01% | 78.57% | 82.27% |
Batch ETL of Driver Status Fact Table | 2,162,362 | 5,658,785 | $3.30 | 94 |
Incremental ETL of Driver Status Fact Table | 1,640,438 | 3,862,490 | $2.45 | 48 |
Difference | 521,924 | 1,796 | $0.85 | 46 |
% Improvement | 24.13% | 31.74% | 25.75% | 48.93% |
Strong Data Consistency Across Active-Active Data Centers
Uber has active-active architecture across multiple data centers. Achieving 100% strong data consistency across tables in different data centers is critical at Uber’s scale to run the workload without worrying about data inconsistencies
By moving to Apache Hudi (as opposed to plain parquet tables in Hive), we were able to build a strongly consistent replication across data lakes in multiple data centers. Towards this, we replicated a table after computing it once in the primary data center and then using a replicator service that uses Apache Hudi metadata to only move incrementally changed files across.
Improved Data Quality
Apache Hudi enables the write-audit-publish (WAP) pattern with which we can perform pre-load data quality checks before publishing data–using this pattern we could prevent bad data from entering production datasets. Apache Hudi offers pre-commit validators, so when configured you can run multiple SQL-based quality checks on the data before it gets published.
Improved Observability
Apache Hudi’s DeltaStreamer emits multiple key metrics that will provide detailed insights on what is happening in the ETL execution, such as number of commits in progress, commits consumed, total records inserted/updated/deleted, etc. The Grafana dashboard below shows a few examples of metrics that are captured within DeltaStreamer. These metrics are very helpful while setting up monitoring and alerting systems to know immediately when an ETL pipeline is lagging behind its upstream source.
Figure 10 below shows the commits in progress vs. commits processed, which would tell us that the lag between upstream and downstream pipeline and every commit produced in the source table that has been processed by the incremental ETL pipeline. When there are failures in the incremental ETL, an alert will instead be sent to our on-call.
Figure 10
Figure 11
Conclusion
With the help of Apache Hudi and incremental ETL we can read the updates incrementally and only run the computation logic on the delta changes and then upsert the records into our Apache Hudi tables. There are many advantages that the incremental data processing model brings to the data engineering community, including large resource savings, lower data freshness, and optimal data completeness, all while allowing downstream workloads to follow suit. In fact, this is one of the rare opportunities where we are able to reduce the cost of a system, while also enjoying the increased performance.
While there is probably little doubt that incremental processing should be the de-facto model on the data lakehouses, there is still a lot of work to be done to unlock that. For example, in our implementation we limited ourselves to single-stream, multi-table joins, while also relying on some level of understanding of the business domain in choosing the tables to be used as incremental scans or full snapshot scans. We look forward to working alongside the Apache Hudi and open source community to fully leverage existing SQL capabilities in our framework and also to bring general-purpose incremental SQL on engines like Apache Spark and Flink to life.
Apache®, Apache Hudi, Hudi, the triangle logo, Apache Spark, Spark, and the star logo are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks.