Sourcerer: MySQL Ingestion at Scale
In the first blog published earlier, we discussed Sourcerer — a Data Ingestion framework, part of the Myntra Data Platform (MDP), at a 10 thousand foot view and how it evolved to support large scale ingestion. In this second blog on Sourcerer, we will dive deeper into transactional data ingestion from MySQL databases into MDP.
At Myntra, data related to Orders, Logistics, Warehouse Management, Coupons, Returns, Catalog and many more systems are stored in MySQL databases. Analytics on these datasets is critical and is used to generate several monthly financial reports, optimise delivery (using ML), enhance catalog and inventory management, etc. The first step towards enabling analytics for these functional areas is to make the transactional data available as-is in our Data lake to enable further transactional analytics or historical analysis through snapshots and aggregates. This is where Sourcerer comes in.
MySQL Ingestion is one of the most prominent flows of Sourcerer, ingesting hundreds of tables, amounting to terabytes of Data per day. We will talk through the architecture of the MySQL Ingestion first and then go through some of the challenges we faced at Myntra’s scale.
Architecture
Sourcerer holds the responsibility to keep the exact snapshot of source in the Data Platform. This ingested raw Data set is then cleansed and filtered via different processing jobs and is made available to business users on the Data Warehouse for near real time or operational/transactional analytics. The architectural diagram is as follows:
Sourcerer Architecture for MySQL Ingestion
MySQL Source
We have a master-slave architecture of MySQL databases at Myntra. For a database to be ingested, a new MySQL slave is created specifically for analytical use-cases. The Master DB is used for production read-writes whereas the Slave DB acts as a read-only source thereby separating concerns.
Ingestion Pipeline
The CDC data for the database is streamed from the bin-log file in the slave using the Debezium connector. Multiple Debezium connectors are run in the Kafka Connect cluster with one connector per source database.
Debezium pushes the Data in Central Message Queue (CMQ) — Kafka in Avro Format. For each table, a new Kafka topic is created. Offset, configuration and status information of all Debezium connectors are also stored in Kafka.
While pushing the data in Kafka, Schema for each table in Schema Registry (SR) is also added. Schema Registry acts as the Data Catalog for Sourcerer as it contains all the table Schemas ingested via Sourcerer.
From Kafka, we push the data into the Data Lake using the in-house Kafka Sink Connector. This connector uses Schema from SR and maintains the same Schema for the dataset in the Data Lake. This location has all the CDC records for MySQL.
A final spark processing job de-duplicates this data based on the time of ingestion and loads it into the Data Warehouse. This final dataset is an exact replica of MySQL. This De-dup spark processing job is scheduled daily/hourly on Airflow.
For near-real time updates, we have a streaming job that takes in data from Kafka and updates it in the Data Warehouse directly. This flow is used for very critical use cases since it is heavy on processing and thus affecting the cost.
Data Quality
Data Quality is maintained using our Data Veracity Framework — Arjun. The framework enables the user to compare data between source and target, and generate the report based on the veracity definition that has been configured or provided. The generated veracity report is stored in tables so that it would be easily queryable at a later point. The report is also sent to the user via email.
Credential Store
Credentials are required to connect to source MySQL, Data Lake and Data Warehouse. All these are stored in the central vault. APIs exposed by vault are used to fetch the required credentials.
Monitoring and Alerting
Monitoring is set up on Grafana using Prometheus for Kafka and Kafka Connect clusters. Alerts are set up on important metrics like Lag, Under replicated Partitions, Offset commit failure percent and Size of data ingested, etc.
We have a status tracker database that captures the status of all connectors. A job is scheduled to check the status of connectors every 5 minutes. Alerts are sent if connectors are in the failed state. Recoverability is also supported in some cases of error.
The De-dup spark job, orchestrated using Airflow, provides us monitoring and alerting to identify delays/failures in jobs.
Ingestion Pipeline Technology choices
Kafka’s partitioned log model allows data to be distributed across multiple servers, making it scalable.
Kafka decouples data streams so there is very low latency, making it extremely fast.
Partitions are distributed and replicated across many servers, and the data is all written to disk. This helps protect us against server failure, making the data very fault-tolerant and durable.
Ingestion Services
We evaluated several technologies for setting up our Ingestion services. This service performs two actions — reading from the MySQL source — writing to Kafka AND reading from Kafka — writing to Data Lake. A brief tabular comparison is given below:
Evaluation of different technologies for Ingestion
Owing to the wide variety of connectors available, the ease of extending the framework for new connectors and streaming data is the reason why we chose Kafka Connect.
Within Kafka Connect, we chose Debezium MySQL connector as the reader from source. For writing into Data Lake, we built our custom Sink Connector.
Challenges at Scale
Let’s look at some of the challenges we faced at Myntra and how we overcame them.
Quick Updates
While ingesting from a bin-log file of MySQL slave, Debezium adds a timestamp field in every record. We used to rely on this timestamp to get the latest update. Fast-writes to the database would result in having the same timestamp for 2 or more records, thus losing the chronology and also duplication for the dedup pipeline.
We solved it by making a custom change. A new field, sourcerer_timestamp, was added. It is an amalgamation of a timestamp and a counter. The counter increases if the timestamp is the same. Using this, the chronology is maintained. The field remains a timestamp as we add a six digit counter to the timestamp in milliseconds, thus converting time to nanoseconds.
Large number of connectors
Being an expanding platform, we have to run multiple debezium connectors for ingesting multiple source databases. To handle the large number of connectors, we employed Kafka Connect workers in cluster mode. This solution is both fault tolerant and horizontally scalable. In this mode, all workers that are created are linked together using a group id. For horizontally scaling up the cluster, we just need to start a worker with the same group id on a VM. On submitting a request for a new connector, the cluster automatically spawns the new connector on a VM it deems fit based on the VMs’ capacity. If a VM goes down, the cluster will move the connectors to other VMs which are up and running, providing fault tolerance.
Kafka Connect Cluster setup
Before we discovered the cluster mode, we were running a primitive setup where we were setting up one Debezium connector on one worker process and we had multiple such workers running on a single VM. We realised that this solution was indeed a single point of failure. In case a VM goes down, the workers and subsequently the connector will go down and the ingestion for those databases would immediately stop. Scaling up for more ingestions was also getting difficult as we had to maintain a list separately of which connector is running on which worker and VM. This was neither a hassle-free horizontally scalable solution nor was it a fault tolerant one.
Schema Validation
When the schema evolves in the database, the same is propagated to the Schema Registry. If the change is not backward compatible with Data Warehouse/Lake schema enforcement, the Debezium connector fails. Manual intervention is required in such cases to correct the data.
In the majority of schema evolutions, the table is re-ingested. But, handling multiple re-ingestions at scale, that too manually, is very tedious. We created an API to automate re-ingestion of tables. It includes the following steps:
- Deleting existing data in Data Lake, warehouse and Kafka.
- Blacklisting the table from the existing connector.
- Getting the snapshot of the table from source.
- Identifying when the snapshot has completed and whitelisting the table back in the existing connector.
Schema Evolution
Another challenge we faced was schema evolution. Addition of a new column to the transactional source (a backward compatible schema change), resulted in schemas to be out of sync between source and lake/warehouse resulting in tickets to add and backfill them. The table was backfilled manually to add the column. As the ingestions grew, we got more such queries. Consequently, we decided to automate this process of schema evolution.
Schema Evolution
First, the schema gets updated in the source (1). When the updated records start flowing in, Debezium updates the schema in SR (2). The Data Lake schema automatically evolves because the Sink connector uses the same schema to dump into Data Lake (3). We modified the processing job to handle the backward compatible schema changes. It makes the necessary changes to the table in the Data Warehouse and then loads the data (4). This way, schema changes are propagated throughout the pipeline.
Data Veracity and Recoverability
During ingestion of MySQL data, there can be several points of failure. Few examples are: MySQL slave is out of sync with Master, bin-log files are corrupted, etc. Our in-house Data Veracity Framework identifies such misses. This tool generates a report and based on inconsistencies, the team re-ingests and corrects the data.
Debezium supports recoverability using the snapshotting mechanism. We can get a snapshot of the complete data or concerned data using Debezium snapshot modes. The recovered data from MySQL will have the latest sourcerer_timestamp. Hence it will overwrite the incorrect data in the Data Warehouse during deduplication.
Data recovery properties of Debezium
The property snapshot.mode specifies the criteria for running a snapshot when the connector starts. Possible settings initial, initial_only, when_needed, never, schema_only, schema_only_recovery.
If we wish to extract concerned data from a table, and not the complete snapshot, we can use: snapshot.select.statement.overrides property. Set this property to a SELECT statement that obtains only the rows that you want in the snapshot. When the connector performs a snapshot, it executes this SELECT statement to retrieve data from that table.
Validation/Testing before Production
In such a complex system, having visibility into data at different interim points is important. Having a QA setup solves this. It also increases overall developer productivity by ensuring integration and functional tests can be run on a dockerized staging environment.
One medium size VM is sufficient to set up the complete end-to-end pipeline. The VM will host a MySQL server containing test DB and tables, Kafka server, Schema Registry and Kafka Connect process containing Debezium and Sink Connector. Kafka Connect process can be started with minimum resources (2GB heap memory) as both the connectors are lightweight. The target location in the Sink Connector can point to a test location in the Data Lake. We dockerized all components to automate the setup.
The DML commands are manually run in MySQL. To test the Debezium and Sink Connector runs, we can check the data in Kafka and Data Lake respectively.
Monitoring Data Freshness
In a complex system like this — being alerted on failures, SLA breaches, are P0 issues which require the right kind of observability and monitoring in place. Along with service monitoring, data freshness is also something which stakeholders need a view of. Data freshness gives us an idea of data completeness. It is the timestamp till which we guarantee that the data until that time has been taken care of. The downstream queries/jobs are run accordingly.
We use the last_modified_date column from the source to keep track of data freshness. Alerts are raised if the delay is more than the threshold.
We are constantly working to improve the Sourcerer framework with other capabilities like streaming ingestion, unification of streaming and batch ingestion to bring in operational efficiency, self service ingestion etc. You would hear about those in future blogs on Sourcerer.
I’m also excited to hear from the rest of the community on what are some of the challenges they’ve faced and how they’ve overcome them in the comments section below. Stay tuned for further updates…
Credits*: Thanks to Amit Rana and Raghu Vengalil for their review and support.*