Spinner: Pinterest’s Workflow Platform

Ace Haidrey | Software Engineer, Workflow; Ashim Shrestha | Site Reliability Engineer, Workflow; Dinghang Yu | Software Engineer, Workflow; Euccas Chen | Software Engineer, Workflow; Evan Li | Engineering Manager, Workflow; Hannah Chen | Product Manager, Workflow; Yulei Li | Software Engineer, Workflow

This article is a repost from the author’s original account here.

Four large circles in a horizontal line. The first circle has text that states 10+ clusters, the second states 4K+ total flows, the third states 10K+ daily flow execution, the final circle states 38K+ Daily Job Execution

Workflow Scale at Pinterest Before Migration to Airflow

Since its inception, Pinterest’s philosophy has always been centered around data. As a data driven company, that means all data ingested is stored for further use. This looks like 600 terabytes of new data every day, encompassing over 500 petabytes of total data. At this scale, big data tooling plays a critical role in enabling our company to gather meaningful insights. This is where the workflow team comes in. We help facilitate over 4000 workflows, which produce 10,000 daily flow executions and 38,000 daily job executions on average.

Background

Back in 2013, Pinterest built an in-house scheduler framework named Pinball. This solution suited the company’s needs at that time, but it was not able to scale up with increasing requirements to serve other products and services both internally and externally. The following limitations became increasingly apparent:

  • Performance:
    – the schedule/job start delay time (the time between when a job is scheduled to begin and when it actually begins) was higher than desired.

  • Scalability:– the components of the system are stateful, complicating horizontal scaling

    – the central metastore service became a single point of failure.

  • Maintenance:– infrastructure upgrades required draining worker processes, setting up a fallback host to bring up to swap and other complications.– having multiple clusters due to load partitioning added the additional overhead of upgrading and monitoring multiple (10) clusters.

    – allotted worker slot scale up was involved and it added load onto stateful main node which caused latency or bottleneck issues

  • Isolation:– code isolation for the users is problematic as Pinterest has a mono-repo for all python code.

    – a bad import can break many workflows due to dependency entanglements.

  • Features:– Access Control Lists (ACL) and Audit logging missing– execution stats virtualization missing

    – other major features were lacking in our system such as webserver authentication, quick navigation, and more

  • Documentation:– since the project was in-house, documentation was lacking and examples were even more lacking beyond searching code for similar workflows.

    – having a community in which to share solutions, continually improving the product, etc. goes a long way to give users more support.

  • Testing:
    – users were not easily able to bring up their own dev cluster to do end to end testing.

With the above pain points, it became clear that fundamental changes were needed, which created an opportunity to explore what solutions already existed before we set out to solve the problems of the current system with another in-house solution.

Why Airflow?

In 2019, we did some initial analysis on Spotify’s Luigi, LinkedIn’s Azkaban, and a few alternative options, but we ultimately moved forward with Apache’s Airflow for the following reasons:

  • Goal Alignment: the features our users have asked for are either already built in Airflow or can be added onto the current pillars.
  • Industry: it is widely adopted by many organizations, has an active community committing, fosters lots of discussions and has good documentation for our users to look into.
  • DSL: it is python based which aligns with our previous system, and our users will have a reduced discrepancy here.
  • Code: Airflow is written in modules, making it easier to have separate components to connect custom pieces.
  • Scalability: it contains stateless components where restarts can recover, the UI pulls from the central DB, and allows us to plug in major pieces for our kubernetes infrastructure and partitionable scheduler.
  • Reputation: overall the community seems happy with Airflow’s offering

Disclaimer: If you are not sure what Apache Airflow is, it’s an open source framework to help schedule, author, and maintain workflows. You can read all about it here.

Benchmarking

In order to decide if Airflow could meet our needs, we did a series of performance, load, and stress tests with varying parameters such as: number of dags, number of tasks per dag, number of task runs and task instances running simultaneously, different concurrency settings, and the number of records in the db. The goal of these tests was to measure performance mimicking some of our production loads, identify to which extent scale is needed, estimate the cluster scope needed, and identify bottlenecks and limitations that we will need to address in order to commit to this framework.

Before running the testing scenarios, we needed to make modifications to the source code to set up a valid proof of concept (POC) cluster. First, we modified the version of Airflow we were on; we branched off of v1.10-stable and pulled in cherry picks from the master branch to support the SerializedDAG model. Also, due to a different setup of the kubernetes (k8s) environment here at Pinterest, we created a modified k8s executor to submit tasks to our internal k8s cluster. Additionally, we built a modified scheduler to be partitioned, and we added additional stats to record accurate measurements to our opentsdb stat collection. The test shown below is an example of one of the tests we conducted to help us define the performance.

