cover_image

OPPO大数据诊断平台设计与实践

OPPO数智技术 安第斯智能云 2022年12月27日 12:01

01 背景


随着欧加集团大数据业务的发展,现阶段公司大数据平台20+个组件,1EB+级别数据量,平台1000人均日活,服务已经有相当大的规模。在这样的业务背景下,越来越多的用户在使用大数据平台时,发现难以定位问题。基于此,我们设计大数据诊断平台,旨在提升用户解决问题效率,降低用户异常成本。代号“罗盘”,意为用户定位问题,给出优化方案。


此前业务存在问题现状总结如下:


1、问题定位效率相对低。平台组件多,从上层调度器、Livy客户端到中层计算引擎Spark,最后底层Hadoop系统;用户作业日志量大,没法串联一起,问题上下文关联难;用户人员角色非单一研发角色人员,自行分析能力有限,需平台方提供协助解决,沟通与定位让双方工作量只增不减;缺乏自动化工具定位问题等等。各种因素说明,海量作业调度,多种类型运行环境,TB级别日志量,依靠人力盘查作业问题是非常耗的。


2、异常问题类型多,缺乏有效知识库,高效重复利用已有的解决方案。从作业调度任务系统到计算引擎层,常见的业务问题常见如:晚点溯源、高频失败、运行耗时长、数据倾斜、暴力扫描、shuffle失败、CPU浪费、内存浪费、内存溢出等,需将问题数量降低收敛。


3、异常任务、不合理任务成本多。用户任务在执行周期内发生异常或者配置不合理,将导致任务浪费资源,产生许多额外的成本,需将此类问题成本损失降至最低。


图片


总体上希望,从问题出发、经过快速定位、优化方案、问题收敛环节,最后达到降本增效目的。


02 业界产品


基于以上问题,我们调研了业界有关大数据诊断系统,目前比较类似的是Dr. Elephant开源系统,Dr. Elephant一个Hadoop和Spark的性能监控调优工具。它能自动采集Airflow、Azkaban、Oozie等调度系统作业流及计算引擎Spark和Hadoop MR的运行指标,分析作业的异常和性能结果,指导开发者进行作业调优,从而提升开发者工作效率和集群资源利用率。


图片


工作原理:


Dr. Elephant定期从Yarn资源管理中心拉取近期成功和失败的作业列表。每个作业会实时从历史服务器中获取到元数据、配置及调度器作业信息以及监控数据。一旦获取到所有的元数据信息,Dr. Elephant就基于这些元数据运行启发式算法,并生成一份该作业的诊断报告。对该作业报告,进行标记和评级,分为五个级别来评定作业存在新能问题严重程度。


核心功能:


  • 集成多个调度器框架如Azkaban、Airflow、Oozie等;

  • 统计历史作业和工作流的性能指标;

  • Job级别工作流对比;

  • 支持多个计算引擎框架性能诊断(Spark、Tez、MapReduce、TonY);

  • 基于自定义规则的可配置启发式插件,用户诊断作业;

  • 提供REST API, 用户能通过API获取所有信息;


欠缺功能:


  • 支持Spark, Hadoop系统版本比较低,对于新版本Spark, Hadoop兼容性不友好;

  • 不支持Spark, Hadoop新版本的特性的诊断;

  • 诊断指标比较少,其中Spark相关指标仅4个,对于高度依赖Spark引擎是非常欠缺的;

  • 不支持日志级别问题诊断,不能够诊断调度器运行任务或者App应用程序的出现的异常;

  • 调度器和作业App元数据的关联在一些场景下不支持;

  • 不支持异常资源的管理,达到降本增效指引目的;

  • 对Spark History服务接口频繁调用影响History服务的稳定性;

  • 缺乏有效的降本增效流程辅组工具;


综上所述,结合我们有大规模Spark集群调度特点,业界产品对我们解决业务痛点效果不佳, 我们决定自研诊断系统来解决业务带来的挑战。


03 技术方案


由上述可知,系统在业务层面既能快速定位解决用户问题,又能帮助用户管理异常资源;架构层面支持Spark, Hadoop多指标诊断又不影响第三方系统性能问题,我们采用非入侵的方式设计诊断系统。


