使用Apache Arrow助力构建数据系统

如果无法正常显示,请先停止浏览器的去广告插件。
分享至:
1. 使用Apache Arrow 助力构建数据系统 李晨曦 炎凰数据 研发工程师 DataFunSummit # 2023
2. 目录 CONTENT 01 02 为什么要构建新的数据系统 使用Apache Arrow 构建数据系统 03 04 什么是Apache Arrow 为什么是Apache Arrow 一些Tips
3. 01 为什么要构建新的数据系统 DataFunSummit # 2023
4. One Size Fits All Or Not? OLTP/OLAP 批处理/流处理 SQL/NoSQL …… VS HTAP 流批一体 NewSQL ……
5. 数据库的黄金时代 https://dbdb.io/browse?start-min=2020
6. 为什么要构建一个新的数据系统?
7. 为什么要构建一个新的数据系统? 读时建模Schema On Read Schema on Read读时建模技 术在异构日志处理领域的实践
8. 为什么要构建一个新的数据系统? 支持动态数据表模式的数据查询引擎 SELECT * FROM my_eventset
9. 为什么要构建一个新的数据系统? 支持动态数据表模式的数据查询引擎 SELECT _datatype, http.url, http.method, url, method FROM my_eventset
10. 为什么要构建一个新的数据系统? 支持动态模式的数据查询引擎 SELECT _datatype, COALESCE(http.url, url) AS url, COALESCE(http.method, method) AS method FROM my_eventset
11. 数据系统构建分解 内存中过 数据模型 滤、聚合、 内存数据格式 类型系统 排序算子 查询语言 运算表达式 元数据管理 并发控制 时间类型 关联查询 API 标量函数 子查询 外存算子 持久化 索引 查询计划 冷热分层 存储压缩优化 及查询执行 数据交换 窗口函数 高级分析函数 …… 物化视图 资源管理 跨平台运行 算子向量化执行 x86/ARM SSE/AVX/NEON 优化器 分布式执行
12. 02 什么是 Apache Arrow 为什么是 DataFunSummit # 2023
13. 第一步 内存中过 数据模型 滤、聚合、 内存数据格式 类型系统 排序算子 查询语言 运算表达式 元数据管理 并发控制 时间类型 关联查询 API 标量函数 子查询 外存算子 持久化 索引 查询计划 冷热分层 存储压缩优化 及查询执行 数据交换 窗口函数 高级分析函数 …… 物化视图 资源管理 跨平台运行 算子向量化执行 x86/ARM SSE/AVX/NEON 优化器 分布式执行
14. 内存数据格式 session_id timestamp ip Row 1 1432 2023/10/23 10:44 101.229.90.66 Row 2 1578 2023/10/22 17:31 220.181.108.17 Row 3 1532 2023/10/23 08:09 27.115.41.185 Row 4 1398 2023/10/22 13:26 118.31.77.1 行式 列式 1432 Row 1 2023/10/23 10:44 101.229.90.66 Row 2 1578 1532 1578 2023/10/22 17:31 2023/10/23 10:44 1532 timestamp 2023/10/22 17:31 2023/10/23 08:09 2023/10/23 08:09 2023/10/22 13:26 27.115.41.185 101.229.90.66 1398 Row 4 session_id 1578 220.181.108.17 Row 3 1432 2023/10/22 13:26 118.31.77.1 ip 220.181.108.17 27.115.41.185 118.31.77.1
15. 内存数据格式:列式存储 • 场景OLAP • 降低IO、内存开销 • 数据访问局部性&向量化 session_id timestamp ip Row 1 1432 2023/10/23 10:44 101.229.90.66 Row 2 1578 2023/10/22 17:31 220.181.108.17 Row 3 1532 2023/10/23 08:09 27.115.41.185 Row 4 1398 2023/10/22 13:26 118.31.77.1 行式 列式 1432 Row 1 2023/10/23 10:44 101.229.90.66 Row 2 1578 1532 1578 2023/10/22 17:31 2023/10/23 10:44 1532 timestamp 2023/10/22 17:31 2023/10/23 08:09 2023/10/23 08:09 2023/10/22 13:26 27.115.41.185 101.229.90.66 1398 Row 4 session_id 1578 220.181.108.17 Row 3 1432 2023/10/22 13:26 118.31.77.1 ip 220.181.108.17 27.115.41.185 118.31.77.1
16. 要继续造轮子么?
17. 是什么 • 高性能列式内存格式标准 • 支持跨语言跨系统互操作 • 支持现代硬件 • 支持高性能传输,零拷贝,无需序列化反序列化 复制&转换
18. 是什么 • 高性能列式内存格式标准 • 支持跨语言跨系统互操作 • 支持现代硬件 • 支持高性能传输,零拷贝,无需序列化反序列化 • 完备的生态系统
19. 活跃的社区
20. 数据系统构建任务 内存中过 数据模型 滤、聚合、 内存数据格式 类型系统 排序算子 查询语言 运算表达式 元数据管理 并发控制 时间类型 关联查询 API 标量函数 子查询 外存算子 持久化 索引 查询计划 冷热分层 存储压缩优化 及查询执行 数据交换 窗口函数 10人年? 高级分析函数 …… 物化视图 资源管理 跨平台运行 算子向量化执行 x86/ARM SSE/AVX/NEON 优化器 分布式执行
21. 数据系统构建任务 with Apache Arrow 内存中过 数据模型 滤、聚合、 内存数据格式 类型系统 排序算子 查询语言 运算表达式 元数据管理 并发控制 时间类型 关联查询 API 标量函数 子查询 外存算子 持久化 索引 查询计划 冷热分层 存储压缩优化 及查询执行 数据交换 窗口函数 高级分析函数 …… 物化视图 资源管理 跨平台运行 算子向量化执行 x86/ARM SSE/AVX/NEON 优化器 分布式执行
22. 03 使用Apache Arrow 构建数据系统 DataFunSummit # 2023
23. 使用Apache Arrow构建数据系统 数据存储 Parquet/CSV/JSO N… 索引 时间/倒排/B树 /LSM树… 代码 UDF 查询 硬件资源 SQL/API/ DataFrame… CPU/内存/GPU 逻辑计划 优化 物理计划 优化 执行 表达式执 行 聚合函数 … 传输 排序 物化视图
24. 数据存储 • 存储模型:事件(Event) • Apache Parquet • 列式存储 • 支持查询下推 • Arrow Record Batch ? 数据存储 Parquet/CSV/JSO N… 索引 时间/倒排/B树 /LSM树… 代码 UDF 查询 硬件资源 SQL/API/ DataFrame… CPU/内存/GPU 逻辑计划 优化 物理计划 优化 执行 表达式执 行 聚合函数 … 传输 Source: Querying Parquet with Millisecond Latency 排序 物化视图
25. 数据存储 • 存储模型:事件(Event) • Apache Parquet • 列式存储 • 支持查询下推 • Arrow Record Batch ? 数据存储 Parquet/CSV/JSO N… 索引 时间/倒排/B树 /LSM树… 代码 UDF 查询 硬件资源 SQL/API/ DataFrame… CPU/内存/GPU 逻辑计划 优化 物理计划 优化 执行 表达式执 行 聚合函数 … 传输 排序 物化视图
26. 索引/代码/硬件资源 • 索引:时间/倒排 • 代码:UDF • 硬件资源:内存/线程/GPU ? 数据存储 Parquet/CSV/JSO N… ? 索引 时间/倒排/B树 /LSM树… 代码 ? UDF ? 硬件资源 CPU/内存/GPU 逻辑计划 优化 物理计划 优化 查询 SQL/API/ DataFrame… 执行 表达式执 行 聚合函数 … 传输 排序 物化视图
27. 查询:SQL解析 • SQL语句->抽象语法树 ? 数据存储 Parquet/CSV/JSO N… ? 索引 时间/倒排/B树 /LSM树… 代码 ? UDF ? 硬件资源 CPU/内存/GPU 逻辑计划 优化 物理计划 优化 查询 SQL/API/ DataFrame… 执行 表达式执 行 聚合函数 … 传输 排序 ? 物化视图
28. 逻辑计划&优化 • 抽象语法树->逻辑计划 • 优化 Aggregate: MAX(_time) Selection: ip = '101.229.90.66' TableScan: dataset Aggregate: MAX(_time) Selection: ip = '101.229.90.66' TableScan: dataset, columns: ip,_time, expressions: ip = '101.229.90.66' ? 数据存储 Parquet/CSV/JSO N… ? 索引 时间/倒排/B树 /LSM树… 代码 ? ? 硬件资源 CPU/内存/GPU UDF ? 逻辑计划 优化 物理计划 优化 查询 SQL/API/ DataFrame… ? 执行 表达式执 行 聚合函数 … 传输 排序 ? 物化视图
29. 物理计划&执行 • Acero • Apache Arrow提供的C++原生流式查询引擎 • 高可扩展性 class ARROW_ACERO_EXPORT ExecNode { public: const NodeVector& inputs() const { return inputs_; } const ExecNode* output() const { return output_; } const std::shared_ptr<Schema>& output_schema() const { return output_schema_; } virtual Status InputReceived(ExecNode* input, ExecBatch batch) = 0; virtual Status InputFinished(ExecNode* input, int total_batches) = 0; virtual Status StartProducing() = 0; virtual void PauseProducing(ExecNode* output, int32_t counter) = 0; virtual void ResumeProducing(ExecNode* output, int32_t counter) = 0; Status StopProducing(); }; ? 数据存储 Parquet/CSV/JSO N… ? 索引 时间/倒排/B树 /LSM树… 代码 ? ? 硬件资源 CPU/内存/GPU UDF 优化 ? ? 优化 ? 物理计划 ? 执行 表达式执 行 void register_schemaless_sink_node(ExecFactoryRegistry *registry) { registry->AddFactory("schemaless_consuming_sink", SchemalessConsumingSinkNode::Make) } ? SQL/API/ DataFrame… ? 逻辑计划 查询 聚合函数 … 传输 排序 物化视图
30. 物理计划&执行 • 逻辑计划->物理计划 EventSetScan Node ? 数据存储 Parquet/CSV/JSO N… ? 索引 时间/倒排/B树 /LSM树… 代码 ? ? 硬件资源 CPU/内存/GPU UDF Selection Node 优化 ? ? 优化 ? 物理计划 Aggregate Node ? 执行 表达式执 行 ? SQL/API/ DataFrame… ? 逻辑计划 查询 聚合函数 … Schemaless SinkNode 传输 排序 物化视图
31. 物理计划&执行 • 支持动态模式改造 ? 数据存储 class ARROW_ACERO_EXPORT ExecNode { public: const NodeVector& inputs() const { return inputs_; } const ExecNode* output() const { return output_; } const std::shared_ptr<Schema>& output_schema() const { return output_schema_; } virtual Status InputReceived(ExecNode* input, ExecBatch batch) = 0; virtual Status InputFinished(ExecNode* input, int total_batches) = 0; virtual Status StartProducing() = 0; virtual void PauseProducing(ExecNode* output, int32_t counter) = 0; virtual void ResumeProducing(ExecNode* output, int32_t counter) = 0; Status StopProducing(); }; Parquet/CSV/JSO N… ? 索引 时间/倒排/B树 /LSM树… 代码 ? ? 硬件资源 CPU/内存/GPU UDF 优化 ? ? 优化 ? 物理计划 ? 执行 表达式执 行 ? SQL/API/ DataFrame… ? 逻辑计划 查询 聚合函数 … 传输 排序 物化视图
32. 物理计划&执行 • 扩展算子功能 • 支持动态模式 • 支持物化视图 ? ? 数据存储 索引 Parquet/CSV/JSO N… struct Aggregator : public KernelState { virtual Status Consume(); virtual Status MergeFrom(); virtual Status Finalize(); virtual Result<std::shared_ptr<Schema>> IntermediateSchema(); virtual Status ConsumeIntermediate(); virtual Result<std::vector<Datum>> FinalizeIntermediate(); }; 代码 时间/倒排/B树 /LSM树… ? CPU/内存/GPU UDF 优化 ? ? 优化 ? 物理计划 ? 表达式执 行 ? … 聚合函数 传输 查询 ? SQL/API/ DataFrame… ? 逻辑计划 执行 ? 硬件资源 ? ? 物化视图 ? 排序
33. 传输 • Arrow Flight • 使用 gRPC 或 REST 接口标准化高性能数据交换框架 • Arrow Flight SQL ? ? 数据存储 硬件资源 Parquet/CSV/J SON… CPU/内存/GPU 索引 ? 时间/倒排/B树 /LSM树… ? ? … 聚合函数 传输 查询 ? SQL/API/DataF rame… ? 优化 ? ? 优化 ? 物理计划 表达式执 行 ? UDF 逻辑计划 执行 代码 ? ? 物化视图 ? 排序 ?
34. 04 一些Tips DataFunSummit # 2023
35. 踩过的一些坑 • 更新频繁,开发接口还不够稳定:少魔改,多扩展 • 复杂类型处理还不够完备:Union、List • 需要时间和大规模数据的检验 https://arrow.apache.org/docs/cpp/acero/overview.html#what-is-acero
36. One More Thing: 内存中过 数据模型 滤、聚合、 内存数据格式 类型系统 排序算子 查询语言 运算表达式 元数据管理 并发控制 时间类型 关联查询 API 标量函数 子查询 外存算子 持久化 索引 查询计划 冷热分层 存储压缩优化 及查询执行 数据交换 窗口函数 高级分析函数 …… 物化视图 资源管理 跨平台运行 算子向量化执行 x86/ARM SSE/AVX/NEON 优化器 分布式执行
37. 感谢观看

Home - Wiki
Copyright © 2011-2024 iteam. Current version is 2.137.1. UTC+08:00, 2024-11-17 12:49
浙ICP备14020137号-1 $Map of visitor$