Lessons Learned From Running Apache Airflow at Scale

By Megan Parker and Sam Wheating

Apache Airflow is an orchestration platform that enables development, scheduling and monitoring of workflows. At Shopify, we’ve been running Airflow in production for over two years for a variety of workflows, including data extractions, machine learning model training, Apache Iceberg table maintenance, and DBT-powered data modeling. At the time of writing, we are currently running Airflow 2.2 on Kubernetes, using the Celery executor and MySQL 8.

System diagram showing Shopify's Airflow Architecture

Shopify’s Airflow Architecture

Shopify’s usage of Airflow has scaled dramatically over the past two years. In our largest environment, we run over 10,000 DAGs representing a large variety of workloads. This environment averages over 400 tasks running at a given moment and over 150,000 runs executed per day. As adoption increases within Shopify, the load incurred on our Airflow deployments will only increase. As a result of this rapid growth, we have encountered a few challenges, including slow file access, insufficient control over DAG (directed acyclic graph) capabilities, irregular levels of traffic, and resource contention between workloads, to name a few.

Below we’ll share some of the lessons we learned and solutions we built in order to run Airflow at scale.

1. File Access Can Be Slow When Using Cloud Storage

Fast file access is critical to the performance and integrity of an Airflow environment. A well defined strategy for file access ensures that the scheduler can process DAG files quickly and keep your jobs up-to-date.

Airflow keeps its internal representation of its workflows up-to-date by repeatedly scanning and reparsing all the files in the configured DAG directory. These files must be scanned often in order to maintain consistency between the on-disk source of truth for each workload and its in-database representation. This means the contents of the DAG directory must be consistent across all schedulers and workers in a single environment (Airflow suggests a few ways of achieving this).

At Shopify, we use Google Cloud Storage (GCS) for the storage of DAGs. Our initial deployment of Airflow utilized GCSFuse to maintain a consistent set of files across all workers and schedulers in a single Airflow environment. However, at scale this proved to be a bottleneck on performance as every file read incurred a request to GCS. The volume of reads was especially high because every pod in the environment had to mount the bucket separately.

After some experimentation we found that we could vastly improve performance across our Airflow environments by running an NFS (network file system) server within the Kubernetes cluster. We then mounted this NFS server as a read-write-many volume into the worker and scheduler pods. We wrote a custom script which synchronizes the state of this volume with  GCS, so that users only have to interact with GCS for uploading or managing DAGs. This script runs in a separate pod within the same cluster. This also allows us to conditionally sync only a subset of the DAGs from a given bucket, or even sync DAGs from multiple buckets into a single file system based on the environment’s configuration (more on this later).

Altogether this provides us with fast file access as a stable, external source of truth, while maintaining our ability to quickly add or modify DAG files within Airflow. Additionally, we can use Google Cloud Platform’s IAM (identify and access management) capabilities to control which users are able to upload files to a given environment. For example, we allow users to upload DAGs directly to the staging environment but limit production environment uploads to our continuous deployment processes.

Another factor to consider when ensuring fast file access when running Airflow at scale is your file processing performance. Airflow is highly configurable and offers several ways to tune the background file processing (such as the sort modethe parallelism, and the timeout). This allows you to optimize your environments for interactive DAG development or scheduler performance depending on the requirements.

2. Increasing Volumes Of Metadata Can Degrade Airflow Operations

In a normal-sized Airflow deployment, performance degradation due to metadata volume wouldn’t be an issue, at least within the first years of continuous operation.

However, at scale the metadata starts to accumulate pretty fast. After a while this can start to incur additional load on the database. This is noticeable in the loading times of the Web UI and even more so during Airflow upgrades, during which migrations can take hours.

After some trial and error, we settled on a metadata retention policy of 28 days, and implemented a simple DAG which uses ORM (object–relational mapping) queries within a PythonOperator to delete rows from any tables containing historical data (DagRuns, TaskInstances, Logs, TaskRetries, etc). We settled on 28 days as this gives us sufficient history for managing incidents and tracking historical job performance, while keeping the volume of data in the database at a reasonable level.

Unfortunately, this means that features of Airflow which rely on durable job history (for example, long-running backfills) aren’t supported in our environment. This wasn’t a problem for us, but it may cause issues depending on your retention period and usage of Airflow.

As an alternative approach to a custom DAG, Airflow has recently added support for a db clean command which can be used to remove old metadata. This command is available in Airflow version 2.3.

3. DAGs Can Be Hard To Associate With Users And Teams

