PayPal Introduces Dione, an Open-Source Spark Indexing Library

Enable faster interaction with Hadoop data

Ohad Raviv
The PayPal Technology Blog

--

By Ohad Raviv and Shay Elbaz

Photo by Maksym Kaharlytsky on Unsplash

PayPal’s products, both public and internal, rely heavily on data processing in a large variety of techniques and technologies. We, the engineering team in PayPal’s global data science group, are responsible for providing the underlying solutions for these data products. We would like to share an interesting use case we encountered and how we solved it.

Intro

Spark, Hive and HDFS (Hadoop Distributed File Systems) ecosystems are online analytical processing (OLAP)-oriented technologies. They are designed to process huge amounts of data with full scans. From time to time, users want to use the same data for more ad-hoc oriented tasks:

  • Multi-row load— explore small sets (typically 1%) of the data by specific IDs (not random).
  • Single-row fetch — for example, building a serving layer to fetch a specific row upon a REST-API request.

These kinds of tasks are traditionally solved using dedicated storage and technology stacks (HBase, Cassandra, etc.) which require data duplication and add significant operational costs.

In this post, we describe our journey for solving this challenge by using only Spark and HDFS. We will start by introducing an example use case, generalize and define the requirements, suggest some optional solutions, and finally dive into our final solution.

Example Use Case

At PayPal, we have more than 30 million merchants on our platform. To help us detect potentially fraudulent sellers or violations of PayPal’s acceptable use policies, we periodically use automated web scanning tools to collect publicly available, non-sensitive information from selected customer websites. Given our broad merchant population, many of whom have extensive websites comprised of many pages, this effort produces large amounts of data.
Each day’s scan data, along with additional metadata, is saved into a Hive table which is partitioned by date.

In this use case, the data size is ~150TB of compressed files consisting of ~2B webpages, with an average webpage size of ~100KB.

Downstream applications access this data in the following ways:

  1. Full scan — batch analytics processes scan through all the data data to cluster similar merchants, for example. Expected runtime of hours.
  2. Multi-row load— Our data scientists are interested in a specific subset of the merchant web pages to create a data set for their model trainings. Expected runtime of minutes.
  3. Single-row fetch — Some use cases want to fetch a specific webpage given its URL. For example, REST service for customer support teams. Expected runtime of seconds.

We have noticed that this pattern of accessing data that is primarily used for batch analytics for more ad-hoc tasks appears in more domains across the organization — Risk (transaction data, login attempts), Compliance (credit reviews), Marketing (campaign management), etc.

Optional Solutions

Photo by qimono on pixabay

Hive Table

After understanding the required tasks at hand, we started by trying a very naïve solution — defining a Hive table on top of the scanned data and querying it using Spark.

This worked very well for the full scan batch processing task as this is what Spark excels at.

However, when users tried to join this table with a multi-row sample of URLs (typically 0.1–1% of the rows), it took much more than the expected few minutes. The main reasons are as follows:

  • Shuffle — the heaviest operation in distributed systems in terms of CPU and network IO. For a medium-size set of URLs, Spark uses a shuffle join (either hash-join or sort-merge join). This transfers the full webpages’ data over the network, even if most rows are filtered out by the join with the small sample.
  • De-serialization — even for a small URL sample, when Spark optimized with a map-side join (broadcast-join), it still had to de-serialize all rows of the full webpages’ data.

Single-row fetch of a specific web page upon a REST API request was even worse in terms of user experience. Users had to wait for a long time (a few minutes) until Spark scanned through all the webpages’ data.

Bucketing

Hive and Spark bucketing is a step in the right direction to achieve what we were looking for, at least for the multi-row load task. It is a mechanism to store pre-shuffled data to later leverage it in join operations by avoiding recurring shuffles of the same data. It requires a close relationship between the metadata and the data itself. Meaning, users cannot add data to the underlying folders directly and thus divert from the unmanaged data nature of Hive and Spark.

In the bucketing approach, we need to use Spark or Hive to save the data in buckets. In bucketing, all the data is saved in files, where each file contains only certain keys (by their hash values) and is sorted by the key. This way we can significantly enhance the multi-row join with the data.

However, this approach has many drawbacks:

  • We need to own or duplicate the data.
  • It supports only one key per table.
  • No off-the-shelf support for fetching a single row from the data in an SLA of seconds, as Spark has no random access API.
  • Spark and Hive bucketing are not compatible (SPARK-19256).
  • There is a Spark issue when reading from multiple bucket files (SPARK-24528).

Product Requirements

Photo by geralt on pixabay

We concluded that there is no off-the-shelf solution to enable all our required data access patterns. Based on a few more use cases we encountered from various groups across the organization, we decided to generalize our assumptions and requirements from the solution to capture as many use cases as possible:

Assumptions

  • Size — data is potentially huge. Billions of rows and each row could have many columns or columns that contain big payloads (like webpages ~MBs).
  • Modification — data is append-only — for example, every day a new partition with new data is added.
  • Key — Data has natural keys users will want to query (could be more than one key and key might not be unique).
  • Ownership — data could be owned by another team (we cannot change the data, its format, its layout, etc.)

Requirements

  • Users need to interact with the data in the three mentioned ways - batch analytics, multi-row load, and single-row fetch in the defined SLAs.
  • Avoid data duplication (cost considerations - storage, computation, operations).
  • Support multiple keys.
  • Preferably use the same technology stack.

Indexing Approach

Clarifying the assumptions and requirements led us to conclude that we need another approach. Indexing naturally came up. The idea is to have a thin layer that contains some metadata and pointers to the “real” data. This way we could potentially do most of the work on this index and avoid or defer operating on the data itself.

