cover_image

记线上Dubbo调用异常排查处理

郑亚腾 微鲤技术团队
2023年03月23日 10:00


图片





01



简单介绍


  • suishen-esb中,提供了Dubbo对Hystrix的集成;

  • Hystrix内部使用了线程池完成具体的任务执行;


  • 每一个远程service使用独立的线程池;
  • 内部封装中,线程池的核心线程数和最大线程数默认为30,等待队列使用SynchronousQueue(不接受等待任务),拒绝策略为AbortPolicy(线程池无法接受时抛出异常);
  • 当瞬时并发数超出最大线程数时,dubbo调用执行异常。





02


事件脉络


  • 用户反馈使用异常,紧急查看日志



org.apache.dubbo.rpc.RpcException: Failed to invoke the method validLoginAuthentication in the service weli.wormhole.rpc.user.center.api.IAuthenticationService. Tried 1 times of the providers [10.65.0.205:11090] (1/4) from the registry node1.zk.all.platform.wtc.hwhosts.com:2181 on the consumer 10.65.0.34 using the dubbo version 2.7.3-SNAPSHOT. Last error is: validLoginAuthentication_1 could not be queued for execution and fallback failed.      at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:113)    at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:248)    at org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:78)    at org.apache.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:55)    at org.apache.dubbo.common.bytecode.proxy14.validLoginAuthentication(proxy14.java)    at weli.peanut.web.interceptor.VerifyLoginInterceptor.preHandle(VerifyLoginInterceptor.java:134)    at org.springframework.web.servlet.HandlerExecutionChain.applyPreHandle(HandlerExecutionChain.java:134)    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:958)    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897)    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:861)    at javax.servlet.http.HttpServlet.service(HttpServlet.java:620)    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)    at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:501)    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:170)    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:98)    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:408)    at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1040)    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:607)    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1721)    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1679)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)    at java.lang.Thread.run(Thread.java:748)Caused by: com.netflix.hystrix.exception.HystrixRuntimeException: validLoginAuthentication_1 could not be queued for execution and fallback failed.      at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:818)    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:793)    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140)    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)    at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1454)    at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1379)    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)    at rx.observers.Subscribers$5.onError(Subscribers.java:230)    at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:44)    at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:28)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:142)    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)    at rx.Observable.unsafeSubscribe(Observable.java:10158)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)    at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)    at rx.Observable.subscribe(Observable.java:10247)    at rx.Observable.subscribe(Observable.java:10214)    at rx.internal.operators.BlockingOperatorToFuture.toFuture(BlockingOperatorToFuture.java:51)    at rx.observables.BlockingObservable.toFuture(BlockingObservable.java:411)    at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:377)    at com.netflix.hystrix.HystrixCommand.execute(HystrixCommand.java:343)    at suishen.esb.hystrix.dubbo.filter.HystrixFilter.invoke(HystrixFilter.java:46)    at com.alibaba.dubbo.rpc.Filter.invoke(Filter.java:29)    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)    at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:92)    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)    at org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:54)    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)    at org.apache.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:58)    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$CallbackRegistrationInvoker.invoke(ProtocolFilterWrapper.java:157)    at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56)    at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:82)    ... 36 common frames omittedCaused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1bf63b71 rejected from java.util.concurrent.ThreadPoolExecutor@6fb1f813[Running, pool size = 30, active threads = 30, queued tasks = 0, completed tasks = 0]      at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)    at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$ThreadPoolWorker.schedule(HystrixContextScheduler.java:172)    at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$HystrixContextSchedulerWorker.schedule(HystrixContextScheduler.java:106)    at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)    at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)    at rx.Observable.unsafeSubscribe(Observable.java:10151)    ... 91 common frames omitted


根据日志,发现是由于线程池打满造成了任务的拒绝执行,起初认为是提供方的dubbo线程池被打满,迅速排查中台日志;

{"@timestamp":"2022-04-30T22:45:00.000+08:00","@version":1,"message":"dubbo监控10.65.3.23 核心线程数:400,历史最高线程数:400,最大线程数:400,活跃线程数:0,当前线程数:400,队列大小:0,任务总量:48363297,已完成任务量:48363297","logger_name":"wormholeBusiness","thread_name":"qbScheduler-6","level":"INFO","level_value":20000}{"@timestamp":"2022-04-30T22:45:00.000+08:00","@version":1,"message":"dubbo监控10.65.3.44 核心线程数:400,历史最高线程数:400,最大线程数:400,活跃线程数:2,当前线程数:400,队列大小:0,任务总量:48371189,已完成任务量:48371187","logger_name":"wormholeBusiness","thread_name":"qbScheduler-3","level":"INFO","level_value":20000}


  • 发现中台服务正常,dubbo空闲线程也比较充裕;
  • 在回头看调用方异常信息,发现调用方使用了Hystrix,抛出异常的是Hystrix内部的线程池;
  • 此时紧急增加节点,重启服务后,业务开始正常。




