Enabling Infinite Retention for Upsert Tables in Apache Pinot

Apache Pinot™ was originally designed as an append-only OLAP (online analytical processing) database. After some redesign, it was modified to support upserts, which are UPdates plus inSERTs. This allows you to update a record for a given primary key or insert new primary keys. Deletion is a natural extension of upserts, addressing the need for efficient memory and disk usage in upsert use cases that require indefinite retention periods with deletions based on specific business needs.

This blog highlights recent feature developments in Apache Pinot that now support deletions at both memory and disk levels. It also shows how these developments have enabled Uber to sustainably support infinite retention for Pinot upsert use cases. 

Upsert is a feature of Pinot used for things like point updates, backfills, and data correction.

Figure 1 presents a high-level overview of upsert architecture, highlighting how upserts are highly memory-intensive.

Image

Figure 1: High-level architecture of upsert in Pinot.

Upsert-Metadata is an in-memory hashmap that maintains a mapping of Record-Primary-keys to Record-locations. The Record-Primary-key, a unique identifier, is used for partitioning upstream Kafka and serves as a reference for updates if they already exist in the Upsert-Metadata map. The Record-location points to the segment where the latest record for a given Record-Primary-key is stored. This entire Upsert-Metadata mapping is kept in memory for fast upsert operation, contributing to the high memory usage of upserts. To illustrate the memory-intensive nature of upserts, at Uber, our standard host with 376 GiB of memory and 1.1 TiB of disk storage experiences 80% memory utilization and approximately 10% disk utilization for upsert use cases. To draw a comparison with non-upsert append-only use-cases, these hosts experience higher disk utilization (~80%) with memory and cpu utilization at around ~30-40% (highly dependent on query shapes). 

This blog discusses strategies to improve both memory and disk footprints for upsert use cases that require very high (indefinite) retention periods.

Image

Figure 2: Example explaining the low-level architecture of upsert.

Consider Figure 2, which shows upsert behavior at the partition level within a specific server instance. Here, there’s a sealed segment S1 and a consuming segment S2. The Upsert-Metadata map stores a mapping of primary keys to record locations, represented as (segment-name, DocId). The DocId can be understood as the row number within a segment where the record is located. Figure 2 shows, via a dotted line, that when the same primary key appears in the consuming segment, the Upsert-Metadata map updates the record location to the new consuming segment, overwriting the reference to the old record.

Additionally, there’s an in-memory segment bitmap that stores the queryable records for a given segment and is updated with each ingested record. During query processing for upsert tables, this bitmap is used as an implicit filter.

When describing upserts, another important use case is deletes. Once you mark a Record-Primary-key as deleted, it shouldn’t be selected in subsequent queries anymore. They should also be subsequently deleted from in-memory and disk for cost savings.

In Pinot, you can enable the point deletes feature by setting a table-level configuration.

Image

Figure 3: Configuration to enable point-deletes.

Once the point deletes feature is enabled, Pinot starts tracking an additional bitmap called queryableDocIds. It operates similarly to validDocIds with one key difference: whenever the value in the deleteRecordColumn is true, the corresponding entries are removed from queryableDocIds but not from validDocIds. During query time, you then strictly use queryableDocIds acting as an implicit filter. Ideally, queryableDocIds and validDocIds are identical unless a delete record is present for a table.

Image

Figure 4: Architecture of the point deletes feature.

In the example in Figure 4, the only action different from the normal ingestion flow whenever a delete-record comes up is to remove the entry from queryableDocIds. The rest of the flow remains the same. This feature is very similar to adding a filter where deleteRecordColumn = false in all of your queries on this table.

Pinot doesn’t immediately delete the key from the upsert-metadata map upon receiving a deletion message. This approach ensures consistency in deletion, particularly in cases where an out-of-order event arrives after the deletion event for that key.

It’s clear that this function acts more as a soft-delete feature, as it doesn’t achieve any savings by removing metadata from the hashmap or from disk. We have a use case at Uber where the delete messages scale is around 5,000 per second, equivalent to 600 million keys being deleted daily. Based on the standard host configuration mentioned above, we can accommodate approximately 250-300 million keys per host. With a replication factor of 2, accommodating 600 million keys per day, that means adding 4 hosts daily. This becomes a critical issue if we want to support high or indefinite retention, as it results in the cost of adding 4 hosts every day just to manage the metadata of deleted records.

To address the problem mentioned above, we introduced a new feature in Pinot that removes metadata for deleted keys after a TTL window. This buffer TTL window ensures that any out-of-order events occurring within this period won’t reverse the deletion.

Image

Figure 5: Configuration to enable retention of metadata of deleted keys.

Image

Figure 6: Low-level architecture of metadata retention of deleted keys.

The design is straightforward. After each segment commit cycle, Pinot iterates through the keys in a table’s partition in the upsert-metadata-manager. If a key points to a record that exists in validDocIds but not in queryableDocIds (indicating the record is deleted), and it’s exceeded the deletedKeysTTL threshold, Pinot removes the metadata for that key from the map and marks the validDocId as invalid. As shown in Figure 6, where red indicates removal, Pinot removes the 4th row from validDocId as well.

