由字节跳动数据湖团队贡献的 RFC-29 Bucket Index 在近期合入 Hudi 主分支,本文详细介绍 Hudi Bucket Index 产生的背景与实践经验。
文 | 字节跳动数据平台数据湖团队
Hudi提供类似 Hive 的分区组织方式,与 Hive 不同的是,Hudi 分区由多个 File Group 构成,每个 File Group 由 File ID进行标识。File Group 内的文件分为 Base File ( parquet 格式) 和 Delta File( log 文件),Delta File 记录对 Base File 的修改。Hudi 使用了 MVCC 的设计,可以通过 Compaction 任务把 Delta File 和 Base File 合并成新的 Base File,并通过 Clean 操作删除不需要的旧文件。 Hudi 通过索引机制将给定的 Hudi 记录一致地映射到 File ID,从而提供高效的 Upsert。Record Key和 File Group/File ID 之间的这种映射关系,一旦在 Record 的第一个版本确定后,就永远不会改变。简而言之,包含一组记录的所有版本必然在同一个 File Group 中。
DATA
索引的作用
从 400 个文件中读出 100,000 条数据
与 100 条更新的数据做分布式关联,取最新值
将更新后的 100,000 条数据写入临时目录,最后覆盖原先的数据
索引的类型
原理 | 特点 | |
Bloom Filter Index | 每个 Parquet 文件维护一个 Bloom Filter,在 File Group 映射阶段,把所有可能更新的分区的文件的 Bloom Filter 加载进来,用来判断 Record Key 是否存在 | 轻量级,默认的索引方式 包含在数据文件的footer中。默认配置,不依赖外部系统,数据和索引保持一致性 |
HBase Index | 维护每一个 Record Key 的 Partition Path 和 File Group,在插入 File Group定位阶段所有 task 向 HBase 发送 Batch Get 请求,获取 Record Key 的 Mapping 信息。 | 重量级,Record Key 到 File Group 的 mapping 记录在 HBase。对于小批次的keys,查询效率高,依赖外部系统。Hbase Index 会引入额外的外部系统,从而提升运维代价。 |
DATA
索引带来的性能收益是非常巨大的, 尽管 Hudi 已支持 Bloom Filter Index、Hbase index类型,但在字节跳动大规模数据入湖、探索分析等场景中,我们仍然碰到了现有索引类型无法解决的挑战,因此在实践中我们开发了 Bucket Index 的索引方式。
业务场景挑战
随着入湖的数据量增加,Hudi 中生成了约 40,000 个 File Group。虽然该业务部门使用了 Hudi 索引避免了全局合并操作,但是随着 File Group 的数量以及存储的数据量增加,定位 File Group 的时间也在增加,这造成了 Upsert 速度逐渐缓慢的情况,这严重影响了任务产出时间,甚至导致任务无法跑下去。
分析与对策
原先的业务场景使用了默认的 Bloom Filter Index 的索引方式。在观察中,团队发现最终在数据量约 30TB 的场景下,定位 Record 的性能会非常糟糕,此时一共产生了约 5 千亿条记录分布在40,000 个 File Group 中。
在 5 千亿条记录的数据规模下,团队发现定位缓慢的问题来自 Bloom Filter Index 的假阳性。当 Bloom Filter 发生假阳性时, Hudi 需要确定该 Record Key 是否真的存在。这个操作需要读取文件里的实际数据一条一条做对比,而实际数据量规模很大,这会导致查询 Record Key 跟 File ID 的映射关系代价非常大,因此造成了索引的性能下滑。
团队也调研了 Hudi 的另外一种索引方式 Hbase Index。这是一种 HBase 外置存储系统索引。但由于业务方不希望引入 HBase 这一额外依赖,且担心运维 Hbase 过程中存在新的问题,认为 Hbase Index 整体不够轻量,因此在整个业务场景中也无法作为 Bloom Filter 索引的替代。
在这样的场景下,字节跳动需要一个更加轻量且高效的索引方式,并且能够避免在大数据场景下的插入性能问题。
在不断实践中,字节跳动数据湖团队在逻辑层开发了一种基于哈希的索引,使得在插入过程中,定位传入 Record 的待写入文件位置信息时,无需读历史的 Record ,并贡献到了社区的 RFC-29。
改造过后,索引层变成了一层简单的哈希操作,可以直接通过对索引键的哈希操作来找到文件所在的位置。
DATA
Bucket Index 是一种基于哈希的索引,借鉴了数据库里的 Hash Index。给定 n 个桶, 用 Hash 函数决定某个记录属于哪个桶。最终所有分区被分成 N 个桶,每个桶对应一个 File Group。
相比较 Bloom Filter Index 来说,Hash Index 在逻辑层面提供了 Record Key 跟 File Group 的映射关系, 不存在假阳性问题。相同 key 的数据一定是落在同一个桶里面。最终一分区内的结构如下,目前一个 Partition 里面 Bucket 和 File Group 是一一对应的关系。
Bucket Index 数据写入原理
hashKeyFields.hashCode() & Integer.MAX_VALUE) % numBuckets
Bucket Index 查询优化原理
select * from T1 where city = beijing
select city from T1 group by city
select count(*) from T1 join T2 where T1.city = T2.city
Bucket Index 的实践与未来规划
在实践过程中,我们也发现了 Bucket Index 的一些实践建议以及未来的方向。一个关键的问题,是如何确定 numBuckets 的值,目前 Bucket Index 的桶数量 ,需要根据预估的数据量提前在建表时进行确定,且建表后不可更改,对于这种限制,我们目前有下面的解决方案。
要设置合理的桶数量,需要预测表的目标大小和未来数据增长情况。
桶的数量过小会降低整体引擎的并行速度,原因不难理解:当数据量增大时, 单个 File Group 对应的数据将增大,而 Hudi 表是以 File Group 为单位将数据切割生成 inputSplit 的,单个 File Group 数据过大将导致查询并发降低,性能下降。 一般说来建议单个桶的大小控制在 3GB 左右。
同时我们也应该避免桶的数量过多,过多的桶数量则会造成单个桶的数据量太小,造成小文件情况。基于这样的范围,当目标表的大小可以被预测时,我们可以比较容易得到一个合适的 Bucket Index 的桶数量值。
当然,我们也意识到这样的做法并不是一个灵活的方法。在未来,我们将推出可扩展的 Hash Index 桶方法来彻底解决这个问题。我们将支持已有的 Hudi 表在建表后直接扩展桶的数量,以避免当业务数据暴增时单个文件太大,影响查询以及 Compaction 性能。我们的后续优化将利用 Hashmap 的扩容过程,将分桶数按倍数做到轻量级扩容。当桶的数量在初期预测设置较小时,今后也能动态扩容,可以彻底解决预估桶数量不准确带来的烦恼。
DATA
产品介绍
火山引擎湖仓一体分析服务LAS
湖仓一体分析服务 LAS(Lakehouse Analytics Service)是面向湖仓一体架构的Serverless数据处理分析服务,提供一站式的海量数据存储计算和交互分析能力,完全兼容 Spark、Presto、Flink 生态,帮助企业轻松完成数据价值洞察。后台回复数字“4”了解产品
火山引擎 E-MapReduce
支持构建开源Hadoop生态的企业级大数据分析系统,完全兼容开源,提供 Hadoop、Spark、Hive、Flink集成和管理,帮助用户轻松完成企业大数据平台的构建,降低运维门槛,快速形成大数据分析能力。后台回复数字“3”了解产品
- End -