大数据的迁移通常来说是个不小的工程,在移动 PB 级规模数据的同时,我们还需要保证对上层业务的透明。 流利说在 2020 10 月份完成了大数据平台到阿里云的搬迁,整个过程历时 21 天。本文将简要介绍此次迁移的工程架构,以及这个期间的一些问题和思考。
大数据不同于在线业务,后者更多的是依赖缓存或数据库系统,这些系统一般云厂商都有现成的产品来实现数据同步,比如云产品 DTS,其可以建立数据库跨云的主从同步。而大数据的同步往往元数据与实际数据是分开的,一般元数据是在 Hive Metastore,而实际数据是 s3 这类对象存储系统上的文件。因此对于大数据的同步,我们往往也需要一个类似 DTS 这样的产品,当表数据或表分区发生改变,元数据和实际文件的同步在一个事务中完成同步,但目前业界缺少这样的产品。
有同学可能会想分开同步,即建立 metastore DB 的 DTS 连接实现元数据的同步,然后同步对象系统上的文件,完成后再批量更改元数据中的路径,这种方式首先没有办法保证元数据与实际文件的一致,其次增量同步文件也会更加繁琐,因为 Hive/Spark 这类任务在完成之前,文件是写向临时目录的,当所有 task 完成之后才会移动到最终目录,所以基于事件的方式做同步,很可能在读取文件时,文件就已经被移走。因此稳妥的方式是基于表或分区的变更事件来同步数据,如 Hive 任务在完成之后,此时的数据已经写到目标表/分区中,然后再做同步操作。然而对于非 Hive/Spark 这类任务产生的数据,因为其没有对应的 hive schema,还是可以使用事件监听的方式,比如 flume sink 到 s3 上的文件,可以订阅 s3 bucket 上的 event notification,在指定路径下当有 s3:ObjectCreated:Put 事件时,触发文件的同步。
上述聊到需要一个类似 DTS 这样的系统(为了区分,下文称为 DSS),但实际迁移过程要复杂的多,至少以下几点需要考虑:
数据同步系统 DSS 的设计与实现
DSS 与当前计算框架或调度系统结合
如何灰度
如何发现不一致的数据并做修复
关于 DSS 的系统设计,其底层使用了 Juicedata 的开源文件同步工具 juicesync,不仅支持跨云同步,还有不错的易用性以及高效的同步性能。因为是在专线网络,Hive metastore 中 DB/Table/Partition 的同步,DSS 通过 Metastore 的 API 获取其实例对象,然后把对象的 Location 字段改掉,接着通过目标云的 Metastore API 写过去。
除了实现最基本的 meta 和文件同步外,DSS 还提供了以下特性:
根据表名,计算当日所变更的分区数目并做同步;如非分区表,则根据 schema 的情况做表数据的同步
修复(同步)指定表、表分区,以及分区表的所有分区信息
只接受请求,根据情况暂停或开启所有表的数据同步。(下文会说明该功能在灰度中的应用)
DSS 实现之后,利用 Airflow DAG 中的 callback,在任务完成计算后,触发 DSS 实现自动同步:
上图中,我们引入了一个叫名 Janus 的模块,它是负责计算任务血缘的一个子系统,该系统会根据指定的 Hive 或 Spark SQL 计算出整个任务的输出表;有了这些输出表之后,DSS 会请求 Hive Metastore 获取当前表在本次计算周期内的分区变更信息,从而实现对增量分区做数据同步。以下做具体介绍:
INSERT OVERWRITE TABLE dw.dw_zh_user_tag(dt=‘${dt}')
SELECT a.user_id
'c1' AS tag,
'c2' AS tag_comment
FROM dw.temp_dw_zh_user_tag_inc a
LEFT JOIN dw.temp_dw_zh_user_tag_inc_07 b
on a.user_id = b.user_id
WHERE ...
GROUP BY 1,2,3;
对于以上 SQL 脚本,Janus 血缘服务会输出
inserted_tables:[‘dw.dw_zh_user_tag’], input_tables:[‘dw.temp_dw_zh_user_tag_inc’, 'dw.temp_dw_zh_user_tag_inc_07’]
,因为只要同步输出表,因此这里只要关心 inserted_tables。有了这些表名,便可以使用 Metastore DB 获取 ETL 时间段内,这些表分区变更信息,以下为查询 Metastore DB 的 Query:
SELECT PART_NAME, LOCATION
FROM DBS a
JOIN TBLS b
ON a.`DB_ID` = b.DB_ID
JOIN PARTITIONS p
ON p.TBL_ID = b.TBL_ID
JOIN SDS s
ON b.SD_ID = s.SD_ID
JOIN (
SELECT PART_ID, PARAM_VALUE
FROM PARTITION_PARAMS where PARAM_KEY = 'transient_lastDdlTime' and PARAM_VALUE >= %d and PARAM_VALUE <= %d
) pp
ON p.`PART_ID` = pp.PART_ID
WHERE a.name = %r and b.`TBL_NAME` = %r order by PART_NAME desc;
ETL 的计算通常会是在凌晨一段时间,比如 00:00 ~ 05:00。判定两边集群的结果计算是否一致,一般会对比两边输出表的数据集,当时我们定下的原则是只要输入一致,输出便会一致,只要输出一致,便达到割接状态。要对比两边的输出表,那就需要启动阿里云的集群做灰度计算,而在 ETL 计算的这段时间内,s3 上的结果集是不会向 oss 同步的,这就是 DSS 为什么需要提供开启或暂停数据同步的功能,而在白天这一段时间内,数据开发的日常修改会近实时的同步到 oss,以及工程师们会确认并修复好数据一致性问题后,晚间再次打开 DSS 的同步,把凌晨新增的一批数据覆盖到 oss,我们把这个过程形象的比喻为蓄水以及开闸放水。
白天定位数据集的不一致问题,并做修复。完成之后,打开 DSS 的同步开关,从而让两边的数据保持一致的状态。
在两边集群都先后完成任务计算后,先以粗粒度完成表数量上的对比,数量不一致的表略过内容校验和的对比。整个过程涉及表或分区达数十万,并且需要第一时间出具数据不一致的报告,以供工程师定位不一致的原因。在经过几轮灰度后发现,对比的结果并不满足当时定下的原则,即输入表一致,但输出表的结果却不一致。通过分析,我们发现脚本中使用了一些影响幂等性的函数,如 row_number / collect_list / collect_set,以及对于 double 类型的列累加出现分片区别的问题。这些问题如存在上游表,那么其下游所有被依赖的表一致性都会出现问题。
我们说到影响数据一致性的函数,但其发现过程却是一点点刨出来的,正所谓该摔的坑,一个都逃不掉。在对比数据一致性问题上,利用血缘辅助定位源头表,并使用 crc32 来求和整个列,这个过程看上去系统就可以实现,但实际上更多的工作是在寻找 root cause 的过程。即发现不一致后,需要拆分脚本来对比每一小部分 query 的结果是否一致,又因为是跨云对比,通常大家在跑出结果后仍不能一步求出差异。因此,我们把方案转移到 presto 的 federation query,使用一段 SQL 就可以求出相同表两边数据的差异,从而加速定位数据不一致的原因。
SELECT plan_id,
trigger_cnt,
trigger_dcnt,
related_cnt,
related_dcnt,
success_cnt,
success_dcnt,
fail_cnt,
fail_dcnt,
ttl_cnt,
count(1)
FROM (SELECT *
FROM aliyun.dw_adl.public_msg_plan_report
WHERE dt = '${dt}'
UNION ALL
SELECT *
FROM aws.dw_adl.public_msg_plan_report
WHERE dt = '${dt}') t
GROUP BY plan_id,
trigger_cnt,
trigger_dcnt,
related_cnt,
related_dcnt,
success_cnt,
success_dcnt,
fail_cnt,
fail_dcnt,
ttl_cnt
HAVING count(1) = 1
如上述代码,我们在一段 SQL 中,快速对比了这两张相同的表在各自云上的数据差异。
本文记录了流利说大数据离线部分的迁移过程,通过 Hive metastore 的 hook 可以基于事件的方式获取表的变更记录,根据这些记录从而完成表分区的数据同步。而在迁移的过程中,我们发现数据的不一致问题才是整个迁移过程中最大的挑战。借助于 Presto 我们完成了几十万分区,PB 级数据的校验,以及使用联合查询快速计算出两边结果集的差异,高效的完成了数据一致性对比的工作。但在大数据领域目前还没有一款产品能有效帮助企业完成迁移,很多工具只解决了部分问题,但随着更多企业的上云,云厂商在这方面应该会整合一些方案形成产品。
作者简介
董亚军 技术部数据工程团队 Tech Lead
戳“阅读原文” get 流利说工作机会噢