The indexing technique is very common in data systems and is widely used in almost all available databases, even in the big data landscape. For example, it is available in Teradata, Netezza, Google’s BigQuery and many more. It is natural to have this capability also in Spark’s ecosystem.

We have tried to look for previous efforts in this direction and found Microsoft’s Hyperspace, a recent work that tries to integrate an indexing subsystem into Spark. However, even though it has some in common with what we needed, their current functionality and general direction is not in line with our mentioned access patterns.

We decided to continue with our efforts of building an indexing system integrated into Spark, Hive or HDFS.

Key-value stores

For solving the multi-row load and single-row fetch tasks, we need a fast way to do key-value fetches. It makes a lot of sense to use a dedicated key-value service for that. There are many options to choose from such as HBase, Cassandra, Aerospike, etc. However, this direction has many drawbacks:

  • It is overkill for our task — these technologies are designed to handle much more complex scenarios with much lower latency.
  • Key-value stores manage the data — so you need to “own” the data or duplicate it.
  • They are live services. They require dedicated resources and have an overhead of operations, configurations, monitoring, etc.
  • Such systems are also used by real-time, critical applications. Massive scans by batch applications will degrade the system’s overall performance, causing latency in the critical applications.

Dione

Dione moon and Saturn

Since the above proposed directions did not meet our requirements, we decided to create and open source a new indexing library — Dione. The main idea is that the index is a “shadow” table of the original data. It contains only the key columns and pointers to the data. It is saved in a special format inspired by Avro and bucketing. Based on this index, the library provides APIs to join, query, and fetch back the original data in the required SLAs.

The index has the same number of lines as the original data but contains only key columns and a reference. It is saved in a special Avro B-Tree format.

The main advantages:

  • Relies only on Spark, Hive and HDFS. No external services.
  • Semi-managed — we don’t modify, duplicate or move the original data.
  • Supports multiple indices on the same data.
  • The index is exposed as a standard Hive table.
  • Our special Avro B-Tree format supports ad-hoc single-row fetch in SLA of seconds.
Comparing the different proposed solutions on key metrics

Architecture

Dione solves two main challenges:

  • Given a pointer (a line in the index), how to fetch the data quickly?
  • How to store the index table to meet the required SLAs?

Thus, we have built two main components — Indexer and AvroBtreeFormat. The final index solution is composed of the inter-relations between these components, although in principle each of them can stand on its own.

Indexer

The Indexer’s main goal is to solve the multi-row load task. It has two main functionalities:

Creating the index — It scans once through the data and extracts the relevant metadata. The index is saved as a standard Hive table available for users to query. We currently support indexing data in Parquet, Avro, and SequenceFile formats, and we plan to support more formats in the future.

Indexer scans through the data and saves required metadata on each row to allow us to fetch back the data later

Using the index — Given the index metadata columns, we provide an API to fetch the original data quickly. Thus, users can load the index table as a standard Hive table, filter it with a standard Hive/Spark join with their data sample, and use the API to read the original data for the resulting subset. This way we avoid shuffling and de-serializing the entire data.

Avro B-Tree File Format

The Indexer solves the multi-row load task. We still need to solve the single-row fetch task as using Spark to scan through all the index table does not meet our required SLA. For that, we decided to leverage another degree of freedom we have — the index’s storage format. Inspired by Avro’s SortedKeyValueFile, bucketing and traditional databases indexing systems we decided to create a “new” file format — Avro B-Tree.

From a technical viewpoint, it is just an Avro file compatible with any Avro reader. However, we added another field to each row with reference (not related to the Index pointer) to another row in the same file. In addition, we sorted the rows in each file in a B-Tree order, so when we need to randomly search and fetch by a certain key, we will minimize the number of hops while reading the file.

Leveraging Avro blocks as B-Tree Nodes. Each node is sorted internally, and each row can point to the beginning of another Node.

To understand how we use this Avro B-Tree file format in our indexing system, let’s look at the index that was created by the Indexer. The Index data is saved as a shadow table of the original data table. This is the file structure:

  • Folders for example, each folder contains data from a certain date (standard partitioning).
  • Files — each file contains rows with the same key hash-modulo (like bucketing).
  • File format — rows in a file are saved in a B-Tree structure.

When a user triggers a single-row fetch API request, we can go straight to the requested folder (by the given date request), then straight to the right file by the key’s hash, and finally traverse the file by the B-Tree order until we find the requested key. This process takes about one second and meets our required SLA.

Advantages of the Avro B-Tree solution:

  • Compatible with all Avro readers.
  • Avro has good random-access support to read from a specific file offset.
  • B-Tree minimizes the number of hops when searching the index.
  • Enables a forward-only seeks in the file until we find the desired key.
  • Avro saves the data in blocks, so we have set each block to be the size of our B-Tree’s node.

Spark API

Both the Indexer and Avro B-Tree File Format libraries are independent packages and rely only on HDFS. Users can save any table in the Avro B-Tree format so it will be accessible for both batch analytics with Spark and single-row fetches. In our full indexing solution, we use both packages. For a simplified Spark user experience, we added a high-level Spark API for creating and using an index. The API is available in Scala and Python.

Index owner code examples

Define an index for table `crawl_data` (run once):

Scan the data table and update the index (run on data table update):

Index client code examples

Multi-row load:

Single-row fetch:

Summary

To recap, we have built Dione, a Spark indexing library, to enable users to leverage their batch analytics data for more interactive tasks like multi-row load and single-row fetches with significantly improved SLAs. We have open-sourced this library to share this functionality with the community and get feedback.

We already have many open issues with interesting features and directions we can add to the core functionality, including join optimizations, better integration with Spark’s optimizer, “time-machine” loading capabilities, and more.

We welcome you to try it out and interested to hear your thoughts. Feel free to open issues and contribute code.

--

--