架构层主要由同步工作流层任务元数据模块、同步Yarn/Spark App元数据模块、关联工作流层/引擎层App元数据模块、工作流任务异常检测模块,引擎层异常检测模块,Portal展示模块组成。同时调度器(Scheduler Server)可以适配多个开源调度器项目, 如内部系统Oflow、Airflow、DolphinScheduler等。


图片

整体架构图


整体架构分3层:

  • 第一层为对接外部系统,调度器、Yarn、HistoryServer、HDFS等系统,同步元数据、集群状态、运行环境状态、日志等到诊断系统分析;

  • 第二层为架构层, 包括数据采集、元数据关联&模型标准化、异常检测、诊断Portal模块;

  • 第三层为基础组件层,包括MySQL、 Elasticsearch、Kafka、Redis等组件。


具体模块流程阶段:

(1)数据采集阶段:从调度系统将用户、DAG、作业、执行记录等工作流元数据同步至诊断系统;定时同步Yarn ResourceManager、Spark HistoryServer App元数据至诊断系统,标志作业运行指标存储路径,为后续数据处理阶段作基础;


(2)数据关联&模型标准化阶段:将分步采集的工作流执行记录、Spark App、Yarn App、集群运行环境配置等数据通过ApplicationID介质进行关联,此时,工作流层与引擎层元数据已关联完毕,得到数据标准模型 (user, dag, task, application, clusterConfig, time) ;


(3)工作流层&引擎层异常检测阶段:至此已经获得数据标准模型,针对标准模型进一步Workflow异常检测流程,同时平台维护着一套沉淀多年的数据治理知识库,加载知识库到标准模型,通过启发式规则,对标准模型的指标数据、日志同时进行异常挖掘,结合集群状态及运行是环境状态,分析得出工作流层、引擎层异常结果;


图片


(4)业务视图:存储、分析数据,提供给用户任务概览、工作流层任务诊断、引擎层作业Application诊断,工作流层展示调度器执行任务引发的异常,如任务失败、回环任务、基线偏离任务等问题,计算引擎层展示Spark作业执行引发的耗时、资源使用、运行时问题;


图片


04 实践效果


我们从四个方面简述诊断平台带来的效果:诊断平台UI、效率分析、成本分析、稳定性分析、降本增效分析。


(1)诊断平台UI


图片


引擎层分析主要展示Spark计算过程中异常、不合理的作业,并给作业记录异常标签,如CPU浪费、数据倾斜、Task长尾、大表扫描等异常类型标签,这些标签是数据标准模型经过工作流层、引擎层异常检测得出,同时可以让用户清楚作业的问题原因。


(2)效率分析


长尾Task分析

图片


原因:长尾任务是由于作业运行过程中,一个Task或多个Task单元执行时间过长,拖延整个任务运行时间。

危害:作业执行时间过长,资源浪费

诊断:从时间角度计算,执行时间过长原因在于Task读取数据量多或者数据读取慢。如果读取数据过多,那么将出现数据倾斜,按数据倾斜方式处理;如果读取数据过慢,那么Hadoop集群的节点负载高或者有网络丢包问题等,导致数据读取慢,可以联系运维处理。


HDFS卡顿分析

图片


原因:HDFS卡顿是Spark作业中Task最小执行单元读取数据速率比其他Task慢,低于阈值;

危害:作业执行时间过长,浪费资源;

诊断:作业数据所在机器网络IO问题或者集群配置不一致问题,导致Task从Hadoop读取数据速率低下。这种情况一般伴随着长尾Task出现,同时表现Task执行时间过长、读取数据量少,导致整个数据处理Task无法高效利用回收。这种情况需排查数据在节点配置及机器硬件配置;


推测执行过多分析

图片


原因:推测执行(speculative)是指作业执行单元Task在同一个Stage中的执行时间相比其他Task执行时间长,在其他Executor发起相同Task执行,先完成的Task将Kill另个Task, 并取得结果。这样情况下如果作业大部分Task都发起推测执行,超过一定比例,就是推测执行过多的表现;

危害:任务执行时间长,资源浪费恶化;

诊断:机器配置不同、网络波动、集群负载高、作业数据倾斜等都会引起推测执行,过多的慢任务执行推测将会导致资源恶化,推测执行其实是对资源的压榨、用空间换取时间的做法。解决执行推测要从多方面入手,结合集群状态环境。


全局排序异常分析

图片


原因:Spark Stage中的Task只有一个时,而且处理的数量级别大,Stage中的所有数据都集中在一个Task中,这种情况即发生全局排序异常。

