在日常Flink使用过程中,我们经常遇到Flink任务中某些Slot或者TM负载过重的问题,对日常的资源调配、运维以及降本都带来了很大的影响,所以我们对Flink的task部署机制进行了梳理和调研,准备在后续的工作中进行优化。由于jobGraph的生成以及任务提交流程因任务部署方式而不同,对我们后续的分析也没有影响,这里忽略前置流程,直接从Dispatcher出发,重点关注submit后executionGraph构建以及后续的任务部署过程。
2.1 SchedulerNG
在Dispatcher收到submit请求后,先是启动了JobManagerRunner,再启动JobMaster,在初始化jobMaster的过程中,我们注意到这里开始了整个作业的Scheduling第一步,创建SchedulerNG。
this.schedulerNG =
createScheduler(
slotPoolServiceSchedulerFactory,
executionDeploymentTracker,
jobManagerJobMetricGroup,
jobStatusListener);
我们这次主要跟踪构建executionGraph,然后根据Scheduling策略发起的整个部署过程。
现阶段(1.13)SchedulerNG默认实现是DefaultScheduler,初始化过程中就会开始构建我们的ExecutionGraph,ExecutionGraph中有几个重要元素
// topologically sort the job vertices and attach the graph to the existing one
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
executionGraph.attachJobGraph(sortedTopology){
1. executionGraph第一步拿到了jobGraph中的有序JobVertex列表
2. 接着一对一创建ExecutionJobVertex
3. 根据producer并行度生成producedDataSets(IntermediateDataSet)
4. 再根据自身并行度生成所属的ExecutionVertex[]
5. 构建stateBackend信息和checkpointStorage信息等
6. 最后完成executionGraph的拓扑构建executionTopology
}
Pipeline region的构建内嵌在executionGraph的初始化过程中,我们知道Flink中各个节点之间的链接都会有IntermediateDataSet这一种逻辑结构,用来表示JobVertex的输出,即该JobVertex中包含的算子会产生的数据集。这个数据集的ResultPartitionType有几种类型:
BLOCKING:都上游处理完数据后,再交给下游处理。这个数据分区可以被消费多次,也可以并发消费。这个分区并不会被自动销毁,而是交给调度器判断。
BLOCKING_PERSISTENT:类似于Blocking,但是其生命周期由用户端指定。调用JobMaster或者ResourceManager的API来销毁,而不是由调度器控制。
PIPELINED:流交换模式。可以用于有界和无界流。这种分区类型的数据只能被每个消费者消费一次。且这种分区可以保留任意数据。
PIPELINED_BOUNDED:该策略在PIPELINED的基础上保留有限制的buffer,避免对barrier造成阻塞。
PIPELINED_APPROXIMATE:和PIPELINED_BOUNDED类似,可以支持下游task重启后继续消费,用来支持task failover后的Approximate Local-Recovery策略。
第一步 先根据executionTopology构建rawPipelinedRegions,多个vertex能否组合成一个pipeline region的关键在于这个vertex的consumedResult.getResultType().isReconnectable(),如果支持重连,那么两个vertex之间就会进行拆分,划到不同的region。这里的isReconnectable就和我们的ResultPartitionType类型有关,流处理中的PIPELINED和PIPELINED_BOUNDED都是默认的false,在这种情况下所有的vertex其实都会放入同一个region。故我们日常的flink作业其实都只会生成一个pipeline region。
第二步 根据不同的pipeline region构建自己的resultPartition信息,这个是为了构建后续的PartitionReleaseStrategy,决定一个resultPartition何时finish以及被release
第三步 对vertex的coLocation情况进行校验,保证co-located tasks必须在同一个pipeline Region里。这里是因为后续的scheduling strategy里会保证不同pipeline region的调度部署是阶段隔离的,可能无法满足colocation-constraint
@Override
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
.filter(this::isSourceRegion)
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}
但在实现上这里有几个重要元素需要了解:
LocalInputPreferredSlotSharingStrategy :在Flink内部,所有的slot分配都是基于sharingslot来操作的,在满足co-location的基础上,Flink期望将producer和consumeNode task尽可能的分布在一起,以减少数据传输成本。
SlotProfile:slot的资源信息,对task -> logical slot -> physical slot的mapping有非常重要的作用,包含了task的资源信息,slot的物理资源信息,倾向的location(TaskManagerLocation),倾向的allocation以及整个executionGraph之前分配过的allocation(用于黑名单,重启后尽量避免分配在之前的slot里)。
ResourceProfileRetriever: 用于获取executionVertex的实际资源信息。默认是unknown,如果有明细配置会用于后续的executionSlotSharingGroup资源构建。
ExecutionSlotSharingGroup:Flink task资源申请的最终逻辑载体,用于将sharing到一起的task(execution group)组合成一个group用于生成资源,后续部署也会绑定对应的task。
在JobMaster完成自身构建之后,就委托SchedulerNG来开始了整个job的Scheduling:
@Override
protected void startSchedulingInternal() {
log.info(
"Starting scheduling with scheduling strategy [{}]",
schedulingStrategy.getClass().getName());
transitionToRunning();
schedulingStrategy.startScheduling();
}
private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
final List<SchedulingPipelinedRegion> regionsSorted =
SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
schedulingTopology, regions);
final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new HashMap<>();
for (SchedulingPipelinedRegion region : regionsSorted) {
maybeScheduleRegion(region, consumableStatusCache);
}
}
final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
regionVerticesSorted.get(region), id -> deploymentOption);
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
private List<SlotExecutionVertexAssignment> allocateSlots(
final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
return executionSlotAllocator.allocateSlotsFor(
executionVertexDeploymentOptions.stream()
.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
.collect(Collectors.toList()));
}
我们对整个Flink task的部署过程完成梳理后,重新对我们一开始的问题进行思考:
问题的产生在于大量的task集中分配到了统一个sharedSlot,这个我们可以发现其实是在ExecutionSlotSharingGroup的构建过程中产生的。我们看下源码,可以很直接的看到整个group的分配是一个roundRobin过程,而executionVertices来自于有序拓扑结构,中间传递过程也保证了有序性,所以最终会导致大量的task分配的index靠前的group中,最后落到了同一个slot。
为了避免这种情况,我们的做法其实有比较多,一种是在保证各种constraint的同时添加随机性,以打散各个不均匀的task;还有一种就是构建基于load-balance的分配过程,以尽可能的将task分布均匀。
附Flink部分源码:
private void findAvailableOrCreateNewExecutionSlotSharingGroupFor(
final List<SchedulingExecutionVertex> executionVertices) {
for (SchedulingExecutionVertex executionVertex : executionVertices) {
final SlotSharingGroup slotSharingGroup =
getSlotSharingGroup(executionVertex.getId());
final List<ExecutionSlotSharingGroup> groups =
executionSlotSharingGroups.computeIfAbsent(
slotSharingGroup.getSlotSharingGroupId(), k -> new ArrayList<>());
ExecutionSlotSharingGroup group = null;
for (ExecutionSlotSharingGroup executionSlotSharingGroup : groups) {
if (isGroupAvailableForVertex(
executionSlotSharingGroup, executionVertex.getId())) {
group = executionSlotSharingGroup;
break;
}
}
if (group == null) {
group = new ExecutionSlotSharingGroup();
group.setResourceProfile(slotSharingGroup.getResourceProfile());
groups.add(group);
}
addVertexToExecutionSlotSharingGroup(executionVertex, group);
}
}
这个问题主要是在于说有一些过重的task对应的slot都分配在了同一个tm上,导致整个tm压力过大,资源难以协调。在整个过程中其实我们有看到tm信息的交互,在co-location constraint上。我们看下该hint职责:
The co-location group is used to make sure that the i-th subtasks for iteration head and iteration tail are scheduled on the same TaskManager.
也就是说其实是为了解决算子间相同index的task数据传递之类的问题,但对于task的均衡负载无法介入。对此我们尝试去做的事情:
Flink开源社区较活跃,Task侧的部署链路也一直在演进中,持续跟进并深入了解内部实现逻辑能更好的支持我们解决Flink个性化调度策略上的一些问题。后续我们也准备进一步完善Flink在operator级别的细粒度资源配置能力,降低资源使用率的同时进一步提高Flink作业稳定性。
线下活动推荐:
时间:4月9日(周日)14:00-18:00
主题:得物技术沙龙-安全专场
地点:上海市杨浦区黄兴路221号互联宝地C2栋5楼 培训教室
活动亮点:本次沙龙聚焦于行业安全前沿最佳实践,将通过得物安全白皮书分享、企企业安全体系建设经验、零信任安全介绍、数据安全治理手段等多个维度,来讲述安全管理在当前企业中遇到的挑战和解决方案。
报名方式:点击阅读原文或下图
*文/ 昭只
关注得物技术,每周一三五晚18:30更新技术干货