美团批流⼀体增量数仓建设的探索与实践

如果无法正常显示,请先停止浏览器的去广告插件。
分享至:
1. 美团批流一体增量数仓 n lo a S h c 建设的探索与实践 e T T M 美团数据科学与平台部
2. n lo a S 美团批流一体增量数仓建设的探索与实践 h c e T T M 数据科学与平台部 - 数据平台中心 - 汤楚熙
3. 个人简介 a S 数据系统开发专家 h c e T T M n lo 负责美团数据平台Flink引擎、平台工具等相关研发工作,对实 时、离线数仓建设有较丰富经验,曾参与并搭建了美团基于 FlinkSQL的数仓开发平台。目前主要工作精力集中在基于Flink 的增量数仓的建设上,并持续推进流批一体在公司的落地,提 升数据应用效能,助力业务发展。
4. 目录 contents h c e T T M a S n lo
5. 增量数仓发展历程 Table 数仓模型抽象统一 2019~2020 提出实时数仓模型抽 象,实时数仓模型标 准落地。 SQL 编程接口统一 h c e T T M Data Lake n lo a S 存储引擎统一 流批一体 计算语义统一 执行层统一 2020~2021 2021~2022 2022~至今 暨SparkSQL成为离 线数仓主流编程接口 之后,FlinkSQL也顺 利在公司实时数仓场 景中大范围落地。 数据湖成为新一代湖 仓一体数据架构的主 要组件。 基于Flink+数据湖构 建流批一体增量数据 仓库。
6. 典型数仓应用场景 M端 B端 面向业务管理者、物流一 线作业管理人员、商分、 业务产品、运营、⻛控、 业务系统RD、算法策略 RD等⻆色,提供实时数 据用于监控与分析。大部 分应用场景主要是靠实时 数据的时效性来优化业务 流程,起到降低业务⻛ 险、运营成本、提升时效 的作用。 C端 面向商家(供应商、商户 ⻔店、仓库管理人员), 提供实时数据用于监控与 分析。起到降低业务⻛ 险、运营成本、提升时效 的作用。 h c e T T M n lo a S 面向美团APP专区推荐、 猜喜、FEED、美团优选 专区、站外投放、周末大 促、促销选品等场景下提 供实时的数据服务,为提 升每日拉新、商品曝光、 用户访问下单转化率等业 务目标提供数据支撑。 D端 面向骑手端,数据即时性 要求高、算法决策重等业 务特点,在内部策略、⻛ 控、运营和IoT等业务方 向上都需要有大量的实时 数据作为支撑。 对比项 M端 B端 C端 D端 数据一致性要求 强 强 一般 强 时效性 分钟级~天级 秒级~天级 秒级-天级 秒级 数据量/天 百GB级别 百GB级别 百TB级别 百GB级别 QPS/RPS 10≤ 100≤ 高峰≈100000 高峰≈10000
7. 经典数仓架构问题回顾 Lambda架构 实时数仓 1)基于Log来计算,数据 时效性好。 2)正确性保证困难,数据 源延迟、乱序难以根除。 3)数据回溯效率差,需要 回放全量日志。 4)中间结果难以查询,问 h c e T T M n lo a S 离线数仓 1)基于快照来计算,大部分 数仓表都是某个时间点的数据 快照,数据时效性差。 2)时间跨度⻓,部分指标计 算依赖年级别数据。 3)基于批模式计算,有状态 计算都需要先排序。 4)数据冗余度高。 题排查困难。 两套代码,迭代效率低、维护成本高、数据质量差。
8. 离线数仓如何保证数据一致性 美团大部分场景数据来自于MySQL、Blade(NewSQL),这类数据源的特 点是UPDATE事件占比高,意味数仓链路上有大量的Merge操作。 UPDATE INSERT + UPDATE + DELETE ≈ 60% n lo 离线数仓为保证数据的一致性,依赖全局排序再合并,但牺牲了时效性、更细时间粒度快照的还原能力。 Binlog EventType PK Status INSERT A 1 INSERT B 1 UPDATE A INSERT UPDATE h c e T T M Ts a S Row_Number() OVER(Partition by PK order by Event_Time DESC) RN ODS EventType PK Status Ts RN INSERT C 1 1002 1 1001 UPDATE B 1,3 1002 1 1, 2 1002 UPDATE A 1,2 1002 1 C 1 1002 INSERT B 1 1001 1 B 1, 3 1002 INSERT A 1 1000 1 1000 WHERE RN = 1
9. 离线数仓架构的矛盾 问题:为了保证一致性,牺牲了时效性、模型灵活性 1)结果计算需要Scan全表(分区)、排序,耗时⻓。 2)无法查看更细粒度的快照数据。 快照的默认时间语义是:T日0点,下游所有表都要继承这个语义。 ODS DW 业务系统 h c e T T M ChangeLog a S 事务事实快照表 Spark/MR 累计事实快照表 增量表(分区表) 在指定时间完成Merge Snapshot-1(T-1, T) n lo 全量表 在指定时间完成Merge 关联、上卷或下钻 周期事实快照表 快照表(分区表) 调度开始时间:T+1 0:00 Snapshot-1’(T-1, T) Snapshot-1’’(T-1, T) 调度开始时间:T+1 1:00 调度开始时间:T+1 ?:00
10. 实时数仓ExactlyOnce保证成本高 ① 为了保证结果数据的一 致性,链路上Flink任务中 需要进行幂等处理,有额 外的计算资源开销。 ODS 幂等处理 X ③ ⻓窗口多流关联和事件 跨期更新依赖Flink状态成 本高,状态过期时间设置 不容易精确把控。 DWD h c e T T M a S n lo DWS Task Fail ② 已追加到下游Kafka的 数据无法回滚, 对未做幂等处理的下游消 费者会直接造成影响。 ④ 上游Kafka扩容,为了 保顺消费,需要增加额外 排序算子,也存在额外的 开销。 APP
11. 实时数据回溯低效 Flink+Kafka的架构也算作一种广义上的增量计算模式,但最大的问题是查询场 景无法直接快照得出结果,每次需要重新执行Merge操作,效率差。 业务模型调整统计口径后,进行数据回溯,需要回放完整ChangeLog日志,效率低。 T0时刻 n lo select count(distinct key) From DWD where ts >= ‘T0’ DWD h c e T T M a, a, b, b, b, c, 1 2 1 2 3 1 d, e, 1 1 a S a, b, c, d, e, 2 3 1 1 1 DWS 5 4 3 2 1 T1时刻,调整口径, 重启任务 从需求上讲,业务只要基于T1时刻的Snapshot + T1时刻后的增量ChangeLog即可快速完成数据回溯。
12. 问题与需求总结 一致性 核心矛盾:保证数据一致性的前提下,如何权 衡 “端到端时延” 和 “计算资源” 。 对比项 子项 离线数仓 数据一致性 一致性保证成本 低 时效性 扩展性 易用性 依赖能力 h c e T T M 时延 n lo a S 资源 实时数仓 增量数仓? 高 低 好(秒级) 好(秒级~分钟级) 就绪时间 差(小时级延迟) 容量 好 好 好 回溯成本 高(批量重刷) 高(流式回刷、double资源、断流) 低(upsert、从源头直接更新) 问题排查难度 易(中间结果可查) 难(中间结果不可查) 易(中间结果可查) 1)对⻬离线数仓的数据一致性要求:所以需要依赖ChangeLog、事务、数据错算可端到端回滚。 2)低延时保证:Upsert+主键满足低延时的Merge、流式增量数据更新能力。 3)高效的数据组织模型:提供同时支持流批读写,Snapshot两种格式的、可控的保存时⻓的能力;支持多版本快照;中间 结果可查询;
13. 目录 contents h c e T T M a S n lo
14. 增量数仓的架构设想 增量数仓的核心设计,是将离线数仓经典模式下的天级别快照,拆分成分钟、秒级别的小快照,生产基于流计算模 型,历史数据回溯可结合cost来推断执行模式,存储可自定义ChangeLog保留时⻓、Snapshot间隔可配置,并且能 够具有点查接口,实现真正意义的exactly-once低延迟数仓生产模型。 h c e T T M a S n lo
15. 兼容离线+实时数仓模型 数仓中的数据有两种形态:Log、Snapshot 分别对应 Stream、Table。 完整的Log能保证还原出任意时刻的Snapshot, 完整的Stream能还原成任意时刻的Table的Snapshot。 Table Stream h c e T T M Snapshot a S Log n lo Table Snapshot Trade-off:需要还原成什么时间点的Snapshot vs 需要保留多久的Log。
16. 计算引擎选型 我们需要计算引擎支持流、批两种执行模式,并且拥有SQL编程接口,处理ChangeLog,高可扩展性等能力。 结论: 选择Flink作为 生产引擎。 h c e T T M ✅ ✅ ✅ ✅ ✅ ✅ ✅ ✅ ✅ ❌ n lo a S ✅ ❌ ✅ ✅ ✅ ✅ ✅ ✅ ✅ ✅ ✅ ✅
17. 存储引擎选型 我们需要一套同时满足流读、流写、批读、批写,支持高并发低延迟的点查能力(实时维表),并且可生成ChangeLog的存储引擎。 对比项 读写接口 因为需要ChangeLog、点读 能力,所以Hudi不完全满足 需求。 子项 Apache Iceberg Apache Hive Apache Kafka ✅ ✅ ✅ ✅ ❌ 批读 ✅ ✅ ✅ ✅ ❌ 流写 ✅ ✅ ✅ ❌ ✅ 流读 ✅ ✅ ✅ 点读 ❌ ❌ ❌ Upsert/Delete ❌ ✅ ✅ ❌ ❌ ❌ ✅ ✅ ✅ Write Serialization(多个 Snapshot Isolation(多个 Snapshot Isolation(多个 Writer必须严格串行, Writer写的数据无交集可 Writer写的数据无交集可以并 Reader和Writer可以并行) 以并发执行,Reader和 发执行,Reader和Writer可以 Writer可以并行) 并行) ACID 隔离级别 因为需要依赖主键,所以 Iceberg不符合预期。 Delta Lake 批写 ChangeLog 事务 Apache Hudi 主键 h c e T T M n lo a S ❌ ✅ ❌ ❌ ❌ ❌ ❌ ❌ ✅ ✅ 无 无 ❌ ✅ ❌ ❌ ✅ ❌ ✅ ✅ ✅ ❌ ❌ ✅ ❌ ❌ ✅ ❌ 分区内置索引 规划未开发 ✅ - ❌ Copy on Write ✅ ✅ ✅ ❌ ❌ Merge on Read ❌ ✅ - ❌ ❌ 自动合并小文件 ❌ ✅ ❌ ❌ ❌ 新增列 ✅ (仅支持Spark) - ✅ 删除列 ✅ (仅支持Spark) - ✅ 缺省列处理 ✅ (仅支持Spark) - ✅ 社区现状(数据截 开源时间 2018/11/06 2019/01/17 2019/04/12 至2021-01) Github Star 1k 1.6k 3k 联合主键 Time Travel 查询优化 Schema变更支持 逻辑下推 Contributors 119 128 77 结论: 基于Hudi进行 改造。
18. 数据存储引擎POC方案 - Beluga • 客户端(BelugaClient):面向应 用程序,提供读写接口、事务 接口。 • KV层(Hbase):基于HBase改 造。支持按主键插入、更新和 删除数据;负责生成 ChangeLog(before/after)数 据。 • Storage层(Hudi):基于Hudi改 造。集成成熟读写接口和设 计,支持增量读写和批量读 写。 h c e T T M a S n lo
19. 文件组织形式设计 Hbase的HRegion与Hudi的FileGroup天然适配,很好的整合了异构存储的数据分区模型。 单张Beluga表包含多个Region,每个 Region会映射一个固定的FileGroup, FileGroup之间的RecordKey相互独 立。 Beluga Region Region Region Region-1 Region-2 Region-N StoreFile StoreFile StoreFile n lo Hbase HBase为Hudi增加 了一套外部索引, 可以使数据更新更 加高效。 h c e T T M a S FileGroup-1 LogFlie Hudi 图中的Region并不是指Hbase的 HRegion,这里更多所指的是数 据的逻辑文件分组关系。 … LogFlie FileGroup-2 LogFlie … LogFlie … FileGroup-N LogFlie … + + + BaseFlie BaseFlie BaseFlie LogFlie
20. 文件组织形式设计详解 Region-1 Region-2 Region-N StoreFile StoreFile StoreFile n lo HFile保留全量数据快照,服务点查需 求和ChangeLog生产。 h c e T T M FileGroup-1 LogFile保留增量ChangeLog,服务 流读、流写,数据保留时⻓可配置 LogFlie … + LogFlie a S FileGroup-2 LogFlie HBase … + LogFlie … FileGroup-N LogFlie … LogFlie Hudi + BaseFile全量快照,服务批读、批 写,支持MVCC,数据版本保留时间 可配置。 Hudi BaseFlie BaseFlie BaseFlie
21. 记录格式改造 问题:HUDI原生的ChangeLog格式数据,是Flink回撤流转换成Avro格式数据后再直接落盘,这种实现会有不必要性能开销。 改进后:将HUDI原生ChangeLog(Before/After)格式数据,合并成一条HoodieRecord,-U(Before消息)通过查HBase来获取。 • • • 降低反/序列化开销:Source减少一次反序列化的开销、Sink端减少一次序列化的开销。 n lo 降低I/O的开销:将-U记录的一次写/读操作,转换成一次+U记录的只读操作。 降低存储成本:理论上落盘数据量相对减少,序列化后的LogFile数据排列更紧凑。 Before After +I +I -U +U -D -U/+U h c e T T M a S -D 增加isBAFormat属性 区分ChangeLog(Before/After)表
22. 保证端到端ExactlyOnce(为什么基于Beluga生产ChangeLog) 设计考量2:如果将按Hudi原生的形式直接 下发回撤流,只能保证最终一致性,不能保 证强一致,因为无法保证一次UPDATE事件 的-U/+U消息会在一次事务中同时提交。 FlinkJob 为保证数据一致性(正确的Before值),使用Flink状态保留 Source -U/+U LeftSideState h c e T T M Join RightSideState Source n lo 全部状态快照,资源开销大(尤其是空间局部性强的Key)。 -U/+U +U -U/+U -U/+U Aggr a S -U/+U Sink FlinkJob +U AggrState +U -U/+U 设计考量1:使用Beluga,给业务 Beluga Beluga 提供了一种选择,可以自行决定是 否通过外存来管理状态,还可以将 状态的变更日志跨作业进行传递。 Beluga -U/+U
23. 保证端到端ExactlyOnce(ChangeLog生产流程) Beluga ② 查询并拼接BeforeRow KV-Store(HBase) n lo PK, Status, Ts, Version { A, ① 客户端执行Put(List<Record>) EventType, PK, Status, Ts {INSERT, {UPDATE, {DELETE, D, D, B, 1, 1003} 2, 1004} 1, 1005} 1, 1001, 1} 1, 1002, 1} { C, 1, 1004, 1} { D 1, 1005, 2} h c e T T M ④ commit/rollback() a S { B, {INSERT, {UPDATE, {DELETE, 2)UPDATE:加锁,Get(PK),查出Before值,再 Put数据到KV,施放锁,再异步追加数据到 LogStore。 ③ 异步追加到LogStore EventType, PK, Status, Ts LogFile (Hudi) 1)INSERT:加锁,Put数据到KV,施放锁,再异步 追加数据到LogStore。 D, D, B, ⑥ MOR读 null/1 1003} 1/2, 1003} 1/null, 1003} ⑤ Async Compaction BaseFile(Hudi) 3)DELETE:加锁,Get(PK),查出Before值,删除 记录,施放锁,再异步追加数据到LogStore。
24. 保证端到端ExactlyOnce(事务模型) 整体思路是借助Flink的Checkpoint机制,实现两阶段提交模型,大致流程如下: 1)Prepare阶段:各SinkTask(participant)所在TM节点,收到CheckpointBarriar消息后,会通知JM(Coordinator)可以提交数据了。 2)Commit阶段:JM收到全部TM数据flush完成的消息,开始doCommit,并在Commit成功后,申请新的CommitId,开启下一轮事务。 h c e T T M a S n lo
25. 保证端到端ExactlyOnce(数据写流程) h c e T T M a S n lo
26. 保证端到端ExactlyOnce(数据提交流程) h c e T T M a S n lo
27. 保证端到端ExactlyOnce(数据回滚流程) h c e T T M a S n lo
28. ExactlyOnce机制落地 - DWD生产 DWD ODS Snapshot’’’ PK Binlog EventType, PK, Status, Ts {INSERT, D, {UPDATE, D, {DELETE, B, A Status B 1 1/2, 1004} C 1 1, 1005} Ts ChangeLog 1000 2 1, 1003} Snapshot’’ 1001 1 2 … … {INSERT, D, 1003 1004 {UPDATE, D, {DELETE, B, a S 1, 1003} h c e T T M 1002 D EventType, PK, Status, Ts 1/2, 1004} 1, … n lo 1005} PK Status Ts A 2 1000 B 1 1001 ChangeLog C 1 1002 D 1 2 1003 1004 … … … EventType, PK, Status, Ts {INSERT, {UPDATE, 局部Timeline Txn-1 Txn-2 Txn-1 Txn-3 全局Timeline T1 T2 T3 Txn-2 D, D, 1, 1003} 1/2, 1004}
29. ExactlyOnce机制落地 - DWS生产 DWS DWD Snapshot’’’ PK A Status Ts ChangeLog 1000 2 B 1 1001 C 1 1002 D 1 2 1003 1004 … … … EventType, PK, Status, Ts {INSERT, D, select status, count(PK) cnt, current_ts() ts from t1 group by status 1, 1003} h c e T T M {UPDATE, D, {DELETE, B, 1/2, 1004} 1, 1005} Snapshot’’’ a S n lo status cnt ts 1 2 3 2 1 1000 1007 1008 1008 1 1001 2 1004 … … 2 … ChangeLog EventType, Status, cnt, Ts {UPDATE, 1, 2/3, 1003} {UPDATE, 1, 3/2, 1004} {UPDATE, 2, 1/2, 1004} {UPDATE, 1, 2/1, 1004} 局部Timeline Txn-1 Txn-2 Txn-1 Txn-3 全局Timeline T1 T2 T3 Txn-2
30. 目录 contents h c e T T M a S n lo
31. 离线特征增量化生产 建设背景 21年以前,外卖推搜算法团队所使用的特征、样本数据就绪时间经常无法保证,导致模型经常无法在T+1前完成训练和上线,影响线上转化效果。而且业务通过进一步 优化离线Spark任务的性能,已经无法显著的提升数据就绪时间。因此,业务希望借助增量数仓,提升特征数据就绪时间,进而来提升算法的转化效果。 初步效果 1)离线特征所依赖的流量数 据,就绪时间能稳定在早高 峰前完成。 2)算法团队的模型训练可在 T+1完成,模型rpm预计可提 升2%。 h c e T T M a S n lo
32. 批流一体增量数仓 建设背景 作为优选基础数据团队,是整个优选最上游的数据供给方,数据模型下游依赖方较多,新增、修改数仓模型需要修改离线、实时两套代码,开发速度跟不上业务需求 的增⻓速度,希望通过增量生产模式来统一两套模型,目前业务预计切换到增量生产架构,会提升至少30%的开发效率。 一套口径,两套代码 h c e T T M n lo a S 两套代码、两套数据验证流程、两套监控,给用户解释不一致问题的时间成本高。
33. 批流一体增量数仓 解决方案:增量数仓。 一套代码,开发工作量减半。一套任务,运维工作量减半。一套口径,数据解释成本减半。 权衡点1:业务高时效性的要求 场景并不多,更多是因为底层 技术架构的原因,不得已才做 到了高时效性,伴随而来的是 数据开发和治理的成本问题。 h c e T T M 权衡点2:因为Hive不支持 Upsert业务不得已才将数据生 产模式转换为全量快照的计 算,但是换来的是数据新鲜性 的下降和固定使用天级别的快 照带来的需求响应灵活性下 降。 a S n lo
34. 目录 contents h c e T T M a S n lo
35. 未来规划 Beluga 1)Partial-Update Flink a S 1)秒级事务提交能力 h c e T T M n lo 2)Beluga支持点查询能力 2)离线表平滑迁移工具 3)Standalone Meta-Server 3)Multi-Sink性能问题优化 4)高效的并发控制 平台化 1)统一ODS入仓 2)批流一体数仓开发平台
36. a S n lo Q&A h c e T T M
37. h c e T T M 更多技术干货 欢迎关注“美团技术团队” n lo 招聘:XXX岗位 邮箱:XXX@meituan.com a S

Accueil - Wiki
Copyright © 2011-2024 iteam. Current version is 2.137.1. UTC+08:00, 2024-11-16 04:37
浙ICP备14020137号-1 $Carte des visiteurs$