1 什么是向量化计算
1.1 并行数据处理:SIMD指令
1.2 向量化执行框架:数据局部性与运行时开销
1.3 如何使用向量化计算
2 为什么要做Spark向量化计算
3 Spark向量化计算如何在美团实施落地
3.1 整体建设思路
3.2 Spark+Gluten+Velox计算流程
3.3 阶段划分
4 美团Spark向量化计算遇到的挑战
4.1 稳定性问题
4.2 支持ORC并优化读写性能
4.3 Native HDFS客户端优化
4.4 Shuffle重构
4.5 适配HBO
4.6 一致性问题
5 上线效果
6 未来规划
6.1 Spark向量化之后对开源社区的跟进策略
6.2 提升向量化覆盖率的策略
7 致谢
让我们从一个简单问题开始:假设要实现“数组a+b存入c”,设三个整型数组的长度都是100,那么只需将“c[i] = a[i] + b[i]”置于一个100次的循环内,代码如下:
void addArrays(const int* a, const int* b, int* c, int num) {
for (int i = 0; i < num; ++i) {
c[i] = a[i] + b[i];
}
}
我们知道:计算在CPU内完成,逻辑计算单元操作寄存器中的数据,算术运算的源操作数要先放置到CPU的寄存器中,哪怕简单的内存拷贝也需要过CPU寄存器。所以,完成“c[i] = a[i] + b[i]”需经三步:
其中,加载和存储对应访存指令(Memory Instruction),计算是算术加指令,循环执行100次上述三步骤,就完成了“数组a + 数组b => 数组c”。该流程即对应传统的计算架构:单指令单数据(SISD)顺序架构,任意时间点只有一条指令作用于一条数据流。如果有更宽的寄存器(超机器字长,比如256位16字节),一次性从源内存同时加载更多的数据到寄存器,一条指令作用于寄存器x和y,在x和y的每个分量(比如32位4字节)上并行进行加,并将结果存入寄存器z的各对应分量,最后一次性将寄存器z里的内容存入目标内存,那么就能实现单指令并行处理数据的效果,这就是单指令多数据(SIMD)。
单指令多数据对应一类并行架构(现代CPU一般都支持SIMD执行),单条指令同时作用于多条数据流,可成倍的提升单核计算能力。SIMD非常适合计算密集型任务,它能加速的根本原因是“从一次一个跨越到一次一组,从而实现用更少的指令数完成同样的计算任务。”
执行引擎常规按行处理的方式,存在以下三个问题:
图2:row by row VS blcok by block
同一列数据在循环里被施加相同的计算,批量迭代将减少函数调用次数,通过模版能减少虚函数调用,降低运行时开销。针对固定长度类型的列很容易被并行处理(通过行号offset到数据),这样的执行框架也有利于让编译器做自动向量化代码生成,显著减少分支,减轻预测失败的惩罚。结合模板,编译器会为每个实参生成特定实例化代码,避免运行时查找虚函数表,并且由于编译器知道了具体的类型信息,可以对模板函数进行内联展开。
图3:向量化执行框架示例
自动向量化(Auto-Vectorization)。当循环内没有复杂的条件分支,没有数据依赖,只调用简单内联函数时,通过编译选项(如gcc -ftree-vectorize
、-O3),编译器可以将顺序执行代码翻译成向量化执行代码。可以通过观察编译hint输出和反汇编确定是否生产了向量化代码。
g++ test.cpp -g -O3 -march=native -fopt-info-vec-optimized
,执行后有类似输出“test.cpp:35:21: note: loop vectorized”。gdb test + (gdb) disassemble /m function_name
,看到一些v打头的指令(例如vmovups、vpaddd、vmovups等)。使用封装好的函数库,如Intel Intrinsic function、xsimd等。这些软件包中的内置函数实现都使用了SIMD指令进行优化,相当于high level地使用了向量化指令的汇编,详见:https://www.intel.com/content/www/us/en/docs/intrinsics-guide/index.html。
通过asm内嵌向量化汇编,但汇编指令跟CPU架构相关,可移植性差。
编译器暗示:
如果循环内有复杂的逻辑或条件分支,那么将难以向量化处理。
以下是一个向量化版本数组相加的例子,使用Intel Intrinsic function:
#include <immintrin.h> // 包含Intrinsic avx版本函数的头文件
void addArraysAVX(const int* a, const int* b, int* c, int num) {
assert(num % 8 == 0); // 循环遍历数组,步长为8,因为每个__m256i可以存储8个32位整数
for (int i = 0; i < num; i += 8) {
__m256i v_a = _mm256_load_si256((__m256i*)&a[i]); // 加载数组a的下一个8个整数到向量寄存器
__m256i v_b = _mm256_load_si256((__m256i*)&b[i]); // 加载数组b的下一个8个整数到向量寄存器
__m256i v_c = _mm256_add_epi32(v_a, v_b); // 将两个向量相加,结果存放在向量寄存器
_mm256_store_si256((__m256i*)&c[i], v_c); // 将结果向量存储到数组c的内存
}
}
int main(int argc, char* argv[]) {
const int ARRAY_SIZE = 64 * 1024;
int a[ARRAY_SIZE] __attribute__((aligned(32))); // 按32字节对齐,满足某些向量化指令的内存对齐要求
int b[ARRAY_SIZE] __attribute__((aligned(32)));
int c[ARRAY_SIZE] __attribute__((aligned(32)));
srand(time(0));
for (int i = 0; i < ARRAY_SIZE; ++i) {
a[i] = rand(); b[i] = rand(); c[i] = 0; // 对数组a和b赋随机数初始值
}
auto start = std::chrono::high_resolution_clock::now();
addArraysAVX(a, b, c, ARRAY_SIZE);
auto end = std::chrono::high_resolution_clock::now();
std::cout << "addArraysAVX took " << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count() << " microseconds." << std::endl;
return 0;
}
addArraysAVX函数中的_mm256_load_si256、_mm256_add_epi32、_mm256_store_si256都是Intrinsic库函数,内置函数命名方式是:
编译:g++ test.cpp -O0 -std=c++11 -mavx2 -o test。选项-O0用于禁用优化(因为开启优化后有可能自动向量化),-mavx2用于启用AVX2指令集。
从业界发展情况来看,近几年OLAP引擎发展迅速,该场景追求极致的查询速度,向量化技术在Clickhouse、Doris等Native引擎中得到广泛使用,降本增效的趋势也逐渐扩展到数仓生产。2022年6月DataBricks发表论文《Photon- A Fast Query Engine for Lakehouse Systems》,Photon是DataBricks Runtime中C++实现的向量化执行引擎,相比DBR性能平均提升4倍,并已应用在Databricks商业版上,但没有开源。2021年Meta开源Velox,一个C++实现的向量化执行库。2022 Databricks Data & AI Summit 上,Intel 与Kyligence介绍了合作开源项目Gluten,旨在为Spark SQL提供Native Vectorized Execution。Gluten+Velox的组合,使Java栈的Spark也可以像Doris、Clickhouse等Native引擎一样发挥向量化执行的性能优势。
从美团内部来看,数仓生产有数万规模计算节点,很多业务决策依赖数据及时产出,若应用向量化执行技术,在不升级硬件的情况下,既可获得可观的资源节省,也能加速作业执行,让业务更快看到数据和做出决策。根据Photon和Gluten的公开数据,应用向量化Spark执行效率至少可以提升一倍,我们在物理机上基于TPC-H测试Gluten+Velox相比Spark 3.0也有1.7倍性能提升。
通过Spark的plugin功能,Gluten将Spark和向量化执行引擎(Native backend,如Velox)连接起来,分为Driver plugin和Executor Plugin。在Driver端,SparkContext初始化时,Gluten的一系列规则(如ColumnarOverrideRules)通过Spark Extensions注入,这些规则会对Spark的执行计划进行校验,并把Gluten支持的算子转换成向量化算子(如FileScan会转换成NativeFileScan),不能转换的算子上报Fallback的原因,并在回退的部分嵌入Column2Row、Row2Column算子,生成Substrait执行计划。在Executor端,接收到Driver侧的LaunchTask RPC消息传输的Substrait执行计划后,再转换成Native backend的执行计划,最终通过JNI调用Native backend执行。
图5:Spark+Gluten+Velox架构图
在我们开始Spark向量化项目时,开源版本的Gluten和Velox还没有在业界Spark生产环境大规模实践过,为了降低风险最小代价验证可行性,我们把落地过程分为以下五个阶段逐步进行:
cat /proc/cpuinfo | grep --color -wE "bmi|bmi2|f16c|avx|avx2|sse"
性能收益验证。由于向量化版本和原生Spark分别使用堆外内存和堆内内存,引入翻倍内存的配置,以及一些高性能feature支持不完善,一开始生产环境测试性能结果不及预期。我们逐个分析解决问题,包括参数对齐、去掉arrow中间数据转换、shuffle batch fetch、Native HDFS客户端优化、使用jemelloc、算子优化、内存配置优化、HBO适配等。本阶段将平均资源节省从-70%提升到40%以上。
一致性验证。主要是问题修复,对所有非SLA作业进行大规模测试,筛选出稳定运行、数据完全一致、有正收益的作业。
灰度上线。将向量化执行环境发布到所有服务器,对符合条件的作业分批上线,增加监控报表,收集收益,对性能不及预期、发生数据不一致的作业及时回退原生Spark上。此过程用户无感知。
整个实施过程中,我们通过收益转化漏斗找到收益最大的优化点,指导项目迭代。下图为2023年某一时期的相邻转化情况。
聚合时Shuffle阶段OOM。在Spark中,Aggregation一般包括Partial Aggregation、Shuffle、Final Aggregation三个阶段,Partial Aggregation在Mapper端预聚合以降低Shuffle数据量,加速聚合过程、避免数据倾斜。Aggregation需要维护中间状态,如果Partial Aggregation占用的内存超过一定阈值,就会提前触发Flush同时后续输入数据跳过此阶段,直接进入ShuffleWrite流程。Gluten使用Velox默认配置的Flush内存阈值(Spark堆外内存*75%),由于Velox里Spill功能还不够完善(Partial Aggregation不支持Spill),这样大作业场景,后续ShuffleWrite流程可能没有足够的内存可以使用(可用内存<25%*Spark堆外内存),会引起作业OOM。我们采用的策略是通过在Gluten侧调低Velox Partial Aggregation的Flush阈值,来降低Partial Aggregation阶段的内存占用,避免大作业OOM。这个方案在可以让大作业运行通过,但是理论上提前触发Partial Aggergation Flush会降低Partial Aggretation的效果。更合理的方案是Partial Aggregation支持Spill功能,Gluten和Velox社区也一直在完善对向量化算子Spill功能的支持。
Velox的DWIO模块原生只支持DWRF和Parquet两种数据格式,美团大部分表都是基于ORC格式进行存储的。DWRF文件格式是Meta内部所采用的ORC分支版本,其文件结构与ORC相似,比如针对ORC文件的不同区域,可通过复用DWRF的Reader来完成相关数据内容的读取。
基于这些优化,改造后的Velox版ORC Reader读取时间减少一半,性能提升一倍。
首先介绍一下HDFS C++客户端对ORC文件读取某一列数据的过程。第一步,读取文件的最后一个字节来确定PostScript长度,读取PostScript内容;第二步,通过PostScript确定Footer的存储位置,读取Footer内容;第三步,通过Footer确定每个Stripe的元数据信息,读取StripeFooter;第四步,通过StripeFooter确定每个Column的Stream位置,读取需要的Stream数据。
小数据量随机读放大。客户端想要读取[offset ~ readEnd]区间内的数据,发送给DN的实际读取区间却是[offset ~ endOfCurBlock],导致[readEnd ~ endOfCurBlock]这部分数据做了无效读取。这样设计主要是为了优化顺序读场景,通过预读来加快后续访问,然而针对随机读场景(小数据量下比较普遍),该方式却适得其反,因为预读出的数据很难被后续使用,增加了读放大行为。我们优化为客户端只向DN传递需要读取的数据区间,DN侧不提前预取,只返回客户端需要的数据。
Gluten在shuffle策略的支持上,没有预留好接口,每新增一种shuffle模式需要较大改动。美团有自研的Shuffle Service,其他公司也可能有自己的Shuffle Service(如Celeborn),为了更好适配多种shuffle模式,我们提议对shuffle接口重新梳理,并主导了此讨论和设计。
Gluten中的shuffle抽象第一层是数据格式(Velox是RowVector,Gluten引入的Arrow是RecordBatch),第二层是分区方式(RoundRobin、SinglePart、Hash、Range),如果要支持新shuffle模式(local、remote),需要实现2*4*2=16个writer,将会有大量冗余代码。分区具体实现应该与数据格式和shuffle模式无关,我们用组合模式替代继承模式。另外,我们在shuffle中直接支持了RowVector,避免Velox和Arrow对应数据类型之间的额外转换开销。
HBO(Historical Based Optimization)是通过作业历史运行过程中资源的实际使用量,来预测作业下一次运行需要的资源并设置资源相关参数的一种优化手段。美团过去在原生Spark上通过调配堆内内存取得了8%左右的内存资源节省。
Gluten主要使用堆外内存(off-heap),这与原生Spark主要使用堆内内存(on-heap)不同。初期出于稳定性考虑Gluten和原生Spark的运行参数整体一致,总内存大小相同,Gluten off-heap 占比75%, on-heap占比25%。这样配置既会导致内存利用率不高(原生Spark的内存使用率58%,向量化版作业内存使用率 38%),也会使一部分作业on-heap内存配置偏低,在算子回退时导致任务OOM。
我们把HBO策略推广到堆外内存,向量化计算的内存节省比例从30%提升到40%,由于heap内存配置不合理的OOM问题全部消除。
select A, B, count(distinct userId), sum(amt) from t group by 1,2
,Gluten会把count(distinct userId) 变为count(userId),通过把userId加到GroupingKey里来实现distinct语义。具体处理过程如下:表1:示例SQL在Spark中的处理步骤
这个方案的弊端有两个:1)HashTable的内存占用会变大,需要同时开启HashAggregate算子的Spill功能避免OOM;2)直接修改了Velox的HashAggregate算子内部代码,从Velox自身的角度来看,没有单独针对Distinct相关的聚合做处理,随着后续迭代,可能影响所有用到Intermediate Aggregation的聚合过程。
鉴于此,Gluten社区提供了一个更加均衡的解决方案,针对这类Distinct Aggregation,生成执行计划时,Spark的Partial Merge Aggregation不再生成Intermediate Aggregation,改为生成Final Aggregation(不会提前flush、不使用merge_sum),同时配合聚合函数的Partial Companion函数来返回Intermediate结果。这样就从执行计划转换策略层面规避这个问题,避免对Velox里Final Aggregation内部代码做过多的改动。
SELECT concat(col2, cast(max(col1) as string)) FROM (VALUES (CAST(5.08 AS FLOAT), 'abc_')) AS tab(col1, col2) group by col2;
在Spark中返回abc_5.08,在Gluten中返回abc_5.079999923706055。浮点数5.08不能用二进制分数精确表达,近似表示成5.0799999237060546875。Velox通过函数folly::to<std::string>(T val)
来实现float类型到string类型的转换,这个函数底层依赖开源库google::double-conversion, folly里默认设置了输出宽度参数DoubleToStringConverter::SHORTEST(可以准确表示double类型的最小宽度)
,转换时经过四舍五入之后返回 5.079999923706055。我们把宽度参数改为DoubleToStringConverter::SHORTEST_SINGLE(可以准确表示float类型的最小宽度)
,转换时经过四舍五入之后返回 5.08。我们已上线了2万多ETL作业,平均内存资源节省40%+,平均执行时间减少13%,证明Gluten+Velox的向量化计算方案生产可行。向量化计算除了能提高计算效率,也能提高读写数据的效率,如某个作业的Input数据有30TB,过去需要执行7小时,绝大部份时间花在了读数据和解压缩上面。使用向量化引擎后,因为上文提到的ISA-L解压缩优化,列转行的开销节省,以及HDFS Native客户端优化,执行时间减少到2小时内。
我们已上线向量化计算的Spark任务只是小部分,计划2024年能让绝大部分的SQL任务运行在向量化引擎上。
扩大向量化算子和UDF范围。我们整理了影响权重最高的几十个算子回退问题与Gluten社区一起解决,对于大量内部UDF,则会探索用大模型来将UDF批量改写为C++版本的向量化实现。
扩大File format支持向量化范围。美团内部有约20%的表为textfile格式,还有接近10%的表使用内部开发的format,只能按行读取也不支持下推,加上行转列都会有额外性能开销,影响最终效果。我们将会把textfile全部转为ORC,为自研format提供C++客户端,进一步提升向量化计算性能。
美团核心本地商业-基础研发平台招聘引擎技术专家,如果你对大数据计算感兴趣,具有主流大数据计算引擎(包括但不限于Spark、Hive、Flink、Hudi、Iceberg等)的实际应用经验和原理了解,有引擎优化或平台化的相关经验,欢迎加入我们,详情参见:招聘信息。