03


问题分析


  • 根据日志分析,是由HystrixFilter执行了HystrixCommand.execute()造成了异常。

@Activate(group = Constants.CONSUMER)public class HystrixFilter implements Filter {
public HystrixFilter() { ApplicationContext springContext = ApplicationContextHolder.getContext(); if (springContext != null && !springContext.containsBean(HystrixSpringService.class.getSimpleName())) { BeanFactory beanFactory = springContext.getAutowireCapableBeanFactory(); if (beanFactory instanceof DefaultListableBeanFactory) { BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(HystrixSpringService.class); beanDefinitionBuilder.setDestroyMethodName("preDestroy"); beanDefinitionBuilder.setScope("singleton");
((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(HystrixSpringService.class.getSimpleName(), beanDefinitionBuilder.getBeanDefinition());
//触发初始化 beanFactory.getBean(HystrixSpringService.class.getSimpleName()); } } }
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 异步调用使用hystrix做熔断没有意义 if ("true".equalsIgnoreCase(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY))) { return invoker.invoke(invocation); }
DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation); return command.execute(); }
}
  • 当执行的dubbo调用为同步调用时,会创建DubboHystrixCommand,交由Hystrix执行远程调用。

public class DubboHystrixCommand extends HystrixCommand<Result> {
private static Logger logger = Logger.getLogger(DubboHystrixCommand.class); private static final int DEFAULT_THREADPOOL_CORE_SIZE = 30; private static final int CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS = 30000; private static final int DEFAULT_FAIL_QPS_THRESHOLD = 20; private Invoker invoker; private Invocation invocation;
public DubboHystrixCommand(Invoker<?> invoker, Invocation invocation) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName())) .andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(), invocation.getArguments() == null ? 0 : invocation.getArguments().length))) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() //10秒钟内请求失败上限值,超过此值熔断器发挥作用 .withCircuitBreakerRequestVolumeThreshold(getFailQpsThreshold(invoker.getUrl()) * 10) //熔断器中断请求30秒后会进入半打开状态,放部分流量过去重试 .withCircuitBreakerSleepWindowInMilliseconds(getCircuitBreakerSleepWindowInMilliseconds(invoker.getUrl())) //错误率达到50开启熔断保护 .withCircuitBreakerErrorThresholdPercentage(50) //使用dubbo的超时,禁用这里的超时 .withExecutionTimeoutEnabled(false)) //根据dubbo配置设置线程池大小 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(getThreadPoolCoreSize(invoker.getUrl()))));

