导读
本文主要介绍了Doris SQL解析的原理。
重点讲述了生成单机逻辑计划,生成分布式逻辑计划,生成分布式物理计划的过程。对应于代码实现是Analyze,SinglePlan,DistributedPlan,Schedule四个部分。
Analyze负责对AST进行前期的一些处理,SinglePlan根据AST进行优化生成单机查询计划,DistributedPlan将单机的查询计划拆成分布式的查询计划,Schedule阶段负责决定查询计划下发到哪些机器上执行。
由于SQL类型有很多,本文侧重介绍查询SQL的解析,从算法原理和代码实现上深入讲解了Doris的SQL解析原理。
1. Doris简介
Doris分成两部分FE和BE,FE 负责存储以及维护集群元数据、接收、解析、查询、设计规划整体查询流程,BE 负责数据存储和具体的实施过程。
在 Doris 的存储引擎中,用户数据被水平划分为若干个数据分片(Tablet,也称作数据分桶)。每个 Tablet 包含若干数据行。多个 Tablet 在逻辑上归属于不同的分区Partition。一个 Tablet 只属于一个 Partition。而一个 Partition 包含若干个 Tablet。Tablet 是数据移动、复制等操作的最小物理存储单元。
Doris广泛应用于小米的各条业务线上,为小米金融,手机,AIOT,游戏等各大部门提供数据计算支持
2. SQL解析简介
这个过程包括以下四个步骤:词法分析,语法分析,生成逻辑计划,生成物理计划。
如图1所示:
2.1 词法分析
词法分析主要负责将字符串形式的sql识别成一个个token,为语法分析做准备。
select ...... from ...... where ....... group by ..... order by ......
SQL 的 Token 可以分为如下几类:
○ 关键字(select、from、where)
○ 操作符(+、-、>=)
○ 开闭合标志((、CASE)
○ 占位符(?)
○ 注释
○ 空格
......
2.2 语法分析
语法分析主要负责根据语法规则,将词法分析生成的token转成抽象语法树(Abstract Syntax Tree),如图2所示。
图 2 抽象语法树示例
2.3 逻辑计划
3. 设计目标
最大化计算的并行性
最小化数据的网络传输
最大化减少需要扫描的数据
4. 总体架构
如图4所示,Parse阶段本文不详细讲,Analyze负责对AST进行前期的一些处理,SinglePlan根据AST进行优化生成单机查询计划,DistributedPlan将单机的查询计划拆成分布式的查询计划,Schedule阶段负责决定查询计划下发到哪些机器上执行。
由于SQL类型有很多,本文侧重介绍查询SQL的解析。
图5展示了一个简单的查询SQL在Doris的解析实现。
5. Parse阶段
词法分析采用jflex技术,语法分析采用java cup parser技术,最后生成抽象语法树(Abstract Syntax Tree)AST,这些都是现有的、成熟的技术,在这里不进行详细介绍。
6. Analyze阶段
抽象语法树是由StatementBase这个抽象类表示。这个抽象类包含一个最重要的成员函数analyze(),用来执行Analyze阶段要做的事。
不同类型的查询select, insert, show, set, alter table, create table等经过Parse阶段后生成不同的数据结构(SelectStmt, InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt等),这些数据结构继承自StatementBase,并实现analyze()函数,对特定类型的SQL进行特定的Analyze。
例如:select类型的查询,会转成对select sql的子语句SelectList, FromClause, GroupByClause, HavingClause, WhereClause, SortInfo等的analyze()。然后这些子语句再各自对自己的子结构进行进一步的analyze(),通过层层迭代,把各种类型的sql的各种情景都分析完毕。例如:WhereClause进一步分析其包含的BetweenPredicate(between表达式), BinaryPredicate(二元表达式), CompoundPredicate(and or组合表达式), InPredicate(in表达式)等。
对于查询类型的SQL,包含以下几项重要工作:
· 元信息的识别和解析:识别和解析sql中涉及的 Cluster, Database, Table, Column 等元信息,确定需要对哪个集群的哪个数据库的哪些表的哪些列进行计算。
· SQL 的合法性检查:窗口函数不能 DISTINCT,投影列是否有歧义,where语句中不能含有grouping操作等。
· SQL 简单重写:比如将 select * 扩展成 select 所有列,count distinct转成bitmap或者hll函数等。
· 函数处理:检查sql中包含的函数和系统定义的函数是否一致,包括参数类型,参数个数等。
· Table 和 Column 的别名处理
· 类型检查和转换:例如二元表达式两边的类型不一致时,需要对其中一个类型进行转换(BIGINT 和 DECIMAL 比较,BIGINT 类型需要 Cast 成 DECIMAL)。
对AST 进行analyze后,会再进行一次rewrite操作,进行精简或者是转成统一的处理方式。目前rewrite的算法是基于规则的方式,针对AST的树状结构,自底向上,应用每一条规则进行重写。如果重写后,AST有变化,则再次进行analyze和rewrite,直到AST无变化为止。
例如:常量表达式的化简:1 + 1 + 1 重写成 3,1 > 2 重写成 Flase 等。将一些语句转成统一的处理方式,比如将 where in, where exists 重写成 semi join, where not in, where not exists 重写成 anti join。
7. 生成单机逻辑Plan阶段
这部分工作主要是根据AST抽象语法树生成代数关系,也就是俗称的算子数。树上的每个节点都是一个算子,代表着一种操作。
图7 单机逻辑计划示例
如果不进行优化,生成的关系代数下发到存储中执行的代价非常高。
对于查询:
select a.siteid, a.pv from table1 a join table2 b on a.siteid = b.siteid where a.citycode=122216 and b.username="test" order by a.pv limit 10
未优化的关系代数,如图8所示,需要将所有列读出来进行一系列的计算,在最后选择输出siteid, pv两列,大量无用的列数据浪费了计算资源。
Doris在生成代数关系时,进行了大量的优化,将投影列和查询条件尽可能放到扫描操作时执行。
图8 未优化的关系代数
具体来说这个阶段主要做了如下几项工作:
· Slot 物化:指确定一个表达式对应的列需要 Scan 和计算,比如聚合节点的聚合函数表达式和 Group By 表达式需要进行物化。
· 投影下推:BE 在 Scan 时只会 Scan 必须读取的列。
· 谓词下推:在满足语义正确的前提下将过滤条件尽可能下推到 Scan 节点。
· 分区,分桶裁剪:根据过滤条件中的信息,确定需要扫描哪些分区,哪些桶的tablet。
· Join Reorder:对于 Inner Join, Doris 会根据行数调整表的顺序,将大表放在前面。
· Sort + Limit 优化成 TopN:对于order by limit语句会转换成TopN的操作节点,方便统一处理。
· MaterializedView 选择:会根据查询需要的列,过滤,排序和 Join 的列,行数,列数等因素选择最佳的物化视图。
图9展示了优化的示例,Doris是在生成关系代数的过程中优化,边生成边优化。
图 9 单机查询计划优化的过程
8. 生成分布式Plan阶段
有了单机的PlanNode树之后,就需要进一步根据分布式环境,拆成分布式PlanFragment树(PlanFragment用来表示独立的执行单元),毕竟一个表的数据分散地存储在多台主机上,完全可以让一些计算并行起来。
图 10 bucket shuffle join示例
图 11 HashJoinNode创建分布式逻辑计划核心流程
图 12 从单机计划到分布式计划
9. Schedule阶段
哪个 BE 执行哪个 PlanFragment
每个 Tablet 选择哪个副本去查询
如何进行多实例并发
图13展示了创建分布式物理计划的核心流程:
a. prepare阶段:给每个PlanFragment创建一个FragmentExecParams结构,用来表示PlanFragment执行时所需的所有参数;如果一个PlanFragment包含有DataSinkNode,则找到数据发送的目的PlanFragment,然后指定目的PlanFragment的FragmentExecParams的输入为该PlanFragment的FragmentExecParams。
b. computeScanRangeAssignment阶段:针对不同类型的join进行不同的处理。
c. computeFragmentExecParams阶段:这个阶段解决PlanFragment下发到哪个BE上执行,以及如何处理实例并发问题。确定了每个tablet的扫描地址之后,就可以以地址为维度,将FragmentExecParams生成多个实例,也就是FragmentExecParams中包含的地址有多个,就生成多个实例FInstanceExecParam。如果设置了并发度,那么一个地址的执行实例再进一步的拆成多个FInstanceExecParam。针对bucket shuffle join和colocate join会有一些特殊处理,但是基本思想一样。FInstanceExecParam创建完成后,会分配一个唯一的ID,方便追踪信息。如果FragmentExecParams中包含有ExchangeNode,需要计算有多少senders,以便知道需要接受多少个发送方的数据。最后FragmentExecParams确定destinations,并把目的地址填充上去。
d. create result receiver阶段:result receiver是查询完成后,最终数据需要输出的地方。
e. to thrift阶段:根据所有PlanFragment的FInstanceExecParam创建rpc请求,然后下发到BE端执行。这样一个完整的SQL解析过程完成了。
图 13 创建分布式物理计划核心流程
图 14 生成物理计划的过程
10. 总结
Doris遵守了SQL解析的常用方法,但根据底层存储架构,以及分布式的特点,在SQL解析这块进行了大量的优化,实现了最大并行度和最小化网络传输,给SQL执行层面减少很多负担。
往期文章