All setups were run on AWS nodes and included two UI nodes, one scheduler node (c5.2xlarge), one mysql host (1 primary/1 secondary, c5d.4xlarge), customized k8s executor with worker pods with 300 Mhz CPU, 300 MB requested memory.

All setups were run on AWS nodes and included two UI nodes, one scheduler node (c5.2xlarge), one mysql host (1 primary/1 secondary, c5d.4xlarge), customized k8s executor with worker pods with 300 Mhz CPU, 300 MB requested memory.

After conducting the tests we determined the following:

  • the UI node performs well and latency is tied to the number of db records on different views
  • mysql was not the bottleneck we previously assumed, because it is a single central host that is connected by the scheduler, webserver and workers
  • scheduler performance decreases after maintaining 1000 dags
  • the bottleneck detected during the performance test was on the kubernetes submission and result parsing when a single scheduler is under extreme load
  • the custom kubernetes executor has a task queue built up when the system is under heavy load, meaning delays
  • To meet our goal for the scheduling delay time, we must keep the number of dags under 1000 to ensure acceptable performance for a single scheduler. Therefore a multi-scheduler is necessary.

The result of the performance test gave us a very clear picture of how the system would perform under different settings, and where effort is needed to remove the bottlenecks.

Comparing this to the previous system, Pinball, we found:

  • under similar load as all of our Pinball clusters combined, the Airflow POC has a smaller scheduling delay (50s vs 180s)
  • with a total load combined into one cluster, the system holds 3000 DAGs with 25 tasks each at about a 10 minute delay, 5x better than Pinball
  • the UI page load time <1s for 95th percentile of users, with a mimicking load of 250 concurrent page accesses, and the performance is minimally impacted by the number of dags as it is paginated; with Pinball clusters latency grew with the number of dags

Based on the performance testing, then, we confirmed we can host a single cluster for our entire load if we beef up the kubernetes executor to boost performance and stability and bring it to production level scalability, while also scaling the multi-scheduler solution to shard/partition the load and maintain the performance.

From a user perspective, we can customize the settings and leverage the out of the box Airflow features to address the feature asks, and we will provide a single cluster for the majority of the load (we have to have separate clusters for pii and sox workflows due to security concerns) so that the workflow data can be searched within the same cluster.

Pinterest Workflow System

Spinner architecture overview — schematic to illustrate all the different components and their interconnections.

Spinner architecture overview

The above schematic shows the end to end workflow system. Each component will be explained more deeply in the relevant sections. The external clients shown in the schematic (such as EasyFlow, Galaxy, Flohub, and Monarch) interact with Spinner and need not be mentioned in more detail.

Serialized DAG

The serialized dag representation is important for two major reasons:

  1. Performance: with thousands of dags on the same cluster, a cached db representation of the workflow yielded better performance than having the dag file processed for each call
  2. Migration: we need to migrate thousands of workflows from our legacy Pinball system to Airflow. With the differing DSL, we will need to process the workflow representation into the database to empower all the UI features such as code view, render view, etc. because these views otherwise expect a dag file, which we won’t have. The migrated workflows will be discussed in more detail later.

Webserver

This stateless webserver will be the entry point for the users to view their workflow states and history. We enabled dag level access control (DLAC) on the cluster, and enforce that each dag must pass at least one role (exemplified below), so that only users who have permissions to the workflow can access the workflow. By default, we have setup a role, spinner-users, that will be set to all workflows. At minimum, this default role will have read access to each dag; this provides platform maintainers access to read logs, see history, etc. Users can then create additional roles, which are provisioned by the platform team, to assign to their specific workflows, and prevent other users from being able to view or take actions on them. At this time, it is a manual process to provision roles, that we hope to automate down the line — creating roles and assigning users to those roles.

dag = DAG( dag_id='example_dag', access_control={SPINNER_USERS: {'can_dag_read','can_dag_edit'}}, ...

)

These permissions are synced from the dag files to the DB when the dag is parsed by the scheduler, i.e. when the DagModel is being overwritten. The user also can force the refresh by refreshing the dag in the UI. And of course, if the webserver process is restarted, it will reconstruct the webapp, syncing all workflow permissions at startup. We added syncing to all these times rather than just when a DagModel is updated.

Along with this, we also use the out of the box event logging to add to the audit logs any time a user takes an action on the workflow. This has a full trace history of anything done to the workflow.