this.invoker = invoker; this.invocation = invocation; }
/** * 获取每秒请求失败的阈值,超过此阈值熔断器开始生效 * * @param url * @return */ private static int getFailQpsThreshold(URL url) { if (url != null) { int threshold = url.getParameter("FailQpsThreshold", DEFAULT_FAIL_QPS_THRESHOLD); if (logger.isDebugEnabled()) { logger.debug("FailQpsThreshold: " + threshold); } return threshold; }
return DEFAULT_FAIL_QPS_THRESHOLD; }
/** * 获取熔断器中断请求窗口大小 * * @param url * @return */ private static int getCircuitBreakerSleepWindowInMilliseconds(URL url) { if (url != null) { int circuitBreakerSleepWindowInMilliseconds = url.getParameter("CircuitBreakerSleepWindowInMilliseconds", CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS); if (logger.isDebugEnabled()) { logger.debug("circuitBreakerSleepWindowInMilliseconds: " + circuitBreakerSleepWindowInMilliseconds); } return circuitBreakerSleepWindowInMilliseconds; }
return CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS;
}
/** * 获取线程池大小 * * @param url * @return */ private static int getThreadPoolCoreSize(URL url) { if (url != null) { int size = url.getParameter("ThreadPoolCoreSize", DEFAULT_THREADPOOL_CORE_SIZE); if (logger.isDebugEnabled()) { logger.debug("ThreadPoolCoreSize: " + size); } return size; }
return DEFAULT_THREADPOOL_CORE_SIZE;
}
@Override protected Result run() throws Exception { Throwable exception = null;
Result result = null;
try { result = invoker.invoke(invocation); exception = result.getException(); } catch (Exception e) { exception = e; }
// 这里打印异常是为了记录异常,再抛出异常是为了触发fallback if (exception != null) { Logs.error("Dubbo Exception: ", exception); throw new Exception(exception); }
return result; }
@Override protected Result getFallback() { return new RpcResult((Object) null); }}public class DubboHystrixCommand extends HystrixCommand<Result> {
private static Logger logger = Logger.getLogger(DubboHystrixCommand.class); private static final int DEFAULT_THREADPOOL_CORE_SIZE = 30; private static final int CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS = 30000; private static final int DEFAULT_FAIL_QPS_THRESHOLD = 20; private Invoker invoker; private Invocation invocation;
public DubboHystrixCommand(Invoker<?> invoker, Invocation invocation) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName())) .andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(), invocation.getArguments() == null ? 0 : invocation.getArguments().length))) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() //10秒钟内请求失败上限值,超过此值熔断器发挥作用 .withCircuitBreakerRequestVolumeThreshold(getFailQpsThreshold(invoker.getUrl()) * 10) //熔断器中断请求30秒后会进入半打开状态,放部分流量过去重试 .withCircuitBreakerSleepWindowInMilliseconds(getCircuitBreakerSleepWindowInMilliseconds(invoker.getUrl())) //错误率达到50开启熔断保护 .withCircuitBreakerErrorThresholdPercentage(50) //使用dubbo的超时,禁用这里的超时 .withExecutionTimeoutEnabled(false)) //根据dubbo配置设置线程池大小 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(getThreadPoolCoreSize(invoker.getUrl()))));

this.invoker = invoker; this.invocation = invocation; }
/** * 获取每秒请求失败的阈值,超过此阈值熔断器开始生效 * * @param url * @return */ private static int getFailQpsThreshold(URL url) { if (url != null) { int threshold = url.getParameter("FailQpsThreshold", DEFAULT_FAIL_QPS_THRESHOLD); if (logger.isDebugEnabled()) { logger.debug("FailQpsThreshold: " + threshold); } return threshold; }
return DEFAULT_FAIL_QPS_THRESHOLD; }
/** * 获取熔断器中断请求窗口大小 * * @param url * @return */ private static int getCircuitBreakerSleepWindowInMilliseconds(URL url) { if (url != null) { int circuitBreakerSleepWindowInMilliseconds = url.getParameter("CircuitBreakerSleepWindowInMilliseconds", CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS); if (logger.isDebugEnabled()) { logger.debug("circuitBreakerSleepWindowInMilliseconds: " + circuitBreakerSleepWindowInMilliseconds); } return circuitBreakerSleepWindowInMilliseconds; }
return CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS;
}
/** * 获取线程池大小 * * @param url * @return */ private static int getThreadPoolCoreSize(URL url) { if (url != null) { int size = url.getParameter("ThreadPoolCoreSize", DEFAULT_THREADPOOL_CORE_SIZE); if (logger.isDebugEnabled()) { logger.debug("ThreadPoolCoreSize: " + size); } return size; }
return DEFAULT_THREADPOOL_CORE_SIZE;
}
@Override protected Result run() throws Exception { Throwable exception = null;
Result result = null;
try { result = invoker.invoke(invocation); exception = result.getException(); } catch (Exception e) { exception = e; }
// 这里打印异常是为了记录异常,再抛出异常是为了触发fallback if (exception != null) { Logs.error("Dubbo Exception: ", exception); throw new Exception(exception); }
return result; }
@Override protected Result getFallback() { return new RpcResult((Object) null); }}
看到这里终于找到了根本问题:
  • 内部封装中,线程池的核心线程数和最大线程数默认为30,等待队列使用SynchronousQueue(不接受等待任务),拒绝策略为AbortPolicy(线程池无法接受时抛出异常);
  • 当瞬时并发数超出最大线程数时,dubbo调用执行异常。




04


处理


1、修改HystrixFilter,提供是否使用Hystrix开关,对于签名验证等核心接口,选择同步执行。


@Activate(group = Constants.CONSUMER)public class HystrixFilter implements Filter {
public HystrixFilter() { ApplicationContext springContext = ApplicationContextHolder.getContext(); if (springContext != null && !springContext.containsBean(HystrixSpringService.class.getSimpleName())) { BeanFactory beanFactory = springContext.getAutowireCapableBeanFactory(); if (beanFactory instanceof DefaultListableBeanFactory) { BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(HystrixSpringService.class); beanDefinitionBuilder.setDestroyMethodName("preDestroy"); beanDefinitionBuilder.setScope("singleton");
((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(HystrixSpringService.class.getSimpleName(), beanDefinitionBuilder.getBeanDefinition());
//触发初始化 beanFactory.getBean(HystrixSpringService.class.getSimpleName()); } } }
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 异步调用使用hystrix做熔断没有意义 if ("true".equalsIgnoreCase(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY)) || "false".equalsIgnoreCase(invoker.getUrl().getMethodParameter(invocation.getMethodName(), "hystrixOpen"))) { return invoker.invoke(invocation); }
DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation); return command.execute(); }
}
2、调整DubboHystrixCommand中线程池参数,增加最大线程数配置、线程活跃时间及等待队列大小。
public class DubboHystrixCommand extends HystrixCommand<Result> {
private static Logger logger = Logger.getLogger(DubboHystrixCommand.class); private static final int DEFAULT_THREAD_POOL_CORE_SIZE = 30; private static final int DEFAULT_THREAD_POOL_MAX_SIZE = 50; private static final int DEFAULT_THREAD_POOL_KEEP_ALIVE_TIME_MINUTES = 5;
private static final int DEFAULT_FAIL_QPS_THRESHOLD = 20; private Invoker invoker; private Invocation invocation;
public DubboHystrixCommand(Invoker<?> invoker, Invocation invocation) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName())) .andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(), invocation.getArguments() == null ? 0 : invocation.getArguments().length))) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() //10秒钟内请求失败上限值,超过此值熔断器发挥作用 .withCircuitBreakerRequestVolumeThreshold(getFailQpsThreshold(invoker.getUrl()) * 10) //熔断器中断请求30秒后会进入半打开状态,放部分流量过去重试 .withCircuitBreakerSleepWindowInMilliseconds(30000) //错误率达到50开启熔断保护 .withCircuitBreakerErrorThresholdPercentage(50) //使用dubbo的超时,禁用这里的超时 .withExecutionTimeoutEnabled(false)) //根据dubbo配置设置线程池大小 .andThreadPoolPropertiesDefaults(getThreadPoolSetter(invoker.getUrl())));

this.invoker = invoker; this.invocation = invocation; }
/** * 获取每秒请求失败的阈值,超过此阈值熔断器开始生效 * * @param url * @return */ private static int getFailQpsThreshold(URL url) { if (url != null) { int threshold = url.getParameter("FailQpsThreshold", DEFAULT_FAIL_QPS_THRESHOLD); if (logger.isDebugEnabled()) { logger.debug("FailQpsThreshold: " + threshold); } return threshold; }
return DEFAULT_FAIL_QPS_THRESHOLD; }
private static HystrixThreadPoolProperties.Setter getThreadPoolSetter(URL url) { return HystrixThreadPoolProperties.Setter() .withCoreSize(getThreadPoolProperties(url, "threadPoolCoreSize", DEFAULT_THREAD_POOL_CORE_SIZE)) .withMaximumSize(getThreadPoolProperties(url, "threadPoolMaxSize", DEFAULT_THREAD_POOL_MAX_SIZE)) .withKeepAliveTimeMinutes(getThreadPoolProperties(url, "threadPoolKeepAliveTimeMinutes", DEFAULT_THREAD_POOL_KEEP_ALIVE_TIME_MINUTES)); }
/** * 获取线程池大小 * * @param url * @return */ private static int getThreadPoolProperties(URL url, String name, int defaultProperties) { if (url != null) { int size = url.getParameter(name, defaultProperties); if (logger.isDebugEnabled()) { logger.debug(name + ": " + size); } return size; } return defaultProperties; }
@Override protected Result run() throws Exception { Throwable exception;
Result result = null; try { result = invoker.invoke(invocation); exception = result.getException(); } catch (Exception e) { exception = e; }
// 有异常抛出 if (exception != null) { Logs.error("dubbo exception: ", exception); throw new RuntimeException(exception); }
return result; }
}


作者 | 郑亚腾 资深服务端开发工程师
本文来自微鲤团队,转载请注明出处。

继续滑动看下一个
微鲤技术团队
向上滑动看下一个