点击关注公众号👆,探索更多Shopee技术实践
Apache Hudi 是业内基于 Lakehouse 解决方案中的典型组件,相比于传统基于 HDFS 和 Hive 的数据仓库架构,基于 Apache Hudi 的 Lakehouse 解决方案有众多优势,例如:低延迟的数据刷新,高度的数据新鲜度;小文件自动化管理;支持数据文件的多版本读写;与大数据生态内 Hive/Spark/Presto 等引擎的无缝衔接。基于这些特性,我们开始尝试对当前主要基于 Hive 的数仓架构进行升级改造。
本文将重点介绍 Shopee Marketplace 业务使用 Flink + Hudi 构建实时数据仓库的解决方案、实践案例以及下一步规划。
目录
1. 背景介绍
1.1 现状
1.2 痛点分析
1.3 典型实时计算应用与实时数仓的差异
2. 基于 Flink+Hudi 的实时数仓架构设计
2.1 DataFlow 简介
2.2 DataFlow 详情
2.3 DataFlow 示例
2.4 我们的思考
3. 实时数据仓库实施
3.1 Partial Update Hudi 表
3.2 Multi-version Hudi 表
3.3 实际效果
4. 总结与展望
在 Shopee 内部,Data Infra 团队已经支持了数据入湖到 Hudi 的过程,提供了大量具备较高实时性和稳定性的数据源。我们 Marketplace Data Engineering 团队也基于这些 Hudi 表构建了订单、商品和用户的小时数据 Pipeline,这些小时数据不仅在大促时期为业务人员提供数据支持,也被用于日常的风控业务中。
当前,我们采用了类似于 mini batch 的思想,每小时对最近 3 小时的数据进行计算和刷新,在保证数据及时更新的情况下,解决数据延迟、JOIN 时间不对齐等问题。但随着数据量的迅速增长,小时级数据 SLA 的保障难度和计算资源的消耗都在不断增加。我们开始探索提升数据产出及时性并减少资源消耗的解决方案。
经过对小时任务的架构进行梳理和分析之后,我们发现:
可通过避免不必要的重复计算,用增量数据 JOIN 替代大表全量 JOIN,以及采用及时性更好的数据源的方式来缓解或者解决当前的痛点。经过相关的技术调研之后,我们拟定了以使用 Flink + Hudi 为核心的技术方案,通过实时计算和数据湖存储构建实时数据仓库。
在介绍具体方案介绍,我们先对比典型实时计算应用和实时数仓,以了解和区分他们的不同点和侧重点。
其中,典型实时计算应用包括:
实时数仓可理解为将经典离线数仓实时化,给用户提供与离线数仓相似的数据使用体验,包括但不限于表的 schema,数据查询方式等。为了达到这一目标,实时数仓需满足以下要求:
和典型实时计算相比,实时数仓有如下的侧重点或者优势,例如:
但,实时数仓为此付出了降低时效性的代价。
基于以上分析,我们设计了满足实时数据仓库场景需求的如下 DataFlow,数据会从 Kafka①,经 Flink 计算后②,写入 Partial Updated Hudi 表(部分列更新)③,然后与离线 Hive 表或其他实时入湖的 Hudi 表④ 一起,经周期性的 Flink/Spark 批计算⑤ 后写入结果 Hudi 表⑥。
可以看到 Data Flow 中结合了流计算② 与批计算⑤,并有两个部分使用了 Hudi③⑥。实时计算用来加速数据处理,提升全局数据的及时性;批计算会计算复杂指标并更新当前最新的数据,用来确保数据的完整性和准确性;而 Partial Update Hudi 表③ 和 Multi-version Hudi 表⑥ 都能在 Spark 和 Presto 中便捷访问,保证了数据的易用性。
G
有 T1
和 T2
两个 topic,其中,T1
topic 有主键 pk
和字段 col-1
,T2
topic 有主键 pk
和字段 col-2
,那么 Kafka 组 G
逻辑上包括主键 pk
,字段 col-1
和字段 col-2
,即 T1(pk, col-1) + T2(pk, col-2) => G(pk, col-1, col-2)
project
,filter
,map
或自定义 Scalar Function
和 Table Function
。group by
、rank
等任何会使用 Flink State 的操作,因为当 Kafka Topic 组的主键的基数较大时(比如全量商品数),同时处理多个 topic(10+)的数据需要的计算资源极大,而且巨大的 State 会使得作业的稳定性难以保障。PartialUpdateAvroPayload
更新主键行对应的部分列。可以发现 Partial Update Hudi 表实际上完成了将整个 Kafka Topic 组的所有 topic 的数据按照相同的主键 JOIN 成一行完整记录的功能,即多流 JOIN。PartialUpdateAvroPayload
是 Shopee Data Infra 团队开发的 Payload,在社区 OverwriteNonDefaultsWithLatestAvroPayload
的基础上支持了 MOR 表的 Partial Update。下图是实时数据仓库中店铺维表的 DataFlow,仅示意。
分组 Kafka Topics
shop_id
为主键,包括三个 topic。user_id
为主键,包括两个 topic。通用流式 ETL
project
,选出部分字段。NULL
。Partial Update Hudi 表
其他 Hudi 表和 Hive 表
周期性批处理
ROW_NUMBER() OVER (PARTITION BY is_sbs ORDER BY item_count DESC) AS item_cnt_rank
。IF(is_sbs=1 and uea1 > x, 1, 0) AS is_uea1_sbs_shop
。Multi-version Hudi 表
UNION ALL
之后写入,非该 topic 生成的字段设置为 NULL
。如果用一个 Flink 作业消费一个 topic,当新增一个字段时,消费该 Topic 组的所有 Flink 作业都需要添加字段,然后重启,带来了不必要的额外维护成本。INSERT OVERWRITE
方式生成最新的文件 Snapshot 版本。后续如果批处理可以优化成增量处理 INSERT INTO
的方式时,会采用 MOR 表。project
、filter
、map
或自定义 Scalar Function
和 Table Function
,降低了第五步批计算的复杂度。ci
,Checkpoint 间隔时长为 di
,则 Partial Update Hudi 表的时延为 ci ~ ci + di + ci
,其中在 Checkpoint 过程中消费的数据,需要到下一次 Checkpoint 结束才可读,故最大时延为 ci + di + ci
。定义所有的 Kafka Topic 组中,时延最大的 Partial Update Hudi 表时延为 c ~ c + d + c
。b
,执行时间为 e
,则批处理的时延为 e ~ b + e
,同样在一次批处理过程中数据源更新的数据,需要到下一次执行结束才可读,故最大时延为 b + e
。c + e ~ (c + d + c) + (b + e)
。Partial Update Hudi 表,需要先通过批处理写入历史数据,然后再实时处理在 Kafka 中的增量数据。
批处理写入历史数据时使用 Bulk Insert 的方式写入,设置 hoodie.sql.bulk.insert.enable=true
开启 Bulk Insert,设置 hoodie.bulkinsert.shuffle.parallelism
可以控制写入的并行度,每个分区产生的文件数也和这个参数有关,当设置的过大时会产生小文件的问题。
在此基础上设置 Clustering 相关参数可以完成小文件的合并,设置 hoodie.clustering.inline=true
开启 Clustering,设置 hoodie.clustering.inline.max.commits=1
可以在 Bulk Insert 之后立即执行 Clustering 操作。hoodie.clustering.plan.strategy.max.bytes.per.group
,hoodie.clustering.plan.strategy.target.file.max.bytes
和 hoodie.clustering.plan.strategy.small.file.limit
用来控制 Clustering 输出的文件大小和数量。
完成以上的配置后就可以执行 INSERT INTO
脚本,在 INSERT INTO
中需要将 Kafka Topic 组对应的离线或者实时入湖的 Hudi 表,进行简单的计算处理后以 UNION ALL 的形式写入。
这里我们通过构造不同数据类型的 MAP,将全字段的 UNION ALL 转换成几个不同类型 MAP 的 UNION ALL 和从对应的 MAP 中取出字段的方式,这样能极大地提升代码的可读性和可维护性,尤其是当 Kafka Topic 组中的 topic 较多时。
Bulk Insert 和 Clustering 对应两次 commit,其中 Bulk Insert 对应下图中的第一部分,为 deltacommit;而 Clustering 对应的是一次 replacecommit。而且从 commit 的时间可以看出,是先进行了 Bulk Insert,再 Clustering。
Bulk Insert 和 Clustering 都只生成 parquet 文件,其中 Bulk Insert 的文件是第一个版本,文件时间为 11:31,会有大量小文件;而 Clustering 会生成小文件合并之后的文件作为第二个版本,文件时间为 11:32。Bulk Insert 产生的第一个版本的小文件会在之后实时作业中按照数据的保留策略清理。
在 Flink 作业中执行 Indexing 用于构建 Bootstrap 后的文件索引信息并存入 state 中。作业中设置 index.bootstrap.enabled = true
开启 indexing,write.index_bootstrap.tasks
用来指定 indexing 的并行度,write.bucket_assign.tasks
可以指定 bucket_assign 算子的并行度,待第一个 Checkpoint 完成后,可以使用 Savepoint 并退出作业,这就完成了 Flink 作业的 Indexing。
正式运行的 Flink Insert 作业中,需要去掉 index.bootstrap.enabled
参数(默认是 false),来关闭 Indexing,然后从之前 Indexing 最后的 Savepoint 启动即可正常写入数据。
Insert 中比较关键的参数有 compaction.tasks
表示执行 Compaction 的并行度,compaction.delta_commits
用来控制执行 Compaction 的周期,这也决定了 Read Optimized Query 和 RO 表的数据时延。
另外,hoodie.cleaner.commits.retained
,hoodie.keep.min.commits
和 hoodie.keep.max.commits
这三个和 Cleaner 相关的参数用来配置数据的版本淘汰策略,用户的查询时长如果超过 hoodie.keep.min.commits
的时长之后,可能会失败。
在之前已经介绍过周期性批作业的时延情况,应尽量减少每次批处理执行的时间,也需要尽量运用各种批处理的优化策略,这样才可能缩短调度周期,降低端到端的时延。
Multi-version Hudi 表是一个 COW 表,需设置恰当的 hoodie.cleaner.commits.retained
值,来确保支持的最长用户查询耗时,在该时长内的查询能确保数据文件未被清理,设置得太大会有较大的存储成本压力;设置得太小,可能会因为查询文件被清理,而导致用户的查询失败。
用户维表目前只需要第 1-3 步来生成 Partial Update Hudi 表,是一个纯粹的实时 Flink 作业,只有一个 Kafka Topic 组被 Flink 作业消费,主键为 user_id。Flink 作业的 Checkpoint 周期为 1 分钟,Checkpoint 的间隔为 1 分钟,Checkpoint 耗时约 5 秒。用户维表的端到端时延约为 2 分钟,设置 hoodie.cleaner.commits.retained=50
,支持用户查询的时长约 2*50=100 分钟。
相比于小时数据,在资源消耗上降低了约 40%,端到端时延从约 90 分钟降低到近 2 分钟。
简介:店铺维表需要包括流计算和批计算的所有步骤,一个 Flink 作业消费一个 Kafka Topic 组,主键为 shop_id,并将结果数据写入 Partial Update Hudi 表,Flink 作业的 Checkpoint 周期为 1 分钟,Checkpoint 的间隔为 1 分钟,Checkpoint 耗时约 10 秒,Partial Update Hudi 表的时延约 2 分钟;周期性批处理的调度周期为 15 分钟,每次执行时长约 10 分钟,Multi-version Hudi 表的端到端时延约 27 分钟,设置 hoodie.cleaner.commits.retained=5
,支持用户查询的时长约 (5+1)*15=90 分钟。
因为店铺维表之前没有小时数据,先将天任务的资源消耗换算到小时任务后对比发现,新方案在资源消耗上降低了 54%,端到端的时延从约 90 分钟降低到了 30 分钟。
本文介绍了基于 Flink + Hudi 的实时数据仓库解决方案,一方面通过实时计算来加速计算,另一方面通过与批处理技术的结合来确保数据的最终一致性。并通过提供分级的结果表来满足不同场景的及时性要求,实时计算产出的 Partial Update Hudi 表提供部分核心实时数据,批处理产出的 Multi-version Hudi 表提供完整且更准确的数据。总的来说,该方案在易用性、完整性、准确性和及时性这四个方面都符合预期,同时降低了计算资源的消耗。
后面我们会继续在如下几个方面进行尝试和探索:
本文作者
Wanglong,数据研发工程师,来自 Shopee Marketplace Data Engineering 团队。持续深耕大数据领域 7+ 年,专注于数据仓库建模、实时离线数仓架构、湖仓一体架构等技术。
技术编辑
Li,来自 Data Infra 团队,Shopee 技术委员会 Data 通道委员。
加入我们
Shopee Marketplace Data Engineering 团队,负责电商核心的基础数据和数据公共层的建设。致力于通过数据建模和大数据处理技术的应用,搭建稳定、易用且高效的企业级核心数据仓库。我们建设及管理的数据已经覆盖了交易、用户、商品、促销等电商核心领域,更多的业务域也在不断丰富和完善中。我们正在持续优化和迭代我们的数据体系,不断尝试和探索新的数据技术及解决方案,为数据人员和业务赋能的同时,持续地降本增效。
目前团队大量数据研发岗位持续招聘中,感兴趣的同学可将简历发送至 audrey.wenjie@shopee.com(亦可进行咨询,注明来自 Shopee 技术博客)。