Psyberg: Automated end to end catch up

By Abhinaya Shetty, Bharath Mummadisetty

This blog post will cover how Psyberg helps automate the end-to-end catchup of different pipelines, including dimension tables.

In the previous installments of this series, we introduced Psyberg and delved into its core operational modes: Stateless and Stateful Data Processing. Now, let’s explore the state of our pipelines after incorporating Psyberg.

Pipelines After Psyberg

Let’s explore how different modes of Psyberg could help with a multistep data pipeline. We’ll return to the sample customer lifecycle:

Processing Requirement:
Keep track of the end-of-hour state of accounts, e.g., Active/Upgraded/Downgraded/Canceled.

Solution:
One potential approach here would be as follows

  1. Create two stateless fact tables :a. Signups

    b. Account Plans

  2. Create one stateful fact table:
    a. Cancels

  3. Create a stateful dimension that reads the above fact tables every hour and derives the latest account state.

Let’s look at how this can be integrated with Psyberg to auto-handle late-arriving data and corresponding end-to-end data catchup.

Navigating the Workflow: How Psyberg Handles Late-Arriving Data

We follow a generic workflow structure for both stateful and stateless processing with Psyberg; this helps maintain consistency and makes debugging and understanding these pipelines easier. The following is a concise overview of the various stages involved; for a more detailed exploration of the workflow specifics, please turn to the second installment of this series.

1. Psyberg Initialization

The workflow starts with the Psyberg initialization (init) step.

  • Input: List of source tables and required processing mode
  • Output: Psyberg identifies new events that have occurred since the last high watermark (HWM) and records them in the session metadata table.

The session metadata table can then be read to determine the pipeline input.

2. Write-Audit-Publish (WAP) Process

This is the general pattern we use in our ETL pipelines.

**a. Write
**Apply the ETL business logic to the input data identified in Step 1 and write to an unpublished iceberg snapshot based on the Psyberg mode

b. **Audit
**Run various quality checks on the staged data. Psyberg’s metadata session table is used to identify the partitions included in a batch run. Several audits, such as verifying source and target counts, are performed on this batch of data.

**c. Publish
**If the audits are successful, cherry-pick the staging snapshot to publish the data to production.

3. Psyberg Commit

Now that the data pipeline has been executed successfully, the new high watermark identified in the initialization step is committed to Psyberg’s high watermark metadata table. This ensures that the next instance of the workflow will pick up newer updates.

Callouts

  • Having the Psyberg step isolated from the core data pipeline allows us to maintain a consistent pattern that can be applied across stateless and stateful processing pipelines with varying requirements.
  • This also enables us to update the Psyberg layer without touching the workflows.
  • This is compatible with both Python and Scala Spark.
  • Debugging/figuring out what was loaded in every run is made easy with the help of workflow parameters and Psyberg Metadata.

Let’s go back to our customer lifecycle example. Once we integrate all four components with Psyberg, here’s how we would set it up for automated catchup.

首页 - Wiki
Copyright © 2011-2025 iteam. Current version is 2.139.0. UTC+08:00, 2025-01-05 16:01
浙ICP备14020137号-1 $访客地图$