When running Airflow in a multi-tenant setting (and especially at a large organization), it’s important to be able to trace a DAG back to an individual or team. Why? Because if a job is failing, throwing errors or interfering with other workloads, us administrators can quickly reach out to the appropriate users.

If all of the DAGs were deployed directly from one repository, we could simply use git blame to track down the job owner. However, since we allow users to deploy workloads from their own projects (and even dynamically generate jobs at deploy-time), this becomes more difficult.

In order to easily trace the origin of DAGs, we introduced a registry of Airflow namespaces, which we refer to as an Airflow environment’s manifest file.

The manifest file is a YAML file where users must register a namespace for their DAGs. In this file they will include information about the jobs’ owners and source github repository (or even source GCS bucket), as well as define some basic restrictions for their DAGs. We maintain a separate manifest per-environment and upload it to GCS alongside with the DAGs.

4. DAG Authors Have A Lot Of Power

By allowing users to directly write and upload DAGs to a shared environment, we’ve granted them a lot of power. Since Airflow is a central component of our data platform, it ties into a lot of different systems and thus jobs have wide-ranging access. While we trust our users, we still want to maintain some level of control over what they can and cannot do within a given Airflow Environment. This is especially important at scale as it becomes unfeasible for the Airflow administrators to review all jobs before they make it to production.

In order to create some basic guardrails, we’ve implemented a DAG policy which reads configuration from the previously mentioned Airflow manifest, and rejects DAGs which don’t conform to their namespace’s constraints by raising an AirflowClusterPolicyViolation.

Based on the contents of the manifest file, this policy will apply a few basic restrictions to DAG files, such as:

  • A DAG ID must be prefixed with the name of an existing namespace, for ownership.
  • Tasks in a DAG must only enqueue tasks to the specified celery queue—more on this later.
  • Tasks in a DAG can only be run in specified pools, to prevent one workload from taking over another’s capacity.
  • Any KubernetesPodOperators in this DAG must only launch pods in the specified namespaces, to prevent access to other namespace’s secrets.
  • Tasks in a DAG can only launch pods into specified sets of external kubernetes clusters

This policy can be extended to enforce other rules (for example, only allowing a limited set of operators), or even mutate tasks to conform to a certain specification (for exampke, adding a namespace-specific execution timeout to all tasks in a DAG).

Here’s a simplified example demonstrating how to create a DAG policy which reads the previously shared manifest file, and implements the first three of the controls mentioned above:

These validations provide us with sufficient traceability while also creating some basic controls which reduce DAGs abilities to interfere with each other.

5. Ensuring A Consistent Distribution Of Load Is Difficult

It’s very tempting to use an absolute interval for your DAGs schedule interval—simply set the DAG to run every timedelta(hours=1) and you can walk away, safely knowing that your DAG will run approximately every hour. However, this can lead to issues at scale.

When a user merges a large number of automatically-generated DAGs, or writes a python file which generates many DAGs at parse-time, all the DAGRuns will be created at the same time. This creates a large surge of traffic which can overload the Airflow scheduler, as well as any external services or infrastructure which the job is utilizing (for example, a Trino cluster).

After a single schedule_interval has passed, all these jobs will run again at the same time, thus leading to another surge of traffic. Ultimately, this can lead to suboptimal resource utilization and increased execution times.

While crontab-based schedules won’t cause these kinds of surges, they come with their own issues. Humans are biased towards human-readable schedules, and thus tend to create jobs which run at the top of every hour, every hour, every night at midnight, etc. Sometimes there’s a valid application-specific reason for this (for example, every night at midnight we want to extract the previous day’s data), but often we have found users just want to run their job on a regular interval. Allowing users to directly specify their own crontabs can lead to bursts of traffic which can impact SLOs and put uneven load on external systems.

As a solution to both these issues, we use a deterministically randomized schedule interval for all automatically generated DAGs (which represent the vast majority of our workflows). This is typically based on a hash of a constant seed such as the dag_id.

The below snippet provides a simple example of a function which generates deterministic, random crontabs which yield constant schedule intervals. Unfortunately, this limits the range of possible intervals since not all intervals can be expressed as a single crontab. We have not found this limited choice of schedule intervals to be limiting, and in cases when we really need to run a job every five hours, we just accept that there will be a single four hours interval each day.

Thanks to our randomized schedule implementation, we were able to smooth the load out significantly. The below image shows the number of tasks completed every 10 minutes over a twelve hour period in our single largest Airflow environment.

Bar graph showing the Tasks Executed versus time. Shows a per 10–minute Interval in our Production Airflow Environment

Tasks Executed per 10–minute Interval in our Production Airflow Environment

6. There Are Many Points of Resource Contention

