总篇118篇 2021年第9篇
物化视图这一使用的功能想必大家都不陌生,我们可以通过使用物化视图,将预先设定好的复杂SQL逻辑,以增量迭代的形式实时(按照事务地)更新结果集,从而通过查询结果集来避免每次查询复杂的开销,从而节省时间与计算资源。
事实上,很多数据库系统和OLAP引擎都不同程度地支持了物化视图。另一方面,Streaming SQL本身就和物化视图有着很深的联系,那么基于Apche Flink(下称Flink) SQL去做一套实时物化视图系统是一件十分自然而然的事情了。
本文介绍了汽车之家(下称之家)在基于Flink的实时物化视图的一些实践经验与探索,并尝试让用户直接以批处理SQL的思路开发Flink Streaming SQL 任务。希望能给大家带来一些启发,共同探索这一领域。
系统分析与问题拆解
Flink在Table & SQL模块做了大量的工作,Flink SQL已经实现了一套成熟与相对完备的SQL系统,同时,我们也在Flink SQL上有着比较多的技术和产品积累,直接基于Flink SQL本身就已经解决了构建实时物化系统的大部分问题,而唯一一个需要我们解决的问题是如何不重不漏地生成数据源表对应的语义完备的Changelog DataStream,包括增量和全量历史两部分。
虽然规约到只剩一个问题,但是这个问题解决起来还是比较困难的,那我们将这个问题继续拆解为以下几个子问题:
问题解决与系统实现
问题一 基于数据传输平台的增量数据读取
增量数据加载还是相对比较好解决的,我们直接复用实时数据传输平台的基础建设。数据传输平台[1]已经将Mysql/SqlServer/TiDB等增量数据以统一的数据格式写入到特定的Kafka Topic中,我们只要获取到对应的Kafka Topic就可以进行读取即可。问题二 支持checkpoint的全量数据加载
第一版我们用Legacy Source 写了一套BulkLoadSourceFunction
,这一版的思路比较朴素,就是全量从数据源表进行查询。这个版本确实能完成全量数据的加载,但是问题也是比较明显的。如果在bulk load阶段作业发生了重启,我们就不得不重新进行全量数据加载。对于数据量大的表,这个问题带来的后果还是比较严重的。对于第一版的固有问题,我们一直都没有特别好的对策,直到Flink-CDC[2]2.0的发布。我们参考了Flink-CDC的全量数据加载阶段支持Checkpoint的思路,基于FLIP-27开发了新的BulkLoadSource
。第二版不论在性能上还是可用性上,对比第一版都有了大幅提升。问题三 基于全局版本的轻量CDC数据整合算法
这三个子问题中,问题三的难度是远大于前面两个子问题的。这个问题的朴素思路或许很简单,我们只要按照Key缓存全部数据,然后根据增量数据流来触发Changelog DataStream更新即可。事实上我们也曾按照这个思路开发了一版整合逻辑的算子。这版算子对于小表还是比较work的,但是对于大表,这种思路固有的overhead开始变得不可接受。我们曾用一张数据量在12亿,大小约120G的SqlServer表进行测试,本身就巨大的数据再加上JVM上不可避免的膨胀,状态大小变得比较夸张。经过这次测试,我们一致认为这样粗放的策略似乎不适合作为生产版本发布,于是我们不得不开始重新思考数据整合的算法与策略。在谈论我们的算法设计思路之前,我不得不提到DBLog[3]的算法设计, 这个算法的核心思路利用watermark对历史数据进行标识,并和对应的增量数据进行合并,达到不使用锁即可完成整个增量数据和历史数据的整合,Flink-CDC也是基于这个思路进行的实现与改进。在相关资料搜集和分析的过程中,我们发现我们的算法思路与DBLog的算法的核心思路非常相似, 但是是基于我们的场景和情况进行了设计与特化。- 增量数据需要来自于数据传输平台的Kafka Topic
结合上述情况进行分析,我们来规约一下这个算法必须要达成的目标- 保证数据的Changelog Stream,数据完整,Event(RowKind)语义完备
- 保证算法实现不依赖任何来自于Flink外部的系统或者功能
经过大家的分析与讨论后,我们设计出了一套数据整合的算法,命名为Global Version Based Pause-free Change-Data-Capture Algorithm。算法原理
我们同时读入BulkLoadSource
的全量数据与RealtimeChangelogSource
增量数据,并根据主键进行KeyBy与Connect,而算法的核心逻辑主要由之后的KeyedCoProcess阶段完成。下面交待几个关键的字段值:- SearchTs: 全量数据从数据源查询出来的时间戳
- Watermark:基于增量数据在数据库里产生的时间戳生成
- Version:全序版本号,全量数据是0,即一定最小版本
KeyedCoProcess 收到全量数据后,不会直接发送,而是先缓存起来,等到Watermark的值大于该SearchTs后发送。在等待的期间,如果有对应的Changlog Data,就将被缓存的Version0全量数据丢弃,然后处理Changelog Data并发送。在整个数据处理的流程中,全量数据和增量数据都是同时进行消费与处理的,完全不需要引入暂停阶段来进行数据的整合。
算法实现
我们决定以Flink Connector的形式开展算法的实现,我们以接入SDK的名字Estuary为该Connector命名。通过使用DataStreamScanProvider
,来完成Source内部算子间的串联,Source的算子组织如下图(chain到一起的算子已拆开展示)
BulkLoadSource
/ChangelogSource
主要负责数据的读入和统一格式处理;BulkNormalize
/ChangelogNormalize
主要是负责处理数据运行时信息的添加与覆盖,主键语义处理等工作;WatermarkGenerator
是针对算法工作需求定制的Watermark生成逻辑的算子;而VersionBasedKeyedCoProcess
就是核心的处理合并逻辑和RowKind语义完备性的算子。算法实现的过程中还是有很多需要优化或者进行权衡的点。全量数据进入CoProcess数据后,会首先检查当前是否处理过更大版本的数据,如果没有的话才进行处理,数据首先会被存入State中并根据SearchTs + T(T是我们设置的固有时延)注册EventTimeTimer。如果没有高版本的数据到来,定时器触发发送Version 0的数据,否则直接抛弃改为发送RowKind语义处理好的高版本增量数据。另一方面,避免状态的无限增长,当系统判定BulkLoad阶段结束后,会结束对多相关Flink State的使用,存在的State只要等待TTL过期即可。另外,我们针对在数据同步且下游Sink支持Upsert能力的场景下,开发了特别优化的超轻量模式,可以以超低的overhead完成全量+增量的数据同步。开发完成后,我们的反复测试修改与验证,完成MVP版本的开发实时物化视图实践
MVP版本发布后,我们与用户同学一起,进行了基于Flink的物化视图试点。
基于多数据源复杂逻辑的Data Pipeline实时化
下面是用户的一个真实生产需求:有三张表,分别来自于TiDB*/SqlServer/Mysql,数据行数分别为千万级/亿级/千万级,计算逻辑相对复杂,涉及到去重,多表Join。原有通过离线批处理产生T+1的结果表。而用户希望提升该Pipeline的延迟。* 由于我们使用的TiCDC Update数据尚不包含-U部分,故TiDB表的整合算法还是采取Legacy Mode进行加载
我们与用户沟通,建议他们以批处理的思路去编写Flink SQL,把结果的明细数据的数据输出到StarRocks中。用户也在我们的协助下,较为快速地完成了SQL的开发,任务的计算拓补图如下:
结果是相当让人惊喜的!我们成功地在保证了数据准确性的情况下,将原来天级延迟的Pipeline提升至10s左右的延迟。数据也从原来查询Hive变为查询StarRocks,不论从数据接入,数据预计算,还是数据计算与查询,实现了全面的实时化。另一方面,三张表每秒的增量最大不超过300条,且该任务不存在更新放大的问题,所以资源使用相当的少。根据监控反馈的信息,初始化阶段完成后,整个任务TM部分只需要使用1个Cpu(on YARN),且Cpu使用常态不超过20%。对比原来批处理的资源使用,无疑也是巨大提升。
数据湖场景优化
正如上文提到的,对于数据同步,我们做了专门的优化。只需要使用专用的Source表,就可以一键开启历史数据+增量数据数据同步,大大简化了数据同步的流程。我们目前尝试使用该功能将数据同步至基于Iceberg的数据湖中,从数据同步层面大幅提升数据新鲜度。限制与不足
虽然我们在这个方向的探索取得了一定成果,但是仍有一定的限制和不足
服务器时钟的隐式依赖
仔细阅读上面算法原理,我们会发现,不论是SearchTs的生成还是Watermark的生成,实际上最后都依赖了服务器系统的时钟*而非依赖类似Time Oracle机制。我们虽然算法实现上引入固有延迟去规避这个问题,但是如果服务器出现非常严重时钟不一致,超过固有延迟的话,此时watermark是不可靠的,有可能会造成处理逻辑的错误。一致性与事务
事实上我们目前这套实现没有任何事务相关的保证机制,仅能承诺结果的最终一致性,最终一致性其实是一种相当弱的保证。就拿上文提到的例子来说,如果其中一张表存在2个小时的消费延迟,另一张表基本不存在延迟,这个时候两表Join产生的结果其实是一种中间状态,或者说对于外部系统应该是不可见的。为了完成更高的一致性保证,避免上面问题的产生,我们自然会想到引入事务提交机制。然而目前我们暂时没有找到比较好的实现思路,但是可以探讨下我们目前的思考。如何定义事务
事务这个概念想必大家或多或少都有认识,在此不多赘述。如何数据库系统内部定义事务是一件特别自然且必要的事情,但是如何在这种跨数据源场景下定义事务,其实是一件非常困难的事情。还是以上文的例子来展开,我们能看到数据源来自各种不同数据库,我们其实对于单表记录了对应的事务信息,但是确实没有办法定义来自不同数据源的统一事务。我们目前的朴素思路是根据数据产生的时间为基准,结合checkpoint统一划定Epoch,实现类似Epoch-based Commit的提交机制。但是这样做又回到前面提到的问题,需要对服务器时间产生依赖,无法从根源保证正确性。跨表事务
对于Flink物化视图一致性提交这个问题,TiFlink[4]已经做了很多相关工作。但是我们的Source来自不同数据源,且读取自Kafka,所以问题变得更为复杂,还是上面提到的例子,两张表Join过后,如果想保证一致性,不只是Source和Sink算子,整个关系代数算子体系都需要考虑引入事务提交的概念和机制,从而避免中间状态的对外部系统的发布。更新放大
这个问题其实比较好理解。现在有两张表join,对于左表的每一行数据,对应右表都有n(n > 100)条数据与之对应。那么现在更新左表的任意一行,都会有2n的更新放大。状态大小
目前整套算法在全量同步阶段的Overhead虽然可控,但是仍有优化空间。我们目前实测,对于一张数据量在1亿左右的表,在全量数据阶段,需要最大为1.5G左右的State。我们打算在下个版本继续优化状态大小,最直接的思路就是BulkSource
通知KeyedCoProcess
哪些主键集合是已经处理完毕的,这样可以使对应的Key提早进入全量阶段完成模式,从而进一步优化状态大小。总结与展望
本文分析了基于Flink物化视图实现的问题与挑战,着重介绍了处理生成完整的Changelog DataStream的算法与实现和在业务上的收益,也充分阐述了目前的限制与不足。
虽然这次实践的结果称不上完备,存在一些问题亟待解决,但是我们仍看到了巨大的突破与进步,不论是从技术还是业务使用上。我们充分相信未来这项技术会越来越成熟,越来越被更多人认可和使用,也通过此次探索充分验证了流处理和批处理的统一性。我们目前的实现还处在早期版本,仍有着工程优化和bug fix的空间与工作(比如前文提到的两表的推进的skew太大问题,可以尝试引入Coordinator进行调节与对齐),但是相信随着不断的迭代与发展,这项工作会变得越来越稳固,从而支撑更多业务场景,充分提升数据处理的质量与效率!引用
[1] http://mp.weixin.qq.com/s/KQH-relbrZ2GUqdmaTWx6Q[2] http://github.com/ververica/flink-cdc-connectors[3] http://arxiv.org/pdf/2010.12597.pdf[4] http://zhuanlan.zhihu.com/p/422931694作者简介