cover_image

Flink在同程艺龙实时计算平台的研发与应用实践

Flink小分队 同程旅行技术中心
2019年09月03日 09:43


背景介绍

图片


        在2015年初,为了能够采集到用户在PC,APP等平台上的行为轨迹,我们开始开发实时应用。那时可选的技术架构还是比较少的,实时计算框架这块,当时比较主流的有Storm和Spark-streaming。综合考虑实时性,接入难度,我们最终选择使用基于Storm构建了第一个版本的用户行为轨迹采集框架。后续随着实时业务的增多,我们发现Storm已经远远不能满足我们对数据端到端处理准确一次(Exactly-Once)语义的需求,并且对于流量高峰来临时也不能平滑的背压(BackPressure),在大规模集群的支持上Storm也存在问题。经过充分的调研后,我们在2018年初选择基于Flink开发同程艺龙新一代实时计算平台。目前实时计算平台已支撑近千个实时任务运行,服务公司的市场、机票、火车票、酒店、金服、国旅、研发等各个业务条线。下面主要结合实时计算平台来分享下我们在Flink落地过程中的一些实践经验及思考。


平台建设

图片

        

        在开发实时计算平台前,我们有过大量实时应用业务的经验,我们发现使用实时计算的业务方主要有两类:一类的大数据业务是基于Lambda 架构开发的,这部分业务是需要有一个实时计算的组件来帮他们把以前离线的一套数据同步清洗(如:sqoop、hive)转换成实时任务。有时在这个过程中也需要组件来支持实时的过滤聚合。这部分业务方大多是数仓&分析,他们对SQL比较熟悉,更倾向于用SQL解决一切问题;另一部分业务方主要是数据开发&挖掘,他们的业务场景更复杂,业务需求变化及应用迭代很频繁,更关注实时应用的性能,他们喜欢用编程语言如:Java,scala来开发实时应用。

       

       为了更好的为两类用户提供支持,实时计算平台同时支持两种类型的任务:FlinkSQL和 FlinkStream。平台整体架构如图2-1所示:

图片 

图片

图2-1

图片      

1

FlinkSQL

     

01

        上图的后端RTC-FlinkSQL模块即是用来执行提交FlinkSQL任务的服务,SQL属于声明式语言,经过30、40 年的发展,具有很高的易用性、灵活性和表达性。虽然Flink提供了Table &SQL API,但是我们当时基于的Flink1.4及1.6版本本身语法也不支持像Create Table这样的DDL语法,并且在需要关联到外部数据源的时候Flink也没有提供SQL相关的实现方式。此外根据其提供的API接口编写TableSource和TableSink 异常繁琐,不仅要了解Flink各种Operator的API,还要对各个组件的相关接入和调用方式有一定了解(比如Kafka、RocketMQ、Elasticsearch、HBase、HDFS等),因此对于只熟悉SQL进行数据分析的人员直接编写FlinkSQL任务需要较大的学习成本。

        鉴于以上原因,我们构建了实时计算平台的RTC-FlinkSQL开发模块并对FlinkSQL进行扩展,让这部分用户在使用FlinkSQL的时候只需要关心做什么,而不需要关心怎么做。不需要过多的关心程序的实现,而是专注于业务逻辑。


02

    【 四步实现FlinkSQL提交模块】

  • 构建于Apache Calcite、Apache Flink之上

  • 将SQL映射成Flink JobGraph

     ※ parser:通过Calcite api实现解析,最终得到SqlNode集合

     ※ validator:从SqlNode中提取执行的SQL和Source、Sink、维表 对应的配置信息

     ※ executor:利用validator获取的信息借助Flink的API得到对应的JobGraph

  • 通过Yarn Client提交构建好的Flink任务,提交成功返回ApplicationID

  • 利用YARN返回的ApplicationID获取JobId之后通过Flink RESTful API监控程序的运行状况


03

【在原有FlinkSQL的基础上做了很多扩展】

图片

支持创建源表语句

        这里主要是根据上述validator阶段获取的Source配置信息,根据指定参数实例化出该对象,然后调用registerTableSource方法将TableSource注册到environment。从而完成了源表的注册。

图片 

图片

图片      

图片  支持创建输出表语句

          Flink Table输出Operator基类是TableSink,我们这里继承的是AppendStreamTableSink,根据上述validator阶段获取的Sink配置信息,根据指定参数实例化出该对象,然后调用registerTableSink方法将TableSink注册到environment。

图片 

图片