The next section of this blog covers why Pinot marks the validDocId as invalid.

Image

Figure 7: Pseudo-code for deleted keys TTL flow.

Figure 8 illustrates the scale of deletion we achieved at Uber after enabling this feature on a table with a deletion rate of approximately 2,000 messages per second. The metric shows that up to 300 million keys were deleted from the metadata-manager map per day, considering a replication factor of 2.

Image

Figure 8: Deleted keys TTL feature impact in production.

During server restarts, Pinot loads all the keys persisted on disk back into the in-memory hashmap before triggering the deleted-keys-retention on them again. This has caused OOM issues as the stale keys weren’t removed from the disk and are in the scale of millions. This can be resolved by gradually removing the stale keys from the disk, thereby also regaining disk space savings and not loading them back into memory during restarts.

At Uber, we use the UpsertCompactionTask minion task to compact old segments and remove stale or deleted rows from the disk. For this, we use the validDocId bitmap snapshot flow. After every segment commit cycle, we snapshot these bitmaps for all segments and persist them to disk. During each task run, we loop through all these bitmaps to find the segments that’ll provide the maximum compaction efficiency, where the highest value of invalidDocs = totalDocs − validDocs

Referencing the previous examples, after an Upsert-compaction task runs on the segment, the segment on disk will be updated as shown in Figure 9. We see ‌records related to primary key=3 are removed from disk as well.

Image

Figure 9: Low-level architecture of upsert compaction. 

As mentioned earlier, during the metadata retention workflow for deleted keys, Pinot marked the validDocId as invalid for deleted primary keys. This allowed them to be snapshotted in the next cycle, enabling the removal of the deleted record entry from the disk via upsert compaction.

Figure 10 shows the impact of enabling upsert compaction for 1 of our production tables. The table grew to approximately 8 TiB in size, but after enabling compaction, the size was reduced to around 850 GiB, resulting in a space saving of about 90%. Additionally, the rate of data growth significantly decreased from approximately 85 GiB/day to about 3 GiB/day.

Image

Figure 10: Impact of upsert compaction task at production scale.

One challenge with enabling compaction along with deletion is we could end up in a situation where an older non-deleted record for a particular key isn’t compacted, but the deleted record is. During a server restart, when all segments are loaded, Pinot would incorrectly mark the record as non-deleted and start returning it as a valid primary key, leading to an inconsistent state in the table.

For example, consider a primary key PK1 with records in segments S0 and S1. In S1, the record is marked as deleted. If S1 gets compacted but S0 doesn’t due to threshold reasons in the upsert compaction flow, during a server restart, the upsert-metadata-manager map would incorrectly point PK1 to S0, even though it should be considered deleted for the end user.

To resolve this situation, we proposed a design in Pinot to maintain a state of Primary Key  →  distinct-segment-count. This means tracking the number of segments where a record exists for a given primary key. If the count is <= 1, Pinot will allow deletion of metadata on the record, followed by marking the validDocId as invalid. Pinot can now compact the deleted record, ensuring that all other records in other segments are removed.

As of writing, we’ve enabled infinite retention on upsert tables for over 20 tables, with the total primary key count across all tables being approximately 6 billion keys (without replication) and a deletion rate of around 600 million keys per day.

Infinite retention on upsert tables will benefit the following types of long-running use cases at Uber and many more:

  • Tracking Uber for Business ‌use cases where organizations are active for years and have regular updates regarding employee count, payment entities, etc.
  • Following Uber vouchers use cases where vouchers are active for years and receive updates on voucher redemption count, voucher expiry.
  • Cadence workflow analytics, as each workflow can run for hours, months, or years, with deletion enabled on closed workflows.

One challenge we have is the creation of many small segments for a particular table over time. This can lead to longer loading times during server restarts and higher query latency, as more segments need to be processed by the same number of threads during query execution.

One possible solution is to enable upsert compaction across multiple segments, merging them to create larger segments, controlling the overall segment count growth over time.

This powerful feature in Apache Pinot meets a crucial need for many Pinot users at Uber and beyond, enabling higher retention for upsert tables and supporting deletions directly within Pinot. This unlocks numerous new use cases for Pinot upserts at Uber and brings Pinot’s functionality closer to that of a row-level database.

Special thanks to the Apache Pinot™ community members who actively contributed to and reviewed the numerous changes required to enable this feature in Pinot. 

Special thanks to Navina Ramesh in designing point deletes, Robert Zych who designed upsert compaction, and Yupeng Fu for crafting the original upsert design document. 

Apache Pinot™, Pinot, Apache, the Apache feather logo, and the Apache Pinot project logo are registered trademarks of The Apache Software Foundation. ‌No endorsement by The Apache Software Foundation is implied by the use of these marks.
Cover Photo attribution: “Infinite Wine” by Adam Brill is licensed under CC BY-NC-SA 2.0. No modifications.

inicio - Wiki
Copyright © 2011-2025 iteam. Current version is 2.139.0. UTC+08:00, 2025-01-10 04:43
浙ICP备14020137号-1 $mapa de visitantes$