Apache Celeborn在B站的最佳实践

如果无法正常显示,请先停止浏览器的去广告插件。
分享至:
1. Apache Celeborn在B站的 最佳实践 蒋晓峰-哔哩哔哩-资深开发工程师 DataFunCon # 2024
2. Contents 目录 Apache Celeborn 背景概览 Apache Celeborn 生产实践 Apache Celeborn 原理剖析 Apache Celeborn 规划展望
3. 01 Apache Celeborn背景概览
4. 传统Shuffle痛点  传统Shuffle过程  传统Shuffle缺陷 • Mapper Task基于Partition ID对Shuffle数据本地排序, 写到本地磁盘,同时写一个索引文件记录文件里属于每一 个Partition的Offset和Length,生成数据文件和Index文 件 1. 依赖大容量本地盘/云盘存储Shuffle数据,数据驻留直至 消费完成,限制存算分离 • Reduce Task从每个Shuffle数据文件里读取Partition数据 2. Mapper排序占用较大内存,甚至触发外排,IO放大 3. Shuffle Read高网络连接,逻辑连接数是M*N 4. 磁盘随机读,存在大量的随机读盘 5. 数据单副本,容错性较低 6. 和NodeManager同一进程,影响YARN调度的稳定性  传统Shuffle问题 1. 不够高效 • 写放大 • 随机读 • 高网络连接 3. 不够弹性 • 依赖大容量本地盘 • 无法存算分离 2. 不够稳定 • Fetch Failure • OOM
5. Celeborn简介概览  Celeborn:大数据引擎统一中间数据服务 https://github.com/apache/celeborn • 阿里云捐赠给Apache基金会的通用Remote Shuffle Service • 1800+ commits • 旨在解决大数据引擎处理中间数据遇到的性能、稳定性及弹性问题,提升 大数据计算引擎的性能、稳定性和弹性 • 89 Contributor&16 committer • 引擎无关,支持Spark,Flink及MapReduce等多引擎 • Shuffle+Spill Data:解除对大容量本地盘的依赖 • 810+ stars&330+ forks
6. Celeborn核心优势  Externel Shuffle Service(ESS) • ESS Shuffle服务内嵌在NodeManager中 • Shuffle Write阶段:Map Task产生的Shuffle数据 写入到本地存储介质 • Shuffle Read阶段:Reduce Task读取Shuffle数据 文件时从ESS服务获取 • 存在诸多弊端包括异构存储导致长尾任务现象严重、 磁盘和网络开销大及可靠性差等  Celeborn优势 • 使用Push-Style Shuffle代替Pull-Style,减少Mapper内存压力 • 支持IO聚合,Shuffle Read连接数从M*N降到N,同时更改随机 读为顺序读 • 支持两副本机制,降低Fetch Fail概率 • 支持计算与存储分离架构,部署Shuffle Service至特殊硬件环境, 与计算集群分离 • 解决Spark On Kubernetes对本地磁盘的依赖
7. Celeborn VS External Shuffle Service  Celeborn VS Externel Shuffle Service性能 • 测试1.1T/2.2T/3.3T的纯Shuffle场景,对比ESS, Celeborn 0.2.1, Celeborn 0.3.0 • Celeborn相比ESS有明显性能优势,随着Shuffle 规模变大优 势愈加明显 • 对比ESS,Celeborn优势主要在Shuffle Read阶段;对于 Shuffle Write阶段,Celeborn通过一层网络,ESS直接写本 地文件,最新版本没有明显性能降低 • 测试10T的TPCDS,对比ESS, Celeborn单副本和两副本 • 单副本20%提升,两副本15%提升
8. 02 Apache Celeborn原理剖析
9. Celeborn设计架构  服务端 • Master • 管理集群状态 • 分配负载 • 基于Raft实现高可用 • Worker • 接收&存储&服务Shuffle数据 • 多层存储 • Memory • Local Disks • DFS/Object Store  客户端 • Lifecycle Manager • 管理当前作业的Shuffle Metadata • Shuffle元数据转移,降低Master负载 • Shuffle Client • 存在于Executor或者TaskManager上 • Push/Fetch Shuffle数据
10. Celeborn Shuffle流程  Shuffle核心流程 1. Mapper请求LifecycleManager注册Shuffle 2. LifecycleManager向Master请求Slot 3. Worker预留Slot并且创建相应的文件 4. Mapper从LifecycleManager获取Worker位置 5. Mapper将数据推送到指定的Worker 6. Worker合并并且复制数据到其对等节点 7. Worker定期将数据刷新到磁盘 8. Mapper任务完成触发MapperEnd事件 9. 当所有Mapper任务完成时,Worker提交文件 10. Reducer请求文件位置 11. Reducer读取Shuffle数据
11. 【性能】核心设计:Push/聚合/Split  Push Shuffle+Partition数据聚合  存算分离  写放大  磁盘随机小IO  网络高连接小 IO  Partition Split切分:支持Soft和Hard两种模式 避免磁盘写爆
12. 【性能】核心设计:异步化  异步Push  异步Flush  异步Commit  异步Fetch
13. 【性能】支持Spark AQE  Partition范围读  Partition合并  Join Strategy切换  Skew Join优化  Mapper范围读 1. Partition Split切分 2. Sort On Read 3. Range Read
14. 【性能】列式Shuffle  行列转换:写时做行专列,读时做列转行 行列转换:(Int, String, Decimal)  代码生成:消除解释执行开销 • 解释执行 • 代码生成 开启列式Shuffle,测试3T TPCDS  Shuffle Size缩减40%  行列转换开销低于5%
15. 【性能】对接Native引擎 • Celeborn+Gluten:社区合作&优势结合&优化正交 • Gluten Columnar Shuffle高性能的关键在于Hash- based Shuffle、Native Partitioner、零拷贝等设计, 存在的限制包括: 1. 依赖大容量本地盘存储Shuffle数据,资源利用 率低 2. Shuffle Write内存紧张时Spill到磁盘,增加额 外磁盘IO 3. Shuffle Read有大量的网络连接和大量磁盘随 机读,导致较差的稳定性和性能 • Gluten+Celeborn Columnar Shuffle整体设计 1. Shuffle Writer复用Native Partitioner,拦截 本地IO并且改为推向Celeborn集群 2. Celeborn集群做数据重组包括聚合相同 Partition 数据和多备份 3. Shuffle Reader从特定Celeborn Worker上面 顺序读取数据并且反序列化为Column Batch • Gluten+Celeborn Columnar Shuffle性能  HDD:提升8%~12%  SSD:持平
16. 【性能】多层存储  灵活适配硬件环境,并且尽可能让数据存放在更快的存储层  存储介质:内存/本地盘/DFS(HDFS)/Object Store(OSS)  灵活配置:可任意选择1~3层存储配置  快存储优先:小Shuffle缓存,让数据尽可能存在快存储
17. 【稳定性】容错  两副本  Revive机制  磁盘防爆  Revive:Shuffle Client在发生PushData失败时 触发 ,LifecycleManager重新选择一对Worker, 同时当前Partition发生一次切分  Worker自检  集群健康检测  RPC重试  Batch Revive:Shuffle Client缓存若干Revive 请求后一并发给 LifecycleManager
18. 【稳定性】快速滚动升级  向前兼容:通过协议的PB化实现  优雅重启:通过Hard Split机制实现  优雅重启流程 1. 触发优雅下线后,Worker通知Master 并且把自己置为Graceful Shutdown 状态 2. Master不再向此Worker分配新的负载 3. 此后发送给Worker的PushData响应 带上Hard_Split标记,促使Client终止 向此Worker继续推送数据 4. Client将发送CommitFiles给此 Worker,触发内存数据刷盘 5. 内存数据完成刷盘后,Worker把本地 的Partition信息存入LevelDB,此时 Worker 退出进程并等待重启 6. 重启之后,Worker从LevelDB中恢复 状态,正常服务本地Shuffle数据的读 取请求以及接收新的Shuffle Write
19. 【稳定性】拥塞控制  反压  拥塞控制 • 定义三类 Watermark • Pause Receive Watermark • Pause Replicate Watermark • Resume Watermark 参考TCP拥塞控制机制 • Worker频繁检查使用的直接内存比例, 触发Pause Receive、Pause Replicate 和Resume • 慢启动 • 拥塞避免 • 拥塞检测  Credit-Based流控:Flink Shuffle Read
20. 【稳定性】负载均衡 • 隔离坏盘  Round Robin • 负载尽量分配给快/大盘 1. 计算每个磁盘上可分配的可用Slot • Slot分配 • 收到RequestSlots后,Master为Shuffle的每个 PartitionLocation分配一个Slot • 分配策略:Round Robin和Load Aware 2. 在所有Worker和所有磁盘之间以轮询方式分配Slot,分配一个 后递减并且在磁盘或Worker上没有可用Slot时排除 3. 如果集群的总可用Slot不够则对未分配的Slot重新运行算法  Load Aware 1. Worker实时监控每块磁盘健康状态,近期的读写速度以及磁盘 可用容量,并且通过心跳发送给Master,其中健康状态通过本地 的DeviceMonitor定期检查,读写速度通过统计最近时间窗口的平 均FlushTime和FetchTime计算 2. Master具有集群磁盘的全局视野,针对每次RequestSlots, Master筛选出空间充足的磁盘,根据读写速度进行分组,以组间 递减的方式进行分配
21. 03 Apache Celeborn生产实践
22. Celeborn Kubernetes部署 • Celeborn K8S部署架构:Celeorn/Spark/Flink/MR On K8S
23. Celeborn集群部署、升级和扩缩容  Celeborn集群部署  Celeborn集群升级-重启 • 镜像升级原地升级 • Celeborn集群独立部署,HDFS和YARN混部 • 配置修改原地重启 • Master和Worker集群单独部署 • Rolling Upgrade • Master节点应部署在相对稳定/独立的集群节点 • 预镜像下载 • 对外统一暴露Service或通过对多Master挂SLB方式 • 并行Rolling 对外提供服务(引擎侧配置Service/SLB地址) • Graceful Shutdown • Master节点增加或迁移时对应用透明 • terminationGracePeriodSeconds(90s) • 3个Master集群:3Master HA+独立部署 • 4个Worker集群:独立部署+SSD盘  滚动升级策略 • 尽量先升级一台非Leader的Master节点 • Celeborn集群分布 • 按照升级单Worker节点-多个Worker节点-全部升级策略 • 常熟集群 • 低优集群:Spark(低优)/Flink/MR • 高优集群:Spark(高优) • 嘉定集群:Spark(低优)  集群扩容 • 增加Replica数量  集群缩容 • Master decommission worker • Worker IDLE->K8S减少Replica  扩缩容能力规划  支持弹性扩缩容
24. Celeborn Shuffle流量  Shuffle Service流量 • Shuffle总量:25000TB+ • Celeborn: 10000TB,40% • ESS:5000TB,20% • Local Shuffle:75000TB,30% • Other:2500TB,10%
25. Celeborn作业数量  Shuffle Service Application数量 • Application总量:125000+ • Celeborn:25000,20% • ESS:20000,15% • Local Shuffle:80000,64% • Other:20,<1%
26. Celeborn失败作业占比  Application 运行失败占比 • 失败作业数量低于30 • 失败根因异常种类包括 • Register Shuffle • Create Partition Reader • Fetch Chunk • Ask Timeout • Lookup Timeout • Limit Zero Inflight Requests • Shuffle Data Lost • Push Data • Push Merged Data • Revive Push Merged Data • Await Result • ... • Ask和Lookup Timeout占大多数
27. Celeborn失败作业明细  Application 运行失败明细 • 查询Spark Application元仓表 • app_id;Application ID • app_name:Application名称 • cluster_name:集群名称 • day:作业运行日 • end_time:Application结束时间 • failure_reason:失败异常堆栈
28. Celeborn集群监控  Celeborn集群Metrics  开启Metrics • metrics_IsActiveMaster_Value • celeborn.metrics.enabled: true • metrics_WorkerCount_Value • celeborn.metrics.extraLabels:<label1_key>:<label • metrics_NettyMemory_value 1_value>[,<label2_key>:<label2_value>]* • metrics_DiskBuffer_value • metrics_ReadBufferDispatcherRequestsLength_value • 日志 1. Disk余量监控(日志,未来支持全局/单个Worker) 2. OutOfDirectMemoryError  Celeborn应用Metrics • metrics_ApplicationCount_value • metrics_RegisteredShuffleCount_value • metrics_FlushDataTime • metrics_OpenStreamTime • metrics_ReserverSlotsTime • metrics_OpenStreamTime  业务侧指标 • 作业变慢 • 作业失败率上升  Celeborn提示集群存在压力,需要扩容 • metrics_ReadBufferDispatcherRequestsLength_value • metrics_NettyMemory_value • metrics_DiskBuffer_value • metrics_FlushDataTime • metrics_OpenStreamTime • metrics_ReserverSlotsTime • metrics_OpenStreamTime
29. Celeborn监控面板 • Celeborn Master集群监控
30. Celeborn监控面板 • Celeborn Worker集群监控
31. Celeborn Shuffle治理 • Shuffle卡顿Application详 情 • Shuffle卡顿数量分 布 • Shuffle卡顿变化趋 势 • Shuffle平均卡顿时 间
32. Celeborn元仓建设  Celeborn元仓背景  Celeborn元仓链路 • Celeborn缺乏观测运行状态集群资源使用情况,同时缺乏 • Celeborn支持/metrics/prometheus接口提供Master和 作业细粒度的运行统计信息,导致难以平衡作业Shuffle运 Worker监控指标 行时的资源使用 • Collector通过Java Agent访问Rest API收集监控指标数据 • 如何基于集群合理分配作业资源以及诊断治理作业是建设 汇报到指定Kafka Topic Celeborn元仓的主要目标 • Kafka消息通过Routine Load/Flink方式导入StarRocks • 为了提高Celeborn集群分配作业资源的掌控,收集运行状 态监控指标以及故障异常堆栈,按照各维度进行汇总同时 • BMD Doctor基于SpringBoot构建元仓数据服务,其中 Data Service采用MyBatis框架访问StarRocks 持久化,通过数据服务提供数据实时/历史查询数据应用
33. Celeborn元仓概览  Celeborn资源消耗概况  Celeborn App运行概况
34. Celeborn混沌测试  Celeborn混沌测试背景  Celeborn混沌测试流程 • 仅依靠UT、集成测试、e2e测试等无法保证服 务可靠性即覆盖线上复杂环境,如坏盘、CPU 过载、网络过载、机器挂掉等 • 定义测试Plan来描述事件类型、事件触发的顺序及持续时间, 事件类型包括节点异常、磁盘异常、IO异常以及CPU过载等 • 模拟生产环境,测试环境模拟线上可能出现的 异常,同时保证满足Celeborn运行的最小运行 环境,即至少3个Master节点和2个Worker节 点可用,并且每个Worker节点至少有一块盘 • 客户端向Scheduler提交Plan • Scheduler根据Plan描述给每个节点的Runner发送Operation • Runner负责具体执行并汇报当前节点的状态 • 触发Operation之前,Scheduler推演此事件发生产生的结果, 若导致无法满足RSS的最小可运行环境,拒绝此事件
35. Celeborn解耦Spark发版  Celeborn Spark Client解耦 • --jars/--conf spark.jars指定Celeborn Spark Client JAR • [SPARK-45762] Support shuffle managers defined in user jars by changing startup order • 解决spark.executor.userClassPathFirst=true时 CelebornShuffleHandle的Classloader不一致问题
36. Celeborn开启Jemalloc  开启Jemalloc • CELEBORN_PREFER_JEMALLOC=true • Jemalloc是高效的内存分配器,用于替代标准的 malloc函数,提升内存管理性能和减少内存碎片
37. Celeborn Stage重算  Spark Stage重算流程 CIP-4 Re-run Spark Stage for Celeborn Shuffle Fetch Failure 1. Spark Shuffle Id传递给CelebornShuffleHandle,并且按照常规方式进行分配 2. Shuffle Writer发送 (appShuffleId, appShuffleIdentifier, isWriter=true) 到LifecycleManager • 如果appShuffleId或者appShuffleIdentifier没有记录,则采用AtomicInteger生成一个新的Shuffle Id,保 存appShuffleId->appShuffleIdentifier->(shuffleId, true)映射,返回给Shuffle Writer • 如果找到appShuffleIdentifier,则将shuffleId返回给Shuffle Writer 3. Shuffle Writer使用shuffleId写入数据到Celeborn 4. Shuffle Reader发送 (appShuffleId, null, isWriter=false) 到LifecycleManager 5. 查找与appShuffleId关联的最新完成的Shuffle Id(所有Shuffle写任务在Celeborn中都已完成)返回给Shuffle Reader 6. Shuffle Reader使用shuffleId从Celeborn读取Shuffle数据 7. 在Fetch Failure期间,Shuffle Reader发送ReportFetchFailure(包含appShuffleId和shuffleId)到 LifecycleManager,通过反射清理MapOutputTracker上的状态(仅一次),同时将shuffleId的Fetch状态更新 为false,如果Fetch状态已经为false,则不执行任何操作 8. DAG Scheduler以Fetch Failed完成任务,重新提交所有与相应shuffleMapTask相关的MapTasks进行新的 Stage尝试
38. Celeborn开启Stage重算  开启Spark Stage重算 • celeborn.client.spark.fetch.throwsFetchFailure=true • 测试TPCDS q11 1T with 8 workers and replication off • Kill 6 workers out of 8 • spark.stage.maxConsecutiveAttempts=100 • 测试ETL Spark SQL Application and replication off • Kill 2 workers out of 3
39. 04 Apache Celeborn规划展望
40. Celeborn Roadmap • Celeborn性能 1. Skew Join优化 • • Celeborn存储 1. Object Storage 2. IO调度 • 带优先级的IO调度能力 • Celeborn引擎 1. Tez/Trino 2. Native SDK+Velox(Or other engine) 3. Flink Hybrid Shuffle+Reduce Partition 4. Spilled Data/Cached Data • Celeborn资源调度 1. 资源弹性扩缩容 2. 灰度升级能力 • Application • Tenant Celeborn其他能力 1. Celeborn Dashboard 2. Celeborn Security 3. Celeborn Chaos Testing
41. 加入我们  Apache Celeborn 社区  哔哩哔哩 • GitHub: https://github.com/apache/celeborn • • 钉钉群: 41594456 • 微信群:brick_carrier • 微信公众号:Apache Celeborn 微信公众号:哔哩哔哩技术
42. logo

inicio - Wiki
Copyright © 2011-2025 iteam. Current version is 2.139.0. UTC+08:00, 2025-01-09 18:08
浙ICP备14020137号-1 $mapa de visitantes$