cover_image

工作流编排利器-Conductor的落地

王凯文 北京顺丰同城科技技术团队
2025年02月07日 02:25
1、背景介绍   

1.1 现状  

智域系统支持动态和静态的管理小哥的排班计划,同时也对小哥的收派件的路径进行智能调度来提升收派效率,智能调度的业务场景对算法模型的数据产出的有一定的时效性的要求和需要对不同算法任务进行编排关联,最终快速、准确的产出调度所需的数据。此外,公司内部的供应链管理的相关系统,例如仓网规划、需求预测、库存计划等也对算法任务的编排有一定的需求。
另外,智域系统每天还需要处理大量的订单数据,网点数据,用来计算出前日的各种数据指标,例如网点维度的订单达成率,订单超时率,根据这些指标再进行派件人员优化,区域调整等。定时任务会在每天凌晨拉起需要处理的任务,由于处理的数据量大,很多任务一跑就是十来个小时,如果再由于一些错误,导致任务失败重跑,那么可能当天就无法计算出前日的数据,导致整体指标出现错误偏差。

1.2 问题难点  

上述业务场景存在着一定的业务痛点,主要体现在以下几点:
1、难以统一调度管理:不同业务线实现的技术方案各不相同,难以统一的进行管理调度复用
2、重复造轮子:各个业务线都要对任务进行任务管理、任务调度、任务重试等任务管理功能的统一开发    
3、复杂的订单处理流程:对于订单数据处理往往有强关联的依赖关系,一个处理任务的数据源往往需要其他处理任务先处理完,纷繁复杂的数据处理任务在调度时机的选择上还需要投入大量的精力
4、机器资源严重不均衡:在处理任务大数据量任务以及算法任务时,往往会随机选择集群中的几台机器进行执行,导致cpu飙升,严重时会导致集群雪崩。
5、并行任务受阻:受限于平台单机器资源,任务并行计算能力不足。

1.3 解决方案  

通过分析业务场景中存在的问题,发现主要问题在于没有一个统一的平台来负责任务的管理调度和相关的任务补偿处理平台。需要一款系统,支持工作流调度,任务编排,任务管理,容错率高,有可视化界面,上手难度低等特点。

2、Conductor介绍  

2.1 选型   

我们选取了4款常用的工作流引擎进行对比

图片

从上述背景描述的业务场景中,可以知道,我们使用的是实时调度的场景,需要将众多的任务进行编排,支持fork-join,可以水平无限扩展,这四款中只有Conductor是最适合的实时调度的场景,另外三个适合的是大数据批量处理场景,另外从开发语言,star数,上手难度等方面综合考虑    
经过综合评估,我们选择了Conductor作为工作流编排工具,主要原因如下:
  • 高度可扩展性:Conductor支持水平扩展,能够轻松应对高峰期的流量冲击,满足我们的业务需求。
  • 灵活性:Conductor提供了丰富的API和DSL,支持复杂的业务逻辑编排,能够适应不同的业务场景。
  • 易用性:Conductor内置多种任务类型,易于集成现有的微服务架构,降低了开发和运维成本。
  • 社区支持:Conductor拥有活跃的社区和丰富的文档资源,便于获取技术支持和解决方案。

    综上所述,Conductor在可扩展性、灵活性、易用性和社区支持方面均表现出色,符合我们项目的实际需求,是最优的技术选型。


2.2 Conductor概念   

Conductor是由netflix开源的一个微服务编排引擎,在netflix内容平台工程团队的许多业务场景提供了很多支持,比如内容检查(检查文件的输入是否正确\完整)、编码(将原有的文件进行视频编码)、发布等业务流程,这些业务流程由微服务任务异步驱动的。其中一些业务流程是需要维持数天的长期业务流程。与我们公司面临的场景很类似。    

2.3 Conductor运行  

