货拉拉数据平台上有众多的任务,任务和任务之间有着复杂依赖关系,这些任务关系实际上构成了一个有向无环图(DAG),任务成百上千,我们很难用“一张图”就直观的画出整个链路的结构和依赖关系,因此我们需要使用一些特定的方法对整个链路进行分析,来帮助我们解决以下问题:
如何确定终端任务的大致产出时间?
当某一天终端任务产出延迟了,如何确认到底是链路的哪一个环节导致了它产出变慢?
链路和集群资源的关系是怎么样的?我们能否通过分析链路避免资源抢占?
如何定位集群资源高峰是哪个任务产生的?
任务实际运行时间
我们的数据平台任务可以有设定执行时间,在上游依赖完成的情况下,如果还没到执行时间,任务是不会进行的。任务的状态又分为“就绪”,“运行中”,“成功”,“失败”等。如果不考虑任务下发出现问题,任务处于“就绪”状态有以下可能:
当上游依赖任务还未产出时,哪怕设定时间已到,当天实例也不会运行
当上游依赖已跑完,但还未到设定时间时,当天实例也不会运行
某任务实例实际开始运行时间≈max(上游最后一个实例产出时间,该任务任务设定时间
因为任务可以设定执行时间,我们经常会遇到这么一个问题:上游某个任务已经运行完了,但是下游因为设定时间较晚,等待了很久后才执行。有的时候任务的等待是没有必要的,因为任务
保障链路复杂度
以下是2022.11.20统计的部分关键终端任务及上游任务个数
保障链路名称 | 链路任务个数 | 根节点任务个数 | 最大链路层数 | 要求产出时间 |
---|---|---|---|---|
P0核心报表 | 807 | 290 | 22 | 6:30 |
P1核心报表 | 1338 | 413 | 23 | 7:30 |
用户画像链路 | 657 | 269 | 18 | 6:00 |
日常运营核心报表 | 756 | 280 | 21 | 6:30 |
链路分析
“如果每个任务的一个紧挨着一个运行就好了。”
原始链路
原始链路包含10个任务,其中10号任务作为终端任务,为了优化终端任务的产出时间,我们需要调整每个任务的启动时间,但因为链路之间相互依赖,任务的调整会牵扯到其上游和旁支的链路任务,因此一共做了三次调整动作。
在经过三次调整动作后,我们实现了10号任务提前30分钟产出。
调整后链路
但公司级重要的报表上游有几百上千个任务,成百上千个分支,如果每个链路分支都要用以上方法人为一个个去判断优化和调整,那是不可能的,因此我们需要寻找自动化的链路优化方式,能够轻松识别出来哪些任务需要调整。
我们首先识别出来终端任务在链路理想状态下能够提前多久产出,再自下而上优化上游任务运行时间,优化的时间根据下游调整时间和任务之间等待时间比较获得,在此思路下我们需要做以下两点假设:
以1分钟作为任务有无等待的边界,小于一分钟时,我们认定两任务之间无等待。
当计算出来往前调整时间小于0时,我们默认不做调整。
我们判断出最长链路相加70分钟,因此终端任务预计最早能在1:10分产出,能优化30分钟。
优化过程:
任务10往前调整30分钟
优化任务10上游:可优化gap=30
任务9:wait=0.5<1,任务9往前调整30分钟
任务8:wait=20>1,任务8往前调整30-20=10分钟
优化任务8上游:可优化gap=10分钟
任务5:wait=0.5<1,任务5往前调整10分钟
优化任务9上游:可优化gap=30
任务4:wait=0.5<1,任务4往前调整30分钟
任务7:wait=15>1,任务7往前调整30-15=15分钟
任务6:wait=30>1,任务往前调整30-30=0分钟
优化任务4上游:可优化gap=30分钟
任务1:wait=40>1,任务4往前调整30-40<0分钟
任务2:wait=10>1,任务2往前调整30-10=20分钟
优化任务7上游:可优化gap=15分钟
任务2:wait=20>1,任务2往前调整15-20<0分钟
优化任务6上游:可优化gap=0分钟
任务3:无需优化
最终红色字体就是每个任务我们需要调整的时间。
整个算法思路的流程图如下:
以上例子和优化算法的实现如下:
import networkx as nx
import Pandas as pd
# 生成以上链路
edges = pd.DataFrame(
{
"source": [1, 2, 2, 3, 4, 7, 6, 9, 5, 8],
"target": [4, 4, 7, 6, 9, 9, 9, 10, 8, 10],
"weight": [5, 10, 10, 40, 45, 20, 20, 10, 25, 5],
"wait": [40, 10, 20, 0.5, 0.5, 15, 30, 0.5, 0.5, 20],
"adjust": [0,0,0,0,0,0,0,0,0,0],
}
)
G_test = nx.from_pandas_edgelist(edges, create_using=nx.DiGraph, edge_attr=True)
G_test.add_edge(10,100000,weight=0.5,adjust=-30,wait=0.5)
# 优化算法函数
def opt_layer(G,pre_nodes):
while pre_nodes!=[]:
print('开始优化',pre_nodes,'上游。')
next_opt_nodes = []
for i in pre_nodes:
adjust_compare = pd.DataFrame(dict(G[i])).loc['adjust',:].min()
print('adjust_compare:',adjust_compare)
pre_nodes = list(G.predecessors(i))
next_opt_nodes = list(set(next_opt_nodes+pre_nodes))
for j in pre_nodes:
if G[j][i]['wait'] < 1:
G[j][i]['adjust'] = adjust_compare
print(j,'-',i,'wait<1')
print(j,'-',i,'adjust:',G[j][i]['adjust'])
else:
new_adjust = math.floor(G[j][i]['wait']) + adjust_compare
G[j][i]['adjust'] = min([new_adjust,G[j][i]['adjust'],0])
print(j,'-',i,'adjust>=1')
print(j,'-',i,'adjust:',G[j][i]['adjust'])
pre_nodes = next_opt_nodes
print('---------------------------------')
return G
# 开始优化
G_opt = opt_layer(G_test,[10])
运行以上代码,可以输出一个任务优化汇总的Dataframe,其中adjust就是任务需要向前调整的运行时间。
链路任务构成:
在识别出了最长链路之后,我们可以根据最长链路进行一些分析。从任务层面,我们发现在最长链路上,一些重要的节点任务是经常出现的,比如我们的订单宽表任务和司机宽表任务,所以我们可以根据这些节点的产出时间,来定位我们链路的延迟原因和健康情况。
链路总体时长构成:
链路总体时长=任务等待时间+任务运行时间
任务等待时间优化:修改调度时间,减少任务之间空泡。
任务运行时间优化:MR container数量的控制,数据倾斜优化,file merge优化,计算逻辑优化等等。
在发现订单宽表和司机宽表这些任务经常作为重要枢纽任务后,我们可以针对整个链路的任务进行节点度的分析,DAG中节点的度数反映了该节点的依赖复杂程度。我们发现节点度数高的任务结束时间经常和集群资源的高峰时间重合。
下游任务个数 | 任务名称 | 开始时间 | 结束时间 |
---|---|---|---|
129 | dwb层_订单基础宽表 | 1:29:22 | 1:49:21 |
85 | 城市维表 | 0:35:05 | 0:35:58 |
58 | 城市层级关系表 | 0:36:49 | 0:37:55 |
54 | DWS层-新司机宽表 | 2:48:32 | 2:55:12 |
49 | dwd-全量用户表 | 0:18:51 | 0:24:48 |
48 | 实时看板所有业务线所有指标汇总 | 2:35:21 | 2:37:12 |
47 | DIM层-城市维度表(全) | 0:18:13 | 0:19:00 |
32 | dwb层-无忧搬家订单基础宽表 | 1:02:45 | 1:08:08 |
31 | 司机基本信息表 | 0:35:26 | 0:41:16 |
29 | 多业务-城市大区全国维度信息 | 0:36:50 | 0:37:57 |
27 | dwb-货运用户订单基表 | 1:49:28 | 2:06:59 |
25 | 棱镜埋点汇总表增量更新 | 0:20:01 | 0:20:19 |
22 | 多业务-订单明细数据(数据按业务线加倍,计算必须加业务线) | 1:49:37 | 2:10:27 |
22 | DWB层-司机认证信息表 | 0:50:05 | 0:56:38 |
21 | 数智中心 | 0:41:41 | 0:42:11 |
20 | DWD层-新版围栏表 | 0:36:07 | 0:36:43 |
20 | dwb层_订单ab单基础表 | 1:49:28 | 2:06:55 |
20 | 风控订单基础宽表 | 2:13:06 | 2:43:21 |
19 | DWB层-司机车辆信息表 | 0:45:03 | 0:52:05 |
19 | 多业务分车型报表车型维度辅助计算表 | 1:10:33 | 1:11:24 |
17 | 搬家业务操作日志表 | 0:11:05 | 0:14:32 |
16 | 数智中心城市维表 | 0:35:07 | 0:35:36 |
目前分析造成资源高峰的原因有两个:
度数很大的节点完成的时候,下游可能会有大量任务涌入,造成任务挤兑,资源高峰。
大数据量表对应任务完成的时候,下游任务可能不多但数据量大同样需要耗费大量资源。
根节点存在外部依赖任务,此类任务无法通过修改调度时间控制运行时间。
特殊的抽数任务,此类任务调度时间不能修改,如:必须在几点后进行抽数的任务。
任务挤兑导致的资源高峰,存在不确定性,比如高峰可能并非由单个节点产生,因此并非所有资源高峰都能找到“罪魁祸首”。
只针对一天的链路运行情况做分析可能会存在特殊性,因此最好分析过去一段时间的最长链路,以避免某天链路的特殊性。
基于以上的分析,我们可以有一些链路优化的后续思路:
比较当天和以前一段时间的最长链路,我们可以定位到当天延迟产出原因,也可以分析最长链路哪些处理时间在恶化。
最长链路上的任务相当于是直接影响报表产出时间的重要任务,我们可以给经常出现在最长链路上的任务倾斜更多资源,比如保障经常出现在最长链路上任务的task数量稳定,以保障其稳定产出。
为了避免资源挤兑,可以梳理出链路中的哪些任务下游占用资源较多,错峰让这些任务产出。比如货运场景下,埋点、lbs和订单宽表三个任务因为数据量较大、下游较多,他们的下游运行往往会造成资源挤兑,我们可以让以上三个任务分开时间产出,以保证两两间下游不会挤兑。
宽表的设计能够节约重复计算导致的资源浪费,方便了下游使用,但同时会导致下游资源的挤兑,针对宽表下游,我们可以打散运行时间,将不重要或没有产出时间要求的任务,放在白天运行。
https://networkx.org/documentation/stable/index.html
作者简介:
杨舒林,大数据高级数据仓库开发工程师。现就职于货拉拉科技,目前专注于大数据数据治理工作,负责保障链路治理、数据质量治理和指标监控等工作。