在介绍 Airflow 之前,先引入两个概念:定时系统和工作流系统。一个定时系统,比如下面的 crontab,表示每天早上 6 点下载一个文件到本地:
0 6 * * * sh /var/download_file.sh
如果希望下载完成后分析该文件,大概只能再创建一个 6 点 30 的 crontab:
30 6 * * * sh /var/analyze_file.sh
这种通过时间先后顺序建立的依赖关系,是不稳定的。正常情况下 6 点开始下载,7 点分析,问题在于:如果下载时间过长,分析任务开始时文件还没有 ready;如果下载失败,分析任务由于不能获取下载任务的执行结果,依然会尝试启动,而实际上是没有必要的。
这种任务之间有上下游依赖关系的场景下,工作流系统的任务编排能力就显得非常有用了。
Airflow 是一个工作流系统,由 Airbnb 贡献给 Apache,本文将介绍 Airflow 的一些基本概念以及在即刻的使用经验。
关于 Airflow 的调度机制,可以总结为一句话:
如果一个 dag run 的状态为 running,调度器就会按照拓扑排序,开始执行状态为空的任务;如果某个任务失败,其下游所有任务都不会执行
Operator 是任务模板,用来定义一类任务,比如 Airflow 默认提供了 BashOperator、PythonOperator。Task 是任务,是 Operator 的参数化实例。
DAG(有向无环图)是 Airflow 的核心概念,主要由一个 cron 表达式和一个编排过的 Task 集合组成,集合中任务之间的依赖关系可以表示为一个有向无环图。
Dag Run 是 Dag 的一次执行,每到达 cron 表达式的一个执行点,会触发一次 Dag Run,以拓扑排序的顺序开始执行该 Dag 里的所有 Task。Task Instance 是 Task 的一次执行。
Airflow 使用 python 开发,可通过pip install apache-airflow
安装,下面看看如何用 Airflow 来解决最开始的文件下载分析问题。为了演示,我把分析任务拆分成两个,并且在分析完之后增加一个邮件通知,最后再删除该文件,代码如下:
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
args = {
'owner': 'Damien',
'start_date': airflow.utils.dates.days_ago(1),
}
dag = DAG(
dag_id='download_and_analyze_file',
default_args=args,
schedule_interval='0 6 * * *',
)
download_file = BashOperator(
task_id='download_file',
bash_command="sh /var/download.sh {{ ds }}",
dag=dag,
)
analyze_1 = BashOperator(
task_id='analyze_1',
bash_command="sh /var/analyze.sh {{ task_instance.xcom_pull(task_ids='download_file', key='return_value') }}",
dag=dag,
)
analyze_2 = BashOperator(
task_id='analyze_2',
bash_command="sh /var/analyze.sh {{ task_instance.xcom_pull(task_ids='download_file', key='return_value') }}",
dag=dag,
)
send_email = BashOperator(
task_id='send_email',
bash_command="sh /var/analyze.sh",
dag=dag,
)
delete_file = BashOperator(
task_id='delete_file',
bash_command="rm {{ task_instance.xcom_pull(task_ids='download_file', key='return_value') }}",
dag=dag,
)
download_file >> [analyze_1, analyze_2] >> send_email >> delete_file
将上面的代码文件拷贝到指定目录,Airflow 定期扫描该目录,并加载执行。在这个例子中,首先定义了一个 DAG 对象,定时为每天早上 6 点开始执行,并向其中添加了 5 个任务 download_file、analyze_1、analyze_2、send_email 、delete_file,通过位移操作符>>
来设置任务间的上下游关系。
Operator 的参数可以使用 jinja2 模板语法,在执行时动态渲染,比如bash_command="sh /var/download.sh {{ ds }}"
,其中ds
是 Airflow 内置的变量,表示当天日期
,格式为yyyy-MM-dd
,Airflow 也支持扩展更多的变量和函数。
Airflow 通过 XCom 在任务间交换消息,包括 xcom_push 和 xcom_pull,上例中的 analyze_1 和 analyze_2 便是通过 xcom_pull 来获取 download_file 任务返回的文件路径。
webserver 是 Airflow 的管理后台,定义完 DAG 后,可以在 webserver 上看到拓扑图:一段时间后,DAG 的执行结果如下:上图中纵向的每一列代表一次 dag run,每个小方形代表一个 task instance,绿色表示 dag run 或 task instance 的状态为执行成功,红色表示执行失败,橙色表示上游失败,空白表示任务尚未执行。在 Airflow webserver 上重要的任务操作有:
clear 是最常用的操作,当任务失败时,用 clear 可以清空指定任务及其下游所有任务的状态,从而达到重跑的目的。
run 用来执行单个任务,只在 CeleryExecutor 下可用。
在少数情况下,修复任务的操作发生在 Airflow 之外,即该任务已经正常输出结果,下游任务是可以正常执行的,此时应当将该任务标记为 success,并 clear 重跑下游任务。
实际上,考虑到权限管控,webserver 的操作应该对数据平台的用户透明,平台应基于 Airflow 提供的操作原语,封装更易用、受控的任务操作方式。
一般在数据平台中,有任务管理平台(Job-registry),有调度引擎(Airflow),有计算引擎(EMR),关系如下:上图中,spark 是 EMR 集群上的核心软件,我们真正要执行的任务,几乎都是 spark 任务。livy 是 EMR 提供的 spark 管控工具。airflow 通过 livy 的 rest api 来提交任务、查询任务状态。在我们 Airflow 的使用过程中,总结了以下经验:
在开始的示例代码中,简单声明了一个 dag 和 5 个 task,构建 DAG 比较轻松。而现实场景中的任务会更多、依赖关系更复杂,一般做法是先从任务管理平台拉取全部任务信息,然后动态地、递归地构建 DAGs。
首先定义 LivySparkAdder 和 LivySparkSensor 两种 operator,分别继承 Airflow 的 BaseOperator 类和 BaseSensor 类。对于每个 spark 任务,相应地生成 1 个 adder、1 个 sensor,其中 adder 负责向 livy 提交任务,sensor 负责轮询任务状态,sensor 依赖 adder。这种方式类似于异步 io,可以大幅度提高调度系统的吞吐量。
Dag 的 start_date 参数表示开始执行时间,当新建一个 dag 时,scheduler 会从这个时间开始,依次触发 dag run。所以这个参数一定不能写死,比如设成2019-08-01
,如果有用户在 2019 年 9 月 1 日新加了一个每天调度的 dag,那么 scheduler 会往前追加 30 次 dag run,进而造成一个任务瞬间被提交 30 次。标准做法应该是根据设置的 cron 表达式,即 DAG 的 schedule_interval 参数,动态确定 start_date,不同 dag 的 start_date 应该是不同的,这里常有两种语义可供选择:
executor 参数控制执行方式,常用的有 LocalExecutor 和 CeleryExecutor:当选择 LocalExecutor 时,worker 以多进程的方式执行任务,系统并发度受限于机器的 cpu;当选择 CeleryExecutor 时,scheduler 将任务发送至 redis,分布式的 celery worker 从 redis 中获取任务并执行,最好采用这种可水平扩展的方式。
Rest api 比 airflow cli 更好用,方便从其他系统操作任务,比如执行、重跑等,比如这个Airflow Rest Api[1],基于 airflow 的 plugin 机制实现。
我们做了 3 种监控:
当使用 CeleryExecutor 时,调度系统自身的并行度非常高,但下游的抗并发能力不一定能匹配,此时需要进行流控(flow control)。
在凌晨的日常任务中,有非常多的数据同步任务相互之间没有任何依赖,一到调度时间点,数百个任务同时开始提交,给下游的 livy server 造成很大压力。于是我们将所有 adder 绑定到同一个 slot 为 4 的 pool 上,限制 adder 的并行度。需要注意的是,在这种 adder - > sernsor 提交模型中,因为 adder 任务执行时间极短,限制 adder 并行度并不会影响系统的吞吐量。
跨 DAG 依赖是指 dag1 中的任务 a 依赖 dag2 中的 1 个任务 b,通过ExternalTaskSensor
实现,当然这个实现对用户是透明的。下面举两个例子:
另外,这种依赖是可以指定依赖偏移,说简单点就是,一个每天凌晨 1 点执行的任务,既支持依赖 0 点的小时级任务,也支持依赖 1 点的小时级任务。
如果说 Airflow 有什么细节令人费解,那一定是 execution_date,也就是任务参数中的{{ ds }}
。假设有个任务在凌晨 1 点执行,按对一般调度系统的理解,任务执行时的 execution_date 应该是当天凌晨 1 点,实际上却是前一天凌晨 1 点。在传统的定时 ETL 方案中,只有到达当前调度点,从上一个调度点到当前调度点的全量数据才能 ready,这批数据的归属,当然是上一个调度点。
下面说说 Airflow 的部署方案。
在数据平台发展初期,我们在 ec2 上用puckel/docker-airflow[3]部署了单机版的 airflow,将 airflow-scheduler、airflow-worker、airflow-webserver 集成在一个 container 中,使用 LocalExecutor。这种方式下不详细介绍,主要有几个痛点:
最终我们选择了分布式部署方案。
典型的 airflow on k8s 架构,如下图:系统中有多个 webserver、多个 scheduler(只有 1 个 scheduler active,剩下的 standby)、多个 worker(可手动扩容),三个组件共用同一份 config map 配置,共用同一个元数据存储,scheduler 和 worker 通过 redis 生产和消费任务。
相比单机版,分布式版 Airflow 主要改动包括:
FROM puckel/docker-airflow:1.8.2
COPY dags /usr/local/airflow/dags
COPY plugins /usr/local/airflow/plugins
COPY requirements.txt /requirements.txt
FROM airflow-base
CMD ["scheduler"]
FROM airflow-base
CMD ["worker"]
FROM airflow-base
CMD ["webserver"]
改用这种分布式方案后,webserver 和 worker 具备了水平扩展能力,scheduler 变得高可用,同时,自动部署和测试环境减少了手动操作失误的风险,提高了系统的稳定性。
此时 Airflow 算是一个合格的调度系统。
Airflow Rest Api: https://github.com/teamclairvoyant/airflow-rest-api-plugin
[2]Airflow Prometheus Exporter: https://github.com/epoch8/airflow-exporter
[3]puckel/docker-airflow: https://github.com/puckel/docker-airflow