通过Conductor提供的api通过JSON按照Conductor的DSL定义复杂的业务流(workflow),并将不同的微服务中的任务定义成Conductor的worker任务,并由拆分的微服务实现处理单个任务;基于JSON DSL定义执行业务流;所有业务流的运行状态是可查询的并且是可追溯的;提供了暂停、恢复、重启等控制API来方便业务方接入开发;复用现有的服务,并且提供了方便的管理途径;提供了多任务并行处理的能力,扩展数百万个并发运行流程的能力。引擎的核心是状态机服务,即Decider服务。当工作流事件发生时(例如任务完成,失败等),Decider将工作流蓝图与工作流的当前状态相匹配,识别下一个状态,并安排适当的任务,或更新工作流的状态。

2.4 Conductor核心思路   图片

Conductor架构图
引擎的核心是基于有向无环图(DAG)算法实现的状态机服务(Decider)和基于分布式K-V存储的队列服务(Queue)。    
当工作流事件发生时比如开启一个工作流,或者工作流中任务状态更新,Decider从存储引擎中拿出相应的工作流定义,并基于工作流的定义与工作流的当前状态相匹配,根据条件获取下一个状态,来决定是继续安排适当的任务即推入相应的任务队列并更新工作流的状态。
工作流是由不同的task任务(task)组成的,这些任务由worker应用程序调用相应的注册API将不同的任务注册到Conductor中,任务名称是唯一的,Conductor通过不同的任务名称来产生不同的任务队列,worker也是根据不同的任务名称去拉取该worker需要处理的任务,并通过实现Conductor的worker接口来对该任务进行相应的业务处理,处理完后通过API同Conductor进行通信在Queue服务中更新相应的任务状态

2.5 Conductor定义  

图片
      一个工作流是由很多Task来组成的,定义了每个task在这个工作流中是如何工作的,所有定义都是通过Json格式来定义的,当定义好工作流后,我们就可以给工作流一个输入来让工作流运行,会生成一条唯一的工作流id,然后对应的worker机器则会去拉取相应的任务来执行,当所有的任务都按照工作流定义执行完成后,该工作流执行完成了。    

2.6 Conductor任务管理  

图片
Conductor支持任务的失败重试,worker从Conductor拉取任务后,会在本地处理任务,当处理失败的时候,会设置Conductor task的状态为Failed, Conductor服务端收到该任务失败后,会根据task定义中的重试次数来进行重试,worker会再次拉取该任务进行执行,这里的worker不一定是第一次执行失败的Conductor,而是所有处理该任务的worker机器都有可能会拉取到该任务。    图片
任务生存时间是指该任务从初始态到最终态的时间,任务最终态包括完成,失败,超时,任务生存的最长时间则是任务超时时间,当任务超过任务定义中的超时时间后,则会将任务定义为超时。    图片
Worker的最长执行时间也是在定义task的时候设置的,当一个任务在一个worker上执行时间超过设置的阈值之后,则会将该任务标记为失败,再根据重试策略重新执行该任务。

3、Conductor管控智域系统所有算法任务  

3.1 抽象业务模型  

通过理解Conductor的基本原理,我们现在来介绍如何将Conductor融入智域系统现有算法业务中。
一个模型流程可能包含任意个算法任务步骤,首先将一个模型任务拆分成足够独立的算法任务,并且基于Conductor 约定的JSON DSL将算法任务定义成一个Conductor task,将一整个模型定义成Conductor workflow,将模型定义好之后,通过API注册到Conductor中,并在某一适合通过API开启工作流从而开启一个算法模型。
下面模型设计为例:    
首先注册每个算法模型不同步骤的task任务,这些task任务后续要编排到不同的Conductor的工作流中。根据业务模型执行顺序以及策略模型的树形拓扑结构,将不同的算法步骤编排成顺序执行;图片
Middle-model服务是一个运行中台策略计算模型的服务,负责将算法的不同步骤通过拖拽的方式进行服务编排。一个简单的计算模型组成一个工作流。
将从输入源拿取数据、缺失值处理、异常值处理、数据标准化和处理完后的数据抽象为Conductor的工人任务,分别在不同的服务中实现。将这些任务按照Conductor的JSON DSL的方式定义到Conductor中,不同的模型可能需要不同的任务组合,按照产品的模型设计进行模型的生成,通过注册工具流API将任务组合成的工作流注册到Conductor中,并开启。    
将截取好的树形结构按照Conductor DSL封装为Conductor支持的workflow定义,并通过API注册到Conductor中。由worker服务节点将通过API拉取通过Conductor调的分配到不同队列中的中的任务拉取下来执行,worker服务处理完任务之后将任务状态返回给Conductor方便Conductor进行下一步策略模型调度。
图片