危害:任务处理时间长、消耗资源大

诊断:全局排序异常并没有发挥Spark并发计算特性,Task处理数据漫长,非常消耗资源,解决这个问题需要对作业进行重新分区,并发计算数据。


(3)成本分析


CPU浪费分析

图片


原因:Spark Driver/Executor cores参数配置不合理导致CPU空闲浪费

危害:没用高效利用资源

诊断:通过Spark Application采集指标,分析Spark Driver、Spark Executor执行过程中的CPU的运行时间(单位: vcore·second)占比,如果空闲时间超过一定的比例,判定为浪费,用户根据比例降低启用CPU数量。

计算Application CPU浪费过程中,采集到Executor执行开始和结束时间、Executor执行所有Job开始和结束时间、Job内部真正执行Task CPU时间,  最终获得以下指标:


  • 所有Executor的并发个数Count,每个Executor固定核数ExecutorCores

  • 所有Executor内Job真正执行时间和JobTime(计算Job开始结束时间交叉和)

  • 所有Executor内Task个数 TaskCount及每个Task执行CPU时间

图片

总CPU计算时间估算为:

图片

实际使用CPU计算时间为:

图片

CPU浪费百分比:

图片


如果空闲比很大,可以适当降低参数spark.executor.cores的值,降低并发度,或者减少RDD分区数和Shuffle参数spark.sql.shuffle.partitions。


内存浪费分析


图片


原因:分析Driver/Executor内存使用峰值占总内存比例,当空闲比例值超过阈值,为内存浪费

危害:没用高效利用资源

诊断:采集Spark Application Driver/Executor的相关内存指标,与CPU浪费计算同理,获得Executor指标如下:


  • 所有Executor个数Count, 每个Executor内存ExecutorMemory

  • 每个Executor执行时间

图片
  • 每个Executor执行过程内存峰值 

图片

总的内存时间估算为:

图片

实际内存时间为:

图片

浪费内存百分比:

图片


如果空闲比很大,可以适当降低参数spark.executor.memory的值;


(4)稳定性分析


全表扫描问题


图片


原因:SparkSQL查询大表数据时,没有进行分区条件筛选,或者SQL比较复杂时,发生了全表扫描;

危害:作业执行时间长,集群负载高,影响其他作业执行

诊断:Spark SQL扫描数据表时,尽管现在Spark对优化器已经有不少的优化,如谓词下推、列裁剪、常量合并等,但都相对简单,在没分区的大表或者用户Join大表和小表时,会出现全表扫描或者分区不合理暴力扫描情况。一旦执行了这种作业,一方面用户长时间才能得到数据结果,另一方面平台方承载作业扫描全表的压力,作业会占用集群主要资源,拖慢其他作业。因此用户需要根据具体业务做条件限制,调整Spark SQL以及对表分区等。


数据倾斜分析


图片


原因:数据倾斜是Task计算过程中Key分布不均造成的,个别Key的数据特别多,超出计算节点的计算能力;

危害:会导致任务内存溢出、计算资源利用率低、作业执行时间超出预期;

诊断:数据倾斜发生时,大量的Map Stage数据发送到Reduce Stage,Reduce Stage节点需要处理大量数据,其他依赖该节点将处于长时间等待状态。比如Stage1依赖Stage0的执行执行结果,如果Stage0发生数据倾斜,导致执行过长或者直接挂起,Stage1将处于等待状态,整个作业也一直挂起,这是资源将被这个作业占有,但只有极少数Task在执行,造成计算资源浪费,利用率低;大量数据将集中在少数计算节点上,当数据量超出单个节点的内存范围,最终内存溢出,导致任务失败。一般出现在SQL字段:join on, group by, partition by, count distinct等,解决数据倾斜常用方式有:


  • 增大并行度spark.sql.shuffle.partitions,使得数据再次分配到不同Task;

  • 过滤异常值的数据,过多冗余值也会导致数据倾斜;

  • SQL中group by或者RDD的reduceByKey添加key的随机数打散Map, Reduce两个阶段数据,最后在Reduce阶段将随机数去掉;

  • 表Join关联时,可以使用Broadcast方式广播小表数据,避免shuffle, 就不会发生数据倾斜;


Shuffle失败分析


图片


原因:由于作业配置、网络、操作系统、硬件多个因素,Shuffle在节点之间传输数据会失败

