背景说明
Blink提交采用进程模型(包装flink info/run命令)进行作业执行计划的生成和作业的提交,这个基本是大数据计算引擎jstorm/spark/flink的共识,采用该方式的优点在于:
简单:
用户只需在自己的jar包中进行逻辑处理
干净
技术演进
JVM 进程冷启动层面优化
仅能做到作业级别class复用:
原因:由于不同用户作业很大可能依赖不同版本的包,做class缓存时就会存在冲突。
间接带来的问题:
每个作业需要缓存大量class,对brs服务磁盘带来巨大压力(单个作业缓存数据100MB以上)。
1、有较多限制,同时业内并没有大规模使用:
2、在我们场景测试下来,效果并不好
常驻服务生成
2、面临核心痛点问题:
引擎版本间的兼容性没有保证,如何支持引擎多版本?
方案1:多进程方案(每个版本额外启动一个常驻进程)
方案2:多classloader方案
先做下简要背景说明,作业包可分为下面4种情况
flink/lib依赖包
launcher包:包涵和引擎交互的包。如plan/jobgraph的生成、资源plan apply到jobgraph中、热更新等
user jar用户jar包:作业级别
connnector/backend插件包
可以简单使用该classloader层级关系做隔离
由于每个作业的user jar包不同,则version classloader没法复用
version classloader用完及释放,此时和进程模型相比也就没有太大区别,即性能会不好。
思路:
由于version级别的classloader,很少或者不变动,可复用。
request级别的classloader每次用完立即释放
由于每个作业的用户jar不同,没法复用
launcher包的功能如何暴露给spring boot server(即blink rest server)使用呢?
spring boot server通过反射调用launcher包中的方法即可;
优点:
为啥hive/spark/flink计算引擎都是通过自定义classloader方案,不采用类似上面的方案,如下图1所示呢?
那么计算引擎使用图2的方案存在什么问题呢?
先铺垫下基础知识, classloader类加载机制3原则:
全盘负责:所谓全盘负责,就是当一个类加载器负责加载某个Class时,该Class所依赖和引用其他Class也将由该类加载器负责载入,除非显示使用另外一个类加载器来载入,如class.forName(, classloader)。
双亲委派:所谓的双亲委派,则是先让父类加载器试图加载该Class,只有在父类加载器无法加载该类时才尝试从自己的类路径中加载该类。通俗的讲,就是某个特定的类加载器在接到加载类的请求时,首先将加载任务委托给父加载器,依次递归,如果父加载器可以完成类加载任务,就成功返回;只有父加载器无法完成此加载任务时,才自己去加载。
缓存机制:缓存机制将会保证所有加载过的Class都会被缓存,当程序中需要使用某个Class时,类加载器先从缓存区中搜寻该Class,只有当缓存区中不存在该Class对象时,系统才会读取该类对应的二进制数据,并将其转换成Class对象,存入缓冲区中。这就是为很么修改了Class后,必须重新启动JVM,程序所做的修改才会生效的原因。
举例:A, B, C三个类依赖关系如下图,但是类B对应的jar在两个classloader中都有。
此时B在进程启动时,已经被父classloader加载。然后调用user code时,调用了A -> B -> C。由于B已经被父classloader加载,根据全盘负责原则此时C将交给父classloader加载,而父classloader没有该C的jar包,则报ClassNotFoundExceotion。
但是用户就很困惑,调用链明明是我的代码,而且我的包中已经有该class,为什么会报这个错呢?
解决办法:
将B从父classpath去除。不可行,这样父classloader在进程启动前,就报ClassNotFoundExceotion了;
对user code中B 做shade改包名,一般该解法可行。但是比较trick的是用户代码依赖的B不是依赖形式使用,而是以hard code编码方式。如果让用户改动依赖代码,就很麻烦。
最终临时是将该依赖打入到父classpath。但是对于引擎来说,就会有较大改动。如果是广泛使用的包,又会很容易和其他用户作业冲突。
通过多版本classloader方案优化后,经测试简单作业plan耗时从10秒降低到1秒以内,有数量级级别的提升。
同时,从背景说明章节的图中可看到绝大多数作业都为简单作业;
作业提交和jobgraph生成解耦
blink 采用single job的session模式,提交作业时先拉起JobManager,然后同步方式等pod拉起之后(拉起需要申请pod比较耗时),之后在编译作业生成jobgraph。如果发现不兼容再退出JM作业,则前面耗时的工作白做了。
基于此,我们实现flink支持k8s per job模式,解耦作业提交和jobgraph生成。在客户端提前生成jobgraph,如果不兼容直接报错了,无需拉起JobManager。
解耦后,可以做很多优化。运维态不变更作业。可以直接复用已经生成的jobgraph,无需再重复生成等。
同时,为了统一代码栈,降低开发成本,也扩展datastream作业支持per job模式提交。
结语
参考文档链接:
[1]https://www.yuque.com/g/jackylau-sc7w6/bve18l/rgy8y7e47abmw17c/collaborator/join?token=dGXoLPcmNkj0ILEP#%20《包冲突常见解法》
🚀🚀🚀参与ImageSearch 图像搜索评测,赢取Kindle Paperwhite4、评测局定制卫衣、云小宝帆布包、图搜6个月免费试用等多重好礼🎁
点击阅读原文查看详情。