3.2 业务模型注入Conductor  

通过使用Cconductor做任务编排和流程控制,我们把模型运行从业务拆分出去,耗时较长的模型任务单独的机器部署worker处理,通过worker容器化扩缩容的方式实现任务并行执行能力的提升。最终的业务架构模型设计如下:    图片

3.3 过程中遇到的问题  

问题1:
Conductor对长时间运行的任务会通过队列的长度和返回时间,会重复将任务加入到队列中,为了保持任务的至少一次处理,从而导致middle-model需要针对这种情况进行幂等的保证,通过redis的setex将taskId保存在redis中,在每次任务处理的之前检查该任务是否已经在处理或已经处理过,如果正在处理需要直接返回Conductor任务状态为IN_PROGRESS并设置下一次重新推送队列时间,解决多次任务推动的问题。
问题2:
由于任务想要并行处理,需要在不同的任务之间设置不同的FORK-JOIN任务,需要通过设置哪些任务可以并行,哪些任务需要串行,需要通过Conductor的FORK-JOIN任务注册到原有的工作流中来解决。
问题3:
工作流的结果如何第一时间通知业务方。为了解决这个问题当时想了两个方案,在每个工作流的最后面增加一个http任务,工作流成功后调用业务方接口将工作流结果通知业务方,但是这种方法没法解决在中间某个任务失败后是走不到最后的http任务的;Conductor官方给的解决方案是实现WorkflowStatusListener这个接口,这个接口包含两个方法,我们通过实现这个接口在工作流完成和工作流终止两种状态时候通过http请求回调业务方。    

4、Conductor处理智域系统单体大任务  

4.1 场景分析  

     对于一家物流公司而言,快递员是其运营的核心资本。在全国范围内,数以十万计的快递员如何高效运作,一直是公司关注的重点课题。长期以来,快递员的收件和派件工作通常被分配在固定的区域内进行,这种模式虽然稳定,但也暴露出了一些问题:
  • 新员工适应期:新入职的快递员由于对技术和路线不够熟悉,导致在同一区域内的工作效率与老员工相比存在差距。
  • 区域负荷波动:随着电商活动频繁,尤其是“双十一”、“618”等大型促销期间,某些区域的快递量激增,使得现有快递员难以应对。
  • 外部因素影响:如天气变化等不可控因素也会影响快递员的工作效率。

    为了解决上述问题,基于商圈规划的调度机制一直是我们物流平台中心研究的重点问题,旨在通过综合考虑多个变量来实现更合理的任务分配:
  • 个人效率:基于历史数据评估每位快递员的实际工作表现。
  • 区域规模:考虑到不同区域的地理特征及复杂程度。
  • 包裹数量:预测特定时间段内各区域的包裹量。
  • 天气状况:结合短期天气预报,调整配送计划。
  • 促销活动:针对特定时间点(如电商大促)提前规划人员配置。
  • 突发事件:建立应急响应机制,确保面对突发状况时仍能快速调整策略。

目标是能够做到不仅按天,甚至细化到每个班次,都能够精准地指派合适的快递员前往相应的区域执行收派件任务,从而最大化整体运营效率与服务质量,
     想要实现上述的内容,必须每时每刻基于海量的数据来计算得出,包括计算全国的小哥信息,快递信息,网点信息等,再综合汇总的数据,进行一系列的任务才能产出最后的结果,在这样一个大型的系统架构中,时刻会产出各种的中间表,每个表和每个表之间的依赖错综复杂,犹如一个网状结构充斥在系统的每个角落。对应到代码层面就是一个一个的CT任务,
智域系统在每天的CT任务中,需要处理全国的所有快递网点,小哥,订单等数据,单机处理CT任务耗时太长,开多线程处理受限于单机负载,而且失败重试成本很高。尤其是在服务重启上线的时候,会导致服务在运行中的ct任务中断,并且重启上线的时候ct任务不会重试,只能等待下个周期,这样就会到导致很多依赖于ct 任务的场景会将ct任务失败加入到系统设计的范畴内,即影响了系统的复杂度,也会对数据产出产生影响。
    引入了Conductor后,这里我们就可以根据工作流的特性,来处理这种问题。在工作流编排过程中,为了处理这种大任务,业内比较通用的做法就是Fan-out Fan-in的任务编排模式,将大任务分解成小任务,然后并行运行小任务,最后再聚合结果
图片    
通过使用 DAG(有向无环图)编排 Fan-out Fan-in 任务,Conductor 可以有效地处理复杂的任务流程。Fan-out Fan-in 模式通常用于将一个任务分解成多个子任务,并最终聚合这些子任务的结果。这种模式在许多场景下都非常有用,例如数据处理、订单处理和批处理任务等。子任务的拆分方式分为静态和动态两种,分别对应静态 DAG 和动态 DAG。
静态 DAG:静态 DAG 是指在工作流定义阶段就确定了所有子任务的数量和结构。这种模式适用于那些预先知道子任务数量和依赖关系的情况。例如,在订单处理系统中,如果每个订单都需要经过支付确认、库存检查和发货三个步骤,那么这些步骤可以预先定义为静态 DAG。静态 DAG 的优点在于其结构清晰、易于维护。
动态 DAG:动态 DAG 是指在工作流执行过程中根据实际情况动态生成子任务。这种模式适用于那些子任务数量和结构在运行时才能确定的情况。例如,在数据处理流水线中,可能需要根据不同的数据源和数据量动态生成不同数量的处理任务。动态 DAG 的优点在于其灵活性高,可以根据实际情况进行动态调整。
动态 DAG Fan-out Fan-in 与 MapReduce:动态 DAG Fan-out Fan-in 模式可以理解为一种类似于 MapReduce 的处理方式。在这种模式下,每个子任务可以看作是一个“Map”任务,负责处理一部分数据;而最终的聚合结果则是一个“Reduce”任务,负责汇总所有子任务的结果。这种模式非常适合处理大规模数据集,因为它可以充分利用并行计算的优势。
例如,在数据处理流水线中,假设需要处理一批日志文件,每个文件需要进行数据清洗、转换和分析。可以将每个文件的处理任务作为一个“Map”任务,然后将所有处理结果汇总到一个“Reduce”任务中进行最终的汇总和分析。这种方式不仅可以提高处理效率,还可以确保数据的一致性和完整性。    
通过这种方式,Conductor 不仅能够处理静态的工作流,还能灵活应对动态变化的任务需求,从而实现更高效的任务编排和管理。

4.2 业务改进  

图片
     CT任务由之前的单机执行,变为使用了Conductor动态DAG的模式,当某个机器执行一个CT任务时,会将这个任务进行拆分成一个一个的小任务,然后注册为Conductor的工作流,之后集群内的所有节点都会执行这些小任务,都执行完成后再进行汇总。          
     上图是具体的设计流程,分片键是处理任务的维度,可以按照时间进行分片,可以按照网点进行分片,也可以按照城市进行分片,例如按照网点维度进行分片,CT任务需要处理3万多个网点数量,每个分片默认处理60个网点的数量,拆分成具体的任务后,将任务名称以及该任务名称对应的需要处理的60个网点的编号存储在redis中,然后提交工作流任务,具体的任务是在fork-join中执行的,fork任务就是具体的分片任务,pod接收到分片任务后会从redis中取出自己需要处理的网点信息,然后再执行具体的处理逻辑,当所有的任务都处理完成后,再执行fork任务来进行汇总。    

4.3 业务实施  

     Conductor对于ct业务来说,接入还是有一定繁琐的,每次新建一个ct任务的时候,都得先去Conductor平台创建任务,再创建工作流,然后需要对接Conductor sdk,去拉取对应的任务,再处理任务,这么一套流程下来对新人很不友好。
     所以我们对ct任务的Conductor进行了封装,让业务屏蔽和Conductor的一切感知,只需要引入Conductor的sdk,然后继承一个类,就可以使用了,所以业务接入基本是0成本。
