美团批流⼀体增量数仓建设的探索与实践
如果无法正常显示,请先停止浏览器的去广告插件。
1. 美团批流一体增量数仓
n
lo
a
S
h
c
建设的探索与实践
e
T
T
M
美团数据科学与平台部
2. n
lo
a
S
美团批流一体增量数仓建设的探索与实践
h
c
e
T
T
M
数据科学与平台部 - 数据平台中心 - 汤楚熙
3. 个人简介
a
S
数据系统开发专家
h
c
e
T
T
M
n
lo
负责美团数据平台Flink引擎、平台工具等相关研发工作,对实
时、离线数仓建设有较丰富经验,曾参与并搭建了美团基于
FlinkSQL的数仓开发平台。目前主要工作精力集中在基于Flink
的增量数仓的建设上,并持续推进流批一体在公司的落地,提
升数据应用效能,助力业务发展。
4. 目录
contents
h
c
e
T
T
M
a
S
n
lo
5. 增量数仓发展历程
Table
数仓模型抽象统一
2019~2020
提出实时数仓模型抽
象,实时数仓模型标
准落地。
SQL
编程接口统一
h
c
e
T
T
M
Data Lake
n
lo
a
S
存储引擎统一
流批一体
计算语义统一
执行层统一
2020~2021 2021~2022 2022~至今
暨SparkSQL成为离
线数仓主流编程接口
之后,FlinkSQL也顺
利在公司实时数仓场
景中大范围落地。 数据湖成为新一代湖
仓一体数据架构的主
要组件。 基于Flink+数据湖构
建流批一体增量数据
仓库。
6. 典型数仓应用场景
M端
B端
面向业务管理者、物流一
线作业管理人员、商分、
业务产品、运营、⻛控、
业务系统RD、算法策略
RD等⻆色,提供实时数
据用于监控与分析。大部
分应用场景主要是靠实时
数据的时效性来优化业务
流程,起到降低业务⻛
险、运营成本、提升时效
的作用。
C端
面向商家(供应商、商户
⻔店、仓库管理人员),
提供实时数据用于监控与
分析。起到降低业务⻛
险、运营成本、提升时效
的作用。
h
c
e
T
T
M
n
lo
a
S
面向美团APP专区推荐、
猜喜、FEED、美团优选
专区、站外投放、周末大
促、促销选品等场景下提
供实时的数据服务,为提
升每日拉新、商品曝光、
用户访问下单转化率等业
务目标提供数据支撑。
D端
面向骑手端,数据即时性
要求高、算法决策重等业
务特点,在内部策略、⻛
控、运营和IoT等业务方
向上都需要有大量的实时
数据作为支撑。
对比项 M端 B端 C端 D端
数据一致性要求 强 强 一般 强
时效性 分钟级~天级 秒级~天级 秒级-天级 秒级
数据量/天 百GB级别 百GB级别 百TB级别 百GB级别
QPS/RPS 10≤ 100≤ 高峰≈100000 高峰≈10000
7. 经典数仓架构问题回顾
Lambda架构
实时数仓
1)基于Log来计算,数据
时效性好。
2)正确性保证困难,数据
源延迟、乱序难以根除。
3)数据回溯效率差,需要
回放全量日志。
4)中间结果难以查询,问
h
c
e
T
T
M
n
lo
a
S
离线数仓
1)基于快照来计算,大部分
数仓表都是某个时间点的数据
快照,数据时效性差。
2)时间跨度⻓,部分指标计
算依赖年级别数据。
3)基于批模式计算,有状态
计算都需要先排序。
4)数据冗余度高。
题排查困难。
两套代码,迭代效率低、维护成本高、数据质量差。
8. 离线数仓如何保证数据一致性
美团大部分场景数据来自于MySQL、Blade(NewSQL),这类数据源的特
点是UPDATE事件占比高,意味数仓链路上有大量的Merge操作。
UPDATE
INSERT + UPDATE + DELETE
≈ 60%
n
lo
离线数仓为保证数据的一致性,依赖全局排序再合并,但牺牲了时效性、更细时间粒度快照的还原能力。
Binlog
EventType PK Status
INSERT A 1
INSERT B 1
UPDATE A INSERT UPDATE
h
c
e
T
T
M
Ts
a
S
Row_Number()
OVER(Partition by PK
order by Event_Time
DESC) RN
ODS
EventType PK Status Ts RN
INSERT C 1 1002 1
1001 UPDATE B 1,3 1002 1
1, 2 1002 UPDATE A 1,2 1002 1
C 1 1002 INSERT B 1 1001 1
B 1, 3 1002 INSERT A 1 1000 1
1000
WHERE RN = 1
9. 离线数仓架构的矛盾
问题:为了保证一致性,牺牲了时效性、模型灵活性
1)结果计算需要Scan全表(分区)、排序,耗时⻓。
2)无法查看更细粒度的快照数据。
快照的默认时间语义是:T日0点,下游所有表都要继承这个语义。
ODS
DW
业务系统
h
c
e
T
T
M
ChangeLog
a
S
事务事实快照表
Spark/MR
累计事实快照表
增量表(分区表)
在指定时间完成Merge
Snapshot-1(T-1, T)
n
lo
全量表
在指定时间完成Merge
关联、上卷或下钻
周期事实快照表
快照表(分区表)
调度开始时间:T+1 0:00
Snapshot-1’(T-1, T) Snapshot-1’’(T-1, T)
调度开始时间:T+1 1:00 调度开始时间:T+1 ?:00
10. 实时数仓ExactlyOnce保证成本高
① 为了保证结果数据的一
致性,链路上Flink任务中
需要进行幂等处理,有额
外的计算资源开销。
ODS
幂等处理
X
③ ⻓窗口多流关联和事件
跨期更新依赖Flink状态成
本高,状态过期时间设置
不容易精确把控。
DWD
h
c
e
T
T
M
a
S
n
lo
DWS
Task Fail
② 已追加到下游Kafka的
数据无法回滚,
对未做幂等处理的下游消
费者会直接造成影响。
④ 上游Kafka扩容,为了
保顺消费,需要增加额外
排序算子,也存在额外的
开销。
APP
11. 实时数据回溯低效
Flink+Kafka的架构也算作一种广义上的增量计算模式,但最大的问题是查询场
景无法直接快照得出结果,每次需要重新执行Merge操作,效率差。
业务模型调整统计口径后,进行数据回溯,需要回放完整ChangeLog日志,效率低。
T0时刻
n
lo
select count(distinct key)
From DWD
where ts >= ‘T0’
DWD
h
c
e
T
T
M
a, a, b, b, b, c,
1 2 1 2 3 1
d, e,
1 1
a
S
a, b, c, d, e,
2 3 1 1 1
DWS
5
4
3
2
1
T1时刻,调整口径,
重启任务
从需求上讲,业务只要基于T1时刻的Snapshot + T1时刻后的增量ChangeLog即可快速完成数据回溯。
12. 问题与需求总结
一致性
核心矛盾:保证数据一致性的前提下,如何权
衡 “端到端时延” 和 “计算资源” 。
对比项 子项 离线数仓
数据一致性 一致性保证成本 低
时效性
扩展性
易用性
依赖能力
h
c
e
T
T
M
时延
n
lo
a
S
资源
实时数仓 增量数仓?
高 低
好(秒级) 好(秒级~分钟级)
就绪时间 差(小时级延迟) 容量 好 好 好
回溯成本 高(批量重刷) 高(流式回刷、double资源、断流) 低(upsert、从源头直接更新)
问题排查难度 易(中间结果可查) 难(中间结果不可查) 易(中间结果可查)
1)对⻬离线数仓的数据一致性要求:所以需要依赖ChangeLog、事务、数据错算可端到端回滚。
2)低延时保证:Upsert+主键满足低延时的Merge、流式增量数据更新能力。
3)高效的数据组织模型:提供同时支持流批读写,Snapshot两种格式的、可控的保存时⻓的能力;支持多版本快照;中间
结果可查询;
13. 目录
contents
h
c
e
T
T
M
a
S
n
lo
14. 增量数仓的架构设想
增量数仓的核心设计,是将离线数仓经典模式下的天级别快照,拆分成分钟、秒级别的小快照,生产基于流计算模
型,历史数据回溯可结合cost来推断执行模式,存储可自定义ChangeLog保留时⻓、Snapshot间隔可配置,并且能
够具有点查接口,实现真正意义的exactly-once低延迟数仓生产模型。
h
c
e
T
T
M
a
S
n
lo
15. 兼容离线+实时数仓模型
数仓中的数据有两种形态:Log、Snapshot 分别对应 Stream、Table。
完整的Log能保证还原出任意时刻的Snapshot, 完整的Stream能还原成任意时刻的Table的Snapshot。
Table
Stream
h
c
e
T
T
M
Snapshot
a
S
Log
n
lo
Table
Snapshot
Trade-off:需要还原成什么时间点的Snapshot vs 需要保留多久的Log。
16. 计算引擎选型
我们需要计算引擎支持流、批两种执行模式,并且拥有SQL编程接口,处理ChangeLog,高可扩展性等能力。
结论:
选择Flink作为
生产引擎。
h
c
e
T
T
M
✅ ✅
✅ ✅
✅ ✅
✅ ✅
✅ ❌
n
lo
a
S
✅
❌
✅ ✅
✅ ✅
✅ ✅
✅
✅
✅
✅
17. 存储引擎选型
我们需要一套同时满足流读、流写、批读、批写,支持高并发低延迟的点查能力(实时维表),并且可生成ChangeLog的存储引擎。
对比项
读写接口
因为需要ChangeLog、点读
能力,所以Hudi不完全满足
需求。
子项
Apache Iceberg
Apache Hive
Apache Kafka
✅ ✅ ✅ ✅ ❌
批读 ✅ ✅ ✅ ✅ ❌
流写 ✅ ✅ ✅ ❌ ✅
流读 ✅ ✅ ✅ 点读 ❌ ❌ ❌ Upsert/Delete ❌ ✅ ✅ ❌ ❌ ❌ ✅ ✅ ✅ Write Serialization(多个 Snapshot Isolation(多个 Snapshot Isolation(多个 Writer必须严格串行, Writer写的数据无交集可 Writer写的数据无交集可以并 Reader和Writer可以并行) 以并发执行,Reader和 发执行,Reader和Writer可以 Writer可以并行) 并行)
ACID
隔离级别
因为需要依赖主键,所以
Iceberg不符合预期。
Delta Lake
批写
ChangeLog
事务
Apache Hudi
主键
h
c
e
T
T
M
n
lo
a
S
❌ ✅
❌ ❌
❌ ❌
❌ ❌
✅ ✅
无 无
❌ ✅ ❌
❌ ✅ ❌
✅ ✅ ✅ ❌ ❌
✅ ❌ ❌ ✅ ❌
分区内置索引 规划未开发 ✅ - ❌ Copy on Write ✅ ✅ ✅ ❌ ❌
Merge on Read ❌ ✅ - ❌ ❌
自动合并小文件 ❌ ✅ ❌ ❌ ❌
新增列 ✅ (仅支持Spark) - ✅ 删除列 ✅ (仅支持Spark) - ✅ 缺省列处理 ✅ (仅支持Spark) - ✅ 社区现状(数据截 开源时间 2018/11/06 2019/01/17 2019/04/12 至2021-01) Github Star 1k 1.6k 3k
联合主键
Time Travel
查询优化
Schema变更支持
逻辑下推
Contributors
119
128
77
结论:
基于Hudi进行
改造。
18. 数据存储引擎POC方案 - Beluga
• 客户端(BelugaClient):面向应
用程序,提供读写接口、事务
接口。
• KV层(Hbase):基于HBase改
造。支持按主键插入、更新和
删除数据;负责生成
ChangeLog(before/after)数
据。
• Storage层(Hudi):基于Hudi改
造。集成成熟读写接口和设
计,支持增量读写和批量读
写。
h
c
e
T
T
M
a
S
n
lo
19. 文件组织形式设计
Hbase的HRegion与Hudi的FileGroup天然适配,很好的整合了异构存储的数据分区模型。
单张Beluga表包含多个Region,每个
Region会映射一个固定的FileGroup,
FileGroup之间的RecordKey相互独
立。
Beluga
Region
Region
Region
Region-1 Region-2 Region-N
StoreFile StoreFile StoreFile
n
lo
Hbase
HBase为Hudi增加
了一套外部索引,
可以使数据更新更
加高效。
h
c
e
T
T
M
a
S
FileGroup-1
LogFlie
Hudi
图中的Region并不是指Hbase的
HRegion,这里更多所指的是数
据的逻辑文件分组关系。
…
LogFlie
FileGroup-2
LogFlie
…
LogFlie
…
FileGroup-N
LogFlie
…
+ + +
BaseFlie BaseFlie BaseFlie
LogFlie
20. 文件组织形式设计详解
Region-1 Region-2 Region-N
StoreFile StoreFile StoreFile
n
lo
HFile保留全量数据快照,服务点查需
求和ChangeLog生产。
h
c
e
T
T
M
FileGroup-1
LogFile保留增量ChangeLog,服务
流读、流写,数据保留时⻓可配置
LogFlie
…
+
LogFlie
a
S
FileGroup-2
LogFlie
HBase
…
+
LogFlie
…
FileGroup-N
LogFlie
…
LogFlie
Hudi
+
BaseFile全量快照,服务批读、批
写,支持MVCC,数据版本保留时间
可配置。
Hudi
BaseFlie
BaseFlie
BaseFlie
21. 记录格式改造
问题:HUDI原生的ChangeLog格式数据,是Flink回撤流转换成Avro格式数据后再直接落盘,这种实现会有不必要性能开销。
改进后:将HUDI原生ChangeLog(Before/After)格式数据,合并成一条HoodieRecord,-U(Before消息)通过查HBase来获取。
•
•
•
降低反/序列化开销:Source减少一次反序列化的开销、Sink端减少一次序列化的开销。
n
lo
降低I/O的开销:将-U记录的一次写/读操作,转换成一次+U记录的只读操作。
降低存储成本:理论上落盘数据量相对减少,序列化后的LogFile数据排列更紧凑。
Before After
+I +I
-U
+U
-D
-U/+U
h
c
e
T
T
M
a
S
-D
增加isBAFormat属性
区分ChangeLog(Before/After)表
22. 保证端到端ExactlyOnce(为什么基于Beluga生产ChangeLog)
设计考量2:如果将按Hudi原生的形式直接
下发回撤流,只能保证最终一致性,不能保
证强一致,因为无法保证一次UPDATE事件
的-U/+U消息会在一次事务中同时提交。
FlinkJob
为保证数据一致性(正确的Before值),使用Flink状态保留
Source
-U/+U
LeftSideState
h
c
e
T
T
M
Join
RightSideState
Source
n
lo
全部状态快照,资源开销大(尤其是空间局部性强的Key)。
-U/+U
+U
-U/+U
-U/+U
Aggr
a
S
-U/+U
Sink
FlinkJob
+U
AggrState
+U
-U/+U
设计考量1:使用Beluga,给业务
Beluga
Beluga
提供了一种选择,可以自行决定是
否通过外存来管理状态,还可以将
状态的变更日志跨作业进行传递。
Beluga
-U/+U
23. 保证端到端ExactlyOnce(ChangeLog生产流程)
Beluga
② 查询并拼接BeforeRow
KV-Store(HBase)
n
lo
PK, Status, Ts, Version
{ A,
① 客户端执行Put(List<Record>)
EventType, PK, Status, Ts
{INSERT,
{UPDATE,
{DELETE,
D,
D,
B,
1, 1003}
2, 1004}
1,
1005}
1, 1001, 1}
1, 1002, 1}
{ C, 1, 1004, 1}
{ D 1, 1005, 2}
h
c
e
T
T
M
④ commit/rollback()
a
S
{ B,
{INSERT,
{UPDATE,
{DELETE,
2)UPDATE:加锁,Get(PK),查出Before值,再
Put数据到KV,施放锁,再异步追加数据到
LogStore。
③ 异步追加到LogStore
EventType, PK, Status, Ts
LogFile
(Hudi)
1)INSERT:加锁,Put数据到KV,施放锁,再异步
追加数据到LogStore。
D,
D,
B,
⑥ MOR读
null/1 1003}
1/2,
1003}
1/null, 1003}
⑤ Async Compaction
BaseFile(Hudi)
3)DELETE:加锁,Get(PK),查出Before值,删除
记录,施放锁,再异步追加数据到LogStore。
24. 保证端到端ExactlyOnce(事务模型)
整体思路是借助Flink的Checkpoint机制,实现两阶段提交模型,大致流程如下:
1)Prepare阶段:各SinkTask(participant)所在TM节点,收到CheckpointBarriar消息后,会通知JM(Coordinator)可以提交数据了。
2)Commit阶段:JM收到全部TM数据flush完成的消息,开始doCommit,并在Commit成功后,申请新的CommitId,开启下一轮事务。
h
c
e
T
T
M
a
S
n
lo
25. 保证端到端ExactlyOnce(数据写流程)
h
c
e
T
T
M
a
S
n
lo
26. 保证端到端ExactlyOnce(数据提交流程)
h
c
e
T
T
M
a
S
n
lo
27. 保证端到端ExactlyOnce(数据回滚流程)
h
c
e
T
T
M
a
S
n
lo
28. ExactlyOnce机制落地 - DWD生产
DWD
ODS
Snapshot’’’
PK
Binlog
EventType, PK, Status, Ts
{INSERT,
D,
{UPDATE, D,
{DELETE, B,
A
Status
B 1
1/2, 1004} C 1
1,
1005}
Ts
ChangeLog
1000
2
1, 1003}
Snapshot’’
1001
1
2
… …
{INSERT,
D,
1003
1004
{UPDATE, D,
{DELETE, B,
a
S
1, 1003}
h
c
e
T
T
M
1002
D
EventType, PK, Status, Ts
1/2, 1004}
1,
…
n
lo
1005}
PK Status Ts
A 2 1000
B 1 1001
ChangeLog
C 1 1002
D 1
2 1003
1004
… … …
EventType, PK, Status, Ts
{INSERT,
{UPDATE,
局部Timeline
Txn-1
Txn-2
Txn-1
Txn-3
全局Timeline
T1
T2
T3
Txn-2
D,
D,
1, 1003}
1/2, 1004}
29. ExactlyOnce机制落地 - DWS生产
DWS
DWD
Snapshot’’’
PK
A
Status
Ts
ChangeLog
1000
2
B 1 1001
C 1 1002
D 1
2 1003
1004
… … …
EventType, PK, Status, Ts
{INSERT,
D,
select
status,
count(PK) cnt,
current_ts() ts
from t1
group by status
1, 1003}
h
c
e
T
T
M
{UPDATE, D,
{DELETE, B,
1/2, 1004}
1,
1005}
Snapshot’’’
a
S
n
lo
status cnt ts
1 2
3
2
1 1000
1007
1008
1008
1 1001
2 1004
… …
2
…
ChangeLog
EventType, Status, cnt, Ts
{UPDATE,
1, 2/3, 1003}
{UPDATE, 1, 3/2, 1004}
{UPDATE, 2, 1/2, 1004}
{UPDATE, 1, 2/1, 1004}
局部Timeline
Txn-1
Txn-2
Txn-1
Txn-3
全局Timeline
T1
T2
T3
Txn-2
30. 目录
contents
h
c
e
T
T
M
a
S
n
lo
31. 离线特征增量化生产
建设背景
21年以前,外卖推搜算法团队所使用的特征、样本数据就绪时间经常无法保证,导致模型经常无法在T+1前完成训练和上线,影响线上转化效果。而且业务通过进一步
优化离线Spark任务的性能,已经无法显著的提升数据就绪时间。因此,业务希望借助增量数仓,提升特征数据就绪时间,进而来提升算法的转化效果。
初步效果
1)离线特征所依赖的流量数
据,就绪时间能稳定在早高
峰前完成。
2)算法团队的模型训练可在
T+1完成,模型rpm预计可提
升2%。
h
c
e
T
T
M
a
S
n
lo
32. 批流一体增量数仓
建设背景
作为优选基础数据团队,是整个优选最上游的数据供给方,数据模型下游依赖方较多,新增、修改数仓模型需要修改离线、实时两套代码,开发速度跟不上业务需求
的增⻓速度,希望通过增量生产模式来统一两套模型,目前业务预计切换到增量生产架构,会提升至少30%的开发效率。
一套口径,两套代码
h
c
e
T
T
M
n
lo
a
S
两套代码、两套数据验证流程、两套监控,给用户解释不一致问题的时间成本高。
33. 批流一体增量数仓
解决方案:增量数仓。
一套代码,开发工作量减半。一套任务,运维工作量减半。一套口径,数据解释成本减半。
权衡点1:业务高时效性的要求
场景并不多,更多是因为底层
技术架构的原因,不得已才做
到了高时效性,伴随而来的是
数据开发和治理的成本问题。
h
c
e
T
T
M
权衡点2:因为Hive不支持
Upsert业务不得已才将数据生
产模式转换为全量快照的计
算,但是换来的是数据新鲜性
的下降和固定使用天级别的快
照带来的需求响应灵活性下
降。
a
S
n
lo
34. 目录
contents
h
c
e
T
T
M
a
S
n
lo
35. 未来规划
Beluga
1)Partial-Update
Flink
a
S
1)秒级事务提交能力
h
c
e
T
T
M
n
lo
2)Beluga支持点查询能力 2)离线表平滑迁移工具
3)Standalone Meta-Server 3)Multi-Sink性能问题优化
4)高效的并发控制
平台化
1)统一ODS入仓
2)批流一体数仓开发平台
36. a
S
n
lo
Q&A
h
c
e
T
T
M
37. h
c
e
T
T
M
更多技术干货
欢迎关注“美团技术团队”
n
lo
招聘:XXX岗位
邮箱:XXX@meituan.com
a
S