Java多线程之CompletableFuture
01
CompletableFuture中的方法
1.1 简介
CompletableFuture是java.util.concurrent库在java 8中新增的主要工具,同传统的Future相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性。Future接口相对简单些,提供了取消(cancel)、获取结果(get)、检测是否完成或者取消(isCancelled、isDone)这些方法,Future接口提供了异步获取结果的能力。CompletableFuture还能够将任务放到不同的线程中执行, 既可以在当前线程中直接执行任务,也可以将其放到其他任务线程中执行,这个过程是自动的,无需干预。
CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。CompletionStage相当于“阶段”,一个CompletionStage对象是异步计算中的一个阶段,当 一个CompletionStage完成时会触发下一个动作或计算。 CompletionStage解决了Future的一些问题: Future没有提供通知机制,Future是否执行完任务需要不断的检查或者调用get()方法阻塞任务执行,CompletionStage完美的解决了该问题,前一个任务 执行成功后可以自动触发下一个任务的执行,中间无需等待。CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果 。
1.2 创建CompletableFuture
CompletableFuture 在创建时,如果传入线程池,那么会使用指定的线程池工作。如果没传入,那么会使用默认的 ForkJoinPool。ForkJoinPool的优势在于,可以充分利用多核cpu的优势,把一个任务拆分成多个小任务,把多个小任务放到多个处理器核心上并行执行;当多个小任务执行完成之后,再将这些执行结果合并起来即可。ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTasktask) 或invoke(ForkJoinTasktask)方法来执行指定任务了。其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。
1.2.1 构造函数创建
最简单的方式就是通过构造函数创建一个CompletableFuture实例。如下代码所示。由于新创建的CompletableFuture还没有任何计算结果,这时调用join,当前线程会一直阻塞在这里。
CompletableFuture<String> future = new CompletableFuture();
String result = future.join();
System.out.println(result);
此时,如果在另外一个线程中,主动设置该CompletableFuture的值,则上面线程中的结果就能返回。
future.complete("test");
1.2.2 supplyAsync创建
CompletableFuture.supplyAsync()也可以用来创建CompletableFuture实例。通过该函数创建的CompletableFuture实例会异步执行当前传入的计算任务。在调用端,则可以通过get或join获取最终计算结果。supplyAsync有两种签名:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
第一种只需传入一个Supplier实例(一般使用lamda表达式),此时框架会默认使用ForkJoin线程池来执行被提交的任务。第二种可以指定自定义的线程池,然后将任务提交给该线程池执行。下面为使用supplyAsync创建CompletableFuture的示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("compute test");
return "test";
}
);
String result = future.join();
System.out.println("get result: " + result);
在示例中,异步任务中会打印出compute test,并返回test作为最终计算结果。所以,最终的打印信息为get result: test
1.2.3 runAsync创建
CompletableFuture.runAsync()也可以用来创建CompletableFuture实例。与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值。因此,runAsync适合创建不需要返回值的计算任务。同supplyAsync()类似,runAsync()也有两种签名:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
下面为使用runAsync()的例子:
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
System.out.println("compute test");
});
System.out.println("get result: " + future.join());
由于任务没有返回值, 所以最后的打印结果是get result: null
1.3 异步回调方法
同Future相比,CompletableFuture最大的不同是支持流式(Stream)的计算处理,多个任务之间,可以前后相连,从而形成一个计算流。比如:任务1产生的结果,可以直接作为任务2的入参,参与任务2的计算,以此类推。CompletableFuture中常用的流式连接函数包括:
thenApply——有入参有返回
thenApplyAsync——有入参有返回
thenAccept——有入参无返回
thenAcceptAsync——有入参无返回
thenRun——无入参无返回
thenRunAsync——无入参无返回
thenCombinethen
CombineAsyncthen
Composethen
ComposeAsyncwhen
Completewhen
CompleteAsync
handle
handleAsync
其中,带Async后缀的函数表示需要连接的后置任务会被单独提交到线程池中,从而相对前置任务来说是异步运行的。除此之外,两者没有其他区别。因此,为了快速理解,在接下来的介绍中,我们主要介绍不带Async的版本。
1.3.1 thenApply / thenAccept / thenRun互相依赖
这里将thenApply / thenAccept / thenRun放在一起讲,因为这几个连接函数之间的唯一区别是提交的任务类型不一样 :
thenApply提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果
thenAccept提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果
thenRun提交的任务类型需遵从Runnable签名,即没有入参也没有返回值
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
System.out.println("compute 1");
return 1;
});
CompletableFuture<Integer> future2 = future1.thenApply((p)->{
System.out.println("compute 2");
return p+10;
});
System.out.println("result: " + future2.join());
在上面的示例中,future1通过调用thenApply将后置任务连接起来,并形成future2。该示例的最终打印结果为11,可见程序在运行中,future1的结果计算出来后,会传递给通过thenApply连接的任务,从而产生future2的最终结果为1+10=11。当然,在实际使用中,我们理论上可以无限连接后续计算任务,从而实现链条更长的流式计算。
需要注意的是,通过thenApply / thenAccept / thenRun连接的任务,当且仅当前置任务计算完成时,才会开始后置任务的计算。因此,这组函数主要用于连接前后有依赖的任务链。
1.3.1.1 thenApply
thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,测试用例如下:
@Test
public void test6() throws Exception {
ForkJoinPool pool=new ForkJoinPool();
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
return 1.2;
},pool);
//cf关联的异步任务的返回值作为方法入参,传入到thenApply的方法中
CompletableFuture cf2=cf.thenApply((result)->{
System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
return "test:"+result;
}).thenAccept((result)-> {
//接收上一个任务的执行结果作为入参,但是没有返回值
System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println(result);
System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
}).thenRun(()->{
//无入参,也没有返回值
System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("thenRun do something");
System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis());
});
System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
//等待子任务执行完成
System.out.println("run result->"+cf.get());
System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis());
//cf2 等待最后一个thenRun执行完成
System.out.println("run result->"+cf2.get());
System.out.println("main thread exit,time->"+System.currentTimeMillis());
}
1.3.2 exceptionally有返回
exceptionally方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中,如果该任务正常执行则会exceptionally方法返回的CompletionStage的result就是该任务正常执行的结果
@Test
public void test2() throws Exception {
ForkJoinPool pool=new ForkJoinPool();
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
if(true){
throw new RuntimeException("test");
}else{
System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
return 1.2;
}
},pool);
//cf执行异常时,将抛出的异常作为入参传递给回调方法
CompletableFuture<Double> cf2= cf.exceptionally((param)->{
System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("error stack trace->");
param.printStackTrace();
System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
return -1.1;
});
//cf正常执行时执行的逻辑,如果执行异常则不调用此逻辑
CompletableFuture cf3=cf.thenAccept((param)->{
System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("param->"+param);
System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
});
System.out.println("main thread start,time->"+System.currentTimeMillis());
//等待子任务执行完成,此处无论是cf2和cf3都可以实现job2退出,主线程才退出,如果是cf,则主线程不会等待job2执行完成自动退出了
//cf2.get时,没有异常,但是依然有返回值,就是cf的返回值
System.out.println("run result->"+cf2.get());
System.out.println("main thread exit,time->"+System.currentTimeMillis());
}
1.3.3 whenComplete无返回
whenComplete主要用于注入任务完成时的回调通知逻辑。这个解决了传统future在任务完成时,无法主动发起通知的问题。前置任务会将计算结果或者抛出的异常作为入参传递给回调通知函数。
以下为示例:
@Test
public void test10() throws Exception {
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
if(false){
throw new RuntimeException("test");
}else{
System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
return 1.2;
}
});
//cf执行完成后会将执行结果和执行过程中抛出的异常传入回调方法,如果是正常执行的则传入的异常为null
CompletableFuture<Double> cf2=cf.whenComplete((a,b)->{
System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
if(b!=null){
System.out.println("error stack trace->");
b.printStackTrace();
}else{
System.out.println("run succ,result->"+a);
}
System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
});
//等待子任务执行完成
System.out.println("main thread start wait,time->"+System.currentTimeMillis());
//如果cf是正常执行的,cf2.get的结果就是cf执行的结果
//如果cf是执行异常,则cf2.get会抛出异常
System.out.println("run result->"+cf2.get());
System.out.println("main thread exit,time->"+System.currentTimeMillis());
}
1.3.4 handle有返回
handle与whenComplete的作用有些类似,但是handle接收的处理函数有返回值,而且返回值会影响最终获取的计算结果。handle方法返回的CompletableFuture的result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关了
以下为示例:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
System.out.println("compute 1");
return 1;
});
CompletableFuture<Integer> future2 = future1.handle((r, e)->{
if(e != null){
System.out.println("compute failed!");
return r;
} else {
System.out.println("received result is " + r);
return r + 10;
}
});
System.out.println("result: " + future2.join());
在以上示例中,打印出的最终结果为11。说明经过handle计算后产生了新的结果
@Test
public void test10() throws Exception {
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
if(true){
throw new RuntimeException("test");
}else{
System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
return 1.2;
}
});
//cf执行完成后会将执行结果和执行过程中抛出的异常传入回调方法,如果是正常执行的则传入的异常为null
CompletableFuture<String> cf2=cf.handle((a,b)->{
System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
if(b!=null){
System.out.println("error stack trace->");
b.printStackTrace();
}else{
System.out.println("run succ,result->"+a);
}
System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
if(b!=null){
return "run error";
}else{
return "run succ";
}
});
//等待子任务执行完成
System.out.println("main thread start wait,time->"+System.currentTimeMillis());
//get的结果是cf2的返回值,跟cf没关系了
System.out.println("run result->"+cf2.get());
System.out.println("main thread exit,time->"+System.currentTimeMillis());
}
1.4 异步组合方法
1.4.1 thenCombine / thenAcceptBoth / runAfterBoth互相不依赖
这三个方法都是将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务,区别在于,thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果thenCombine最大的不同是连接任务可以是一个独立的。
CompletableFuture(或者是任意实现了CompletionStage的类型),从而允许前后连接的两个任务可以并行执行(后置任务不需要等待前置任务执行完成),最后当两个任务均完成时,再将其结果同时传递给下游处理任务,从而得到最终结果。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
System.out.println("compute 1");
return 1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(()->{
System.out.println("compute 2");
return 10;
});
CompletableFuture<Integer> future3 = future1.thenCombine(future2, (r1, r2)->r1 + r2);
System.out.println("result: " + future3.join());
上面示例代码中,future1和future2为独立的CompletableFuture任务,他们分别会在各自的线程中并行执行,然后future1通过thenCombine与future2连接,并且以lamda表达式传入处理结果的表达式,该表达式代表的任务会将future1与future2的结果作为入参并计算他们的和。因此,上面示例代码中,最终的打印结果是11。
一般,在连接任务之间互相不依赖的情况下,可以使用thenCombine来连接任务,从而提升任务之间的并发度。
注意,thenAcceptBoth、thenAcceptBothAsync、runAfterBoth、runAfterBothAsync的作用与thenConbime类似,唯一不同的地方是任务类型不同,分别是BiConumser、Runnable
@Test
public void test7() throws Exception {
ForkJoinPool pool=new ForkJoinPool();
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
return 1.2;
});
CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
return 3.2;
});
//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,且有返回值
CompletableFuture<Double> cf3=cf.thenCombine(cf2,(a,b)->{
System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
System.out.println("job3 param a->"+a+",b->"+b);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
return a+b;
});
//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,无返回值
CompletableFuture cf4=cf.thenAcceptBoth(cf2,(a,b)->{
System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis());
System.out.println("job4 param a->"+a+",b->"+b);
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis());
});
//cf4和cf3都执行完成后,执行cf5,无入参,无返回值
CompletableFuture cf5=cf4.runAfterBoth(cf3,()->{
System.out.println(Thread.currentThread()+" start job5,time->"+System.currentTimeMillis());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("cf5 do something");
System.out.println(Thread.currentThread()+" exit job5,time->"+System.currentTimeMillis());
});
System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
//等待子任务执行完成
System.out.println("cf run result->"+cf.get());
System.out.println("main thread start cf5.get(),time->"+System.currentTimeMillis());
System.out.println("cf5 run result->"+cf5.get());
System.out.println("main thread exit,time->"+System.currentTimeMillis());
}
1.4.2 applyToEither / acceptEither / runAfterEither
这三个方法都是将两个CompletableFuture组合起来,只要其中一个执行完了就会执行某个任务(其他线程依然会继续执行),其区别在于applyToEither会将已经执行完成的任务的执行结果作为方法入参,并有返回值;acceptEither同样将已经执行完成的任务的执行结果作为方法入参,但是没有返回值;runAfterEither没有方法入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。测试用例如下:
@Test
public void test8() throws Exception {
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
return 1.2;
});
CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
return 3.2;
});
//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,且有返回值
CompletableFuture<Double> cf3=cf.applyToEither(cf2,(result)->{
System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
System.out.println("job3 param result->"+result);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
return result;
});
//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,无返回值
CompletableFuture cf4=cf.acceptEither(cf2,(result)->{
System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis());
System.out.println("job4 param result->"+result);
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis());
});
//cf4和cf3都执行完成后,执行cf5,无入参,无返回值
CompletableFuture cf5=cf4.runAfterEither(cf3,()->{
System.out.println(Thread.currentThread()+" start job5,time->"+System.currentTimeMillis());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("cf5 do something");
System.out.println(Thread.currentThread()+" exit job5,time->"+System.currentTimeMillis());
});
System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
//等待子任务执行完成
System.out.println("cf run result->"+cf.get());
System.out.println("main thread start cf5.get(),time->"+System.currentTimeMillis());
System.out.println("cf5 run result->"+cf5.get());
System.out.println("main thread exit,time->"+System.currentTimeMillis());
}
1.4.3 thenCompose互相依赖
前面讲了thenCombine主要用于没有前后依赖关系之间的任务进行连接。那么,如果两个任务之间有前后依赖关系,但是连接任务又是独立的CompletableFuture,该怎么实现呢?
先来看一下直接使用thenApply实现:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
System.out.println("compute 1");
return 1;
});
CompletableFuture<CompletableFuture<Integer>> future2 = future1.thenApply( (r) -> CompletableFuture.supplyAsync(()->r+10));
System.out.println(future2.join().join());
可以发现,上面示例代码中,future2的类型变成了CompletableFuture嵌套,而且在获取结果的时候,也需要嵌套调用join或者get。这样,当连接的任务越多时,代码会变得越来越复杂,嵌套获取层级也越来越深。因此,需要一种方式,能将这种嵌套模式展开,使其没有那么多层级。thenCompose的主要目的就是解决这个问题(这里也可以将thenCompose的作用类比于stream接口中的flatMap,因为他们都可以将类型嵌套展开)。
看一下通过thenCompose如何实现上面的代码:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
System.out.println("compute 1");
return 1;
});
CompletableFuture<Integer> future2 = future1.thenCompose((r)->CompletableFuture.supplyAsync(()->r+10));
System.out.println(future2.join());
通过示例代码可以看出来,很明显,在使用了thenCompose后,future2不再存在CompletableFuture类型嵌套了,从而比较简洁的达到了我们的目的。thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例,如果该CompletableFuture实例的result不为null,则返回一个基于该result的新的CompletableFuture实例;如果该CompletableFuture实例为null,则返回一个void的CompletableFuture实例 ,然后执行这个新任务。
@Test
public void test9() throws Exception {
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
return 1.2;
});
CompletableFuture<String> cf2= cf.thenCompose((param)->{
System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
return CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
return "job3 test";
});
});
System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
//等待子任务执行完成
System.out.println("cf run result->"+cf.get());
System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis());
System.out.println("cf2 run result->"+cf2.get());
System.out.println("main thread exit,time->"+System.currentTimeMillis());
}
1.4.4 allOf / anyOf
allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只要有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
anyOf返回的CompletableFuture是多个任务只要其中一个执行完成就会执行,其get返回的是已经执行完成的任务的执行结果,如果该任务执行异常,则抛出异常。
@Test
public void test11() throws Exception {
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
return 1.2;
});
CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
return 3.2;
});
CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
try {
Thread.sleep(1300);
} catch (InterruptedException e) {
}
// throw new RuntimeException("test");
System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
return 2.2;
});
//allof等待所有任务执行完成才执行cf4,如果有一个任务异常终止,则cf4.get时会抛出异常,都是正常执行,cf4.get返回null
//anyOf是只有一个任务执行完成,无论是正常执行或者执行异常,都会执行cf4,cf4.get的结果就是已执行完成的任务的执行结果
CompletableFuture cf4=CompletableFuture.allOf(cf,cf2,cf3).whenComplete((a,b)->{
if(b!=null){
System.out.println("error stack trace->");
b.printStackTrace();
}else{
System.out.println("run succ,result->"+a);
}
});
System.out.println("main thread start cf4.get(),time->"+System.currentTimeMillis());
//等待子任务执行完成
System.out.println("cf4 run result->"+cf4.get());
System.out.println("main thread exit,time->"+System.currentTimeMillis());
}
join()与get()区别在于join()返回计算的结果或者抛出一个unchecked异常(CompletionException),而get() 返回一个具体的异常get() 可以指定超时时间。
02
实现原理
下面以如下代码为例,介绍CompletableFuture的实现原理。
public static void main(String[] args) throws Exception {
CompletableFuture f=CompletableFuture.supplyAsync(()->{
System.out.println("实验");
return "1";
}).thenAccept((x)->{
System.out.println(x);
});
}
2.1 静态方法supplyAsync()
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
//asyncPool是一个线程池,
//它根据配置或者CPU的个数来决定是使用ForkJoinPool还是ThreadPerTaskExecutor作为线程池的实现
return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
if (f == null) throw new NullPointerException();
//创建一个新的CompletableFuture对象,所以后面调用thenAccept()方法其实使用的是这个新建的对象
CompletableFuture<U> d = new CompletableFuture<U>();
//在线程池中执行AsyncSupply
e.execute(new AsyncSupply<U>(d, f));
return d;
}
CompletableFuture提供了属性asyncPool记录线程池对象,当调用静态方法时,CompletableFuture都是使用该线程池来执行异步任务。
创建asyncPool时,会判断机器的cpu核心数,来决定是否使用ForkJoinPool线程池;ThreadPerTaskExecutor为一对一执行器。
asyncSupplyStage()方法返回一个新建的CompletableFuture对象,在CompletableFuture中大部分public方法都会返回一个新建的CompletableFuture对象,所以在链式调用中每次调用的方法都是新建对象的方法,而不是最初那个对象的。最后将AsyncSupply()对象放入了线程池中。
2.2 AsyncSupply.run()方法的实现:
public void run() {
CompletableFuture<T> d; Supplier<T> f;
//dep是在asyncSupplyStage()方法中新建的那个CompletableFuture对象
//fn是自定义的Supplier对象
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
//CompletableFuture的result属性表示Supplier对象的执行结果,
//更准确的说,result属性是用来记录我们编写的lambda表达式的运算结果的
if (d.result == null) {
try {
//使用CAS将Supplier的执行结果放入到属性result中
d.completeValue(f.get());
} catch (Throwable ex) {
//如果抛出异常,将异常对象记录到属性result
d.completeThrowable(ex);
}
}
//下面详细介绍postComplete()方法
d.postComplete();
}
}
2.3 postComplete()方法
//postComplete()方法的作用是检查是否有下一个任务需要执行,如果需要便会触发该任务的执行
//每调用一个CompletableFuture实例方法,CompletableFuture都将该实例方法要执行的任务封装成一个Completion对象,
//并将Completion对象压入到当前CompletableFuture对象的栈中(栈顶使用stack属性记录),
//也就是将后一个CompletableFuture对象的任务压入到前一个对象的栈中,
//每次执行完当前任务后,都会调用postComplete()方法检查栈顶是否有待执行任务
final void postComplete() {
CompletableFuture<?> f = this; Completion h;
//循环检查栈顶是否有待执行任务
while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d;
Completion t;
//casStack()将栈顶切换为下一个次栈顶元素
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
//当后一个CompletableFuture对象的栈中的任务需要嵌入到当前栈中执行时,
//postComplete()获取到这些任务并调用pushStack()放入自己的栈中,
//然后在本方法里面执行这些任务
pushStack(h);
continue;
}
h.next = null;
// detach
}
//如果栈中有待执行任务,调用tryFire()方法执行该任务,
//如果tryFire()的入参为NESTED(值为-1),且返回的不是null,
//说明后一个CompletableFuture对象的栈中的任务需要嵌入执行
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
run()方法执行了当前Supplier对象,并记录下执行结果。之后调用postComplete()检查栈顶是否有待执行任务。后面介绍thenAccept()方法的实现原理时,我们可以看到任务是如何被放到栈中的。
这里还有一点需要注意,调用tryFire()方法时,如果入参为NESTED,表示后一个CompletableFuture的栈中的任务需要嵌入到当前线程中(或者说是嵌入到当前栈中)执行,这相当于当前线程执行完了所有任务后,检查到后一个对象的栈中还有任务,那么便“偷”一个任务过来执行。
tryFire()有多个不同的实现,这里不再一一介绍源码。
2.4 thenAccept()方法的实现原理:
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) {
if (f == null) throw new NullPointerException();
//新建一个CompletableFuture对象d,本方法将该对象返回给调用方
CompletableFuture<Void> d = new CompletableFuture<Void>();
//uniAccept()检查前一个任务是否执行完成,如果执行成功了,那么执行Consumer对象
//注意:入参this指的是supplyAsync()方法里面创建的CompletableFuture对象
//这里逻辑比较绕,一定分清楚现在调用的thenAccept()方法是属于是supplyAsync()方法里面创建的CompletableFuture对象的
if (e != null || !d.uniAccept(this, f, null)) {
UniAccept<T> c = new UniAccept<T>(e, d, this, f);
//将UniAccept对象压入到CompletableFuture对象的栈中
//这个栈是supplyAsync()方法里面创建的CompletableFuture对象的
//push()方法使用CAS将对象c压入到栈
push(c);
// SYNC = 0
c.tryFire(SYNC);
}
return d;
}
final <S> boolean uniAccept(CompletableFuture<S> a, Consumer<? super S> f, UniAccept<S> c) {
Object r;
Throwable x;
//a.result表示上一个任务的执行结果,如果为null,表示任务还在执行中
//上一个任务没有执行完,直接退出当前方法
if (a == null || (r = a.result) == null || f == null)
return false;
//result = null表示当前任务还没执行
//能够执行下面的逻辑,说明上一个任务执行完毕,可以执行当前任务了
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked") S s = (S) r;
//执行Consumer任务
f.accept(s);
//向result属性设置结果
completeNull();
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
thenAccept()方法首先检查前一个CompletableFuture的任务是否执行完成,如果没有,则创建UniAccept对象,并将该对象压入栈中,如果执行完成,则执行Consumer任务,并设置执行结果。设置执行结果使用completeNull()方法,该方法通过CAS将NIL对象设置到result属性,NIL表示的是空结果,当任务返回null或者任务没有执行结果时,都会设置result=NIL,这意味着只要是result为null便表示当前任务没有执行完成或者还没有执行。
static final AltResult NIL = new AltResult(null);
在上面代码中,UniAccept对象压入栈之后,便调用UniAccept.tryFire()方法,下面来看一下该方法的实现原理:
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d; CompletableFuture<T> a;
//dep表示thenAccept()方法中新建的CompletableFuture对象d
//src表示supplyAsync()方法中创建的CompletableFuture对象
if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this))
//uniAccept()在上面已经介绍过了
//这里调用uniAccept()方法可以确保在多线程环境下,Consumer对象已经可以被执行
return null;
dep = null; src = null; fn = null;
//postFire()做收尾工作,检查对象a和对象d的栈是否有待执行任务,如果有分别调用postComplete()方法,
//在调用前还会检查mode的值,如果mode为NESTED(-1),则说明栈的任务由别的线程执行,不再执行
return d.postFire(a, mode);
}
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
if (a != null && a.stack != null) {
if (mode < 0 || a.result == null) {
a.cleanStack();//清理栈,将执行过的任务从栈中清除
} else {
a.postComplete();//执行栈的任务,就本小节的例子来看,a栈的任务都已经执行过了
}
if (result != null && stack != null) {
if (mode < 0) {
return this;
//直接返回当前CompletableFuture对象,该对象的栈任务由其他线程执行
} else {
postComplete();//执行栈任务
}
return null;
}
}
}
2.5 总结
每个CompletableFuture的方法都会新建CompletableFuture对象,并将当前方法需要执行的任务封装成Completion对象,之后将Completion压入到上个方法中创建的CompletableFuture对象的栈中,这样每个CompletableFuture对象的栈都保存了下一个待执行的任务,通过栈将每个任务串在一起,形成一个链条。
当执行完当前CompletableFuture对象的任务后,接着查看栈中是否有任务,如果有则直接执行,如果没有就退出。后一个CompletableFuture对象也是一样,首先检查前一个任务是否执行完成,如果没有则将任务压入前一个对象的栈中,之后退出,如果前一个任务执行完成,则执行当前任务,然后查看栈中的任务。
03
如何合理的配置线程池
注:使用线程池时,任务已经执行完毕,isDone方法返回可能为false。
3.1 分析线程池
要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:
任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
任务的优先级:高,中和低。
任务的执行时间:长,中和短。
任务的依赖性:是否依赖其他系统资源,如数据库连接。
任务性质不同的任务可以用不同规模的线程池分开处理
CPU密集型任务配置尽可能少的线程数量,如配置N * cpu + 1个线程的线程池。
O密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*N cpu。
混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。(我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。)
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
3.2 使用有界队列
建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。
案例:有一次使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。
-也许你还想看-
k8s开发实践之代码编译与镜像构建
基于内容的图像检索技术研究