Apache Celeborn在B站的最佳实践
如果无法正常显示,请先停止浏览器的去广告插件。
1. Apache Celeborn在B站的
最佳实践
蒋晓峰-哔哩哔哩-资深开发工程师
DataFunCon # 2024
2. Contents
目录
Apache Celeborn
背景概览
Apache Celeborn
生产实践
Apache Celeborn
原理剖析
Apache Celeborn
规划展望
3. 01 Apache Celeborn背景概览
4. 传统Shuffle痛点
传统Shuffle过程 传统Shuffle缺陷
• Mapper Task基于Partition ID对Shuffle数据本地排序,
写到本地磁盘,同时写一个索引文件记录文件里属于每一
个Partition的Offset和Length,生成数据文件和Index文
件 1. 依赖大容量本地盘/云盘存储Shuffle数据,数据驻留直至
消费完成,限制存算分离
• Reduce Task从每个Shuffle数据文件里读取Partition数据
2. Mapper排序占用较大内存,甚至触发外排,IO放大
3. Shuffle Read高网络连接,逻辑连接数是M*N
4. 磁盘随机读,存在大量的随机读盘
5. 数据单副本,容错性较低
6. 和NodeManager同一进程,影响YARN调度的稳定性
传统Shuffle问题
1. 不够高效
• 写放大
• 随机读
• 高网络连接
3. 不够弹性
• 依赖大容量本地盘
• 无法存算分离
2. 不够稳定
• Fetch Failure
• OOM
5. Celeborn简介概览
Celeborn:大数据引擎统一中间数据服务 https://github.com/apache/celeborn
• 阿里云捐赠给Apache基金会的通用Remote Shuffle Service • 1800+ commits
• 旨在解决大数据引擎处理中间数据遇到的性能、稳定性及弹性问题,提升
大数据计算引擎的性能、稳定性和弹性 • 89 Contributor&16 committer
• 引擎无关,支持Spark,Flink及MapReduce等多引擎
• Shuffle+Spill Data:解除对大容量本地盘的依赖
• 810+ stars&330+ forks
6. Celeborn核心优势
Externel Shuffle Service(ESS)
• ESS Shuffle服务内嵌在NodeManager中
• Shuffle Write阶段:Map Task产生的Shuffle数据
写入到本地存储介质
• Shuffle Read阶段:Reduce Task读取Shuffle数据
文件时从ESS服务获取
• 存在诸多弊端包括异构存储导致长尾任务现象严重、
磁盘和网络开销大及可靠性差等
Celeborn优势
• 使用Push-Style Shuffle代替Pull-Style,减少Mapper内存压力
• 支持IO聚合,Shuffle Read连接数从M*N降到N,同时更改随机
读为顺序读
• 支持两副本机制,降低Fetch Fail概率
• 支持计算与存储分离架构,部署Shuffle Service至特殊硬件环境,
与计算集群分离
• 解决Spark On Kubernetes对本地磁盘的依赖
7. Celeborn VS External Shuffle Service
Celeborn VS Externel Shuffle Service性能
• 测试1.1T/2.2T/3.3T的纯Shuffle场景,对比ESS, Celeborn
0.2.1, Celeborn 0.3.0
• Celeborn相比ESS有明显性能优势,随着Shuffle 规模变大优
势愈加明显
• 对比ESS,Celeborn优势主要在Shuffle Read阶段;对于
Shuffle Write阶段,Celeborn通过一层网络,ESS直接写本
地文件,最新版本没有明显性能降低
• 测试10T的TPCDS,对比ESS, Celeborn单副本和两副本
• 单副本20%提升,两副本15%提升
8. 02 Apache Celeborn原理剖析
9. Celeborn设计架构
服务端
• Master
• 管理集群状态
• 分配负载
• 基于Raft实现高可用
• Worker
• 接收&存储&服务Shuffle数据
• 多层存储
• Memory
• Local Disks
• DFS/Object Store
客户端
• Lifecycle Manager
• 管理当前作业的Shuffle Metadata
• Shuffle元数据转移,降低Master负载
• Shuffle Client
• 存在于Executor或者TaskManager上
• Push/Fetch Shuffle数据
10. Celeborn Shuffle流程
Shuffle核心流程
1. Mapper请求LifecycleManager注册Shuffle
2. LifecycleManager向Master请求Slot
3. Worker预留Slot并且创建相应的文件
4. Mapper从LifecycleManager获取Worker位置
5. Mapper将数据推送到指定的Worker
6. Worker合并并且复制数据到其对等节点
7. Worker定期将数据刷新到磁盘
8. Mapper任务完成触发MapperEnd事件
9. 当所有Mapper任务完成时,Worker提交文件
10. Reducer请求文件位置
11. Reducer读取Shuffle数据
11. 【性能】核心设计:Push/聚合/Split
Push Shuffle+Partition数据聚合
存算分离 写放大 磁盘随机小IO 网络高连接小
IO
Partition Split切分:支持Soft和Hard两种模式
避免磁盘写爆
12. 【性能】核心设计:异步化
异步Push 异步Flush
异步Commit 异步Fetch
13. 【性能】支持Spark AQE
Partition范围读
Partition合并
Join Strategy切换
Skew Join优化
Mapper范围读
1. Partition Split切分
2. Sort On Read
3. Range Read
14. 【性能】列式Shuffle
行列转换:写时做行专列,读时做列转行 行列转换:(Int, String, Decimal)
代码生成:消除解释执行开销 • 解释执行
• 代码生成
开启列式Shuffle,测试3T TPCDS
Shuffle Size缩减40%
行列转换开销低于5%
15. 【性能】对接Native引擎
• Celeborn+Gluten:社区合作&优势结合&优化正交
• Gluten Columnar Shuffle高性能的关键在于Hash-
based Shuffle、Native Partitioner、零拷贝等设计,
存在的限制包括:
1. 依赖大容量本地盘存储Shuffle数据,资源利用
率低
2. Shuffle Write内存紧张时Spill到磁盘,增加额
外磁盘IO
3. Shuffle Read有大量的网络连接和大量磁盘随
机读,导致较差的稳定性和性能
• Gluten+Celeborn Columnar Shuffle整体设计
1. Shuffle Writer复用Native Partitioner,拦截
本地IO并且改为推向Celeborn集群
2. Celeborn集群做数据重组包括聚合相同
Partition 数据和多备份
3. Shuffle Reader从特定Celeborn Worker上面
顺序读取数据并且反序列化为Column Batch
• Gluten+Celeborn Columnar Shuffle性能
HDD:提升8%~12%
SSD:持平
16. 【性能】多层存储
灵活适配硬件环境,并且尽可能让数据存放在更快的存储层
存储介质:内存/本地盘/DFS(HDFS)/Object Store(OSS)
灵活配置:可任意选择1~3层存储配置
快存储优先:小Shuffle缓存,让数据尽可能存在快存储
17. 【稳定性】容错
两副本
Revive机制
磁盘防爆
Revive:Shuffle Client在发生PushData失败时
触发 ,LifecycleManager重新选择一对Worker,
同时当前Partition发生一次切分
Worker自检
集群健康检测
RPC重试
Batch Revive:Shuffle Client缓存若干Revive
请求后一并发给 LifecycleManager
18. 【稳定性】快速滚动升级
向前兼容:通过协议的PB化实现
优雅重启:通过Hard Split机制实现
优雅重启流程
1. 触发优雅下线后,Worker通知Master
并且把自己置为Graceful Shutdown
状态
2. Master不再向此Worker分配新的负载
3. 此后发送给Worker的PushData响应
带上Hard_Split标记,促使Client终止
向此Worker继续推送数据
4. Client将发送CommitFiles给此
Worker,触发内存数据刷盘
5. 内存数据完成刷盘后,Worker把本地
的Partition信息存入LevelDB,此时
Worker 退出进程并等待重启
6. 重启之后,Worker从LevelDB中恢复
状态,正常服务本地Shuffle数据的读
取请求以及接收新的Shuffle Write
19. 【稳定性】拥塞控制
反压 拥塞控制
• 定义三类 Watermark
• Pause Receive Watermark
• Pause Replicate Watermark
• Resume Watermark 参考TCP拥塞控制机制
• Worker频繁检查使用的直接内存比例,
触发Pause Receive、Pause Replicate
和Resume
• 慢启动
• 拥塞避免
• 拥塞检测
Credit-Based流控:Flink Shuffle Read
20. 【稳定性】负载均衡
• 隔离坏盘 Round Robin
• 负载尽量分配给快/大盘 1. 计算每个磁盘上可分配的可用Slot
• Slot分配
• 收到RequestSlots后,Master为Shuffle的每个
PartitionLocation分配一个Slot
• 分配策略:Round Robin和Load Aware 2. 在所有Worker和所有磁盘之间以轮询方式分配Slot,分配一个
后递减并且在磁盘或Worker上没有可用Slot时排除
3. 如果集群的总可用Slot不够则对未分配的Slot重新运行算法
Load Aware
1. Worker实时监控每块磁盘健康状态,近期的读写速度以及磁盘
可用容量,并且通过心跳发送给Master,其中健康状态通过本地
的DeviceMonitor定期检查,读写速度通过统计最近时间窗口的平
均FlushTime和FetchTime计算
2. Master具有集群磁盘的全局视野,针对每次RequestSlots,
Master筛选出空间充足的磁盘,根据读写速度进行分组,以组间
递减的方式进行分配
21. 03 Apache Celeborn生产实践
22. Celeborn Kubernetes部署
• Celeborn K8S部署架构:Celeorn/Spark/Flink/MR On K8S
23. Celeborn集群部署、升级和扩缩容
Celeborn集群部署
Celeborn集群升级-重启
• 镜像升级原地升级
• Celeborn集群独立部署,HDFS和YARN混部
• 配置修改原地重启
• Master和Worker集群单独部署
• Rolling Upgrade
• Master节点应部署在相对稳定/独立的集群节点
• 预镜像下载
• 对外统一暴露Service或通过对多Master挂SLB方式
• 并行Rolling
对外提供服务(引擎侧配置Service/SLB地址)
• Graceful Shutdown
• Master节点增加或迁移时对应用透明
• terminationGracePeriodSeconds(90s)
• 3个Master集群:3Master HA+独立部署
• 4个Worker集群:独立部署+SSD盘
滚动升级策略
• 尽量先升级一台非Leader的Master节点
• Celeborn集群分布
• 按照升级单Worker节点-多个Worker节点-全部升级策略
• 常熟集群
• 低优集群:Spark(低优)/Flink/MR
• 高优集群:Spark(高优)
• 嘉定集群:Spark(低优)
集群扩容
• 增加Replica数量
集群缩容
• Master decommission worker
• Worker IDLE->K8S减少Replica
扩缩容能力规划
支持弹性扩缩容
24. Celeborn Shuffle流量
Shuffle Service流量
• Shuffle总量:25000TB+
• Celeborn: 10000TB,40%
• ESS:5000TB,20%
• Local Shuffle:75000TB,30%
• Other:2500TB,10%
25. Celeborn作业数量
Shuffle Service Application数量
• Application总量:125000+
• Celeborn:25000,20%
• ESS:20000,15%
• Local Shuffle:80000,64%
• Other:20,<1%
26. Celeborn失败作业占比
Application 运行失败占比
• 失败作业数量低于30
• 失败根因异常种类包括
• Register Shuffle
• Create Partition Reader
• Fetch Chunk
• Ask Timeout
• Lookup Timeout
• Limit Zero Inflight Requests
• Shuffle Data Lost
• Push Data
• Push Merged Data
• Revive Push Merged Data
• Await Result
• ...
• Ask和Lookup Timeout占大多数
27. Celeborn失败作业明细
Application 运行失败明细
• 查询Spark Application元仓表
• app_id;Application ID
• app_name:Application名称
• cluster_name:集群名称
• day:作业运行日
• end_time:Application结束时间
• failure_reason:失败异常堆栈
28. Celeborn集群监控
Celeborn集群Metrics
开启Metrics
• metrics_IsActiveMaster_Value
• celeborn.metrics.enabled: true
• metrics_WorkerCount_Value
• celeborn.metrics.extraLabels:<label1_key>:<label
• metrics_NettyMemory_value
1_value>[,<label2_key>:<label2_value>]*
• metrics_DiskBuffer_value
• metrics_ReadBufferDispatcherRequestsLength_value
• 日志
1. Disk余量监控(日志,未来支持全局/单个Worker)
2. OutOfDirectMemoryError
Celeborn应用Metrics
• metrics_ApplicationCount_value
• metrics_RegisteredShuffleCount_value
• metrics_FlushDataTime
• metrics_OpenStreamTime
• metrics_ReserverSlotsTime
• metrics_OpenStreamTime
业务侧指标
• 作业变慢
• 作业失败率上升
Celeborn提示集群存在压力,需要扩容
• metrics_ReadBufferDispatcherRequestsLength_value
• metrics_NettyMemory_value
• metrics_DiskBuffer_value
• metrics_FlushDataTime
• metrics_OpenStreamTime
• metrics_ReserverSlotsTime
• metrics_OpenStreamTime
29. Celeborn监控面板
• Celeborn Master集群监控
30. Celeborn监控面板
• Celeborn Worker集群监控
31. Celeborn Shuffle治理
• Shuffle卡顿Application详
情
• Shuffle卡顿数量分
布
• Shuffle卡顿变化趋
势
• Shuffle平均卡顿时
间
32. Celeborn元仓建设
Celeborn元仓背景
Celeborn元仓链路
• Celeborn缺乏观测运行状态集群资源使用情况,同时缺乏 • Celeborn支持/metrics/prometheus接口提供Master和
作业细粒度的运行统计信息,导致难以平衡作业Shuffle运
Worker监控指标
行时的资源使用
• Collector通过Java Agent访问Rest API收集监控指标数据
• 如何基于集群合理分配作业资源以及诊断治理作业是建设
汇报到指定Kafka Topic
Celeborn元仓的主要目标
• Kafka消息通过Routine Load/Flink方式导入StarRocks
• 为了提高Celeborn集群分配作业资源的掌控,收集运行状
态监控指标以及故障异常堆栈,按照各维度进行汇总同时 • BMD Doctor基于SpringBoot构建元仓数据服务,其中
Data Service采用MyBatis框架访问StarRocks
持久化,通过数据服务提供数据实时/历史查询数据应用
33. Celeborn元仓概览
Celeborn资源消耗概况
Celeborn App运行概况
34. Celeborn混沌测试
Celeborn混沌测试背景 Celeborn混沌测试流程
• 仅依靠UT、集成测试、e2e测试等无法保证服
务可靠性即覆盖线上复杂环境,如坏盘、CPU
过载、网络过载、机器挂掉等 • 定义测试Plan来描述事件类型、事件触发的顺序及持续时间,
事件类型包括节点异常、磁盘异常、IO异常以及CPU过载等
• 模拟生产环境,测试环境模拟线上可能出现的
异常,同时保证满足Celeborn运行的最小运行
环境,即至少3个Master节点和2个Worker节
点可用,并且每个Worker节点至少有一块盘
• 客户端向Scheduler提交Plan
• Scheduler根据Plan描述给每个节点的Runner发送Operation
• Runner负责具体执行并汇报当前节点的状态
• 触发Operation之前,Scheduler推演此事件发生产生的结果,
若导致无法满足RSS的最小可运行环境,拒绝此事件
35. Celeborn解耦Spark发版
Celeborn Spark Client解耦
• --jars/--conf spark.jars指定Celeborn Spark Client JAR
• [SPARK-45762] Support shuffle managers defined in
user jars by changing startup order
• 解决spark.executor.userClassPathFirst=true时
CelebornShuffleHandle的Classloader不一致问题
36. Celeborn开启Jemalloc
开启Jemalloc
• CELEBORN_PREFER_JEMALLOC=true
• Jemalloc是高效的内存分配器,用于替代标准的
malloc函数,提升内存管理性能和减少内存碎片
37. Celeborn Stage重算
Spark Stage重算流程
CIP-4 Re-run Spark Stage for Celeborn Shuffle Fetch Failure
1. Spark Shuffle Id传递给CelebornShuffleHandle,并且按照常规方式进行分配
2. Shuffle Writer发送 (appShuffleId, appShuffleIdentifier, isWriter=true) 到LifecycleManager
• 如果appShuffleId或者appShuffleIdentifier没有记录,则采用AtomicInteger生成一个新的Shuffle Id,保
存appShuffleId->appShuffleIdentifier->(shuffleId, true)映射,返回给Shuffle Writer
• 如果找到appShuffleIdentifier,则将shuffleId返回给Shuffle Writer
3. Shuffle Writer使用shuffleId写入数据到Celeborn
4. Shuffle Reader发送 (appShuffleId, null, isWriter=false) 到LifecycleManager
5. 查找与appShuffleId关联的最新完成的Shuffle Id(所有Shuffle写任务在Celeborn中都已完成)返回给Shuffle
Reader
6. Shuffle Reader使用shuffleId从Celeborn读取Shuffle数据
7. 在Fetch Failure期间,Shuffle Reader发送ReportFetchFailure(包含appShuffleId和shuffleId)到
LifecycleManager,通过反射清理MapOutputTracker上的状态(仅一次),同时将shuffleId的Fetch状态更新
为false,如果Fetch状态已经为false,则不执行任何操作
8. DAG Scheduler以Fetch Failed完成任务,重新提交所有与相应shuffleMapTask相关的MapTasks进行新的
Stage尝试
38. Celeborn开启Stage重算
开启Spark Stage重算
• celeborn.client.spark.fetch.throwsFetchFailure=true
• 测试TPCDS q11 1T with 8 workers and replication off
• Kill 6 workers out of 8
• spark.stage.maxConsecutiveAttempts=100
• 测试ETL Spark SQL Application and replication off
• Kill 2 workers out of 3
39. 04 Apache Celeborn规划展望
40. Celeborn Roadmap
• Celeborn性能
1. Skew Join优化
•
• Celeborn存储
1. Object Storage
2. IO调度
• 带优先级的IO调度能力
• Celeborn引擎
1. Tez/Trino
2. Native SDK+Velox(Or other engine)
3. Flink Hybrid Shuffle+Reduce Partition
4. Spilled Data/Cached Data
• Celeborn资源调度
1. 资源弹性扩缩容
2. 灰度升级能力
• Application
• Tenant
Celeborn其他能力
1. Celeborn Dashboard
2. Celeborn Security
3. Celeborn Chaos Testing
41. 加入我们
Apache Celeborn 社区 哔哩哔哩
• GitHub:
https://github.com/apache/celeborn •
• 钉钉群: 41594456 • 微信群:brick_carrier • 微信公众号:Apache Celeborn
微信公众号:哔哩哔哩技术
42. logo