业务接入的大致流程
  • 引入sdk
  • 配置文件增加Conductor的服务地址
  • 写具体的业务实现,继承对应的类即可

    继承SimpleSequenceRichShardProcessor类,第一个范型是CT任务运行周期的上下文,第二是具体的分片,可以是一个网点,一个日期        图片


    在CT任务代码中,注入SimpleSequenceRichShardProcessor的实现类,然后获取对应的分片,执行SimpleSequenceRichShardProcessor.execute()方法即可,例如下图代码中手动写了A100~A600 6个网点,传入方法中即可图片


4.4 业务收益  

在接入Conductor后,我们随机挑选了一个任务进行测试,处理效率上由之前的2个小时缩短到10分钟。资源使用率上单节点处理是65%,多节点处理增加了约4%图片
单节点cpu使用率
图片
多节点cpu使用率

5、注意事项   

  • 引入了Conductor后,虽然效率上提升很明显,但是系统架构也随之变的复杂,对于问题排查又上升了一个难度,所以对于业务开发同学来说,虽然接入可以完全屏蔽Conductor,但是也不可避免的需要和Conductor打交道,例如需要调节任务的并发粒度,需要调节任务的拉取频率,需要调节拉取的机器,需要查看任务的运行状态,重试机制,任务下发机制等,这些都需要去了解。

  • CT任务的追踪和定位将会变得困难,一个CT任务会分布到整个集群中的所有节点去执行,需要根据logid,traceid去平台查找相关的日志。

  • Conductor故障恢复难度较高,Conductor的所有元数据都存储在redis中,对于服务故障我们可以通过重启,扩容来解决,重启之后不会丢失任务的状态,因为Conductor是无状态的服务,但是对于redis中的元数据丢失,则无法进行恢复,这里我们的兜底方案是在程序启动的时候,都会去校验一遍该程序对应使用的Conductor元数据,包括工作流定义,任务定义等是否存在,如果不存在,则会自动进行注册,即假如Conductor的数据全部丢失,对应使用Conductor的业务只需要重启即可。    

6、后续规划  

  • 监控完善:目前Conductor服务端监控还不完善,对于工作流的使用率,错误发生率,拉取频率,执行时间等的统计还没有渠道能展示出来,后续会将这些监控全部完善。
  • 告警优化:目前Conductor的告警依赖于服务端的错误日志,只知道任务失败,对于一些更详细的信息还不能很好的关联起来,后续会将工作流关联更多业务关注的信息。

7、总结   

接入Conductor后,收敛了智域系统的业务线设计,统一了部分设计方案;相关的算法任务通过抽象后只需要由某一个业务向开发管理,其他业务线都可以抽象后的算法任务进行复用;算法服务和业务平台服务拆分到不同机器运行,有效的保证了算法任务执行时长,并保证了平台系统的稳定性;有效的支撑单任务并行执行的能力,并提供了多任务并行的解决方案;通过Conductor依赖的dynomite处理了单点redis实现的队列容易引起数据的不一致的问题;通过Conductor的API可以方便的查询模型执行过程查询模型的执行状态,执行到某一步,并且根据Conductor的API有效的进行重试、暂停、取消等相关操作。 

极大的缩短了智域系统中心CT任务的耗时,之前下午或者傍晚才能产出的数据,现在早上就可以产出数据,并且依据现有数据可以提前规划全天的安排,使得平台不会再因为数据没有产出导致任务阻塞。即使任务计算失败,也会在极短的时间内重新执行任务,增加了服务的容错性和健壮性。

Conductor还对接了其他部门的相关业务,比如件量预测、路径规划;还有KFC运营平台的编排任务。我们目前对Conductor的使用有些成功案例,并且接入了相关的监控报警,计划未来通过业务方的反馈,进一步完善Conductor的功能,利用Conductor为我们公司的任务编排能力赋能。

继续滑动看下一个
北京顺丰同城科技技术团队
向上滑动看下一个