Lastly, for scalability, we set up multiple webserver nodes to host the UI service, which each webserver host having 4 threads to handle the requests. When deployments happen, we do rolling deployments so that there is no downtime for the users.

Multi-Partition Scheduler

Our goal is to have one Spinner cluster with one interface to see all the dags, in order to avoid the maintenance, overhead and confusion of multiple clusters we had in our old workflow system. For this purpose, it is not feasible to have a single scheduler to manage all the dags. Even with vertical scaling, we would hit limitations, so increasing the dag parsing parallelism is not a long term solution. We need to establish multiple schedulers where each scheduler looks at a different dag partition. We considered AIP-15, but the timeframe for its implementation didn’t align with our needs. Therefore we built an in-house solution which supports our workflow tiering requirements.

Scheduler partitioning flow comparing old system vs our redesign.

To make this solution possible, we modified the scheduler implementation, as the current scheduler is stateful and doesn’t allow running more instances simultaneously. At the same time, we needed to prevent interference from attempting to schedule the same dags/tasks. Our rule based partitioner is based on the tier and partition number, which are retrieved from its dag location as touched upon above. The partitioner tags a given dag to a specific partition and assigns each partition to one scheduler to manage. In this way, each scheduler will only parse, dispatch, and maintain status for the dags it owns, and no two scheduler nodes will clash. With this multi-scheduler setup, we also fulfill the platform requirement to support higher priority workflows with even tighter SLAs on scheduler delay time, since higher-priority workflows will be put in a dedicated scheduler partition with a tighter limit on load.

We configure the dag folder path for each scheduler host to point to a specific partition. The UI will still render all of the dags as that process uses the entire dag paths, despite the partitioning, so that users will still experience a single entrance. The UI will still show the partition tag for the task instances and ensure that whatever action is taken on the task, the right scheduler will be able to pick up the work for execution.

Spinner Workflows & Pinterest Tiering

Before we dive into the repository structure, we need to discuss the Pinterest-specific tiering structure. Workflows and systems are labeled based on criticality, with Tier1 as the highest and Tier3 is the lowest. In our data organization, we give higher priority and more resources to tier1 workflows, which will be explained with the dag paths in the repository structure. We have modified the DAG definition to require a tier field to be passed in, such as the following:

dag = DAG( dag_id='example_dag', tier=Tier.TIER1, ...

)

We have created a new single repository for our users to add their workflows to, in order to control a few things:

  • we need a way to sync the workflows with the scheduler, the webserver, and the kubernetes pods, so having a single place to sync from makes building this process simpler and more controlled
  • we need to establish tier based paths so that users for tier1 workflows put their workflows in the tier1 base path, for tier2 a tier2 path, and so on and so forth
  • we need to set rules for code reviews as maintainers of the platform
  • we need all of the code to go through a quick unit test suite on any commit to ensure code safety, code quality, and code control

The “dags” folder contains the user authored dags. The sub folders under the dag folder are maintained by the workflow team following the naming convention:

{cluster_type}_tier_{tier_index}_{partition_index}

An example of the structure can be seen below.

  • The cluster type will be spinner, pii, test or sox.
  • The tier index will be as described above: 1, 2 or 3.
  • The partition_index begins at 0 and increases as needed. This value denotes when the number of dags within a certain tier of a certain cluster_type exceeds a threshold we define, then a new partition will be created and new dags need to be checked into the new partition path.

dags/ spinner_tier_1_0/ team1/ my_dag.py spinner_tier_2_0/ spinner_tier_3_0/

pii_tier_1_0/

This setup helps empower our multi partition scheduler component. The common modules across these dag files will be placed in the plugins directory, to be shared across all clusters/tiers/partitions.

Continuous Integration & Continuous Deployment

A large part of any workflow platform is handling deployments of both user and system level code, and at Pinterest we use Teletraan as our deployment system.

The setup for Spinner involves three different code repositories:

  1. Spinner Repo: this is the airflow source code forked from v1.10-stable with a subset of cherry-picks and many internal additions.
  2. Spinner Workflows: mentioned above, but this is the repository for native Airflow code.
  3. Pinboard: this is the existing mono-repository at Pinterest that hosts all python code across any system and platform. Lots of custom functions, and implementation are written here as well as all legacy workflows and jobs.

Schematic shows the different ci/cd pipelines.

As illustrated, there are separate repositories for infrastructure code and user code. Our image that gets built, has the build of the infrastructure code (pinairflow), and has the build of the Pinterest mono-repo code (pinboard). Then there is a separate process pulling in the code for the dags (spinner workflows) and making sure the latest code is synced and executed for new runs.

