Spinner: The Mass Migration to Pinterest’s New Workflow Platform
Ace Haidrey | Software Engineer, Workflow; Ashim Shrestha | Site Reliability Engineer, Workflow; Dinghang Yu | Software Engineer, Workflow; Euccas Chen | Software Engineer, Workflow; Evan Li | Engineering Manager, Workflow; Hannah Chen | Product Manager, Workflow; Yulei Li | Software Engineer, Workflow
This article is a repost from the author’s original account here.
In our last blog post, we discussed how we made the decision and took the actions to move from our legacy system, Pinball, to our new system, Spinner, which is built on top of the Apache Airflow project. As a reminder, this is based off of a custom branch that branched off of Airflow version 1.10-stable with some features cherry picked from the master branch.
In this post, we will explain how we approached and designed the migration, identified requirements, and coordinated with all our engineer teams to seamlessly migrate 3000+ workflows to Airflow. We will deep dive into trade offs made, but before we do that, we want to give our learnings.
The keys to a successful migration process by our standards were:
- Understand and fulfill the gaps between Airflow and the in-house workflow system we previously had. We identify the feature gap, the security differences, and the terminology that users are accustomed to.
- Provide migration tooling to have a low-cost way to mass migrate many workflows simultaneously and have a way for verification.
- Have clear and constant user communication and onboarding materials such as wikis, training sessions, active user support, and announcements.
- Have a stateless UI scheduler partitioning enabled, and enable the kubernetes integration to provide a customized and scalable solution.
- Have clear CI/CD pipelines that make the system consistent, reliable, and efficient to maintain multiple infrastructure branches.
- Test hard — both unit tests and integration tests with staging environments to prevent breaking changes and take a cautious approach to deploying.
- Maintain health checks and comprehensive metrics with alerting to fine tune as loads increase.
The biggest points to stay clear of would include:
- Guarantee the schedule is aligned before and after the migration. The schedule interval from the previous system and the Spinner system do not always line up as the way to define schedulers differ (the old system was not cron based exactly). Therefore, prevent miss-runs and over-runs.
- Provisioning resources such as memory setup for each task to prevent task failure before starting up.
- Kubernetes pod warm up cost is not something we anticipated. The pod startup delay does have a nontrivial cost that must be acceptable overall for your team use case.
- Kubernetes pod redundant side cars can have added latency with network issues and can add scheduling latency to your workflows.
- The investment in user education and support can be high.
- The increased cost overhead of the hybrid solution to maintain the old DSL and the new Airflow DSL is nontrivial.
Now let’s explore what helped us surface these challenges.
Approaches & Requirements
Our platform had defined our requirements for migration to be:
- Minimal user code change required
- The uninterrupted execution of production workflows when being migrated
- Setting a date for and completing the deprecation of the legacy system
Given those, there were two main ways we could have conducted this migration:
- Request workflow owners to rewrite their old workflows in Airflow DSL and provide them support in this transitioning process
- Platform provides tooling to do the DSL translation directly
With method 1, it would reduce tech debt for us and the users, and the platform would not have to maintain additional infrastructure, but there are some major challenges due to all of the customized user logic and dependencies put into the legacy Pinball jobs. Even without those challenges, we did not have customer buy-in in this proposal as the cost to each team would be too many engineering hours. Lastly, this could postpone the deprecation of the legacy system as we need to depend on our customers to finish the work, making it infeasible.
Therefore, our approach turned out to be closer to method 2 — we built a migration layer into the Airflow scheduler, which translates a workflow definition in the legacy workflow system into an Airflow Directed Acyclic Graph (DAG) during dag file parsing on the fly. This meant there will be no user code changes and minimal user involvement at all, providing a transparent migration experience to the user. Every job in the legacy workflow is translated to a wrapper operator type implemented specifically to support the workflow migration use case. During its execution, the operator starts a new k8s pod that kicks off the actual logic of the legacy job using the legacy system’s image. This way, we can mimic the execution environment of the legacy system for the translated tasks.
Migration Layer
To reiterate, the goal of this project is to facilitate transparent workflow migration with minimal user efforts. With the diagram below, we show the end to end experience for facilitating scheduling the migrated workflows, and we’ll dive deeper into the respective components.
Pinterest migration schedulers and components side by side to a native scheduler and its components.
The left-side component is the Pinterest migration scheduler. This was built on top of the native Airflow scheduler and takes advantage of the multi-partition scheduler we wrote previously.
PinterestDagBag
When the scheduler is started in the migration mode, it uses a custom DagBag class, named the PinterestDagBag, to parse dags from the migration metadata file, rather than from the python dag files. To make more sense of this we need to describe how the previous system, Pinball, worked.
Pinball has a concept of Token: when a pinball workflow is ready to run, the pinball workflow parser will translate the workflow definitions into, Tokens which store all the required runtime information. The PinterestDagBag retrieves the workflow definition (a.k.a Tokens) from the workflow parser of the legacy system which is hosted in a container called Token Fetcher. It then translates the legacy workflow definition into native Airflow DAGs and tasks on the fly (i.e. Operators or Sensors).
What allows for this abstraction and translation, which doesn’t require a dag file, is actually quite simple. A dag file is essentially just an identifier of one or more dags. For native airflow, the dag file happens to carry the workflow definition, but it’s completely possible to compose the dag file in a way where it doesn’t directly contain the workflow definition and instead points to the source where the definition is hosted. We write a “dag file” which denotes where the legacy workflow definition is hosted, and we make sure the custom PinterestDagBag module is able to parse dag objects out of it. A sample migration metadata file looks like the following:
{ “cluster_name”: “core001”, “workflow_name”: “test_workflow”, “migration_date”: “2020–01–01 00:00:00”
}
The metadata is generated when the migration of a workflow is initiated (we’ll describe this more later on) and is discoverable to the scheduler. Each migration metadata file denotes how to locate the definition of a legacy workflow, with the help of a Token Fetcher container, which will be discussed in the next section.
Token Fetcher
Once the metadata is discovered by the migration scheduler, the Token fetcher container comes into play. The Token fetcher container hosts the parser of the legacy system and is running alongside with the migration scheduler. It exposes API to retrieve the specification of a legacy workflow as well as parsing the job. Each job in the legacy workflow is parsed into a job Token data structure that contains the specification and, most importantly, the job execution command, such as below:
python data_job_runner.py — job_full_class=reporter.examples.ExampleSparkJob — executor=prod_011 — job_args=”end_date=2021–12–30"
With this Token fetcher container, the PinterestDagBag module can invoke the corresponding APIs to retrieve the workflow specification and job Tokens based on a migration metadata file.
Diagram shows the interactions between components parsing a migrated file.
PinboardOnK8sOperator
Before diving into this special Operator, we want to give a refresher on what Pinboard is. If you read our first blog post, you’ll remember that it is the monorepository at Pinterest containing python written code. In the previous system, all workflows and jobs were defined in this project.
Once we have the data from the Tokens, we use a customized operator to wrap around the legacy job Token abstraction, which is PinboardOnK8sOperator. Each job Token is translated into an instance of this operator, which stores the execution command from the retrieved Token. During its execution, it launches a k8s pod carrying the pinboard build to execute the command, to mimic the job execution environment of the legacy system. This prevents any interference with Airflow’s runtime environment as well.
The serialized dag feature from Airflow is used to serialize the translated dag and tasks, which will help reduce the dag parsing overhead. The PinterestDagBag only calls into the Token Fetcher to retrieve job Tokens and do the translation when the serialized dag of the workflow doesn’t exist. Also, when a dagrun of the migrated dag is to be scheduled, the DagFileProcessor calls into the Token Fetcher again to retrieve the latest job Tokens and refresh the serialized dag. This serialized dag is also used in UI rendering, so there is no need to launch the Token fetcher container alongside with the webserver. In addition, since the properties required by the execution of the PinboardOnK8sOperator are all serializable, we use the serialized dag feature in the execution of the migrated tasks as well.
Migration Tooling
To ease the process of workflow migration, we built a UI tool for users to migrate their existing workflows to Airflow. In just a few clicks, a workflow can be unscheduled from the old system and scheduled on the new Spinner cluster. Once a workflow is migrated, its migration metadata file will be uploaded to s3 and is discoverable by our migration scheduler. The tool also supports rollback of the migration back to the legacy system, high level migration reporting, and an admin role to help users manage their migration records.
We also expose the migration API to downstream services for other systems that leverage programmatically created workflows.
UI of the migration tool that prepares migrated workflows to run on Spinner clusters.
Migration tool UI to show different stages of migration.
This tool has reduced the burden on our users since the active time to migrate a workflow is on the order of minutes, rather than hours if we require users to rewrite the code. The user goes to the UI, selects their workflow, schedules it to run in Spinner, verifies the outputs are valid (this is the one manual step), and lastly signs off by closing the migration record. This tooling was a major win for the platform and for the users — without it, we would not have completed the migration within our one year goal.
Dynamic Dag
One major feature missing from Airflow but required by our migration is called Dynamic Dag. This is to support the use cases where the dag layout is generated dynamically and may vary between scheduler processing. For example, if the dag layout is generated based on the status of some external service or data, the layout loaded from the dag file may vary and depends on the time when the dag file is parsed. The expectation is that the dag’s layout is determined when a new DagRun is scheduled. The computed layout will be saved, and the execution of this DagRun will stick to the saved dag layout. The worker can load the task from this saved layout and doesn’t need to do the dag parsing again, which may result in a different dag layout.
This feature is not supported by Airflow out of the box, and with the native implementation, it has potential issues as explained above when the worker tries to execute a task from a dagrun but the dag layout it retrieves from the dag file differs from the layout when the dagrun was created. In this case the worker was not able to get the particular task from the dag.
We built this feature on top of Airflow, where we designed a new dag type called DynamicDAG. The DynamicDAG exposes a compute_layout method. Instead of defining the tasks at the top level in the dag file, the task instantiation logic is encapsulated inside the compute_layout method. This method is only invoked when a new DagRun is created to generate the dag layout at that given time. This layout snapshot will be saved and bound to this DagRun, so that when a task from a particular dagrun is needed, the system will be able to retrieve it from the saved dag layout instead of loading it from the dag file. The following code snippet shows how to compose a dynamic dag using the DynamicDAG interface.
dag = DynamicDAG(dag_id=”dynamic_test”,compute_layout=compute_layout,
skip_early_layout_compute=True,….
)def compute_layout(dynamic_dag: DynamicDAG, execution_date: datetime = None, dagrun_conf: dict = None) -> None:“””Compute layout for dynamic DAG“””# Use random intrand_int = random.randint(1, 3)for i in range(rand_int):python_task_1 = dynamic_dag.add_task_into_dynamic_dag(operator_class=PythonOperator,task_id=f’python_task_{i}’,python_callable=python_callable,op_kwargs={‘task_id’: f’python_task_{i}’,‘execution_date’: execution_date})python_task_2 = dynamic_dag.add_task_into_dynamic_dag(operator_class=PythonOperator,task_id=f’python_task_{i}_v2',python_callable=python_callable,op_kwargs={‘task_id’: f’python_task_{i}’,‘execution_date’: execution_date})python_task_1 >> python_task_2
Note that while we created this class to help the migration, it also applies to the native workflows, which can use this class for business logic.
We have modified the major components in Airflow such as the scheduler, webserver, and executor handling to support this dag versioning feature, and the flowchart below will show the differences in the scheduler processing logic with/without the dag versioning feature. In the new design, there are two major changes for the scheduler module:
- The dag layout will be regenerated during the dagrun creation time and will be serialized, so a newly created dagrun will always be bound to a particular dag layout.
- Whenever a dag layout of a specific version is needed in the scheduler processing, such as to verify the dag integrity or to schedule task instances, the system will load and deserialize the dag layout. In this way, we can make sure the correct version of the dag is always used for scheduling and execution
Diagram shows the current way a normal Dag runs vs a Dynamic Dag run.
Kubernetes for Migration
Our infrastructure layer leverages our internal Kubernetes cluster in order to have “infinite” scalability, isolation from other tasks, easy maintenance and upgrades, and improved security.
Diagram shows how the Spinner and Pinterest Kubernetes Clusters interact via DB.
In the diagram above, we can see the migrated task case went through two iterations. The migrated task case had a worker pod that is the Airflow worker pod, which then started a migrated task pod to load the environment needed for the command to be invoked and run. This pod over pod scenario adds an extra 2–4 minutes start up, which can be a heavy cost to the user jobs. Later on, we introduced the enhanced migrated task case. With that we are able to run the migrated pod logic in the original worker pod, saving us the cost of starting up a second pod. The lifespan of the enhanced migrated task worker pod when it is executed can be seen below.
Migrated Task Worker Pod
Migrated task worker pod interaction to run logic and complete execution
The Airflow worker starts the airflow task command. That task generates a migrated task command that would be sent to the pinboard container, which is just a container that can invoke the old DSL logic and return the output status. The Airflow worker container is just monitoring the liveliness of the pinboard container and waiting for its exit status to return the state update. The UI from the user perspective when it is attempting to get the live task logs is a separate process which uses the kubernetes API to pull the logs from the host. The polling from the airflow worker for status continues in a loop until we are able to complete the tasks.
Pod Generation
Last but not least, we’d also like to mention the way the Worker pod was generated.
End to end flow for spinning up a pod to run Kubernetes execution
A more detailed explanation is shown in the graph above. The different components that help generate the spec yaml for the containers and their respective task operations are explained. From the Kubernetes Executor, when a task is scheduled to run, it would generate the Airflow worker pod Spec. Since it’s a migrated task, it would also generate the pinboard container spec and consolidate it into the migrated Airflow worker pod spec. Eventually the spec would be submitted to the kubernetes cluster to start an Airflow worker pod with both the worker container and the pinboard container.
And from the sequence chart, you might also notice some resource allocation steps. In kubernetes env, we need to predefine the resource usage for pods. Therefore, we also leverage some history data along with managed data that can be updated as configs directly from UI to help us do smart resource allocation for each migrated task. We internally create a process to track resource usage of task pods to better understand their behaviors and maximize our savings.
Deploys
Decision tree on how our system sees to kick off a new build
As mentioned in the earlier section, during the execution of migrated tasks, a separate k8s pod/container will be launched to run the actual business logic of the task using an image of the legacy system (i.e. the pinboard image). This approach is to make sure the execution behaviors of the migrated tasks stay unchanged after they are migrated. Thus, a dedicated CI/CD pipeline has been built to generate, validate, and publish this image.
The deploy lifecycle for the migrated pinboard image follows the steps:
- A Jenkins job is scheduled and builds the latest commit for the pinboard (Pinterest mono-repo) image. This is done on a scheduled cadence by our Teletraan management tool, or it is triggered manually.
- The artifact is published and then another Jenkins job looks if there have been any changes published. If so, it runs our dag validation unit tests and publishes a staging image.
- The staging image published is then picked up by our validation workflow in the canary environment that is also regularly scheduled. If a new staging image has been published, it runs a set of triggering jobs that trigger other canary workflows to validate the staging image and polls for the status of those using ExternalTaskSensors. These canary workflows are to test common job types in the old system so we have coverage for the most used operators. The canary workflow suite also includes user-contributed workflows to protect their critical workflows from a bad image pushed that could break their pipelines. Once all the sensor tasks receive success states, then we publish a production image for production use by the webserver, scheduler, and worker. If there is a failure during the canary validation test, the workflow team gets auto notified and needs to manually inspect the issue to correct it and deploy.
This canary workflow kicks off many workflows to run checks on them and then do a kick off of a new build
This deploy pipeline also allows for hotfix releases, to protect all users after hours from a full deploy, but rather, just have a specific commit released. The mono repository at times has complex imports and can cause many unintended tasks to fail as a result of having this bad import along its dependency path. This canary validation pipeline allows us to catch potential issues before any changes hit the production environment.
Dag File Syncing
As mentioned in the Migration Tooling section, the Spinner auto migration tool generates a migration metadata file that gets published to s3. This file is the identifier for the scheduler to find the workflow and job Tokens for the migrated workflow. There is a sync’er service that runs alongside the Airflow webserver and scheduler that syncs the migration metadata file on the host with the dag files from s3. This again is based on scheduler tier and partition number. As mentioned in the first blog post, we have multiple schedulers running for both migration and native cases — but a scheduler can only process either native dags or migrated dags, not both. Any new migration metadata files are sync’d to the scheduler within eight seconds, where it then gets processed by the PinterestDagBag module. The following are the existing migration scheduler partitions we have.
Image shows the different migration clusters available
Metrics
At the beginning of the workflow migration project, the majority of workflows run on our system are the migrated workflows from the legacy system. To measure system health, they need to weigh heavier on these workflows. As mentioned in the first blog post, our system level SLO is an aggregated, weighted average of all the schedulers hosted by our system across multiple clusters. Therefore, the migrated schedulers have a higher weight as they contain many more workflows and more higher tier workflows. The SLO is measured by running scheduled dags at 15 minute intervals for each scheduler which emits a stat, which we collect. If any metric is missing a point, the weighted average will fall and we measure that this metric overall does not fall below 98% uptime. Though if any metric does miss a data point, we are still notified to the workflow team, but it doesn’t notify all users unless we have the rate fall below threshold, as that signifies a bigger issue.
Diagram displays a graph for a metric tracking the SLO health of the system
Closing Remarks
Our findings are to be shared with other Airflow enthusiasts who want to explore how to alter the major components to customize their business needs. We took the base Airflow system and put in our custom modifications to allow it to work with our translation layer and orchestrate customer workflows.
We hope you found this post on how we supported the migration from the old system to Airflow at Pinterest helpful. We would be happy to respond to any questions or comments. Thank you!
Special thanks to Evan Li for managing and leading this project and to Ashim Shreshta, Dinghang Yu, Euccas Chen, Hannah Chen, Yulei Li for their contributions to this project.
To learn more about engineering at Pinterest, check out the rest of our Engineering Blog, and visit our Pinterest Labs site. To view and apply to open opportunities, visit our Careers page.