图片 

   图片  支持创建自定义函数

        继承ScalarFunction或者继承TableFunction,需要从用户提交的SQL中获取要使用的自定义函数类名, 之后通过反射获取实例,判断自定义Function属于上述哪种类型,然后调用TableEnvironment.registerFunction即可完成了UDF的注册,最后用户就可以在SQL中使用自定义的UDF。

图片 

图片

图片 

   图片  支持维表关联

        

        使用Calcite对上述validator阶段获取的可执行SQL进行解析,将SQL解析出一个语法树,通过迭代的方式,搜索到对应的维表,并结合上述validator阶段获取的维表信息实例化对应的SideOperator对象,之后通过RichAsyncFunction算子生成新的DataStream,最后重新注册表并执行其他SQL,我们同时支持账号密码直连和公司研发提供的DAL方式

  图片

图片

图片


04

       如图2-2所示,可以方便地在实时计算平台上FlinkSQL编辑器内完成FlinkSQL任务的开发,目前线上运行有500+的FlinkSQL任务在运行。


图片 

图片图2-2

图片      

2

FlinkStream


        除了FlinkSQL外,平台上还有一半的实时任务是一些业务场景更复杂,通过代码来编写开发的任务。对此我们提供了RTC-FlinkStream模块来让用户上传自己本地打包后的FAT-JAR,通过资源管理平台来让用户对JAR做版本管理控制,方便用户选择运行指定的任务版本,FlinkStream任务开发界面如图2-3所示。


图片 

图片图2-3

图片 

        这部分任务有些对资源使用需求比较大,我们提供了任务容器配置的参数来让用户灵活的配置其Task并发,并且提供了自定义时间周期触发保存点(savepoint)的功能。


易用性提升

图片

      

       平台开发难度相对低,难的是如何提升平台的易用性,因为开源组件如Apache Flink核心关注数据的处理流程,对于易用性这部分稍显不足,所以在实时平台功能开发过程中要修改Flink组件的源码来提升其易用性。


1

        就以Flink任务运行的指标(Metrics)监控来说,当Flink程序提交至集群之后,我们需要的是收集任务的实时运行Metrics数据,通过这些数据可以实时监控任务的运行状况,例如,算子的CPU耗时、JVM内存、线程数等。这些实时Metrics指标对任务的运维、调优等有着至关重要的作用,方便及时发现报警,进行调整。通过对比现有的指标采集系统,包括InfluxDB、StatsD、Datadog等系统再结合公司的指标收集系统,我们最终决定采用Prometheus作为指标系统。但是在开发过程中我们发现Flink只支持Prometheus的拉模式收集数据,此模式需要提前知道集群的运行主机以及端口等信息,适合于单集群模式。而作为企业用户,更多的是将Flink任务部署在YARN等集群上,此时,Flink的JobManager、TaskManager的运行是由YARN统一调度,主机以及是端口都是动态的,而Flink只支持的拉模式难以满足我们需求。所以我们通过增加Prometheus的Pushgateway来进行指标的收集,此模式属于推模式,架构如图3-1下所示。同时,我们也积极的向社区贡献了这个新特性[4] ,目前PR已经被合并,详情见FLINK-9187

图片 

图片图3-1

图片      

2

        在完成Flink Pushgateway的相关工作后,为了方便用户查看自己Flink任务的吞吐量,处理延迟等重要监控信息,我们为用户配置了监控页面,方便用户在实时计算平台上快速定位出任务性能问题,如通过我们实时平台监控页面提供的图表3-2,具体指标为flink_taskmanager_job_task_buffers_outPoolUsage来快速判断实时任务的Operator是否存在反压情况[2]. 

       在使用过程中我们也发现了Flink Metrics中衡量端到端的Opertor Latency的指标存在漂移,导致监控不准确问题。我们也修复了该问题[5]并反馈给了社区,详情见FLINK-11887


图片 

图片图3-2

图片 


3

        提升平台易用性还有一个重要的地方就是日志,日志分为操作日志,启动日志,业务日志,运行历史等日志信息。其中比较难处理的就是用户代码中打印的业务日志。因为Flink任务是分布式执行的,不同的TaskManager的处理节点都会有一份日志,业务看日志要分别打开多个TaskManager的日志页面。并且Flink任务是属于长运行的任务,用户代码中打印的日志是打印在Flink WebUI上。此时会面临一个问题,当任务运行的时间越长,日志量会越来越多,原生自带的日志页面将无法打开。为了方便用户查看日志,解决用户无法获取到实时任务的日志信息,同时也为了方便用户根据关键词进行历史日志的检索,我们在实时计算平台为用户提供了一套实时日志系统功能,开发人员可以实时地搜索任务的日志。


       并且系统采用无侵入式架构,架构图见图3-3,在用户程序无感知的情况下,实时采集日志,并同步到Elasticsearch中,当业务需要检索日志时,可通过Elasticsearch语法进行检索。