The figure is a detailed overview of each component and its deploy cycle, but there are a few things to note about the infrastructure side (UI/Scheduler/K8s worker) CI/CD:

  • infrastructure code changes trigger a Jenkins job to make a new build and push the new image to the registry
  • a separate Jenkins job notifies Teletraan to refresh the image and containers on the nodes
  • kubernetes pods will use the same image as the scheduler processes that launch them to execute new tasks, so that both processes are using the same build
  • users who are doing local testing will be able to load the latest image to test out their dag

A few things to note about the Dag deployment CI/CD:

  • a user code change triggers a Jenkins job to sync dags to s3 on all hosts
  • a daemon runs on webserver and scheduler which syncs the file changes from s3 in very short intervals (30 seconds)
  • the worker pods also sync the latest dags from s3 when they are launched to execute dag’s jobs

Pinterest Kubernetes Executor

At Pinterest, we have an internal team dedicated to providing the Kubernetes service. Therefore, we had to rebuild the out of the box k8s executor to work with our setup. We rewrote all of the kubernetes APIs to interact with, rewrote the Kubernetes Executor, Kubernetes Scheduler, and more to handle this logic. It retains the basic business logic that the open source executor exhibits, while setup specific to our environment. We plugin this executor to the backend.

This executor enables our clusters to have full runtime isolation and to scale up and down as we need depending on our load. Each task runs in its own pod, has a safe isolated setting, and all the pods share k8s nodes with other services within Pinterest as well. Pods get removed right after task execution to free up resource counts. This is a huge improvement over our previous setup, as we had worker hosts provisioned and a set number of worker slots, making scaling difficult.

The Pinterest kubernetes cluster and Spinner cluster cross communications.

The kubernetes executor initially creates a pinterest watcher to pull the pod status, which is something specific to Pinterest k8s setup. We use labels to isolate pods started by different schedulers. This is how we rebuilt this pulling mechanism.

Then, the combination of the dag_id, task_id, execution_date, and try_number for any task instance creates a unique pod name. The pod can be customized to set up additional environment variables, require more resources, install additional packages, and more to fulfill different use case scenarios, as shown below.

customized_task1 = PythonOperator( task_id='customized_task1', dag=dag, python_callable=print_context, executor_config={ "KubernetesExecutor": { 'request_memory': '350Mi', 'request_cpu': '200m', 'limit_memory': '1200Mi', 'limit_cpu': '500m', 'requirements': 'example_requirements.txt',

}})

We also built a “service readiness check” script into the pod initiation logic, which will be executed before the pod can actually begin to start running the task logics. This readiness check script would make sure s3 communication is set up, mysql communications are accepting, knox retrieval is possible, and more. This is because the pod is getting provisioned on the fly each time and those requirements need to be enabled to properly run the task.

With the custom implementation, we also needed to help refine the log pulling mechanism for live tasks. Each task from the pod will push its logs to s3 after task completion, but during runtime, logs get synced to Pinterest Elastic Search (ES). When users go to the UI to pull the task logs, these logs will be pulled from the pod as it writes to a standard path, and if there is any issue then it will fall back to read from ES. This works for the pod readiness logs too in the event a failure occurs there. If there is no error, the readiness log is cleared from the task log display on the UI.

The Pinterest Kubernetes executor flow end to end.

The end to end life of the task, from creation to execution and completion and how the executor handles this, can be seen in the image above. The major components are the K8s Executor, the K8s Scheduler, and the K8s Job Watcher, each of which are highlighted in blue. Each stage brings up the next stage.

User Experience

Our users are at the center of every decision we make, and our goal is to provide them the most benefit for the least amount of work. This required us to provide over 30+ custom operators. As seen in the photo below, we have a lot of custom logic at Pinterest that will get wrapped in an Operator or Sensor. Internally, we have a system called Job Submission Service (JSS) which routes traffic to an internal yarn cluster that executes the hadoop, spark, sparksql, hive, pyspark jobs, and more. Therefore, all of our submission operators needed to be rewritten to work in our setup. On top of that there are common operations that get wrapped in logic and we need to provide feature parity with our legacy workflow system to incentivize user adoption to Airflow.

Though the DSL between the old system and new is quite different, the goal is to reduce the burden on users as much as possible, so that they just need to input the parameters needed to execute the output desired.

