cover_image

由一个线上问题引出的ParallelStream最佳实践

俶真 玩物少年们
2021年02月04日 02:00

简介

Java8发布自2012年,距今已经有相当长的一段时间了,作为现在互联网公司当中最流行的一个Java版本,其许多特性也被大众广泛认可。Stream作为其中一个实用的特性,也常常被人津津乐道。ParallelStream是Stream中并行的一个版本,它有自己不可替代的地位,但是滥用会出现线上问题哦~

之前线上遇到了一个超时问题,最后排查到是因为ParallelStream导致的,这次采坑的经历也更大家分享一下,顺便说一下ParallelStream的原理与使用方式。


问题分析

不久前的一个晚上,线上一个节点出现了异常抖动,并导致了一些请求的超时,但持续时间非常短。

1.案发现场

按照现有的监控,我们能得到的一些指标

  • 那个节点rt上涨,并伴随着各种超时

  • 线程数、cpu、load均有上涨

  • 当时没有fgc,ygc时间也不长

  • io、流量、网络均正常

  • 上下文切换次数有较大幅度的下降

  • 当时下游应用、redis、数据库等都没有抖动

2.初步分析

其实上述现象比较常见,一般都是因为某些原因导致某个地方堵住了,为了保证应用的吞吐量,应用的线程数会涨上去,同时伴随着rt增高,cpu、load等指标上涨的情况。但是有一个地方很奇怪,就是当时上下游并没有存在任何hang住的地方。

因为问题持续的时间非常短,发生的时机非常随机,排查问题的利器jstack几乎没用,排查工作一度陷入僵局。只能根据日志,通过想象一点点尝试还原案发现场。日志观察也是排查这种瞬间毛刺的一个好方法,有时间可以专门出一期如何排查线上问题

3.解决问题

很幸运的是,在日志上我们发现了一些端倪。问题发生前,有一个调用频率很低的请求的日志,并且伴随着每次问题,这个请求都在之前出现,就好比连环凶杀案现场,监控总是拍到一个人,那他肯定有嫌疑。最终我们在代码里面发现了今天我们的主角的名字ParallelStream。

解决问题很简单,把ParallelStream换成普通的线程池写法就行了

至于为什么会导致这个问题,接下来我们来深入探究一下。

源码分析

1.并行与线程池


(1) Stream的实现

相对于一般的Stream来说,ParallelStream多了一层并行的语义,顾名思义,在流处理的时候它是并行的。一般涉及到并行就会与线程池相关,ParallelStream的实现也是依赖于线程池的。

 final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {        assert getOutputShape() == terminalOp.inputShape();        if (linkedOrConsumed)            throw new IllegalStateException(MSG_STREAM_LINKED);        linkedOrConsumed = true;
return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));    }
  

重点关注evaluateParallel方法,并行模式下,会走到这个分支。我们来看看evaluateParallel其中的一个实现

  @Override        public <P_IN> R evaluateParallel(PipelineHelper<T> helper,                                         Spliterator<P_IN> spliterator) {            return new ReduceTask<>(this, helper, spliterator).invoke().get();        }   
   

invoke的实现

 public final V invoke() {        int s;        if ((s = doInvoke() & DONE_MASK) != NORMAL)            reportException(s);        return getRawResult();    }   
private int doInvoke() { int s; Thread t; ForkJoinWorkerThread wt; return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (wt = (ForkJoinWorkerThread)t).pool. awaitJoin(wt.workQueue, this, 0L) : externalAwaitDone(); }
   

渐渐地我们引出了ForkJoin的Pool,对于ForkJoin不熟悉的小伙伴,不要着急,后面我们会详细介绍。

看到这里,我们知道了ParallelStream与ForkJoinPool的关系,但是我们在调用ParallelStream中并没有显式的传入线程池变量,那么ParallelStream用的到底是什么线程池呢?这里我就不贴冗长的源码了,直接说结果

  • 如果当前线程就是ForkJoinWorkerThread,那么线程池就是当前线程所属的ForkJoinPool

  • 反之这个线程池就是ForkJoinPool.commonPool()


