From Big Data to Better Data: Ensuring Data Quality with Verity

Michael McPhillips
Lyft Engineering
Published in
11 min readOct 3, 2023

--

High-quality data is necessary for the success of every data-driven company. It enables everything from reliable business logic to insightful decision-making and robust machine learning modeling. It is now the norm for tech companies to have a well-developed data platform. This makes it easy for engineers to generate, transform, store, and analyze data at the petabyte scale. As such, we have reached a point where the quantity of data is no longer a boundary. Yet this has come at the cost of quality.

In this post we will define data quality at a high-level and explore our motivation to achieve better data quality. We will then introduce our in-house product, Verity, and showcase how it serves as a central platform for ensuring data quality in our Hive Data Warehouse. In future posts we will discuss how Verity addresses data quality elsewhere in our data platform.

What and Where is Data Quality?

Data quality is an amorphous term, with various definitions depending on the context. In Verity, we defined data quality as follows:

Verity’s Definition of Data Quality

The measure of how well data can be used as intended. The data is semantically correct, consistent, complete, unique, well-formed, and timely.

Five aspects of data quality — semantic correctness, consistent, complete/unique, well-formed, timely
Five aspects of data quality with the definition in italics and an example in quotes.

Now that we have defined what data quality is, where should we measure it? Data quality should be assessed across various components, each of which provides its own strengths and evaluates different aspects of data quality. For example, we can almost instantly validate that each record is well-formed and complete during event generation. Streaming compute however, empowers more complex window queries on semantic correctness. We can also evaluate timeliness of partitions and table landing times in data orchestration. Finally, as the subject of this blog post, we can assess data quality via batch compute analytics on our data warehouse, providing a comprehensive albeit slower evaluation compared to the previously mentioned methods.

Hive: Lyft’s Data Warehouse

Lyft’s largest source of consumable data is our Hive Data Warehouse. It empowers most of the analysis and experimentation done at Lyft. This is important as our business relies heavily on accurate machine learning modeling of marketplace trends — such as predicting the best price to charge for a ride or what a coupon’s utilization rate will be. As such, Hive was the first target of Verity’s data quality assessment. Our Analytic Event Lifecycle below demonstrates the workflow of how much of our data gets to Hive.

Analytic Event Lifecycle

Lyft reads and writes petabytes of data every day to Hive — much of it coming from analytic events. When a service at Lyft performs a unit of work, such as a customer interaction or a state change, we generate a record of what happened. We log these events asynchronously at the order of millions per second. These flow through Kafka, our event streaming platform, before being processed by Flink, our streaming compute framework. Flink writes data into Hive for analytic usage. After events reach Hive, Airflow ETLs (Extract-Transform-Load) create derived data sets, analysis is performed, and data for model training is extracted.

Analytic Event Lifecycle at Lyft, from generation to persistence to orchestration
Simplified view of the Analytic Event Lifecycle in Lyft’s Data Platform

Examples of Issues in Lyft’s Hive Event Data

To illustrate how the previous definition of data quality relates to Lyft’s Analytic Event Data in Hive, let us examine three real-world examples:

  1. Completeness — The core.rider_events derived dataset has some records with the session_id as null, caused by a bug in a TrinoSQL transformation inside Airflow.
  2. Semantic Correctness — The core.rider_events derived dataset shows a drastic increase in today’s cancels volume, caused by a bug in the origin web service creating the event.
  3. Timeliness — The raw_events.bike_sessions data should land in Hive within 5 minutes of being generated. A backup in Flink causes records to arrive 30 minutes late.

Consequences of Bad Hive Data

Poor data quality in Hive caused tainted experimentation metrics, inaccurate machine learning features, and flawed executive dashboards. These incidents were hard to troubleshoot, as we had no unified approach to assessing data quality and no centralized repository for results. This delay increased the difficulty and cost of data backfills. The lack of centralization in data quality also made the data discovery process inefficient, making it hard for data scientists and data engineers to identify trustworthy data.

Verity

Our solution was to build Verity, our in-house data quality platform. Verity is currently the center of all things related to data quality at Lyft.

Verity for Hive Data: The User Story

Data producers and consumers can define their data quality checks and verify the data when it is produced or before it is consumed inside Airflow or Flyte. Science and product teams can also create checks and orchestrate them on a fixed schedule.

If a contract is breached, stakeholders can be alerted with a link to a result history view on our UI for expedited debugging. This UI also provides a full-text search interface to find all existing contracts by dataset name or owning team, providing clear observability on current coverage and past performance.

Flowchart showing Check Configuration, Orchestration, Alerting and Debugging in cartoonish PNG’s.
High-level User Story of a Verity customer

Verity Implementation

Verity Goals

  • Make adding checks, viewing their result histories, and receiving alerts as easy and transparent as possible.
  • To not be tightly coupled to a particular data orchestration engine, data store, or compute technology.
  • Be reliable, fault-tolerant, and highly scalable — particularly handle extreme request volume spikes from daily event-processing ETLs.

High-level Concepts

Check Definition — The user-written configuration file defining the data quality contract and who to notify if it is breached.

Check Result— The numeric measurement of data quality at a point in time, a boolean pass/fail value, and metadata about this run.

Check Development The process of interactively creating and testing Check Definitions with our VerityCLI.

Check Orchestration How a previously configured Check Definition is dispatched.

Check ExecutionHow the data quality check is performed within Verity’s web services after being initiated by Check Orchestration.

Verity UI The Verity website where Check Results and Check Definitions can be viewed.

Check Definition

The Verity product begins with a user-written YAML definition of data quality called the Check Definition.

Query — This field generates a numeric measurement of data quality — such as a count or an average. It uses either raw SQL or our domain-specific language (DSL). Our DSL provides a fast, SQL-less short-hand for the most common queries.

Condition — This field describes how the query result is to be evaluated into a pass or a fail. It can be a fixed threshold or a statistical one.

Metadata This includes a human-readable name, a universally unique identifier (UUID), ownership information, and tags (arbitrary semantic aggregations like ‘ML-feature’ or ‘business-reporting’).

Notifier — This lists the target(s) to notify upon failure — via PagerDuty, Slack, or email.

Three Example Check Definitions

The first check addresses the completeness issue from our first example — that our rider_events.session_id is never null.

core rider events session_id is not null: # check name
metadata:
id: 90bde4fa-148b-4f06-bd5f-f15b3d2ad759
ownership_slack: #dispatch-service-dev
tags: [rides, core-data, high-priority]
query:
type: dsl
data_source_id: hive.core.rider_events
filters:
- session_id = null
condition:
type: fixed_threshold
max: 0
notifier_group:
pagerduty_policy: dispatch-service
email: dispatch-service@lyft.pagerduty.com

The next check addresses the issue from our second example. It will ensure the number of canceled rides for this day is not more than 3 standard deviations outside the 90-day historic mean. This check will be dispatched daily at 4 AM by our scheduler.

core rider events daily canceled volume is inside 3 SDs: # check name
metadata:
id: 3cb75736-1fbe-4f6d-bad5-67bf613f5d62
ownership_slack: #dispatch-service-dev
query:
type: dsl
data_source_id: hive.core.rider_events
filters:
- final_state = canceled
condition:
type: z_score
min: -3
max: 3
history: 90 days
schedule: ## field read by VerityScheduler
type: cron
expression: "0 0 4 1/1 * ? *" ## 4 AM daily
notifier_group:
slack: #dispatch-service-alerts
email: dispatch-service@lyft.pagerduty.com

This last check addresses the timeliness issue from our third example. It will ensure that the raw_events.bike_sessions table has no entries where the last_updated_ms (Hive timestamp) is more than 5 minutes later than the occurred_at_ms (generation timestamp).

raw bike sessions data is more than 5 minutes late: # check name
metadata:
id: 6eb84cc-efe3-466e-ab48-a7e1fec6ddq6
ownership_slack: #tbs-dev
query:
type: dsl
data_source_id: hive.raw_events.bike_sessions
filters:
- occurred_at_ms < last_updated_ms + 5 * 60 * 1000
condition:
type: fixed_threshold
max: 0

Check Development

In order to develop these Check Definitions, we made the VerityCLI. It enables customers to validate, backtest, and backfill their checks across date ranges interactively before committing them. This same validation is done on each pull request to ensure no bad configurations are committed, using Github CI Actions.

For example, our backtest command of the completeness check looks like this:

(veritydata venv)mmcphillips@ABCHELLOO veritydata % veritycli backtest \
--check_id 90bde4fa-148b-4f06-bd5f-f15b3d2ad759 --ds 2023–10–15

=!========================================================================
Beginning backtest for 1 date(s) and 1 check(s).
check_ids: ['90bde4fa-148b-4f06-bd5f-f15b3d2ad759']
ds_dates: ['2023–10–15T00:00:00+00:00']
=!========================================================================

SQL Query:
SELECT
COUNT(*) as result
FROM "hive"."core"."rider_events"
WHERE
ds = '2023–10–15'
AND session_id IS null

Result Set:
result
2.00

Maximum Value: 0.00
Check Result: FAILURE

=!========================================================================
overall command finished in : 2.176988840103149 seconds
aggregate results:
SUCCESS : 0
FAILURE : 1
INTERNAL_ERROR : 0
CLIENT_ERROR : 0
=!========================================================================

Check Orchestration

Airflow and Flyte

Data engineers can dispatch these checks inside Flyte, Airflow, or other systems which create or consume Hive data. To do this, we created the VerityAirflowOperator and VerityFlyteOperator. These operators dispatch checks and poll for the results. However, the operators are merely delegators — they add their own typed exceptions and retry strategies and delegate the real business logic to our VeritySDK for better maintainability.

In Airflow, we instantiate the VerityAirflowOperator by citing the check_id previously created in the Check Definition. We then add it to the Airflow DAG (Directed Acyclic Graph) in the desired position:

with DAG(dag_id="rider_events_example") as dag:

....

dq_check_blocking = VerityOperator(
task_id="completeness_dq_blocking",
check_id="90bde4fa-148b-4f06-bd5f-f15b3d2ad759",
check_datetime=ds,
is_blocking=True
)

create_staged_data >> dq_check_blocking >> exchange_data

The VerityAirflowOperator can be used in a blocking fashion to halt a DAG upon a check failure, preventing bad data from ever reaching production. This utilizes the “Stage-Check-Exchange” pattern: we create data in a staged schema, verify the data with a blocking operator, then promote it to production if it passes quality checks.

Verity Scheduled Checks

Data analysts and data scientists can dispatch Verity Scheduled Checks, which are checks orchestrated at specified times using Verity’s job scheduler. This is useful because these users are often not familiar with ETL tooling. To orchestrate Verity Scheduled Checks, they simply write the CRON expression into the Check Definition as seen in Example 2 above.

Check Execution

Now that we have seen what a user must do to onboard to Verity, let us see what happens after they do. Our system design mainly follows an asynchronous compute engine pattern. We have four web-services in the back-end (shown in teal): the Scheduler, API Server, Executor, and Notifier. This loosely-coupled, service-oriented approach allows us to evolve and scale each component independently, while limiting the blast-radius of failures.

component diagram of verity’s execution, orchestration, notifications, and UI
Component Diagram of Verity’s Backend Architecture, Orchestration, and UI

Verity Scheduler — This job scheduler performs our periodic tasks. It consumes configurations, like the Check Definitions, and sends them to the API Server to be persisted. It also orchestrates Verity Scheduled Checks.

  • Verity Scheduled Checks are isolated from any data orchestration engine, so they still run even if Airflow or Flyte are completely down. This remedies a common problem of checks not alerting because the Airflow Task never ran.

Verity API Server — This service handles all the external APIs regarding running checks as well as persisting and retrieving their results. The API Server does not execute any checks, but rather writes a message to our Check Queue, which utilizes SimpleQueueService (SQS).

  • Check Results are written and read by only check_id and time. As such, DynamoDB was a natural choice as a NoSQL key-value store. This gives us low latency and high scalability with schema flexibility and no database maintenance.

Verity Executor — This service picks up a message from our Check Queue, executes the TrinoSQL using our compute gateway, verifies the condition, then saves the results. If the check fails, we drop a message into our Notifier Queue (also SQS).

  • We use SQS to manage both Executor and Notifier tasks because it provides scalability and consumer-side fault tolerance. With SQS, Verity can withstand high peak volume and automatically retry system errors.
  • We deploy a Check Queue and Executor pair for each traffic grouping (e.g. Airflow-Blocking, VerityCLI). This yields infrastructure-level isolation and scalability; we can ensure our Airflow-Blocking checks run very fast while also ensuring that our VerityCLI checks do not interfere with them.

Verity Notifier — This consumes messages from the Notifier Queue and sends a notification to the indicated recipients.

  • Because notification dependencies can be flaky, independently retrying failed notifications avoids re-running costly SQL executions.
  • Client errors from bad SQL will automatically page the check author and prevent breaking checks from accruing.

Verity UI

The VerityUI provides a streamlined data discovery experience via the Verity Homepage. Our full-text search on the Check Definition Metadata lets users see all the checks currently being enforced and their Check Results. This has useful aggregations like owning team, table name, and tags.

Verity’s UI Homepage showing search bars on Check Metadata and the Check Definitions they return
Verity Homepage UI viewing a team’s Check Definitions.

An on-call engineer being alerted will be deep-linked to a result history view where they can quickly gain context on all past results of that check.

Verity’s UI showing the Check Results View
Verity Result History UI viewing an example check’s results

Conclusion

With Verity, we were able to improve data quality in Lyft’s Hive Data Warehouse and provide a centralized platform for vetting Hive Data. Verity made it easy to onboard with our seamless Check Orchestration integrations and Check Development tooling. We enhanced our offering with our rich VerityUI and notifications. Verity’s standardized assessment of data quality increased observability, reliability, and operational efficiency in our Hive Data Warehouse.

Wins So Far:

We currently support over 70 teams at Lyft, covering 2,500 Hive tables with 140,000 data quality validations per week. We reduced the:

  1. number of production data incidents by verifying Hive data as it is created and before it is released (over 13,000 incidents prevented to date).
  2. duration of incidents that do happen. With quick alerting and a centralized UI showing historic Check Results, users can troubleshoot faster.
  3. time scientists and analysts spend in data discovery through providing a holistic view of all Check Definitions being enforced and their respective Check Results.

Related Work

This solution for ensuring data quality in Lyft’s Hive Data Warehouse was the first step in a larger effort to comprehensively evaluate data quality in Lyft’s data platform. Next we tackled:

Real-time Checks to evaluate records of event data as they are produced. This has an almost instant time-to-detect, improving Verity's initial offering.

Anomaly Detection with statistical data quality checks to express complex patterns in Hive data and to remove much of the human burden of Check Development. We expanded and scaled our offering by utilizing Apache Spark and Python data science libraries.

Acknowledgements

Thank you to the Verity Team: Yanhong Ju, Marcos Iglesias, Jason Carey, Valentine Lin, Liuyin Cheng, Bill Graham, Knowl Baek, Leo Luo, and Evan Brim.

If you’re interested in working on big data problems like these, then take a look at our careers page.

--

--