用最通俗易懂的话来说,数据倾斜无非就是大量的相同key被partition分配到一个分区里,造成了'一个人累死,其他人闲死'的情况,这种情况是我们不能接受的,这也违背了并行计算的初衷,首先一个节点要承受着巨大的压力,而其他节点计算完毕后要一直等待这个忙碌的节点,也拖累了整体的计算时间,可以说效率是十分低下的。
(1)绝大多数task执行得都非常快,但个别task执行的极慢。
(2)原本能正常执行的Spark作业,某天突然爆出OOM(内存溢出)异常。观察异常栈,是我们写的业务代码造成的。
(1)增加jvm内存,这适用于第一种情况(唯一值非常少,极少数值有非常多的记录值(唯一值少于几千)),这种情况下,往往只能通过硬件的手段来进行调优,增加jvm内存可以显著的提高运行效率。
(2)增加reduce的个数,这适用于第二种情况(唯一值比较多,这个字段的某些值有远远多于其他值的记录数,但是它的占比也小于百分之一或千分之一),我们知道,这种情况下,最容易造成的结果就是大量相同key被partition到一个分区,从而一个reduce执行了大量的工作,而如果我们增加了reduce的个数,这种情况相对来说会减轻很多,毕竟计算的节点多了,就算工作量还是不均匀的,那也要小很多。
(1)数据倾斜只会发生在shuffle中,下面是常用的可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是代码中使用了这些算子的原因。
2 、造成数据倾斜的原因
2)、业务数据本身的特性
3)、建表时考虑不周
hive.map.aggr=true(是否在Map端进行聚合,默认为true),这个设置可以将顶层的聚合操作放在Map阶段执行,从而减轻清洗阶段数据传输和Reduce阶段的执行时间,提升总体性能
Set hive.groupby.skewindata=true(hive自动进行负载均衡)
b、大小表Join,开启mapjoin
set hive.auto.convert.join=true;
hive.mapjoin.smalltable.filesize=25000000( 即25M)
select
/*+mapjoin(b)*/
a.field1 as field1,
b.field2 as field2,
b.field3 as field3
from a left join b
on a.field1 = b.field1;
select field1,field2,field3…
from log a left join user b on a.userid is not null and a.userid=b.userid
union select field1,field2,field3 from log where userid is null;
select a,count(distinct b) from t group by a
可以写成 select a,sum(1) from (select a,b from t group by a,b) group by a;
select count (distinct key) from a
可以写成 Select sum(1) from (Select key from a group by key) t
select * from log a
join users b
on a.user_id is not null
and a.user_id = b.user_idunion allselect * from log a
where a.user_id is null;
select * from users a
left outer join logs b
on a.usr_id = cast(b.user_id as string)
select * from log a
left outer join users b
on a.user_id = b.user_id;
select /*+mapjoin(x)*/* from log a
left outer join (
select /*+mapjoin(c)*/d.*
from ( select distinct user_id from log ) c
join users d
on c.user_id = d.user_id
) x
on a.user_id = b.user_id;
spark.shuffle.statistics.verbose=true
spark.sql.autoBroadcastJoinThreshold=524288000
操作步骤:
1、对KEY赋值为1,便于下一步进行计数
2、对KEY进行累计
3、对KEY和VALUE交换
4、针对KEY按照字典进行倒排
5、将KEY和VAlUE位置交换,还原到真实的<KEY,VALUE>
6、从已排序的RDD中,直接取前N条
建议打散key进行二次聚合:采用对 非constant值、与key无关 的列进行hash取模,不要使用rand类函数。
dataframe
.groupBy(col("key"), pmod(hash(col("some_col")), 100)).agg(max("value").as("partial_max"))
.groupBy(col("key")).agg(max("partial_max").as("max"))
目前支持该模式下的倾斜window,(仅支持3.0)
select (... row_number() over(partition by ... order by ...) as rn)
where rn [==|<=|<] k and other conditionsspark.sql.rankLimit.enabled=true (目前支持基于row_number的topK计算逻辑)
Spark 2.4开启参数
spark.sql.adaptive.enabled=true
spark.shuffle.statistics.verbose=true
spark.sql.adaptive.skewedJoin.enabled=true
spark.sql.adaptive.allowAdditionalShuffle=true
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.enhance.enabled=true (通用倾斜算法,可处理更多场景)
spark.sql.adaptive.forceOptimizeSkewedJoin=true(允许插入额外shuffle,可处理更多场景)
其他参数:
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (默认为256MB,分区大小超过该阈值才可被识别为倾斜分区,如果希望调整的倾斜分区小于该阈值,可以酌情调小)
spark.sql.adaptive.skewJoin.inflation.enabled=true(默认false,由于采样计算会导致性能回归,正常任务不要开启)
spark.sql.adaptive.skewJoin.inflation.factor=50(默认为100,预估的分区输出大小超过中位数Xfactor才可被识别为膨胀分区,由于预估算法存在误差,一般不要低于50)
spark.sql.adaptive.shuffle.sampleSizePerPartition=500(默认100,每个Task中的采样数,基于该采样数据预估Join之后的分区大小,如果Task数量不大,可以酌情调大)
由于Join语义限制,对于A left join skewed B之类的场景,无法对B进行划分处理,否则会导致数据正确性问题,这也是Spark项目所面临的难题。如果开启以上功能依然不能处理数据倾斜,可以通过开启倾斜key检测功能来定位是哪些key导致了倾斜或膨胀,继而进行过滤等处理。
spark.sql.adaptive.shuffle.detectSkewness=true(默认false,由于采样计算会导致性能回归,正常任务不要开启)
spark.sql.adaptive.shuffle.sampleSizePerPartition=100(默认100,每个Task中的采样数,如果Task数量不大,可以酌情调大)