概要: 介绍英语流利说的数据单元测试框架: data-ci
用 SQL 开发了一个 ETL(Extract-Transform-Load) 任务,如何写单元测试?
ETL 任务的产出一般就是一个表中的数据,因此在没有任何测试框架的情况下,基本上就是在开发过程中写一些 SQL 或者脚本,做一些逻辑的验证。但这些验证的规则没有沉淀下来,导致后续 ETL 逻辑改动时,每次都战战兢兢。
那么,有没有办法像其他编程语言的单元测试框架一样,将这些测试代码沉淀下来?
问题就是机会,我们先来总结一些以往验证数据的步骤。
按照以往经验,我们看一个 Table 中的数据是不是正确,基本上是三个步骤。以 active_login_users 表的数据为例,这张表中存储着每天活跃的注册用户。
所有的 active_login_users
表中的数据必须是注册过的用户,因此通过 LEFT OUTER JOIN db.users
表必须都能 JOIN 上。示例 SQL 如下:
-- wrong_user_count 必须是 0
SELECT count(*) AS wrong_user_count
FROM etl.active_login_users a
LEFT OUTER JOIN db.users b ON a.user_id = b.id
WHERE b.user_id IS NULL
执行 SQL 获取到统计数据后,就要看结果是不是符合预期。接着上面的例子,查询结果中的 wrong_user_count
必须等于0,不然就说明 active_login_users
表中有错误的 user_id,一定是哪里有问题。
发现数据错误后,需要定位数据问题。因此一般大家都是执行一些查询看错误数据的特征. 上面的例子,如果发现 active_login_users
表中有错误数据,那要看一下这些 user_id 都长什么样子,因此执行如下查询:
-- 看看这些 JOIN 不上的 user_id 长什么样子
-- 确定是原始日志采集错误, 还是中间的 ETL 过程 bug
SELECT * AS wrong_user_count
FROM etl.active_login_users a
LEFT OUTER JOIN db.users b ON a.user_id = b.id
WHERE b.user_id IS NULL
LIMIT 20
通常情况下,上述验证过程是"纯手动",有如下几个缺点:
代码没有统一管理, 只有在 ETL 开发完成测试的阶段才会仔细跑, 一旦完成测试上线, 这些逻辑就被尘封了, 没有沉淀下来
代码没有 code review, 验证代码是不是有 bug?
整个过程纯手动, 费事费力, 也就不会自动化, 但对于 ODS 层原始数据, 由于上游系统有可能变化, 数据质量的把控就非常重要, 需要每天定时执行, 确保数据质量.
既然流程这么相似,何不抽象一个 framework 出来,方便大家给数据写 "单元测试"?
问题就是机会,抽象一下,我们需要一个 framework:
以固定格式写测试代码
方便写 assert 函数,毕竟 assert 的逻辑有可能没有那么简单
方便自动化执行,为后续自动化铺路
那么我们开发了 data-ci:一个使用 SQL 给 Table 中的数据写测试的框架。data-ci 的测试代码长这个样子:
# -*- coding:utf-8 -*-
#
# @Time : 2017-05-22 11:58:34
# @Version : 1.0
# @Author : haitao.yao
#
description = u"测试 user_id 都能跟 db.users JOIN 上"
target_columns = ['user_id']
def prepare_data_sql(date_string):
return """
SELECT count(*) AS wrong_user_count
FROM etl.active_login_users a
LEFT OUTER JOIN db.users b ON a.user_id = b.id
WHERE a.data_date = ${date_string}
AND b.user_id IS NULL
"""
def assert_data_ok(date_string, result_data):
for line in result_data:
if line.wrong_user_count > 0:
raise Exception('active_login_users.user_id JOIN db.users 失败, 说明有 user_id 不合法 , line: %s' % line)
def assert_fail_callback_sql(date_string):
return """
SELECT * AS wrong_user_count
FROM etl.active_login_users a
LEFT OUTER JOIN db.users b ON a.user_id = b.id
WHERE a.data_date = ${date_string}
AND b.user_id IS NULL
LIMIT 20
"""
一个 test case 是一个 Python 文件,以 test_(daily|hourly)_
开头,其中 daily 或者 hourly 代表验证数据是基于每天|每小时的. test case 由5部分组成:
description
用来写一些 comments,简要说明这个 test case 是用来测试什么内容的。
target_columns
标识该 test case 主要是测试哪些 column。上述例子中,我们测试的是 user_id
这个 column
prepare_data_sql
需要返回一个查询数据的 SQL,也就是验证数据步骤中提到的第一步. 上述例子中,我们返回一个 LEFT OUTER JOIN
的 SQL,计算没有 JOIN 上的数据有多少条
assert_data_ok
方法就是用来写 assert 函数的地方,prepare_data_sql
的 SQL 的执行结果以result_data
传入,借助于 Python 语言,assert 函数可以任意发挥。
assert_fail_callback_sql
方法就是在上述 assert_data_ok
方法抛出异常的情况下调用,也就是验证数据步骤中的第三步。上述例子中,我们查询一下错误的 user_id
长什么样子,因此 SELECT 出20条来。
test case 成功的情况下,流程如下:
test case 失败的情况下,流程如下:
test case 的组织也很简单,规则如下: tests/{catalog}/{database}.db/{table_name}/
,这样可以一目了然的知道究竟是测试的哪个 table 的数据。
└── tests
└── hive
└── etl.db
└── active_login_users
├── test_daily_channel.py
├── test_daily_platform.py
├── test_daily_device.py
└── test_daily_user_id.py
data-ci 本身也很简单,主要包括三个模块:
Cli 模块,处理命令行参数解析
Parser 模块,负责解析 test case,将上述5个步骤的数据所需的数据
SQLExecutor 模块,作为 SQL 执行引擎的抽象,根据需求实现不同 database 的接口即可。比如可以实现 Presto 的,也可以实现 Hive 或者 Redshift 的
data-ci 提供的 cli 工具不仅可以执行 test case,也提供了一些辅助的工具:
generate
命令可以生成一个 test case 的 skeleton 代码,方便开发 test case
test-run
命令可以完整的执行数据测试的三个步骤,方便测试 assert 函数和 assertfailcallback_sql,适合在 CI(持续集成)中检查 test case
>$ data-ci --help
usage: data-ci [-h] {test-run,run,check,generate} ...
data-ci: A unittest framework on data, allow you to write unittest for tables
positional arguments:
{test-run,run,check,generate}
sub command help
check check the test scripts
run run the test jobs
generate generate code templates
test-run test run prepare_data_sql, assert_fail_sql and
assert_function_ok
optional arguments:
-h, --help show this help message and exit
有了 data-ci 这一套工具,我们就可以把所有的 test case 放到 gitlab 中,通过 Code Review 和 CI 来进一步提升测试的质量。
到这里就结束了吗?显然还没有。
写其他语言的代码时,我们都会通过 coverage 工具统计代码的测试覆盖率(例如 Java 语言中的 Jacoco 框架)。那么,
我们有没有办法计算量化我们数据的测试覆盖率?
试想,一个 Table 中有20个 column,其中的1个 column 被我们写了测试用例,是不是可以认为,数据的测试覆盖率是 5%?
因此,data-ci 执行完 test case 后,会生成一份 coverage report,统计涉及的每个表的覆盖率
在流利说,通过工具的深度整合,工程师不仅可以查询数据,还可以知道自己所查询的数据每天的测试覆盖率是多少,数据到底是不是靠谱;重要的数据一旦有问题,也会收到对应的 data-ci 的告警信息,告知究竟是哪些数据有问题,加快问题的定位。
修改 ETL 部分逻辑现在也是心不慌手不抖,直接跑一遍所有的 test case 验证一下本次逻辑是否会引入其他 bug 即可。
通过 data-ci,我们沉淀了数据的测试用例,每天执行并通过告警机制告知数据问题,进一步提升了数据质量。