危害:作业异常退出,资源浪费

诊断:作业计算过程中,Shuffle作为Spark MapReduce框架中的数据纽带,经常出现失败问题,问题可以分Shuffle Read和Shuffle Write两部分。


图片


由图看出,Shuffle Write的分区(partition)数量跟MapTask(RDD)的数量一致,文件被分割后,经算子计算的中间排序结果临时存放在各个Executor所在的本地磁盘,可以理解为Shuffle Write做了本地磁盘保存文件操作。Shuffle Read的分区数有Spark提供的一些参数控制,参数不合理将会导致Reduce Task异常,如数据倾斜,甚至OOM造成Executor退出,下游网络连接不上。由诊断抓取异常了解到原因后,从Shuffle的数据量和处理Shuffle数据的分区数两个角度给出方案:


  • 减少shuffle数据量,使用Broadcast Join或者去掉不必要字段等;

  • 有group by、Join、 reduce by、partition by等算子操作可以通过shuffle的partitions参数,根据数据量或计算复杂度提高参数值,另外控制好并行度以及运行任务的总核数,官方推荐运行Task为核数的2-3倍;

  • 提高Executor的内存,防止内存溢出或者JVM Crash;

  • 提高Spark网络RPC通信时间配置,可以让数据处理完成等;


内存溢出


图片


原因:Spark内存使用超出了容量造成内存溢出

危害:作业异常退出,资源浪费

诊断:按照Spark内存模型,用户实际使用内存如下

图片


图片


用户作业内存溢出分堆内和堆外两种方式:

  • 堆外内存溢出:表现为作业被Yarn节点Kill, 主要原因是MonitorMemory超出申请内存限制

  • 堆内内存溢出:表现为JVM内存空间不足或者GC超出限制,任务内的数据量过多导致


定位到原因后,可以有多种处理方式:

  • 提高executorMemory, 堆内内存增大;

  • 降低executorCores, 减少并行度,处理数据量变少;

  • 重新分配分区(repartition), 对每个Task产生的RDD、Dataframe数据量减少等;

  • 提高executorMemoryOverhead参数,堆外内存增大;

  • 处理数据倾斜,如group by、reduce by等热点key打散;


SQL其他常见问题分析


图片


原因:SQL执行过程中没权限、表不存在、语法错误等;

危害:任务执行异常退出,浪费资源

诊断:具有SQL失败特征从指标数据或者日志提取,用户根据问题去申请相应权限、创建表或者修正语法问题,能快速解决问题。


(5)降本增效


以上讲述了常见的问题案例场景,这里不再多介绍,接下来我们分析下降本增效。


通过作业层和引擎层分析识别异常、不合理任务,累计识别任务的内存、CPU资源,转化为相应的成本,通过任务元数据关联,按个人、业务、部门三个维度汇总给用户,并设置排名等机制,推进数据治理。


图片


以下通过长期推进治理,可以看成本趋势,用户聚焦的任务问题得以改善。


图片


05 总结与规划


  • OPPO大数据任务诊断平台主要围绕离线调度任务、计算引擎两个方面对问题进行定位分析,使用丰富的知识库,提供给用户解决优化方案,同时达到降本增效的目的。

  • 技术方面采用非入侵方案对接其他系统,保证了其他系统的安全性。系统架构基于启发式规则定位、分析问题方式,但知识库比较依赖人员经验的积累,更深层次问题需要数据挖掘算法扩大检测范围,智能化诊断。

  • 另外,除了对Spark任务问题诊断,OPPO大数据诊断平台还针对Flink任务进行异常、资源问题诊断,整体平台包含Spark、Flink两种计算引擎诊断,届时将会对平台(罗盘)进行开源。



#作者简介


BobZhuang  OPPO高级数据平台工程师


专注大数据分布式系统研发,曾就职于Kingsoft公司。


Xiaoyou Wang  OPPO数据平台工程师


2019年加入OPPO,负责大数据系统相关设计和开发工作,拥有丰富的后端研发经验。


推荐阅读


图片


图片


图片

大数据 · 目录
上一篇OPPO实时计算平台基于云原生的作业弹性伸缩设计与实践下一篇OPPO大数据平台在亚马逊云科技上的成本优化最佳实践

微信扫一扫
关注该公众号

继续滑动看下一个
安第斯智能云
向上滑动看下一个