使用Apache Arrow助力构建数据系统
如果无法正常显示,请先停止浏览器的去广告插件。
1. 使用Apache Arrow
助力构建数据系统
李晨曦 炎凰数据 研发工程师
DataFunSummit # 2023
2. 目录
CONTENT
01
02
为什么要构建新的数据系统
使用Apache Arrow
构建数据系统
03
04
什么是Apache Arrow
为什么是Apache Arrow
一些Tips
3. 01
为什么要构建新的数据系统
DataFunSummit # 2023
4. One Size Fits All Or Not?
OLTP/OLAP
批处理/流处理
SQL/NoSQL
……
VS
HTAP
流批一体
NewSQL
……
5. 数据库的黄金时代
https://dbdb.io/browse?start-min=2020
6. 为什么要构建一个新的数据系统?
7. 为什么要构建一个新的数据系统?
读时建模Schema On Read
Schema on Read读时建模技
术在异构日志处理领域的实践
8. 为什么要构建一个新的数据系统?
支持动态数据表模式的数据查询引擎
SELECT * FROM my_eventset
9. 为什么要构建一个新的数据系统?
支持动态数据表模式的数据查询引擎
SELECT _datatype, http.url, http.method, url, method FROM my_eventset
10. 为什么要构建一个新的数据系统?
支持动态模式的数据查询引擎
SELECT
_datatype,
COALESCE(http.url, url) AS url,
COALESCE(http.method, method) AS method
FROM
my_eventset
11. 数据系统构建分解
内存中过
数据模型 滤、聚合、
内存数据格式 类型系统 排序算子 查询语言 运算表达式
元数据管理
并发控制
时间类型
关联查询
API
标量函数
子查询
外存算子
持久化
索引
查询计划
冷热分层
存储压缩优化
及查询执行
数据交换
窗口函数
高级分析函数
……
物化视图
资源管理
跨平台运行 算子向量化执行
x86/ARM SSE/AVX/NEON
优化器
分布式执行
12. 02
什么是
Apache Arrow
为什么是
DataFunSummit # 2023
13. 第一步
内存中过
数据模型 滤、聚合、
内存数据格式 类型系统 排序算子 查询语言 运算表达式
元数据管理
并发控制
时间类型
关联查询
API
标量函数
子查询
外存算子
持久化
索引
查询计划
冷热分层
存储压缩优化
及查询执行
数据交换
窗口函数
高级分析函数
……
物化视图
资源管理
跨平台运行 算子向量化执行
x86/ARM SSE/AVX/NEON
优化器
分布式执行
14. 内存数据格式
session_id timestamp ip
Row 1 1432 2023/10/23 10:44 101.229.90.66
Row 2 1578 2023/10/22 17:31 220.181.108.17
Row 3 1532 2023/10/23 08:09 27.115.41.185
Row 4 1398 2023/10/22 13:26 118.31.77.1
行式
列式
1432
Row 1
2023/10/23 10:44
101.229.90.66
Row 2
1578
1532
1578
2023/10/22 17:31 2023/10/23 10:44
1532
timestamp
2023/10/22 17:31
2023/10/23 08:09
2023/10/23 08:09 2023/10/22 13:26
27.115.41.185 101.229.90.66
1398
Row 4
session_id
1578
220.181.108.17
Row 3
1432
2023/10/22 13:26
118.31.77.1
ip
220.181.108.17
27.115.41.185
118.31.77.1
15. 内存数据格式:列式存储
• 场景OLAP
• 降低IO、内存开销
• 数据访问局部性&向量化
session_id timestamp ip
Row 1 1432 2023/10/23 10:44 101.229.90.66
Row 2 1578 2023/10/22 17:31 220.181.108.17
Row 3 1532 2023/10/23 08:09 27.115.41.185
Row 4 1398 2023/10/22 13:26 118.31.77.1
行式
列式
1432
Row 1
2023/10/23 10:44
101.229.90.66
Row 2
1578
1532
1578
2023/10/22 17:31 2023/10/23 10:44
1532
timestamp
2023/10/22 17:31
2023/10/23 08:09
2023/10/23 08:09 2023/10/22 13:26
27.115.41.185 101.229.90.66
1398
Row 4
session_id
1578
220.181.108.17
Row 3
1432
2023/10/22 13:26
118.31.77.1
ip
220.181.108.17
27.115.41.185
118.31.77.1
16. 要继续造轮子么?
17. 是什么
• 高性能列式内存格式标准
• 支持跨语言跨系统互操作
• 支持现代硬件
• 支持高性能传输,零拷贝,无需序列化反序列化
复制&转换
18. 是什么
• 高性能列式内存格式标准
• 支持跨语言跨系统互操作
• 支持现代硬件
• 支持高性能传输,零拷贝,无需序列化反序列化
• 完备的生态系统
19. 活跃的社区
20. 数据系统构建任务
内存中过
数据模型 滤、聚合、
内存数据格式 类型系统 排序算子 查询语言 运算表达式
元数据管理
并发控制
时间类型
关联查询
API
标量函数
子查询
外存算子
持久化
索引
查询计划
冷热分层
存储压缩优化
及查询执行
数据交换
窗口函数
10人年?
高级分析函数
……
物化视图
资源管理
跨平台运行 算子向量化执行
x86/ARM SSE/AVX/NEON
优化器
分布式执行
21. 数据系统构建任务 with Apache Arrow
内存中过
数据模型 滤、聚合、
内存数据格式 类型系统 排序算子 查询语言 运算表达式
元数据管理
并发控制
时间类型
关联查询
API
标量函数
子查询
外存算子
持久化
索引
查询计划
冷热分层
存储压缩优化
及查询执行
数据交换
窗口函数
高级分析函数
……
物化视图
资源管理
跨平台运行 算子向量化执行
x86/ARM SSE/AVX/NEON
优化器
分布式执行
22. 03
使用Apache Arrow
构建数据系统
DataFunSummit # 2023
23. 使用Apache Arrow构建数据系统
数据存储
Parquet/CSV/JSO
N…
索引
时间/倒排/B树
/LSM树…
代码
UDF
查询
硬件资源
SQL/API/
DataFrame…
CPU/内存/GPU
逻辑计划 优化
物理计划 优化
执行
表达式执
行
聚合函数
…
传输
排序
物化视图
24. 数据存储
• 存储模型:事件(Event)
• Apache Parquet
• 列式存储
• 支持查询下推
• Arrow Record Batch
?
数据存储
Parquet/CSV/JSO
N…
索引
时间/倒排/B树
/LSM树…
代码
UDF
查询
硬件资源
SQL/API/
DataFrame…
CPU/内存/GPU
逻辑计划 优化
物理计划 优化
执行
表达式执
行
聚合函数
…
传输
Source: Querying Parquet with Millisecond Latency
排序
物化视图
25. 数据存储
• 存储模型:事件(Event)
• Apache Parquet
• 列式存储
• 支持查询下推
• Arrow Record Batch
?
数据存储
Parquet/CSV/JSO
N…
索引
时间/倒排/B树
/LSM树…
代码
UDF
查询
硬件资源
SQL/API/
DataFrame…
CPU/内存/GPU
逻辑计划 优化
物理计划 优化
执行
表达式执
行
聚合函数
…
传输
排序
物化视图
26. 索引/代码/硬件资源
• 索引:时间/倒排
• 代码:UDF
• 硬件资源:内存/线程/GPU
?
数据存储
Parquet/CSV/JSO
N…
?
索引
时间/倒排/B树
/LSM树…
代码
?
UDF
?
硬件资源
CPU/内存/GPU
逻辑计划 优化
物理计划 优化
查询
SQL/API/
DataFrame…
执行
表达式执
行
聚合函数
…
传输
排序
物化视图
27. 查询:SQL解析
• SQL语句->抽象语法树
?
数据存储
Parquet/CSV/JSO
N…
?
索引
时间/倒排/B树
/LSM树…
代码
?
UDF
?
硬件资源
CPU/内存/GPU
逻辑计划 优化
物理计划 优化
查询
SQL/API/
DataFrame…
执行
表达式执
行
聚合函数
…
传输
排序
?
物化视图
28. 逻辑计划&优化
• 抽象语法树->逻辑计划
• 优化
Aggregate: MAX(_time)
Selection: ip = '101.229.90.66'
TableScan: dataset
Aggregate: MAX(_time)
Selection: ip = '101.229.90.66'
TableScan: dataset,
columns: ip,_time,
expressions: ip = '101.229.90.66'
?
数据存储
Parquet/CSV/JSO
N…
?
索引
时间/倒排/B树
/LSM树…
代码
?
?
硬件资源
CPU/内存/GPU
UDF
?
逻辑计划 优化
物理计划 优化
查询
SQL/API/
DataFrame…
?
执行
表达式执
行
聚合函数
…
传输
排序
?
物化视图
29. 物理计划&执行
• Acero
• Apache Arrow提供的C++原生流式查询引擎
• 高可扩展性
class ARROW_ACERO_EXPORT ExecNode {
public:
const NodeVector& inputs() const { return inputs_; }
const ExecNode* output() const { return output_; }
const std::shared_ptr<Schema>& output_schema() const { return
output_schema_; }
virtual Status InputReceived(ExecNode* input, ExecBatch batch) = 0;
virtual Status InputFinished(ExecNode* input, int total_batches) = 0;
virtual Status StartProducing() = 0;
virtual void PauseProducing(ExecNode* output, int32_t counter) = 0;
virtual void ResumeProducing(ExecNode* output, int32_t counter) = 0;
Status StopProducing();
};
?
数据存储
Parquet/CSV/JSO
N…
?
索引
时间/倒排/B树
/LSM树…
代码
?
?
硬件资源
CPU/内存/GPU
UDF
优化 ?
? 优化 ?
物理计划
?
执行
表达式执
行
void register_schemaless_sink_node(ExecFactoryRegistry *registry) {
registry->AddFactory("schemaless_consuming_sink", SchemalessConsumingSinkNode::Make)
}
?
SQL/API/
DataFrame…
?
逻辑计划
查询
聚合函数
…
传输
排序
物化视图
30. 物理计划&执行
• 逻辑计划->物理计划
EventSetScan
Node
?
数据存储
Parquet/CSV/JSO
N…
?
索引
时间/倒排/B树
/LSM树…
代码
?
?
硬件资源
CPU/内存/GPU
UDF
Selection
Node
优化 ?
? 优化 ?
物理计划
Aggregate
Node
?
执行
表达式执
行
?
SQL/API/
DataFrame…
?
逻辑计划
查询
聚合函数
…
Schemaless
SinkNode
传输
排序
物化视图
31. 物理计划&执行
• 支持动态模式改造
?
数据存储
class ARROW_ACERO_EXPORT ExecNode {
public:
const NodeVector& inputs() const { return inputs_; }
const ExecNode* output() const { return output_; }
const std::shared_ptr<Schema>& output_schema() const { return
output_schema_; }
virtual Status InputReceived(ExecNode* input, ExecBatch batch) = 0;
virtual Status InputFinished(ExecNode* input, int total_batches) = 0;
virtual Status StartProducing() = 0;
virtual void PauseProducing(ExecNode* output, int32_t counter) = 0;
virtual void ResumeProducing(ExecNode* output, int32_t counter) = 0;
Status StopProducing();
};
Parquet/CSV/JSO
N…
?
索引
时间/倒排/B树
/LSM树…
代码
?
?
硬件资源
CPU/内存/GPU
UDF
优化 ?
? 优化 ?
物理计划
?
执行
表达式执
行
?
SQL/API/
DataFrame…
?
逻辑计划
查询
聚合函数
…
传输
排序
物化视图
32. 物理计划&执行
• 扩展算子功能
• 支持动态模式
• 支持物化视图
?
?
数据存储
索引
Parquet/CSV/JSO
N…
struct Aggregator : public KernelState {
virtual Status Consume();
virtual Status MergeFrom();
virtual Status Finalize();
virtual Result<std::shared_ptr<Schema>>
IntermediateSchema();
virtual Status ConsumeIntermediate();
virtual Result<std::vector<Datum>> FinalizeIntermediate();
};
代码
时间/倒排/B树
/LSM树…
?
CPU/内存/GPU
UDF
优化 ?
? 优化 ?
物理计划
?
表达式执
行
? …
聚合函数
传输
查询
?
SQL/API/
DataFrame…
?
逻辑计划
执行
?
硬件资源
?
? 物化视图 ?
排序
33. 传输
• Arrow Flight
• 使用 gRPC 或 REST 接口标准化高性能数据交换框架
• Arrow Flight SQL
?
?
数据存储
硬件资源
Parquet/CSV/J
SON…
CPU/内存/GPU
索引
?
时间/倒排/B树
/LSM树…
?
? …
聚合函数
传输
查询
?
SQL/API/DataF
rame…
? 优化 ?
? 优化 ?
物理计划
表达式执
行
?
UDF
逻辑计划
执行
代码
?
? 物化视图 ?
排序
?
34. 04
一些Tips
DataFunSummit # 2023
35. 踩过的一些坑
• 更新频繁,开发接口还不够稳定:少魔改,多扩展
• 复杂类型处理还不够完备:Union、List
• 需要时间和大规模数据的检验
https://arrow.apache.org/docs/cpp/acero/overview.html#what-is-acero
36. One More Thing:
内存中过
数据模型 滤、聚合、
内存数据格式 类型系统 排序算子 查询语言 运算表达式
元数据管理
并发控制
时间类型
关联查询
API
标量函数
子查询
外存算子
持久化
索引
查询计划
冷热分层
存储压缩优化
及查询执行
数据交换
窗口函数
高级分析函数
……
物化视图
资源管理
跨平台运行 算子向量化执行
x86/ARM SSE/AVX/NEON
优化器
分布式执行
37. 感谢观看