图片 

图片

图片 

3-3

4


         计算组件往往处于大数据的中间位置,上游承接MQ等实时数据源,下游对接HDFS、HBase等大数据存储,通过Flink这些实时组件将数据源和数据目标串联在一起。为了避免混乱,这个过程往往需要通过数据血缘来做管理。然而常见的数据血缘管理的开源项目如Apache Atlas等并未提供对Flink的支持,而Flink自身也没有提供相应的Hook来抽取用户代码的中的数据源等信息。为了解决这个问题,我们修改了Flink Client提交过程,在CliFrontend中增加一个notify环节,通过ContextClassLoader和反射在Flink任务提交阶段将Flink生成的StreamGraph内的各个StreamNode抽取出来,这样就可以在提交时候获取出用户编写的Flink任务代码中关键数据源等配置信息,从而为后续的Flink数据血缘管理提供支持。其关键代码如下:

图片 

图片

图片      

        Flink采用了Chandy-Lamport的快照算法来保证一致性和容错性,在实时任务的运行期间是通过Checkpoint[1]机制来保障的。如果升级程序,重启程序,任务的运行周期结束,window内的状态或使用mapstate的带状态算子(Operator)所保存的数据就会丢失了,为了解决这个问题,给用户提供平滑升级程序方案从而保障数据准确处理,我们实时计算平台提供了从外部触发Savepoint功能,在用户手动重启任务的时候,可以选择最近一段时间内执行成功的保存点来恢复自己的程序。平台从保存点恢复任务操作如图3-4所示。

图片 

图片图3-4

图片 

        虽然我们提供了通用的实时计算平台,但是有些用户想使用Flink,除此之外还需要在平台上增加些更符合其业务特点的功能,对此我们也开放了我们实时计算平台的API接口给到业务方,让业务根据其自身场景特点来加速实时应用的变现和落地。


       稳定性优化

图片

     

        前面介绍了我们在实时计算平台易用性方面如:SQL,监控,日志,血缘,保存点等功能点上做的开发工作,其实除了平台功能开发之外还有更多的工作内容是用户没有感知到的。如保障实时应用运行稳定性,在这方面我们积累了很多实践经验,与此同时我们也在Github上建立了Tongcheng-Elong组织,并将修复后的源代码贡献到Apache社区。其中有十几个patch已经被社区接收合并。接下来分享一些我们遇到的稳定性问题和提供的解决方案。


1

Flink的“ 空跑”问题

   

01

        我们在集群运维过程中发现,在偶发的情况下,Flink任务会在YARN集群上空跑。此时,在YARN层面的现象是任务处于RUNNING状态,但是进入到Flink WebUI,会发现此时所有的TaskManager全部退出,并没有任务在运行。这个情况下,会造成的YARN资源的浪费,同时也给运维人员带来困扰,为什么TaskManager都退出了,JobManager不退出呢?甚至给平台监控任务运行状态带来误判,认为任务还在运行,但实际任务早挂了。

       这个问题比较难定位,首先发生这种情况不多,但是一旦出现影响很大。其次,没有异常堆栈信息,无法定位到具体的根本原因。我们的解决方法是通过修改源码,在多个可能的地方增加日志埋点,以观察并了解任务退出时JobManager所执行的处理逻辑。最终我们定位到当任务失败时,在默认的重试策略之后,会将信息归档到HDFS上。由于是串行执行,所以如果在归档过程中发生异常,则会中断正常处理逻辑从而导致通知JobManager的过程不能成功执行。具体的执行逻辑见图4-1。

图片 

图片图4-1

图片 


02

       梳理清楚逻辑之后,我们发现社区也没有修复这个问题。同样,我们也积极向社区进行提交PR修复[6][7][8]。修复这个问题,需要通过3个PR,逐步进行完善,详情见FLINK-12246、FLINK-12219、FLINK-12247

       我们的存储组件比较多,在使用Flink-Connector来读写相关存储组件的如:RocketMQ、HDFS、Kudu、Elasticsearch也发现过这些Connector的Source/Sink存在问题,我们在修复之后也提交了PR反馈到社区:

①  RocketMQSource的connector从savepoint恢复异常问题,及RocketMQSource不能严格保证数据不丢失问题。我们修复这些问题后为业务用户提供基于我们自己版本稳定的connector SDK;

  BucketingSink写入HDFS时出现的client无限续租导致的文件卡在openforwrite状态问题[3],我们也维护了自己的filesystem connector SDK提供给业务用户,在异常发生时主动释放租约;