(2) CommonPool

 static {        //冗长的参数设置代码省略
commonMaxSpares = DEFAULT_COMMON_MAX_SPARES; //默认线程工厂 defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); modifyThreadPermission = new RuntimePermission("modifyThread");
//开始建commonPool啦 common = java.security.AccessController.doPrivileged (new java.security.PrivilegedAction<ForkJoinPool>() { //重点关注makeCommonPool public ForkJoinPool run() { return makeCommonPool(); }}); int par = common.config & SMASK; // report 1 even if threads disabled commonParallelism = par > 0 ? par : 1; }

private static ForkJoinPool makeCommonPool() { int parallelism = -1; ForkJoinWorkerThreadFactory factory = null; UncaughtExceptionHandler handler = null; //从环境变量中获取参数 try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) factory = ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); } catch (Exception ignore) { } if (factory == null) { if (System.getSecurityManager() == null) factory = new DefaultCommonPoolForkJoinWorkerThreadFactory(); else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); } //并行度为处理器数-1,最小是1 if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); }
 
  • 这个线程池commonPool的初始化在ForkJoinPool#makeCommonPool

  • 并行度默认处理器数量减1(不是核数减1)可以通过java.util.concurrent.ForkJoinPool.common.parallelism参数手动设置


(3) 小结

简单总结一下,ParallelStream背后依赖于ForkJoinPool.commonPool线程池,这个池子大小为处理器数减1,可以通过环境变量控制


2.ForkJoin

(1) 简介

ForkJoin作为java8的另一大特性,我们另写一篇文章专门探究都是不过分的,这里也只能简单入个门,帮助理解ParallelStream。

先来看一张著名的图

图片

用大白话来解构一下这张图就是领导手里有一个任务,他拆成两个任务分给手下两个人,等他们都完成了,领导再把两个手下的任务报告合并起来,作为这整个任务的报告,其中“任务”类比为复杂的计算任务,“手下”与“领导”都是线程。大家是不是觉得虽然有3个人(三个线程),但是这个领导啥都不干(主线程啥都没干),就干等着,十分不合理?所以实际情况是领导(主线程)把任务拆分成两个(Fork),一个给手下,一个给自己。至此这个复杂的任务已经拆分了一次了,那么要是“手下”还是觉得任务太难太多怎么办呢?他自己也可以当一次“领导”,按照上面的步骤把任务拆分给“手下”。这个任务中用到的人数(线程数),我们称为并行度(parallelism)

在ForkJoinPool中,每个线程都有自己的任务队列,并且实际上任务会有快有慢,为了更好的利用线程,这里会有一种“工作窃取”机制——有些“手下”干得慢,就帮别的“手下”多干点活儿。

ParallelStream的底层线程池就是ForkJoinPool,进行遍历的时候,也会对集合中的元素进行拆分,并行着进行遍历。


(2) 工作窃取

普通的线程池,应用所有worker公用一个任务队列,所有没有窃取这种说法(任务是大家的,不是自己的)。但是ForkJoinPool每个Woker会有自己的工作队列(workQueue),所以会有窃取的概念。

  //开始跑任务    public void run() {        if (workQueue.array == null) { // only run once            Throwable exception = null;            try {                onStart();                //重点关注                pool.runWorker(workQueue);            } catch (Throwable ex) {                exception = ex;            } finally {                try {                    onTermination(exception);                } catch (Throwable ex) {                    if (exception == null)                        exception = ex;                } finally {                    pool.deregisterWorker(this, exception);                }            }        }    }
final void runWorker(WorkQueue w) { w.growArray(); // allocate queue int seed = w.hint; // initially holds randomization hint int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift for (ForkJoinTask<?> t;;) { //scan获取任务(工作窃取在吗在这儿) if ((t = scan(w, r)) != null) w.runTask(t); else if (!awaitWork(w, r)) break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } }
private ForkJoinTask<?> scan(WorkQueue w, int r) { WorkQueue[] ws; int m; if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { //各种不为空判断 int ss = w.scanState; // initially non-negative for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t; int b, n; long c; if ((q = ws[k]) != null) { //找个队列(k的计算一直要往上追溯,实际是个随机值) if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { // non-empty long i = (((a.length - 1) & b) << ASHIFT) + ABASE; //b是尾端下标,ASHIFT是数组元素偏移量,ABASE是基础的偏移量,使用偏移量方便后面用cas if ((t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i))) != null && q.base == b) { if (ss >= 0) { //>=0说明非空闲,那肯定有任务可以偷 if (U.compareAndSwapObject(a, i, t, null)) { //cas获取任务 q.base = b + 1; if (n < -1) // signal others signalWork(ws, q); return t; } } else if (oldSum == 0 && // try to activate w.scanState < 0) tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); //明明有任务,偷懒不干活,叫起来干活 } //任务获取失败,说明发生竞争,重新开始随机 if (ss < 0) // refresh ss = w.scanState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m; // move and rescan oldSum = checkSum = 0; continue; } checkSum += b; } if ((k = (k + 1) & m) == origin) { // continue until stable 没任务可偷,k自增遍历下个队列,与上掩码m和起始值origin相同说明已经遍历一个来回了 if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) // already inactive break; //如果跑了一圈还是闲着的,那就算了,退出循环。 int ns = ss | INACTIVE; // try to inactivate long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); w.stackPred = (int)c; // hold prev stack top U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) ss = ns; else w.scanState = ss; // back out } checkSum = 0; } } } return null; }
      

整个执行任务的过程围绕工作窃取展开,随机从一个队列中获取任务,直到循环了一圈都没拿到任务。一般这个循环会按照线性自增进行遍历,但如果发生明显的竞争,则重新随机地选择一个队列开始。


代码还是相对晦涩的,尤其是变量命名,但细想这些变量命名又是一些常用变量的缩写,也算是雅信达了,大家有时间可以品一品。


(3) 小结

ForkJoin源码有助于我们更加深入理解ParallelStream


3.注意的地方

其实看完原理的分析,这个ParallelStream可能会涉及到的问题,就一目了然了


(1) 非线程安全集合的使用

我们在使用ParallelStream的过程中,稍微不注意,就会使用一些线程不安全的集合操作。比如说在ParallelStream中对ArrayList进行一些操作等。因为ParallelStream中的操作可以理解为都是并行的,使用线程不安全的集合就会造成问题。常见的一些线程不安全的集合,例如ArrayList,HashMap,HashSet等,在普通Stream中使用是没有问题的,但是在ParallelStream中使用,就会有问题。我们在使用过程中一定要注意这个问题,不然就会导致一系列问题,报并发异常,集合少元素,集合存在元素为空等现象。

举一反三,一般的多线程编程过程中,也要注意这一点,code review的时候经常会看到有通过在多线程里面往ArrayList里面塞元素,这其实是有问题的。


但是,注意到了这一点,我们就能随心所欲地使用ParallelStream了吗?答案也不是的,ParallelStream存在一个致命的问题。


(2) 全局公用的线程池CommonPool

这个问题藏得比较隐蔽。我们知道ParallelStream底层使用的是ForkJoinPool的CommonPool,这个线程池其实并不只有ParallelStream在用,CompletableFuture也会使用的,但是好就好在CompletableFuture是可以用线程池作为入参的,也就是说CompletableFuture是可以做到底层不使用CommonPool的。

但是,只有ParallelStream使用ForkJoinPool.CommonPool难道就不会有问题了吗?本身线程池的创建时参数的设置就是十分复杂的,只用一个线程池会有以下一系列的问题

  • 一些本身rt不低的但需要减少rt的请求,伴随着不小的并发,使用这个线程数为执行器数减1的线程池,除了第一次请求,后续请求都不会达到想要的效果。可以理解为,本身一个部门就没多少人(CommonPool线程池数量不多),一下子来了超多的任务(请求高并发),任务肯定就积压了(实际吞吐量远不如一个请求一个线程)

  • 因为不同类型的(cpu密集,io密集),不同要求(优先rt,优先吞吐量)的任务公用一个线程池,任务之间会相互影响。你的请求平时量不大你想要低rt,用到了ParallelStream,别人的请求量很大,想要减少线程切换提高吞吐量,也用到了ParallelStream,这两者在一起就会相互影响,最后rt不低,吞吐量不高。


(3) 小结

并发编程有许多要注意的地方,ParallelStream本质也是并发编程,一些注意的点不能忘记;

作为开发来说,你不知道你之前的程序员会用ParallelStream干了什么,也不知道你之后的程序员会干些什么。那么编写使用ParallelStream的代码就会非常危险。


稍佳实践

说了那么多注意点,那ParallelStream他的最佳实践是什么呢?这边抛砖引玉,简单列举一些稍佳的实践。

1.cpu密集型任务

ForkJoinPool.CommonPool的线程数就很明确得说明了,这个线程池其实更适合cpu密集型的任务,同样的,ParallelStream也是更加适合进行一些cpu密集的操作。比如说统计一篇文章里面每个字母出现的次数等等之类的,这就有发挥大家的想象了。


2.使用ManagedBlocker

这个ManagedBlocker其实是ForkJoinPool中的一个特性,他可以显式地告诉线程池当前执行卡住了,如果并行度不够,那就要另外重新再开线程保证任务的执行。

public static interface ManagedBlocker {        /**         * Possibly blocks the current thread, for example waiting for         * a lock or condition.         *         * @return {@code true} if no additional blocking is necessary         * (i.e., if isReleasable would return true)         * @throws InterruptedException if interrupted while waiting         * (the method is not required to do so, but is allowed to)         * 具体用于阻塞的方法,需要自己实现,比如说等待一把锁,或者一个Condition         */        boolean block() throws InterruptedException;
/** * Returns {@code true} if blocking is unnecessary. * @return {@code true} if blocking is unnecessary * 返回当前阻塞是不是必要的 */ boolean isReleasable(); }
  

这个应该是官方给出的最优的解法了,这样一来,我们的ForkJoinPool就可以用来执行一些io密集型的任务了。但就实际效果来看,不是很建议,首先使用难度高不说,存在着很多更好,更简单的解决方案(没必要硬着头皮用)


3.自定义线程池提交ParallelStream

我们在前面“Stream的实现-doInvoke的实现”中,我们知道,ParallelStream优先使用的是当前线程所在的ForkJoinPool,所以我们可以通过向自定义线程池提交ParallelStream任务的方式,指定ParallelStream底层使用的线程池。

ForkJoinPool forkJoinPool = new ForkJoinPool(4);forkJoinPool.submit(() -> xxx.parallelStream().collect(Collectors.toList())).get();

但这个代码就有点脱了裤子放屁的感觉了,为什么不直接使用普通的线程池,普通的提交任务,而是用ParallelStream呢?


总结

ParallelStream作为并行计算的一把利器,其设计的出发点是好的,但是使用场景的错误,以及使用方式的错误也让它变得不是那么可靠。我们应该理解其原理,更加正确,更加合理的使用这把双刃剑。


若有收获,就点个赞吧



图片


继续滑动看下一个
玩物少年们
向上滑动看下一个