There’s a lot of possible points of resource contention within Airflow, and it’s really easy to end up chasing bottlenecks through a series of experimental configuration changes. Some of these resource conflicts can be handled within Airflow, while others may require some infrastructure changes. Here’s a couple of ways which we handle resource contention within Airflow at Shopify:

Pools

One way to reduce resource contention is to use Airflow pools. Pools are used to limit the concurrency of a given set of tasks. These can be really useful for reducing disruptions caused by bursts in traffic. While pools are a useful tool to enforce task isolation, they can be a challenge to manage because only administrators have access to edit them via the Web UI.

We wrote a custom DAG which synchronizes the pools in our environment with the state specified in a Kubernetes Configmap via some simple ORM queries. This lets us manage pools alongside the rest of our Airflow deployment configuration and allows users to update pools via a reviewed Pull Request without needing elevated access. 

Priority Weight

Priority_weight allows you to assign a higher priority to a given task. Tasks with a higher priority will float to the top of the pile to be scheduled first. Although not a direct solution to resource contention, priority_weight can be useful to ensure that latency-sensitive critical tasks are run prior to lower priority tasks. However, given that the priority_weight is an arbitrary scale, it can be hard to determine the actual priority of a task without comparing it to all other tasks. We use this to ensure that our basic Airflow monitoring DAG (which emits simple metrics and powers some alerts) always runs as promptly as possible.

It’s also worthwhile to note that by default, the effective priority_weight of a task used when making scheduling decisions is the sum of its own weight and that of all its downstream tasks. What this means is that upstream tasks in large DAGs are often favored over tasks in smaller DAGs. Therefore, using priority_weight requires some knowledge of the other DAGs running in the environment.

Celery Queues and Isolated Workers

If you need your tasks to execute in separate environments (for example, dependencies on different python libraries, higher resource allowances for intensive tasks, or differing level of access), you can create additional queues which a subset of jobs submit tasks to. Separate sets of workers can then be configured to pull from separate queues. A task can be assigned to a separate queue using the queue argument in operators. To start a worker which runs tasks from a different queue, you can use the following command:

bashAirflow celery worker –queues <list of queues>

This can help ensure that sensitive or high-priority workloads have sufficient resources, as they won’t be competing with other workloads for worker capacity.

Any combination of pools, priority weights and queues can be useful in reducing resource contention. While pools allow for limiting concurrency within a single workload, a priority_weight can be used to make individual tasks run at a lower latency than others. If you need even more flexibility, worker isolation provides fine-grained control over the environment in which your tasks are executed.

It’s important to remember that not all resources can be carefully allocated in Airflow—scheduler throughput, database capacity and Kubernetes IP space are all finite resources which can’t be restricted on a workload-by-workload basis without the creation of isolated environments.

Going Forward…

There are many considerations that go into running Airflow with such high throughput, and any combination of solutions can be useful. We’ve learned a ton and we hope you’ll remember these lessons and apply some of our solutions in your own Airflow infrastructure and tooling.

To sum up our key takeaways:

  • A combination of GCS and NFS allows for both performant and easy to use file management.
  • Metadata retention policies can reduce degradation of Airflow performance.
  • A centralized metadata repository can be used to track DAG origins and ownership.
  • DAG Policies are great for enforcing standards and limitations on jobs.
  • Standardized schedule generation can reduce or eliminate bursts in traffic.
  • Airflow provides multiple mechanisms for managing resource contention.

What’s next for us? We’re currently working on applying the principles of scaling Airflow in a single environment as we explore splitting our workloads across multiple environments. This will make our platform more resilient, allow us to fine-tune each individual Airflow instance based on its workloads’ specific requirements, and reduce the reach of any one Airflow deployment.

Got questions about implementing Airflow at scale? You can reach out to either of the authors on the Apache Airflow slack community.

Megan has worked on the data platform team at Shopify for the past 9 months where she has been working on enhancing the user experience for Airflow and Trino. Megan is located in Toronto, Canada where she enjoys any outdoor activity, especially biking and hiking.

Sam is a Senior developer from Vancouver, BC who has been working on the Data Infrastructure and Engine Foundations teams at Shopify for the last 2.5 years. He is an internal advocate for open source software and a recurring contributor to the Apache Airflow project.

Interested in tackling challenging problems that make a difference? Visit our Data Science & Engineering career page to browse our open positions. You can also contribute to Apache Airflow to improve Airflow for everyone.

首页 - Wiki
Copyright © 2011-2024 iteam. Current version is 2.137.1. UTC+08:00, 2024-11-14 15:13
浙ICP备14020137号-1 $访客地图$