本文主要分享字节跳动在使用 Flink State 上的实践经验,内容包括 Flink State 相关实践以及部分字节内部在引擎上的优化,希望可以给 Flink 用户的开发及调优提供一些借鉴意义。
Flink 作业需要借助 State 来完成聚合、Join 等有状态的计算任务,而 State 也一直都是作业调优的一个重点。目前 State 和 Checkpoint 已经在字节跳动内部被广泛使用,业务层面上 State 支持了数据集成、实时数仓、特征计算、样本拼接等典型场景;作业类型上支持了 Map-Only 类型的通道任务、ETL 任务,窗口聚合计算的指标统计任务,多流 Join 等存储数据明细的数据拼接任务。
以 WordCount 为例,假设我们需要统计 60 秒窗口内 Word 出现的次数:
select
word,
TUMBLE_START(eventtime, INTERVAL '60' SECOND) as t,
count(1)
from
words_stream
group by
TUMBLE(eventtime, INTERVAL '60' SECOND), word
每个还未触发的 60s 窗口内,每个 Word 对应的出现次数就是 Flink State,窗口每收到新的数据就会更新这个状态直到最后输出。为了防止作业失败,状态丢失,Flink 引入了分布式快照 Checkpoint 的概念,定期将 State 持久化到 Hdfs 上,如果作业 Failover,会从上一次成功的 checkpoint 恢复作业的状态(比如 kafka 的 offset,窗口内的统计数据等)。
在不同的业务场景下,用户往往需要对 State 和 Checkpoint 机制进行调优,来保证任务执行的性能和 Checkpoint 的稳定性。阅读下方内容之前,我们可以回忆一下,在使用 Flink State 时是否经常会面临以下问题:
由于 OperatorState 背后的 StateBackend 只有 DefaultOperatorStateBackend,所以用户使用时通常指定的 FsStateBackend 和 RocksDBStateBackend 两种,实际上指定的是 KeyedState 对应的 StateBackend 类型:
RocksDB 是嵌入式的 Key-Value 数据库,在 Flink 中被用作 RocksDBStateBackend 的底层存储。如下图所示,RocksDB 持久化的 SST 文件在本地文件系统上通过多个层级进行组织,不同层级之间会通过异步 Compaction 合并重复、过期和已删除的数据。在 RocksDB 的写入过程中,数据经过序列化后写入到 WriteBuffer,WriteBuffer 写满后转换为 Immutable Memtable 结构,再通过 RocksDB 的 flush 线程从内存 flush 到磁盘上;读取过程中,会先尝试从 WriteBuffer 和 Immutable Memtable 中读取数据,如果没有找到,则会查询 Block Cache,如果内存中都没有的话,则会按层级查找底层的 SST 文件,并将返回的结果所在的 Data Block 加载到 Block Cache,返回给上层应用。
这里介绍一下大家在大状态场景下经常需要调优的 RocksDBKeyedStateBackend 增量快照。RocksDB 具有 append-only 特性,Flink 利用这一特性将两次 checkpoint 之间 SST 文件列表的差异作为状态增量上传到分布式文件系统上,并通过 JobMaster 中的 SharedStateRegistry 进行状态的注册和过期。
如上图所示,Task 进行了 3 次快照(假设作业设置保留最近 2 次 Checkpoint):
增量快照涉及到 Task 多线程上传/下载增量文件,JobMaster 引用计数统计,以及大量与分布式文件系统的交互等过程,相对其他的 StateBackend 要更为复杂,在 100+GB 甚至 TB 级别状态下,作业比较容易出现性能和稳定性瓶颈的问题。
用户在使用 State 时,会发现操作 State 并不是一件很"容易"的事情,如果使用 FsStateBackend,会经常遇到 GC 问题、频繁调参等问题;如果使用 RocksDBStateBackend,涉及到磁盘读写,对象序列化,在缺乏相关 Metrics 的情况下又不是很容易进行性能问题的定位,或者面对 RocksDB 的大量参数不知道如何调整到最优。
目前字节跳动内有 140+ 作业的状态大小达到了 TB 级别,单作业的最大状态为 60TB,在逐步支持大状态作业的实践中,我们积累了一些 State 的调优经验,也做了一些引擎侧的改造以支持更好的性能和降低作业调优成本。
我们都知道 FsStateBackend 适合小状态的作业,而 RocksDBStateBackend 适合大状态的作业,但在实际选择 FsStateBackend 时会遇到以下问题:
针对上面 FsStateBackend 中存在的若干个问题,可以看出 FsStateBackend 的维护成本还是相对较高的。在字节内部,我们暂时只推荐部分作业总状态小于 1GB 的作业使用 FsStateBackend,而对于大流量业务如短视频、直播、电商等,我们更倾向于推荐用户使用 RocksDBStateBackend 以减少未来的 GC 风险,获得更好的稳定性。
随着内部硬件的更新迭代,ssd 的推广,长远来看我们更希望将 StateBackend 收敛到 RocksDBStateBackend 来提高作业稳定性和减少用户运维成本;性能上期望在小状态场景下,RocksDBStateBackend 可以和 FsStateBackend 做到比较接近或者打平。
社区版本的 Flink 使用 RocksDBStateBackend 时,如果遇到性能问题,基本上是很难判断出问题原因,此时建议打开相关指标进行排查[1]。另外,在字节跳动内部,造成 RocksDBStateBackend 性能瓶颈的原因较多,我们构建了一套较为完整的 RocksDB 指标体系,并在 Flink 层面上默认透出了部分关键的 RocksDB 指标,并新增了 State 相关指标,部分指标的示意图如下:
造成 RocksDB 性能瓶颈的常见如下:
除了以上指标外,另外一个可以相配合的方法是火焰图,常见方法比如使用阿里的 arthas[2]。火焰图内部会展示 Flink 和 RocksDB 的 CPU 开销,示意图如下:
如上所示,可以看出火焰图中 Compaction 开销是占比非常大的,定位到 Compaction 问题后,我们可以再根据 Value Size、RocksDB 容量大小、作业并行度和资源等进行进一步的分析。
除了 Flink 中提供的 RocksDB 参数[3]之外,RocksDB 还有很多调优参数可供用户使用。用户可以通过自定义 RocksDBOptionsFactory 来做 RocksDB 的调优[4]。经过内部的一些实践,我们列举两个比较有效的参数:
这里要注意一点,由于很多参数都以内存或磁盘来换取性能上的提高,所以以上参数的使用需要结合具体的性能瓶颈分析才能达到最好的效果,比如在上方的火焰图中可以明显地看到 snappy 的压缩占了较大的 CPU 开销,此时可以尝试 compression 相关的参数。
使用 RocksDB State 的相关 API,Key 和 Value 都是需要经过序列化和反序列化,如果 Java 对象较复杂,并且用户没有自定义 Serializer,那么它的序列化开销也会相对较大。比如去重操作中常用的 RoaringBitmap,在序列化和反序列化时,MB 级别的对象的序列化开销达到秒级别,这对于作业性能是非常大的损耗。因此对于复杂对象,我们建议:
if (mapState.contains(userKey)) {
UV userValue = mapState.get(userKey);
}
更多关于序列化的性能和指导可以参考社区的调优文档[7]。
上面提到 RocksDB 的序列化开销可能会比较大,字节跳动内部在 StateBackend 和 Operator 中间构建了 StateBackend Cache Layer,负责缓存算子内部的热点数据,并且根据 GC 情况进行动态扩缩容,对于有热点的作业收益明显。
同样,对于用户而言,如果作业热点明显的话,可以尝试在内存中构建一个简单的 Java 对象的缓存,但是需要注意以下几点:
Checkpoint 持续时间和很多因素相关,比如作业反压、资源是否足够等,在这里我们从 StateBackend 的角度来看看如何提高 Checkpoint 的成功率。一次 Task 级别的快照可以划分为以下几个步骤:
字节跳动内部,我们也针对这四个步骤构建了相关的监控看板:
生产环境中,「等待 checkpointLock」和「同步阶段」更多是在业务逻辑上的耗时,通常耗时也会相对较短;从 StateBackend 的层面上,我们可以对「收集 Barrier」和「异步阶段」这两个阶段进行优化来降低 Checkpoint 的时长。
减少 Barrier 对齐时间的核心是降低 in-flight 的 Buffer 总大小,即使是使用社区的 Unaligned Checkpoint 特性,如果 in-flight 的 Buffer 数量过多,会导致最后写入到分布式存储的状态过大,有时候 in-flight 的 Buffer 大小甚至可能超过 State 本身的大小,反而会对异步阶段的耗时产生负面影响。
如果在你的集群中,所有 Flink 作业都使用同一个 DFS 集群,那么业务增长到一定量级后,DFS 的 IO 压力和吞吐量会成为「异步阶段」中非常重要的一个参考指标。尤其是在 RocksDBStateBackend 的增量快照中,每个 Operator 产生的状态文件会上传到 DFS中,上传文件的数量和作业并行度、作业状态大小呈正比。而在 Flink 并行度较高的作业中,由于各个 Task 的快照基本都在同一时间发生,所以几分钟内,对 DFS 的写请求数往往能够达到几千甚至上万。
在社区版本的增量快照中,RocksDB 新生成的每个 SST 文件都需要上传到 DFS,以 HDFS 为例,HDFS 的默认 Block 大小通常在 100+MB(字节跳动内部是 512MB),而 RocksDB 生成的文件通常为 100MB 以下,对于小数据量的任务甚至是 KB 级别的文件大小,Checkpoint 产生的大量且频繁的小文件请求,对于 HDFS 的元数据管理和 NameNode 访问都会产生比较大的压力。
社区在 FLINK-11937 中提出了将小文件合并上传的思路,类似的,在字节内部的实现中,我们将小文件合并的逻辑抽象成 Strategy,这样我们可以根据 SST 文件数量、大小、存活时长等因素实现符合我们自己业务场景的上传策略。
除了 State 性能以及 DFS 瓶颈之外,StateBackend 的恢复速度也是实际生产过程中考虑的一个很重要的点,我们在生产过程中会发现,由于某些参数的设置不合理,改变作业配置和并发度会导致作业在重启时,从快照恢复时性能特别差,恢复时间长达十分钟以上。
Union State 的特点是在作业恢复时,每个并行度恢复的状态是所有并行度状态的并集,这种特性导致 Union State 在 JobMaster 状态分配和 TaskManager 状态恢复上都比较重:
Union State 在实际使用中,除恢复速度慢的问题外,如果使用不当,对于 DFS 也会产生大量的压力,所以建议在高并行度的作业中,尽量避免使用 Union State 以降低额外的运维负担。
RocksDBStateBackend 中支持的增量快照和全量快照(或 Savepoint),这两种快照的差异导致了它们在不同场景下的恢复速度也不同。其中增量快照是将 RocksDB 底层的增量 SST 文件上传到 DFS;而全量快照是遍历 RocksDB 实例的 Key-Value 并写入到 DFS。
以是否扩缩容来界定场景,这两种快照下的恢复速度如下:
这里比较麻烦的一点是扩缩容恢复时比较容易遇到长尾问题,由于单个并行度状态过大而导致整体恢复时间被拉长,目前在社区版本下还没有比较彻底的解决办法,我们也在针对大状态的作业进行恢复速度的优化,在这里基于社区已支持的功能,在扩缩容场景下给出一些加快恢复速度的建议:
本篇文章中,我们介绍了 State 和 RocksDB 的相关概念,并针对字节跳动内部在 State 应用上遇到的问题,给出了相关实践的建议,希望大家在阅读本篇文章之后,对于 Flink State 在日常开发工作中的应用,会有更加深入的认识和了解。
目前,字节跳动流式计算团队同步支持的火山引擎流式计算 Flink 版正在公测中,支持云中立模式,支持公共云、混合云及多云部署,全面贴合企业上云策略,欢迎申请试用: