本期内容:
客服工单中的多线程
线程池
CompletableFuture
总结
在客服工单系统中,为了方便客服人员处理用户进线反馈问题,需要将工单的日志、工单的基础信息、订单信息、商品信息、客户基本信息、订单的类型、订单的物流信息等重要的数据都要加载出来放在工单详情中供客服人员查阅以便快速处理用户的问题。
下图展示了工单的详情:
工单的详情数据展示的技术方案有两种可以采用:
(1)采用单线程的方式:工单详情所需数据在主逻辑中顺序加载到内存中,然后再封装页面需要展示的数据
(2)采用多线程的方式:工单详情中订单、商家、用户、日志等数据采用多线程分别的加载数据到内存中,然后再封装页面展示的数据
为此,分别对这两种方式做数据加载的实验:
单线程方式(ms) | 多线程方式(ms) | |
---|---|---|
第一次加载 | 400 | 50 |
第二次加载 | 390 | 46 |
第三次加载 | 410 | 55 |
第四次加载 | 420 | 53 |
通过实验对比可以发现,单线程加载数据的方式相比多线程加载数据的方式来说其加载速度明显慢很多。
如果我们客服工单系统中的工单详情数据采用单线程的方式加载数据,那么我们客服在使用的时候会出来短暂的页面加载转圈的过程,这样一方面客服人员使用的体验度下降,另一方面也会影响其工作效率,为此我们客服工单系统采用多线程的方式加载工单详情数据。
自定义线程池的核心代码
public class CommonPool {
public static final ThreadPoolExecutor QUERY_MY_INFO_LOCAL_THREAD_POOL;
static {
ThreadFactory queryMyInfoThreadFactory = new MyThreadFactory("queryMyInfo");
QUERY_MY_INFO_LOCAL_THREAD_POOL = new ThreadPoolExecutor(NumberConstant.TWENTY, NumberConstant.SIXTY, NumberConstant.SIXTY, TimeUnit.SECONDS, new LinkedBlockingDeque<>(NumberConstant.ONE_HUNDRED_FIFTY), queryMyInfoThreadFactory, handler);
QUERY_MY_INFO_LOCAL_THREAD_POOL.prestartAllCoreThreads();
}
public static <V> FutureTask<V> queryMyInfoExecute(Callable<V> callable) {
FutureTask<V> futureTask = new FutureTask<>(callable);
QUERY_MY_INFO_LOCAL_THREAD_POOL.execute(futureTask);
return futureTask;
}
//自定义线程工厂
public static class MyThreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
private String threadName;
public MyThreadFactory() {
}
public MyThreadFactory(String threadName) {
this.threadName = threadName;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, threadName + "-thread-" + mThreadNum.getAndIncrement());
log.info(t.getName() + " has been created");
return t;
}
}
//自定义拒绝策略
public static class MyIgnorePolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
doLog(r, e);
}
private void doLog(Runnable r, ThreadPoolExecutor e) {
log.error(r.toString() + " rejected");
}
}
}
异步获取业务数据核心代码
//获取商品的数据
CompletableFuture<ItemDTO> itemFuture = createItemFuture(orderFullDTO.getItemId());
//开启异步线程查询商品数据
private CompletableFuture<ItemDTO> createItemFuture(Long itemId) {
return CompletableFuture.supplyAsync(() -> itemWrapper.getItemById(itemId), CommonPool.ASYNC_TASK_THREAD_POOL);
}
//获取异步线程返回的商品数据
private ItemDTO getItemFutureResult(CompletableFuture<ItemDTO> itemFuture, long orderId) {
ItemDTO itemDTO = null;
try {
itemDTO = itemFuture.get(200, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.error("AggregationServiceImpl getItemFutureResult itemFuture执行超时, orderId:{}", orderId, e);
}
return itemDTO;
}
产生的业务价值:
(1)大幅度的提升了客服工单系统的响应速度,提高了客服人员的工作效率和对系统的体验度
(2)通过提供更多的详细数据展示帮助客服人员快速的处理客户反馈的问题,不仅提升了客户对玩物得志平台的满足度,而且留存更多的优质客户
使用线程池:
一方面可以帮助使用者创建、管理、销毁线程;
另一方面可以通过设置线程加载数据的超时时间来防止由于某个外部接口超时问题导致无法正常的返回业务数据到页面的问题。
下图展示了线程池的原理:
在线程池底层中一个内核级的线程处理多个用户级的任务,设想一个内核线程处理多个任务并且让这个内核级线程一致处于工作状态,那么一方面就可以减少内核级的线程数量,另一方面可以有效的减少内核线程的频繁创建和销毁、减少了上下文的切换,充分发挥内核线程的最大功效。
于是我们采用线程池来协助我们创建、管理线程。
定义好线程池之后,可以将任务交给我们的线程池处理业务任务,线程池工作原理入如下:
线程池工作原理:
(1)通过execute方法提交任务时,当线程池中的线程数小于corePoolSize时,新提交的任务将通过创建一个新线程来执行,即使此时线程池中存在空闲线程。
(2)通过execute方法提交任务时,当线程池中线程数量达到corePoolSize时,新提交的任务将被放入workQueue中,等待线程池中线程调度执行。
(3)通过execute方法提交任务时,当workQueue已存满,且maximumPoolSize大于corePoolSize时,新提交的任务将通过创建新线程执行。
(4)当线程池中的线程执行完任务空闲时,会尝试从workQueue中取头结点任务执行。
(5)通过execute方法提交任务,当线程池中线程数达到maxmumPoolSize,并且workQueue也存满时,新提交的任务由RejectedExecutionHandler执行拒绝操作。
(6)当线程池中线程数超过corePoolSize,并且未配置allowCoreThreadTimeOut=true,空闲时间超过keepAliveTime的线程会被销毁,保持线程池中线程数为corePoolSize。
线程池的状态与工作能力:
线程池在每个状态下的工作能力:
Java5版本引入了Future接口,可用于启动一个异步任务,并在后续获得这个异步任务的结果,Future接口可用于两个或多个异步任务的并行执行,提高了编写多线程代码的灵活性。
但在实际应用时很难在某些场景下优雅地做到真正的异步。
为此JDK8中新引入CompletableFuture类,下图展示了其实现接口情况:
可以看出CompletableFuture由于实现了Future接口,其拥有回调功能;实现CompletionStage接口使得CompletableFuture能够实现对异步执行的结果进行异步的处理。
与传统的Future相比较,CompletableFuture能够主动设置计算的结果值(主动终结计算过程,即completable),从而在某些场景下主动结束阻塞等待。
我们的项目中使用的是CompletableFuture.supplyAsync(),其可以用来创建CompletableFuture实例。
通过该方法创建的CompletableFuture实例会异步执行当前传入的计算任务,在调用端可以通过get或join获取最终计算结果。
(1)CompletableFuture方式一:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
supplyAsync(Supplier supplier)构建CompletableFuture对象只需传入一个Supplier实例(一般使用lamda表达式),通过代码可以发现此时框架会默认使用ForkJoin的线程池来执行被提交的任务
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
所以当supplyAsync入参只有supplier时,会默认使用asyncPool作为线程池(一般情况下为ForkJoinPool的commonPool),并调用内部方法asyncSupplyStage执行具体的逻辑
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
在asyncSupplyStage方法中,程序会创建一个空的CompletableFuture返回给调用方。
同时该CompletableFuture和传入的supplier会被包装在一个AsyncSupply实例对象中,然后一起提交到线程池中进行处理。
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier<T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return true; }
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}
当supplyAsync返回时,调用方只会拿到一个空的CompletableFuture实例。
业务计算完成之后,最终的结果会被set到对应的CompletableFuture的result字段中。调用方通过get就能取到该CompletableFuture的result字段的值。
因此虽然实际创建CompletableFuture的线程和进行任务计算的线程不同,但是最终会通过result来进行结果的传递。
(2) 构建CompletableFuture方式二:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
方式二构建CompletableFuture对象可以指定自定义的线程池,然后将任务提交给该线程池执行,它和方式一唯一的区别就是方式二可以自定义线程池。
Java8引入了CompletableFuture可以很好的做到真正的异步,同时也提供了更加灵活的使用方式来对异步执行的结果和异步线程的处理和调配。
(1)某个业务需要加载多方数据的时候,考虑使用多线程的方式加载数据,这样不仅提高了数据加载速度并且大幅度的提升了系统使用者的体验度。
(2)创建多线程推荐使用线程池的方式创建,目的是让线程池统一的创建、管理、销毁线程。
(3)推荐使用自定义线程池,不推荐使用Executors工具类方式创建线程池。因为Executors工具类创建线程池一方面容易出现OOM(Executors工具类中线程池允许请求队列长度支持Integer.MAX_VALUE,有些线程池支持创建线程数为Integer.MAX_VALUE),另一方面工具类中的拒绝策略都是默认策略,很多业务场景不适用。
(4)CompletableFuture不仅可以很优雅的实现异步执行任务,而且还可以根据实际情况自定义线程池类型。