简介
Java8发布自2012年,距今已经有相当长的一段时间了,作为现在互联网公司当中最流行的一个Java版本,其许多特性也被大众广泛认可。Stream作为其中一个实用的特性,也常常被人津津乐道。ParallelStream是Stream中并行的一个版本,它有自己不可替代的地位,但是滥用会出现线上问题哦~
之前线上遇到了一个超时问题,最后排查到是因为ParallelStream导致的,这次采坑的经历也更大家分享一下,顺便说一下ParallelStream的原理与使用方式。
问题分析
不久前的一个晚上,线上一个节点出现了异常抖动,并导致了一些请求的超时,但持续时间非常短。
1.案发现场
按照现有的监控,我们能得到的一些指标
那个节点rt上涨,并伴随着各种超时
线程数、cpu、load均有上涨
当时没有fgc,ygc时间也不长
io、流量、网络均正常
上下文切换次数有较大幅度的下降
当时下游应用、redis、数据库等都没有抖动
2.初步分析
其实上述现象比较常见,一般都是因为某些原因导致某个地方堵住了,为了保证应用的吞吐量,应用的线程数会涨上去,同时伴随着rt增高,cpu、load等指标上涨的情况。但是有一个地方很奇怪,就是当时上下游并没有存在任何hang住的地方。
因为问题持续的时间非常短,发生的时机非常随机,排查问题的利器jstack几乎没用,排查工作一度陷入僵局。只能根据日志,通过想象一点点尝试还原案发现场。日志观察也是排查这种瞬间毛刺的一个好方法,有时间可以专门出一期如何排查线上问题
3.解决问题
很幸运的是,在日志上我们发现了一些端倪。问题发生前,有一个调用频率很低的请求的日志,并且伴随着每次问题,这个请求都在之前出现,就好比连环凶杀案现场,监控总是拍到一个人,那他肯定有嫌疑。最终我们在代码里面发现了今天我们的主角的名字ParallelStream。
解决问题很简单,把ParallelStream换成普通的线程池写法就行了
至于为什么会导致这个问题,接下来我们来深入探究一下。
源码分析
1.并行与线程池
(1) Stream的实现
相对于一般的Stream来说,ParallelStream多了一层并行的语义,顾名思义,在流处理的时候它是并行的。一般涉及到并行就会与线程池相关,ParallelStream的实现也是依赖于线程池的。
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
重点关注evaluateParallel方法,并行模式下,会走到这个分支。我们来看看evaluateParallel其中的一个实现
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
invoke的实现
public final V invoke() {
int s;
if ((s = doInvoke() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
渐渐地我们引出了ForkJoin的Pool,对于ForkJoin不熟悉的小伙伴,不要着急,后面我们会详细介绍。
看到这里,我们知道了ParallelStream与ForkJoinPool的关系,但是我们在调用ParallelStream中并没有显式的传入线程池变量,那么ParallelStream用的到底是什么线程池呢?这里我就不贴冗长的源码了,直接说结果
如果当前线程就是ForkJoinWorkerThread,那么线程池就是当前线程所属的ForkJoinPool
反之这个线程池就是ForkJoinPool.commonPool()
(2) CommonPool
static {
//冗长的参数设置代码省略
commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
//默认线程工厂
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
//开始建commonPool啦
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
//重点关注makeCommonPool
public ForkJoinPool run() { return makeCommonPool(); }});
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
//从环境变量中获取参数
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if (pp != null)
parallelism = Integer.parseInt(pp);
if (fp != null)
factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
if (factory == null) {
if (System.getSecurityManager() == null)
factory = new DefaultCommonPoolForkJoinWorkerThreadFactory();
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
//并行度为处理器数-1,最小是1
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
这个线程池commonPool的初始化在ForkJoinPool#makeCommonPool
并行度默认处理器数量减1(不是核数减1)可以通过java.util.concurrent.ForkJoinPool.common.parallelism参数手动设置
(3) 小结
简单总结一下,ParallelStream背后依赖于ForkJoinPool.commonPool线程池,这个池子大小为处理器数减1,可以通过环境变量控制
2.ForkJoin
(1) 简介
ForkJoin作为java8的另一大特性,我们另写一篇文章专门探究都是不过分的,这里也只能简单入个门,帮助理解ParallelStream。
先来看一张著名的图
用大白话来解构一下这张图就是领导手里有一个任务,他拆成两个任务分给手下两个人,等他们都完成了,领导再把两个手下的任务报告合并起来,作为这整个任务的报告,其中“任务”类比为复杂的计算任务,“手下”与“领导”都是线程。大家是不是觉得虽然有3个人(三个线程),但是这个领导啥都不干(主线程啥都没干),就干等着,十分不合理?所以实际情况是领导(主线程)把任务拆分成两个(Fork),一个给手下,一个给自己。至此这个复杂的任务已经拆分了一次了,那么要是“手下”还是觉得任务太难太多怎么办呢?他自己也可以当一次“领导”,按照上面的步骤把任务拆分给“手下”。这个任务中用到的人数(线程数),我们称为并行度(parallelism)
在ForkJoinPool中,每个线程都有自己的任务队列,并且实际上任务会有快有慢,为了更好的利用线程,这里会有一种“工作窃取”机制——有些“手下”干得慢,就帮别的“手下”多干点活儿。
ParallelStream的底层线程池就是ForkJoinPool,进行遍历的时候,也会对集合中的元素进行拆分,并行着进行遍历。
(2) 工作窃取
普通的线程池,应用所有worker公用一个任务队列,所有没有窃取这种说法(任务是大家的,不是自己的)。但是ForkJoinPool每个Woker会有自己的工作队列(workQueue),所以会有窃取的概念。
//开始跑任务
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
//重点关注
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask<?> t;;) {
//scan获取任务(工作窃取在吗在这儿)
if ((t = scan(w, r)) != null)
w.runTask(t);
else if (!awaitWork(w, r))
break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
//各种不为空判断
int ss = w.scanState; // initially non-negative
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
if ((q = ws[k]) != null) {
//找个队列(k的计算一直要往上追溯,实际是个随机值)
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
//b是尾端下标,ASHIFT是数组元素偏移量,ABASE是基础的偏移量,使用偏移量方便后面用cas
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null &&
q.base == b) {
if (ss >= 0) {
//>=0说明非空闲,那肯定有任务可以偷
if (U.compareAndSwapObject(a, i, t, null)) {
//cas获取任务
q.base = b + 1;
if (n < -1) // signal others
signalWork(ws, q);
return t;
}
}
else if (oldSum == 0 && // try to activate
w.scanState < 0)
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
//明明有任务,偷懒不干活,叫起来干活
}
//任务获取失败,说明发生竞争,重新开始随机
if (ss < 0) // refresh
ss = w.scanState;
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;
}
if ((k = (k + 1) & m) == origin) { // continue until stable
没任务可偷,k自增遍历下个队列,与上掩码m和起始值origin相同说明已经遍历一个来回了
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // already inactive
break;
//如果跑了一圈还是闲着的,那就算了,退出循环。
int ns = ss | INACTIVE; // try to inactivate
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
w.stackPred = (int)c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
w.scanState = ss; // back out
}
checkSum = 0;
}
}
}
return null;
}
整个执行任务的过程围绕工作窃取展开,随机从一个队列中获取任务,直到循环了一圈都没拿到任务。一般这个循环会按照线性自增进行遍历,但如果发生明显的竞争,则重新随机地选择一个队列开始。
代码还是相对晦涩的,尤其是变量命名,但细想这些变量命名又是一些常用变量的缩写,也算是雅信达了,大家有时间可以品一品。
(3) 小结
ForkJoin源码有助于我们更加深入理解ParallelStream
3.注意的地方
其实看完原理的分析,这个ParallelStream可能会涉及到的问题,就一目了然了
(1) 非线程安全集合的使用
我们在使用ParallelStream的过程中,稍微不注意,就会使用一些线程不安全的集合操作。比如说在ParallelStream中对ArrayList进行一些操作等。因为ParallelStream中的操作可以理解为都是并行的,使用线程不安全的集合就会造成问题。常见的一些线程不安全的集合,例如ArrayList,HashMap,HashSet等,在普通Stream中使用是没有问题的,但是在ParallelStream中使用,就会有问题。我们在使用过程中一定要注意这个问题,不然就会导致一系列问题,报并发异常,集合少元素,集合存在元素为空等现象。
举一反三,一般的多线程编程过程中,也要注意这一点,code review的时候经常会看到有通过在多线程里面往ArrayList里面塞元素,这其实是有问题的。
但是,注意到了这一点,我们就能随心所欲地使用ParallelStream了吗?答案也不是的,ParallelStream存在一个致命的问题。
(2) 全局公用的线程池CommonPool
这个问题藏得比较隐蔽。我们知道ParallelStream底层使用的是ForkJoinPool的CommonPool,这个线程池其实并不只有ParallelStream在用,CompletableFuture也会使用的,但是好就好在CompletableFuture是可以用线程池作为入参的,也就是说CompletableFuture是可以做到底层不使用CommonPool的。
但是,只有ParallelStream使用ForkJoinPool.CommonPool难道就不会有问题了吗?本身线程池的创建时参数的设置就是十分复杂的,只用一个线程池会有以下一系列的问题
一些本身rt不低的但需要减少rt的请求,伴随着不小的并发,使用这个线程数为执行器数减1的线程池,除了第一次请求,后续请求都不会达到想要的效果。可以理解为,本身一个部门就没多少人(CommonPool线程池数量不多),一下子来了超多的任务(请求高并发),任务肯定就积压了(实际吞吐量远不如一个请求一个线程)
因为不同类型的(cpu密集,io密集),不同要求(优先rt,优先吞吐量)的任务公用一个线程池,任务之间会相互影响。你的请求平时量不大你想要低rt,用到了ParallelStream,别人的请求量很大,想要减少线程切换提高吞吐量,也用到了ParallelStream,这两者在一起就会相互影响,最后rt不低,吞吐量不高。
(3) 小结
并发编程有许多要注意的地方,ParallelStream本质也是并发编程,一些注意的点不能忘记;
作为开发来说,你不知道你之前的程序员会用ParallelStream干了什么,也不知道你之后的程序员会干些什么。那么编写使用ParallelStream的代码就会非常危险。
稍佳实践
说了那么多注意点,那ParallelStream他的最佳实践是什么呢?这边抛砖引玉,简单列举一些稍佳的实践。
1.cpu密集型任务
ForkJoinPool.CommonPool的线程数就很明确得说明了,这个线程池其实更适合cpu密集型的任务,同样的,ParallelStream也是更加适合进行一些cpu密集的操作。比如说统计一篇文章里面每个字母出现的次数等等之类的,这就有发挥大家的想象了。
2.使用ManagedBlocker
这个ManagedBlocker其实是ForkJoinPool中的一个特性,他可以显式地告诉线程池当前执行卡住了,如果并行度不够,那就要另外重新再开线程保证任务的执行。
public static interface ManagedBlocker {
/**
* Possibly blocks the current thread, for example waiting for
* a lock or condition.
*
* @return {@code true} if no additional blocking is necessary
* (i.e., if isReleasable would return true)
* @throws InterruptedException if interrupted while waiting
* (the method is not required to do so, but is allowed to)
* 具体用于阻塞的方法,需要自己实现,比如说等待一把锁,或者一个Condition
*/
boolean block() throws InterruptedException;
/**
* Returns {@code true} if blocking is unnecessary.
* @return {@code true} if blocking is unnecessary
* 返回当前阻塞是不是必要的
*/
boolean isReleasable();
}
这个应该是官方给出的最优的解法了,这样一来,我们的ForkJoinPool就可以用来执行一些io密集型的任务了。但就实际效果来看,不是很建议,首先使用难度高不说,存在着很多更好,更简单的解决方案(没必要硬着头皮用)
3.自定义线程池提交ParallelStream
我们在前面“Stream的实现-doInvoke的实现”中,我们知道,ParallelStream优先使用的是当前线程所在的ForkJoinPool,所以我们可以通过向自定义线程池提交ParallelStream任务的方式,指定ParallelStream底层使用的线程池。
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
forkJoinPool.submit(() -> xxx.parallelStream().collect(Collectors.toList())).get();
但这个代码就有点脱了裤子放屁的感觉了,为什么不直接使用普通的线程池,普通的提交任务,而是用ParallelStream呢?
总结
ParallelStream作为并行计算的一把利器,其设计的出发点是好的,但是使用场景的错误,以及使用方式的错误也让它变得不是那么可靠。我们应该理解其原理,更加正确,更加合理的使用这把双刃剑。
若有收获,就点个赞吧