Diving Deeper into Psyberg: Stateless vs Stateful Data Processing

Let’s use the signup fact table as an example here. This table’s workflow runs hourly, with the main input source being an Iceberg table storing all raw signup events partitioned by landing date, hour, and batch id.

Here’s a YAML snippet outlining the configuration for this during the Psyberg initialization step:

- job:
id: psyberg_session_init
type: Spark
spark:
app_args:
- --process_name=signup_fact_load
- --src_tables=raw_signups
- --psyberg_session_id=20230914061001
- --psyberg_hwm_table=high_water_mark_table
- --psyberg_session_table=psyberg_session_metadata
- --etl_pattern_id=1

Behind the scenes, Psyberg identifies that this pipeline is configured for a stateless pattern since etl_pattern_id=1.

Psyberg also uses the provided inputs to detect the Iceberg snapshots that persisted after the latest high watermark available in the watermark table. Using the summary column in snapshot metadata [see the Iceberg Metadata section in post 1 for more details], we parse out the partition information for each Iceberg snapshot of the source table.

Psyberg then retains these processing URIs (an array of JSON strings containing combinations of landing date, hour, and batch IDs) as determined by the snapshot changes. This information and other calculated metadata are stored in the psyberg_session_f table. This stored data is then available for the subsequent LOAD.FACT_TABLE job in the workflow to utilize and for analysis and debugging purposes.

Stateful Data Processing is used when the output depends on a sequence of events across one or more input streams.

Let’s consider the example of creating a cancel fact table, which takes the following as input:

  1. Raw cancellation events indicating when the customer account was canceled
  2. A fact table that stores incoming customer requests to cancel their subscription at the end of the billing period

These inputs help derive additional stateful analytical attributes like the type of churn i.e. voluntary or involuntary, etc.

The initialization step for Stateful Data Processing differs slightly from Stateless. Psyberg offers additional configurations according to the pipeline needs. Here’s a YAML snippet outlining the configuration for the cancel fact table during the Psyberg initialization step:

- job:
id: psyberg_session_init
type: Spark
spark:
app_args:
- --process_name=cancel_fact_load
- --src_tables=raw_cancels|processing_ts,cancel_request_fact
- --psyberg_session_id=20230914061501
- --psyberg_hwm_table=high_water_mark_table
- --psyberg_session_table=psyberg_session_metadata
- --etl_pattern_id=2

Behind the scenes, Psyberg identifies that this pipeline is configured for a stateful pattern since etl_pattern_id is 2.

Notice the additional detail in the src_tables list corresponding to raw_cancels above. The processing_ts here represents the event processing timestamp which is different from the regular Iceberg snapshot commit timestamp i.e. event_landing_ts as described in part 1 of this series.

It is important to capture the range of a consolidated batch of events from all the sources i.e. both raw_cancels and cancel_request_fact, while factoring in late-arriving events. Changes to the source table snapshots can be tracked using different timestamp fields. Knowing which timestamp field to use i.e. event_landing_ts or something like processing_ts helps avoid missing events.

Similar to the approach in stateless data processing, Psyberg uses the provided inputs to parse out the partition information for each Iceberg snapshot of the source table.

Home - Wiki
Copyright © 2011-2024 iteam. Current version is 2.139.0. UTC+08:00, 2024-12-23 04:06
浙ICP备14020137号-1 $Map of visitor$