TLDR
导致该问题的原因是Dubbo的销毁顺序与线程池、RocketMQ Producer、RocketMQ Consumer之间的销毁顺序没有被正确编排。
编排的原则应该是需要依赖Dubbo服务的组件,比如RocketMQ/Kafka Consumer,定时任务等应该早于Dubbo销毁。而Dubbo依赖的组件,比如线程池,RocketMQ/Kafka Producer需要晚于Dubbo销毁。同时Spring容器的销毁应该在所有依赖Spring的组件销毁后才开始。
排查过程
checkin-service优雅下线分析
首先怀疑Dubbo优雅停机没有生效,我们使用的Dubbo版本为Dubbo2.6.6, 而Dubbo优雅停机是随着版本演进的,从2.5.x开始提供优雅停机的能力,而在Dubbo2.7.x趋于完善,中间的版本是有缺陷的。在对JVM优雅关闭原理、Spring优雅关闭流程、Dubbo优雅关闭机制进行了解后,得出以下结论:
JVM提供了Runtime.getRuntime().addShutdownHook()的方法,作为优雅关闭的钩子,且多个钩子之间是并行执行的。
Spring也通过注册JVM的注销钩子,来实现容器的优雅停机。
private void refreshContext(ConfigurableApplicationContext context) {
refresh(context);
if (this.registerShutdownHook) {
try {
context.registerShutdownHook();
}
catch (AccessControlException ex) {
// Not allowed in some environments.
}
}
}
/**
* Register a shutdown hook with the JVM runtime, closing this context
* on JVM shutdown unless it has already been closed at that time.
* <p>Delegates to {@code doClose()} for the actual closing procedure.
* @see Runtime#addShutdownHook
* @see #close()
* @see #doClose()
*/
public void registerShutdownHook() {
if (this.shutdownHook == null) {
// No shutdown hook registered yet.
this.shutdownHook = new Thread() {
public void run() {
synchronized (startupShutdownMonitor) {
doClose();
}
}
};
Runtime.getRuntime().addShutdownHook(this.shutdownHook);
}
}
Spring的注销流程如图:
Dubbo2.5.x开始提供优雅停机的方案,具体的流程如下:
Provider在接收到停机指令后
从注册中心上注销所有服务;
向所有连接的客户端发送只读事件,停止接收新请求;
等待一段时间以处理已到达的请求,然后关闭请求处理线程池;
断开所有客户端连接。
从配置中心取消监听动态配置;
拒绝新到请求,直接返回调用异常;
等待当前已发送请求执行完毕,如果响应超时则强制关闭连接。
Consumer在接收到停机指令后
优雅停机原理官方文档
https://cn.dubbo.apache.org/zh-cn/docsv2.7/user/examples/graceful-shutdown/
该方案使Dubbo能在销毁前先将处理中的请求处理完,不会中断正在处理的请求(Dubbo优雅停机不保证等待所有请求处理完毕,只会等待一段时间,默认是10秒)。但是该方案的缺陷是由于Dubbo也是通过注册JVM注销钩子,来执行Dubbo的停机流程的。而上面说过,JVM的注销钩子相互之间是并行执行的,因此该方案在Spring环境下,会与Spring的注销钩子并行执行,导致的结果是在Dubbo请求处理途中,可能Dubbo方法依赖的某些bean就已经被Spring销毁掉了。
Dubbo2.6.x新增了ShutdownHookListener,监听Spring的ContextCloseEvent启动Dubbo销毁流程。
private static class ShutdownHookListener implements ApplicationListener {
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ContextClosedEvent) {
// we call it anyway since Dubbo shutdown hook make sure its destroyAll() is re-entrant.
// pls. note we should not remove Dubbo shutdown hook when Spring framework is present, this is because
// its shutdown hook may not be installed.
DubboShutdownHook shutdownHook = DubboShutdownHook.getDubboShutdownHook();
shutdownHook.destroyAll();
}
}
}
该版本存在的问题是,虽然Dubbo2.6.x监听了Spring容器关闭事件来开始Dubbo的销毁流程,但AbstractConfig中仍然保留了JVM的shutdown hook。 Dubbo的开发者在上面的注释中解释了这么做的原因:我们不应该移除Dubbo的shutdown hook, 因为Spring本身的shutdown hook可能并没有被注册。从代码中我们也可以看到,Spring的shutdown hook是否注册,是可以通过参数控制的。
// Springboot
public class SpringApplication {
private void refreshContext(ConfigurableApplicationContext context) {
refresh(context);
if (this.registerShutdownHook) {
try {
context.registerShutdownHook();
}
catch (AccessControlException ex) {
// Not allowed in some environments.
}
}
}
}
这么做存在的问题是由于Dubbo注册的JVM注销钩子仍旧存在,因此Spring和Dubbo的销毁流程还是有可能同时进行,2.5.x存在的问题依旧没有解决。
Dubbo2.7.x最终方案:在启动时主动注册Spring的JVM销毁钩子,并取消掉Dubbo本身的JVM钩子。通过这种方式,既解决了Spring框架可能没有注册JVM钩子的问题,也保证了Spring销毁前Dubbo已完成自身的优雅停机。
// Springboot
public class SpringApplication {
private void refreshContext(ConfigurableApplicationContext context) {
refresh(context);
if (this.registerShutdownHook) {
try {
context.registerShutdownHook();
}
catch (AccessControlException ex) {
// Not allowed in some environments.
}
}
}
}
目前我们的项目基于2.6.6的版本,仿造2.7.x的实现打了一个patch:在启动Spring容器后,注销掉Dubbo自身的JVM钩子。
4j
public class DefaultSpringDubboConfigurations {
DubboShutdownListener DubboShutdownListener() {
return new DubboShutdownListener();
}
public static class DubboShutdownListener implements ApplicationListener, PriorityOrdered {
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ApplicationStartedEvent) {
Runtime.getRuntime().removeShutdownHook(DubboShutdownHook.getDubboShutdownHook());
log.info("Dubbo default shutdown hook removed,will be managed by Spring");
}
....
}
public int getOrder() {
return 0;
}
}
}
总结:综合以上信息可得出,Dubbo的优雅注销流程应该是正常的。
checkin-service错误日志堆栈分析
对容器销毁时产生的错误日志堆栈进行分析,发现销毁时产生的日志可以分为三类。对不同种类的错误日志进行分析:
1. RocketMQ作为生产者时生产消息出现的问题
错误日志
2023-03-13 19:37:45.220] [MQ_ASYNC_EXECUTOR_4] [TID: 7d33b1dd10f3464aa7bc16b4428c3782.149.16787074650500403] [] com.kuaikan.checkin.component.MqSendComponent.lambda$asyncSendRankData$1(MqSendComponent.java:94)
The producer service state not OK, SHUTDOWN_ALREADY :
See https://github.com/alibaba/RocketMQ/issues/43 for further details.
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.makeSureStateOK(DefaultMQProducerImpl.java:402) ~[rocketmq-client-3.2.6.jar!/:?]
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:479) ~[rocketmq-client-3.2.6.jar!/:?]
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1031) ~[rocketmq-client-3.2.6.jar!/:?]
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1025) ~[rocketmq-client-3.2.6.jar!/:?]
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:95) ~[rocketmq-client-3.2.6.jar!/:?]
at com.kuaikan.checkin.component.MqSendComponent.lambda$asyncSendRankData$1(MqSendComponent.java:91) ~[classes!/:?]
at org.apache.skywalking.apm.plugin.wrapper.SwRunnableWrapper.run(SwRunnableWrapper.java:43) [?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_191]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_191]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_191]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]
原因分析
目前checkin-service的Rocketmq producer使用的是团队已封装好的KkMqProducer,该producer在初始化的时候,会调用init方法,而init方法里面,注册了JVM的注销钩子。因此,实际上Rocketmq的注销与Dubbo的注销是并行的,甚至更快。而且由于Dubbo在注销时会等待剩余请求处理完毕,因此可能会出现Dubbo在处理剩余请求时,需要发送mq消息,但是此时Rocketmq producer已经shutdown,而调用已经shutdown的producer就会出现日志里面的报错。
public class KkMqProducer {
public boolean init() throws MQClientException {
log.info("kk producer init...");
log.info(this.namesrcAddr);
this.defaultMQProducer = new DefaultMQProducer(this.producerGroup);
this.defaultMQProducer.setNamesrvAddr(this.namesrcAddr);
this.defaultMQProducer.setInstanceName(String.valueOf(System.currentTimeMillis()));
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
KkMqProducer.this.destroy();
}
}));
this.defaultMQProducer.start();
log.info("kk producer start success");
return true;
}
}
public KkMqProducer userBarrageGoodSyncProducer(
String userBarrageGoodSyncMqGroup,
String userBarrageGoodSyncNameServer) {
KkMqProducer kkMqProducer = new KkMqProducer();
kkMqProducer.setProducerGroup(userBarrageGoodSyncMqGroup);
kkMqProducer.setNamesrcAddr(userBarrageGoodSyncNameServer);
return kkMqProducer;
}
解决方案
重写里面的init方法,将注册JVM钩子的代码删除。只通过@Bean(destroyMethod ="destroy")保证该bean是在Dubbo销毁后,才会被销毁。
2. RocketMQ作为消费者时消费消息出现的问题
错误日志
2023-03-16 11:23:59.248] [ConsumeMessageThread_7] [TID: N/A] [91e64298-a856-4596-96be-b7d3dc46b62d_-1044242708] com.kuaikan.checkin.biz.impl.StairUserTaskActivityBiz.addUserTask(StairUserTaskActivityBiz.java:142)
No provider available in [invoker :interface com.kuaikan.user.service.UserService -> zookeeper://ip:port/com.alibaba.Dubbo.registry.RegistryService?anyhost=true&application=checkin-service :
at com.alibaba.Dubbo.rpc.cluster.support.AvailableCluster$1.doInvoke(AvailableCluster.java:48) ~[Dubbo-2.6.6.kk.jar!/:2.6.6.kk]
at com.alibaba.Dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:244) ~[Dubbo-2.6.6.kk.jar!/:2.6.6.kk]
at com.alibaba.Dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:75) ~[Dubbo-2.6.6.kk.jar!/:2.6.6.kk]
at com.alibaba.Dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52) ~[Dubbo-2.6.6.kk.jar!/:2.6.6.kk]
at com.alibaba.Dubbo.common.bytecode.proxy8.getBUserBaseInfo(proxy8.java) ~[Dubbo-2.6.6.kk.jar!/:2.6.6.kk]
原因分析
首先,checkin-service的Rocketmq consumer都是通过这种方式进行注册的。
public BaseMqConsumer stairUserFavAlbumTask(
String group,
String namesrv,
String topic) {
BaseMqConsumer task = new BaseMqConsumer();
....
....
return task;
}
public class BaseMqConsumer{
private DefaultMQPushConsumer defaultMQPushConsumer;
public boolean destroy() {
log.info("{} shutdown", this.getClass().getSimpleName());
defaultMQPushConsumer.shutdown();
return true;
}
}
因此,在容器注销时,Spring会调用该类在@Bean注解上指定的destory方法,而由于该方法的执行时机会比ContextCloseEvent晚,就会出现当服务从RocketMQ接收到消息需要进行处理时,我们的Dubbo相关的资源已经被销毁掉了,此时对Dubbo service bean进行调用,就会出现No provider available in xx的异常。
解决方案
正确的销毁顺序应该是,作为mq的消费者,应该先于Dubbo资源销毁,而作为mq的生产者,应该晚于Dubbo资源销毁。因此,通过监听Spring的ContextCloseEvent事件,并且将其执行顺序的优先级定义得比团队之前定义的DefaultSpringDubboConfigurations类更高即可。
4j
public class RocketConsumerShutdownConfig
implements ApplicationListener<ContextClosedEvent>, PriorityOrdered {
private List<BaseMqConsumer> baseMqConsumers;
public void onApplicationEvent(ContextClosedEvent event) {
log.info("received context close event , begin shutdown rocketmq consumer !");
for (BaseMqConsumer baseMqConsumer : ListUtils.emptyIfNull(baseMqConsumers)) {
try {
Method method = BaseMqConsumer.class.getDeclaredMethod("destroy");
method.setAccessible(true);
method.invoke(baseMqConsumer);
log.info("shut down rocketmq consumer:{}", baseMqConsumer.getClass().getName());
} catch (NoSuchMethodException e) {
log.error("invoked shutdown method fail !NoSuchMethodException", e);
} catch (InvocationTargetException e) {
log.error("invoked shutdown method fail !InvocationTargetException", e);
} catch (IllegalAccessException e) {
log.error("invoked shutdown method fail !IllegalAccessException", e);
}
}
}
public int getOrder() {
return -1;
}
P.S. 如果是没有引入团队定制的DefaultSpringDubboConfigurations类,我们可以不需要实现PriorityOrdered接口,实现Order接口就已经够了。这是由于Dubbo注册ContextCloseEvent时,并没有实现这两个接口,因此其优先级默认是最低的。而只要监听器实现了Order接口,只要其优先级不是最低的,就能比Dubbo自身注册的该事件先执行。而只要实现了PriorityOrdered接口,不管优先级是多少,都会先于实现Order接口的监听器执行。
Spring关于运行事件监听器的compare代码如下:
private void callRunners(ApplicationContext context, ApplicationArguments args) {
List<Object> runners = new ArrayList<>();
runners.addAll(context.getBeansOfType(ApplicationRunner.class).values());
runners.addAll(context.getBeansOfType(CommandLineRunner.class).values());
AnnotationAwareOrderComparator.sort(runners);
for (Object runner : new LinkedHashSet<>(runners)) {
if (runner instanceof ApplicationRunner) {
callRunner((ApplicationRunner) runner, args);
}
if (runner instanceof CommandLineRunner) {
callRunner((CommandLineRunner) runner, args);
}
}
}
private int doCompare( Object o1, Object o2, OrderSourceProvider sourceProvider) {
boolean p1 = (o1 instanceof PriorityOrdered);
boolean p2 = (o2 instanceof PriorityOrdered);
if (p1 && !p2) {
return -1;
}
else if (p2 && !p1) {
return 1;
}
int i1 = getOrder(o1, sourceProvider);
int i2 = getOrder(o2, sourceProvider);
return Integer.compare(i1, i2);
}
/**
* Determine the order value for the given object.
* <p>The default implementation checks against the given {@link OrderSourceProvider}
* using {@link #findOrder} and falls back to a regular {@link #getOrder(Object)} call.
* @param obj the object to check
* @return the order value, or {@code Ordered.LOWEST_PRECEDENCE} as fallback
*/
private int getOrder( Object obj, OrderSourceProvider sourceProvider) {
Integer order = null;
if (obj != null && sourceProvider != null) {
Object orderSource = sourceProvider.getOrderSource(obj);
if (orderSource != null) {
if (orderSource.getClass().isArray()) {
Object[] sources = ObjectUtils.toObjectArray(orderSource);
for (Object source : sources) {
order = findOrder(source);
if (order != null) {
break;
}
}
}
else {
order = findOrder(orderSource);
}
}
}
return (order != null ? order : getOrder(obj));
}
protected int getOrder( Object obj) {
if (obj != null) {
Integer order = findOrder(obj);
if (order != null) {
return order;
}
}
return Ordered.LOWEST_PRECEDENCE;
}
3. 线程池相关的问题
错误日志
[-10.75.5.190:20909-thread-494] [TID: 74a51f3fa66444228f6db925e09231b9.1380.16789370329540317] [b08e5a8c-f510-404c-8c65-4ac3415dfa0b] com.kuaikan.checkin.common. ] [DubboServerHandler
Executors.lambda$getThreadPoolExecutor$0(Executors.java:66) [Executors CHECKIN_SA_UPLOAD_INFO SERVER AS POOL EXHAUST,activeCount: 0, taskCount: 84, completedTaskCount: 84, workQueue: 0]
[-10.75.5.190:20909-thread-494] [TID: 74a51f3fa66444228f6db925e09231b9.1380.16789370329540317] [b08e5a8c-f510-404c-8c65-4ac3415dfa0b] com.kuaikan.checkin.util.Fu ] [DubboServerHandler
tureResultGetter.getQuietly(FutureResultGetter.java:49) [[FutureResultGetter
java.util.concurrent.TimeoutException: null
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) ~[?:1.8.0_191]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) ~[?:1.8.0_191]
原因分析
checkin的线程池,都是通过团队定制的Executers#getThreadPoolExecutor()进行创建的,而该类会统一通过静态代码块,为所有的线程池自动注册JVM的销毁钩子。
public class Executors {
static {
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
EXECUTOR_CHECK_TASK_MAP.forEach(
(k, v) -> {
ThreadPoolUtils.stop(v);
});
}));
}
}
由于JVM的销毁钩子是并行执行的,因此该线程池的销毁与Spring容器的销毁是同时进行的。因此线程池的可能会比Dubbo的先销毁,此时向线程池提交任务,就会执行拒绝策略。由错误日志可以看到,除了执行拒绝策略时打印的日志,应用还会感知到线程池超时的异常,这是由于我们是使用 CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)这个方法来进行异步操作的,而结果的获取是通过CompletableFuture.get(long timeout, TimeUnit unit)来进行的 这个方法的原理是通过LockSupport.parkNanos(this, nanos)来将当前线程唤起,而LockSupport.unpark(w)方法的调用时机, 是在异步线程执行完成之后。
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
// 此时会调用
d.postComplete();
}
}
由于线程池在Dubbo执行销毁时已经被销毁了 因此在CompletableFuture.get(long timeout, TimeUnit unit)方法执行时永远不会被唤醒,只能等待超时抛出异常,这就是调用方感知到的异常都是超时的原因。
解决方案
删除线程池注册的JVM钩子 通过监听ContextClosedEvent事件完成线程池的注销。由于该配置没有实现PriorityOrdered接口,因此该事件会在Dubbo注销流程完毕后触发,保证了Dubbo在处理剩余请求时线程池不会被关闭。
4j
public class ExecutorShutdownConfig implements ApplicationListener<ContextClosedEvent> {
private static final AtomicBoolean isClose = new AtomicBoolean(false);
public void onApplicationEvent(ContextClosedEvent event) {
if (!isClose.compareAndSet(false, true)) {
return;
}
log.info("start stop all thread pool! ");
Executors.stopAllThreadPool();
log.info("all thread pool stop !");
}
}
效果
[WARN 2023-03-21 20:26:42.103] [NettyServerWorker-10-3] [TID: N/A] [] com.alibaba.Dubbo.remoting.transport.AbstractServer.disconnected(AbstractServer.java:205) [ [Dubbo] All clients has dis
contected from /ip:port. You can graceful shutdown now., Dubbo version: 2.6.6.kk, current host: ]
[INFO 2023-03-21 20:26:42.232] [Thread-50] [TID: N/A] [] com.kuaikan.common.config.Dubbo.DefaultSpringDubboConfigurations$DubboShutdownListener.onApplicationEvent(DefaultSpringDubboConfigur
ations.java:35) [Dubbo destroy finished]
[INFO 2023-03-21 20:26:42.233] [Thread-50] [TID: N/A] [] com.kuaikan.checkin.config.ExecutorShutdownConfig.onApplicationEvent(ExecutorShutdownConfig.java:26) [start stop all thread pool! ]
[INFO 2023-03-21 20:26:42.247] [Thread-50] [TID: N/A] [] com.kuaikan.checkin.config.ExecutorShutdownConfig.onApplicationEvent(ExecutorShutdownConfig.java:28) [all thread pool stop !]
结论
整体的解决思路:把原来多个钩子并行执行停机流程,改成只注册一个Spring的钩子,并且在Spring的钩子执行过程中指定了各个组件的销毁顺序,来保证所有的组件是按照正确的顺序销毁。