③  Flink写Kudu时提供的KuduSink性能过低问题,我们也提出通过异步刷新模式来提高Sink的写入性能[9];

④  Flink-Elasticsearch6-connector写入线程死锁问题。ES是实时分析这边重要的存储组件,而在我们实际的实践过程中会发现本来运行正常的Flink程序会偶尔出现程序hang住,所有的数据处理都停止,消费MQ数据速度降为0,除非重启任务否则无法恢复。这个问题一旦出现,严重影响线上实时应用的稳定性。Flink和ElasticSearch社区也有多个issue讨论类似的问题。在经过分析后,我们发现问题主要原因是Elasticsearch6的core模块在线程池重构后 Bulk Interval FlushTask和RetryHandler共用相同线程池导致的,具体的执行逻辑见图4-2。


图片

图片

4-2

图片 


        对于该问题的临时解决方案是在使用Elasticsearch 6.x的RestHighLevelClient 的时候暂时停止使用setBulkFlushInterval配置, 而是通过Flink自身的checkpoint机制来触发数据定时Flush到ElasticSearch Server端。真正彻底解决办法是构建单独的线程池提供给ReryHandler来使用。随后我们也向Elasticsearch社区提交了issue及PR来修复这个问题 [10]。在这个过程中发现也顺便修复了Flink在任务重试时候transport client 线程泄露[11]等问题详情见FLINK-11235。


2


Flink与ZK网络问题

    

我们也遇到了Flink与ZK网络问题,当Jobmanager与ZK的连接中断之后,会将正在运行的任务立即停止。当集群中任务很多时,可能由于网络抖动等原因瞬断时,会导致任务的重启。而在我们集群上有上千的Flink应用,一旦出现网络抖动,会使得大量Flink任务重启,这个问题对集群和任务的稳定性影响比较大。根本原因是Flink底层采用Curator的LeaderLatch做分布式锁服务,在Curator-2.x的版本中对于网络瞬断没有容忍性,当因为网络抖动、机器繁忙、zk集群短暂无响应都会导致curator将状态置为suspended,正是这个suspended状态导致了所有任务的重启。我们的解决办法是先升级Curator版本到4.x[12],然后在提升版本后再用CuratorFrameworkFactory来构造CuratorFramework时,通过使用ConnectionStateErrorPolicy将StandardConnectionStateErrorPolicy替换为SessionConnectionStateErrorPolicy,前者将suspended和lost都作为error,后者只是将lost作为error,而只有发生error的时候才会取消leadership,所以在经过修改之后,在进入suspended状态时,不再发生leadership的取消和重新选举。我们把这个问题和我们的解决办法也反馈给了社区,详情见FLINK-10052。


总结

图片

      

       本文大致介绍了Flink在同程艺龙实时计算平台实践过程中的一些工作和踩过的坑。对于大数据基础设施来说平台是基础,除此之外还需要投入很多精力来提高Flink集群的易用性和稳定性,这个过程中要紧跟开源社区,因为随着同程艺龙在大数据这块应用场景越来越多,会遇到很多其它公司没有遇到甚至没有发现的问题,这个时候基础设施团队要有能力主动解决这些影响稳定性的风险点,而不是被动的等待社区来提供patch。由于在Flink在1.8版本之前社区方向主要集中在Flink Stream处理这块,我们也主要应用Flink的流计算来替换storm及spark streaming。但是随着近期Flink1.9的发布,Blink分支合并进入Flink主分支,我们也打算在Flink Batch这块尝试一些应用来落地。


图片

作者:数据中心-Flink小分队(谢磊、周生乾、李苏兴)


Reference

[1]https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink

[2]https://www.cnblogs.com/AloneAli/p/10840803.html

[3]https://www.cnblogs.com/AloneAli/p/10840956.html

[4]https://issues.apache.org/jira/browse/FLINK-9187

[5]https://issues.apache.org/jira/browse/FLINK-11887

[6]https://issues.apache.org/jira/browse/FLINK-12246

[7]https://issues.apache.org/jira/browse/FLINK-12219

[8]https://issues.apache.org/jira/browse/FLINK-12247

[9]https://issues.apache.org/jira/browse/BAHIR-202

[10]https://github.com/elastic/elasticsearch/issues/44556

[11]https://issues.apache.org/jira/browse/FLINK-11235

[12]https://issues.apache.org/jira/browse/FLINK-10052

继续滑动看下一个
同程旅行技术中心
向上滑动看下一个