One of the biggest hurdles we had to overcome was providing support for the pinboard code, the mono-repository of Pinterest. As stated before, users have much of their custom logic here, and did not want to have to copy the code to their new workflows that live in a different repository. Since our infrastructure does not enforce packaging submodules, this requires packaging the entire codebase and adding it to the python path, meaning we need to bake it into the image we build. We originally wanted to prevent imports to the old codebase for many reasons, but this seems to be a pain point for the users, so we went ahead and added it to the image to reduce their blockers for writing native workflows.

Custom operators built to support Pinterest workflows.

Testing

A prompt display once users bring up a custom testing container.

To test a workflow in our older workflow system was not so user-friendly. This is something we wanted to tackle with Spinner. We developed a script to launch a container on their dev hosts which runs the airflow scheduler and webserver processes and mimics the production environment. There are some subtle differences such as using local executor instead of the Kubernetes executor (some provisioning from dev hosts is required), but the experience is very comparable. It also sets up a local mysql host for them to connect to within their container, and it syncs files from their local workspace (we use Pycharm for development) to their remote dev hosts, to sync their changes and test with the latest logic. This allows the files to be discovered by the dev scheduler and the user will view their dags on the dev UI. This script is maintained by us within the same repository that users write their workflows so they are familiar with it. It has greatly improved their experience and their developer velocity with these changes.

Unit Testing

Flow displaying user code submission triggering unit tests and how oode gets pushed to production hosts.

Pinterest uses Phabricator and Archanist for its code reviews and commit pushes. When a user makes their change, before it hits production, it must be reviewed by their own team (and possibly the platform team) and then each diff review kicks off a jenkins job. This jenkins job kicks off the automated unit testing we wrote, which has over 25+ checks.

These checks include validating certain properties that exist on the dag, such as access role settings, tier, valid start date, notification settings, and more. They also include settings we do not want set, such as catchup property etc. They include verifying certain properties are within an acceptable range, such as the start_date, the executor_config memory and cpu configurations and concurrency settings. We do have limits on the system side too but want to be enforcing on both fronts. There are also unique unit tests to Pinterest specific logic. For example, we prevent top level calls to Hive Metastore (HMS) in all dag files, so for any function calls that make that, we patch the common functions and catch users who try to do this, as it affects the dag parsing time, as well as posing issues to the HMS service.

These validation tests help to have a higher degree of confidence when a user commits their changes, since we do not review them. While they do not catch all, they do catch most, and help make us aware of common issues so that we can add more unit tests to become an even better guard. In addition, users can also utilize this test pipeline to add unit tests for their own logics.

Monitoring

Stats dashboard to monitor health of production system components.

At Pinterest we extensively use an internal tool called Statsboard, which is a metric visualization and alerting framework. As seen above, we have ample stats for each component such as:

  • Scheduler
  • Webserver
  • Executor
  • API
  • Dag Sync’ing
  • Mysql
  • Features and more

The overall tab is an aggregated statistical view of all the components across all of the clusters. We can view stats per cluster, per component, or at a higher level. So in this way, we can control the granularity of the statistics we see if any system metric seems to be concerning. We have a weighted average for the different schedulers to signify the importance of their health checks (i.e. higher tier has more weight, and also whichever schedulers have higher loads). The health check for each scheduler is actually a workflow that runs on the scheduler which emits a stat if it is successful. This happens on 15 minute intervals, so any missing data point will alert our team if the success rate drops below a certain threshold. We also have latency checks for this stat in case we are seeing a backup.

Many of the stats collected were out of the box, but we added some additional stats into our own components to have tighter visibility on the system health, as we want to be an anticipatory service versus a reactive one.

Closing Remarks

In this post, we wanted to share with our peers what our pain points were, why we investigated different products, how we identified one that would work with our needs, the testing we did to be sure before we invested too heavily, and what modifications had to happen to enable this. We then touch on how we enable our users and improve their experience.

The next phase is to discuss in detail the legacy workflow migrations onto the new Spinner platform, where we took on over 3000 workflows to migrate and coordinate with respective team leads. We built automated tooling, custom APIs, and a translation layer to handle this.

We hope you found this post on how we modified and deployed Airflow at Pinterest helpful. We would be happy to respond to any questions or comments. Thank you!

Special thanks to Evan Li for managing and leading this project and to Ashim Shreshta, Dinghang Yu, Euccas Chen, Hannah Chen, Yulei Li for their contributions to this project.

To learn more about engineering at Pinterest, check out the rest of our Engineering Blog, and visit our Pinterest Labs site. To view and apply to open opportunities, visit our Careers page.

首页 - Wiki
Copyright © 2011-2024 iteam. Current version is 2.137.1. UTC+08:00, 2024-11-15 02:01
浙ICP备14020137号-1 $访客地图$