Riverbed Data Hydration — Part 1

A deep dive into the streaming aspect of the Lambda architecture framework that optimizes how data is consumed from system-of-record data stores and updates secondary read-optimized stores at Airbnb.

Overview

In our previous blog post we introduced the motivation and high-level architecture of Riverbed. As a recap, Riverbed is a part of Airbnb’s tech stack designed to streamline and optimize how data is consumed from system-of-record data stores and update secondary read-optimized stores. The framework is built around the concept of ‘materialized views’ — denormalized representations of data that can be queried in a predictable, efficient manner. The primary goal of Riverbed is to improve scalability, enable more efficient data fetching patterns, and provide enhanced filtering and search capabilities for a better user experience. It achieves this by keeping the read-optimized store up-to-date with the system-of-record data stores, and by making it easier for developers to build and manage pipelines that stitch together data from various data sources.

In this blog post, we will delve deeper into the streaming aspect of the Lambda architecture framework. We’ll discuss step by step its critical components and explain how it constructs and sinks the materialized view from the Change Data Capture (CDC) events of various online data sources. Specifically, we’ll take a closer look at the join transformation within the Notification Pipeline, illustrating how we designed a DAG-like data structure to efficiently join different data sources together in a memory-efficient manner.

To make the framework and its components easier to understand, let’s begin with a simplified example of a Riverbed pipeline definition:

{
Review { id review

User {

id firstName lastName

}

}
}

Riverbed provides a declarative schema-based interface for customers to define Riverbed pipelines. From the sample definition above, a Riverbed pipeline is configured to integrate data sources from the Review and User entities, generating Riverbed sink documents with the review ID as the materialized view document ID.

Based on this definition, Riverbed generates two types of streaming pipelines:

  • Source Pipelines: Two pipelines consume CDC events from the Review and User tables respectively and publish Apache Kafka® events known as notification events, indicating which documents need to be refreshed.
  • Notification Pipeline: This pipeline consumes the notification events published by the source pipelines and constructs materialized view documents to be written into sink stores.

Now, let us delve deeper into these two types of pipelines.

Source Pipeline

Picture 1. High-level system diagram of Riverbed

Picture 1 shows the Source Pipeline as the first component in Riverbed. It is an auto-generated pipeline that listens to changes in system-of-record data sources. When changes occur, the Source Pipeline constructs NotificationEvents and emits them onto the Notification Kafka® topic to notify the Notification Pipeline on which documents should be refreshed. In the event-driven architecture of Riverbed, the Source Pipeline acts as the initial trigger for real-time updates in the read-optimized store. It not only ensures that the mutations in the underlying data sources are appropriately captured and communicated to the Notification Pipeline for subsequent processing, but also is the key solution for the concurrency and versioning issues in the framework.

While the emphasis of this blog post is the Notification Pipeline, a detailed exploration of the Source Pipeline — especially its critical role in maintaining real-time data consistency and its interaction with Notification Pipelines — will be discussed in the next blog post of this series.

Notification Pipeline

Picture 2. Notification Pipeline components

The Notification Pipeline is the core component of the Riverbed framework. It consumes Notification events, then queries dependent data sources and stitches together “documents” that are written into a read-optimized sink to support a materialized view. A notification event is processed by the following operations:

  • Ingestion: For every change to a data source that the Read-Optimized Store is dependent on, we must re-index all affected documents to ensure freshness of data. In this step, Notification Pipeline consumes Notification events from Kafka® and deserializes them into objects that simply contain the document ID and primary source ID.
  • Join: Based on these deserialized objects, Notification Pipeline queries various data stores to fetch all data sources that are necessary for building the materialized view.
  • Stitch: This step models the join results from various data sources into a comprehensive Java Pojo called StitchModel, so that engineers can perform further customized data processing on it.
  • Operate: In this step, a chain of various operators including filter, map, flatMap, etc, containing product-specific business logic can be applied to the StitchModel to convert it into the final document structure that will be stored in the index.
  • Sink: As the last step, documents can be drained into various data sinks to refresh the materialized views.

Among these operations, Join, Stitch and Sink are the most important as well as the most complicated ones. In the following sections, we will dive deeper into their design.

Data Source Join

One of the most crucial and intricate operations in Riverbed’s Notification Pipeline is the Join operation. A Join operation starts from the primary source ID and then fetches data for all data sources associated with the materialized view based on their relationship.

JoinConditionsDag

In Riverbed, we use JoinConditionsDag, a Directed Acyclic Graph, to store the relationship metadata among data sources, where each node represents one unique data source and each edge represents the join condition between two data sources. In the Notification Pipelines, JoinConditionsDag’s root node is always a metadata node for the notification event which contains the document ID and the primary source ID. The join condition connecting to the notification event node reflects the join condition to query the primary source. Below is a sample JoinConditionsDag defining the join relationship between the primary source Listing and some of its related data sources:

Picture 3: JoinConditionsDag Sample

Given notification events are used to indicate which document needs to be refreshed and does not contain any source data, Notification Pipeline joins data sources starting from the primary source ID provided by the Notification event. Guided by the JoinConditionsDag, when the Notification Pipeline processes a Notification event containing the primarySourceId, it queries the Listing table to fetch Listing data where the id matches primarySourceId. Subsequently, leveraging this Listing data, it queries the ListingDescription and Room tables to retrieve listing descriptions and rooms data, respectively, where the listingId equals id of Listing. In a similar manner, RoomAmenity data is obtained with roomId matching the id of the Room data.

JoinResultsDag

Now, we have the JoinConditionsDag guiding the Notification Pipeline to fetch all data sources. However, the question arises: how can we efficiently store the query results? One straightforward option is to flatten all the joined results into a table-like structure. Yet, this approach can consume a significant amount of memory, especially when performing joins with high cardinality. To optimize memory usage, we designed another DAG-like data structure named JoinResultsDag.

Picture 4: JoinResultsDag Structure

There are two major components in a JoinResultsDag. Cell is the atomic container for a data record. Each cell maintains its own successor relationships by mapping successor data source aliases to the CellGroups. CellGroup is the container to store the joined records from one data source. Each data source table record is stored in each Cell.

As mentioned above, the biggest difference and the advantage of using a DAG-based data structure instead of using the traditional flat join table is that it can efficiently store a large amount of join result data especially when there is a 1:M or M:N join relationship between data sources. For example, we have one pipeline to create materialized views for Airbnb Listings with information about all their Listing rooms, which also have lots of room amenities. If we use the traditional flat join table, it will look like the following table.

Obviously, storing joined results using a flat table structure demands extensive resources for both storage and processing. In contrast, JoinResultsDag effectively mitigates data duplication by allowing multiple successor nodes to refer back to the same ancestor nodes.

Picture 5: JoinResultsDag Example

Now with JoinConditionsDag representing the relationship among all data sources and JoinResultsDag storing all the results, joins can be performed in Riverbed roughly as follows:

Starting from the NotificationEvent, Riverbed first initializes a JoinResultsDag with the deserialized Notification event as root. Then guided by the JoinConditionsDag and following a depth-first-search traverse, it visits the data store of each source, queries data based on the join conditions defined on the JoinConditionsDag edges, encapsulates the query results rows inside each Cell and then continues fetching the data of its dependencies until finished visiting all data sources.

Stitching of Data

With the joined results now stored in JoinResultsDag, an additional operation is necessary to transform these varied data pieces into a more usable and functional model. This enables engineers to apply their custom operators, mapping the data onto their specifically designed Sink Document. We refer to this process as the Stitch Operation, resulting in what is known as the StitchModel.

The StitchModel, a Java POJO derived from the custom pipeline definition, serves as the intermediate data model that not only contains the actual data but also contains useful metadata about the event such as document ID, version, mutation source, etc.

After the StitchModel metadata is generated, with the help of the JoinResultsDag, the Stitch operation is more straightforward. It maps the JoinResultsDag into a JSON model with the same structure and then converts the JSON model into the custom defined Java POJO utilizing the GSON library.

Sink data

The final stage in Riverbed’s Notification Pipeline is to write documents into data sinks. In Riverbed, sinks define where the processed data, now in the form of documents, will be ingested after the preceding operations are completed. Riverbed allows for multiple sinks, including Apache Hive(™) and Kafka®, so the same data can be ingested into multiple storage locations if required. This flexibility is a key advantage of the Notification Pipeline, enabling it to cater to a wide variety of use cases.

Riverbed writes documents into data sinks via their write APIs. For the best performance, it encapsulates a collection of documents into the API request and then makes use of the batched write API of each data sink to update multiple documents efficiently.

Summary

In conclusion, we’ve navigated the critical steps of Riverbed’s streaming system within the Lambda architecture framework, focusing on the construction of materialized views from CDC events. Our highlight on the join transformation within the Notification Pipeline showcased a DAG-like structure for efficient and memory-conscious data joining. This discussion has shed light on the architectural approach to constructing materialized views in streaming and introduced innovative data structure designs for optimizing streaming data joins. Looking ahead, we will delve deeper into the Source Pipeline of the streaming system and explore the batch system of Riverbed, continuing our journey through advanced data architecture solutions.

If this kind of work sounds appealing to you, check out our open roles — we’re hiring!

首页 - Wiki
Copyright © 2011-2024 iteam. Current version is 2.132.0. UTC+08:00, 2024-09-19 14:00
浙ICP备14020137号-1 $访客地图$