前言
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。
Sentinel是面向分布式服务架构的流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统自适应保护等多个维度来帮助您保障微服务的稳定性。
所以合理的使用Sentinel,并掌握其背后的底层原理就显得尤为重要!
近日,在进行线上稳定性巡检的时候,发现线上服务偶尔会出现调用Ab接口熔断的情况。查看线上配置的Sentinel熔断规则如下
发现似乎是没有问题的。
查看线上日志,发现熔断之前的慢调用也不是很多,按照1s的统计窗口来计算的话,远远达不到配置的50%的比例。
很是不解,于是决定看下源码一探究竟。
为了帮大家节省时间,这里先直接上结论:
Sentinel的慢调用比例熔断规则统计的时候,不是等到滑动窗口结束了再去根据这一整个窗口的数据来做判断,而是每次请求都会做判断。
比如拿最上面的配置规则来做例子的话,如果当前窗口的刚开始的前几个请求中(大于5)慢调用比例刚好超过了50%,那么就会触发熔断,断路器直接打开,3s内的所有请求都走降级,然后3s后断路器进入半开状态,如果下一个请求正常了,那么断路器就关闭。
下面是源码探险之旅
梦开始的地方:
public String callAb(Long userId, String key) {
//省略...
}
项目中通过@SentinelResource注解来进行资源管控,如果触发熔断或者降级会走注解中配置的blockHandlerClass的blockHandler降级方法,这是我们最常用的Sentinel使用方式。
然后再SentinelAutoConfiguration类中可以看到会注入一个切面处理的类SentinelResourceAspect。
public SentinelResourceAspect sentinelResourceAspect() {
return new SentinelResourceAspect();
}
顾名思义,这个类就是处理@SentinelResource注解包裹的资源的。
下面来看下这个SentinelResourceAspect。
可以发现SphU.entry() 是一个核心的逻辑, 从SphU.entry()方法往下执行会进入到Sph.entry(),Sph的默认实现类是CtSph,而最终会进入CtSph的entry方法:
可以发现,这里主要就是对资源进行封装,然后调用entryWithPriority()。
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
// 获取上下文
Context context = ContextUtil.getContext();
// 省略部分代码...
// 获取处理责任链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
// 省略部分代码...
new CtEntry(resourceWrapper, chain, context);
try {
// 链式处理资源
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
看了这段代码,可以发现,这里会先去获取对应的资源(也就是@SentinelResource包裹的方法,在Sentinel中把这类方法抽象成一个resource)处理的责任链,然后通过对资源进行链式处理。
public class DefaultSlotChainBuilder implements SlotChainBuilder {
public ProcessorSlotChain build() {
// 处理链
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
}
public voidaddLast(AbstractLinkedProcessorSlot<?>protocolProcessor) {
end.setNext(protocolProcessor);
end= protocolProcessor;
}
走到这里就很清晰了,发现这里其实就和Sentinel官网的一张图对应上了。
这里就是一个典型的责任链模式。
我们先接着往下看责任链是如何工作的,
也就是chain.entry(),
发现chain.entry()处理的时候都调用fireEntry(),调用责任链的next节点进行处理。
现在我们重点关注下慢查询和熔断相关的逻辑
根据官网的介绍,也就是下面二个ProcessorSlot。
StatisticSlot:用于记录、统计不同纬度的 runtime 指标监控信息。
DegradeSlot:通过统计信息以及预设的规则,来做熔断降级。
StatisticSlot:
先来看StatisTicSlot对应entry()。
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 先走下面的规则校验的slot
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 校验通过就会走下面的node的两个统计方法
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
// 省略部分代码...
} catch (BlockException e) {
// 后续slot如果校验不通过,也就是触发了限流或者熔断就会走到这里
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// 统计被block的次数
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
// 省略部分代码...
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
看下entry方法,该方法首先会触发后续slot的entry方法,即SystemSlot、FlowSlot、DegradeSlot等的规则,如果规则不通过,就会抛出BlockException,则会在node中统计被block的数量。反之会在node中统计通过的请求数和线程数等信息。
我们可以看到node.addPassRequest()这段代码是在fireEntry执行之后执行的,这意味着,当前请求通过了sentinel的流控等规则,此时需要将当次请求记录下来,也就是执行 node.addPassRequest()这行代码,我们跟进去看看:
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
这里就是我们发现Sentinel内部有两个窗口的概念,一种是秒级别的,一种是分钟级别的。
继续跟进去,发现会进入ArrayMetric。
private final LeapArray<MetricBucket> data;
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
这里就发现了统计的两个核心类LeapArray和MetricBucket,我们先看MetricBucket。
private final LongAdder[] counters;
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
public enum MetricEvent {
PASS,
BLOCK,
EXCEPTION,
SUCCESS,
RT,
OCCUPIED_PASS
}
发现MetricBucket会用MetricEvent来区分统计的信息类别,并用LongAdder来记录信息确保线程安全。
然后我们回过头去下看data.currentWindow()这个获取当前窗口的处理,也就是LeapArray类的逻辑。
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
// timeMillis就是当前时间戳
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 根据当前时间戳来计算时间窗口的角标,来定位时间窗口
int idx = calculateTimeIdx(timeMillis);
// 计算当前窗口的开始时间
long windowStart = calculateWindowStart(timeMillis);
while (true) {
// 获取当前窗口
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 如果为空,说明之前还没初始化,就初始化一个新的窗口
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
// cas更新到array中
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
} // 如果old窗口的开始时间和计算得到的开始时间相等,说明old窗口就是当前时间窗口
else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
// 如果old窗口的开始时间比计算得到的开始时间要小,说明old已经过期了,需要重新设置一个时间窗口
if (updateLock.tryLock()) {
try {
// 重设时间窗口
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 这种可能是时间回拨导致的,当前时间戳回退,直接返回一个新的窗口
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
还有一点补充一下,秒级别的滑动窗口,在内部会初始化两个小的窗口,每个窗口的时间为500ms,代码如下:
// 这里SampleCountProperty.SAMPLE_COUNT默认为2
private transient volatile Metric rollingCounterInSecond = new
ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public LeapArray(int sampleCount, int intervalInMs) {
// 窗口个数,1s默认为两个,所以每个小窗口长度为500ms
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
统计数据的StatisticSlot搞明白了,最后我们看熔断降级相关的规则判断DegradeSlot。
DegradeSlot:
熔断相关的逻辑主要在DegradeSlot.exit()方法中。
public ResponseTimeCircuitBreaker(DegradeRule rule) {
// 慢调用比例的滑动窗口
// 可以发现小窗口只有一个,窗口大小是熔断规则配置的长度,这里是30s;这点比StatisticSlot的滑动窗口要简单一些
this(rule, new SlowRequestLeapArray(1, rule.getStatIntervalMs()));
}
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
Entry curEntry = context.getCurEntry();
// 省略部分代码…
if (curEntry.getBlockError() == null) {
// passed request
for (CircuitBreaker circuitBreaker : circuitBreakers) {
// 熔断规则校验
circuitBreaker.onRequestComplete(context);
}
}
fireExit(context, r, count, args);
}
// 慢调用比例熔断规则校验ResponseTimeCircuitBreaker.onRequestComplete
public void onRequestComplete(Context context) {
// 这里小窗口只有1个
SlowRequestCounter counter = slidingCounter.currentWindow().value();
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
long completeTime = entry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
// 获取rt
long rt = completeTime - entry.getCreateTimestamp();
// 根据rt时长记录这次请求是否是慢调用
if (rt > maxAllowedRt) {
counter.slowCount.add(1);
}
// 总调用次数+1
counter.totalCount.add(1);
// 慢调用熔断规则判断
handleStateChangeWhenThresholdExceeded(rt);
}
// 慢调用熔断规则判断
private void handleStateChangeWhenThresholdExceeded(long rt) {
if (currentState.get() == State.OPEN) {
return;
}
if (currentState.get() == State.HALF_OPEN) {
// In detecting request
// TODO: improve logic for half-open recovery
if (rt > maxAllowedRt) {
fromHalfOpenToOpen(1.0d);
} else {
fromHalfOpenToClose();
}
return;
}
List<SlowRequestCounter> counters = slidingCounter.values();
long slowCount = 0;
long totalCount = 0;
for (SlowRequestCounter counter : counters) {
slowCount += counter.slowCount.sum();
totalCount += counter.totalCount.sum();
}
// 如果当前窗口的总调用次数小于配置的最小调用数(也就是管理后台配置的5)直接return,不做校验
if (totalCount < minRequestAmount) {
return;
}
// 如果当前窗口的总调用次数不小于配置的最小调用数,开始进行慢调用比例校验
// 计算慢调用比例
double currentRatio = slowCount * 1.0d / totalCount;
// 如果慢调用比例大于设置的阈值就走transformToOpen进行熔断处理
if (currentRatio > maxSlowRequestRatio) {
transformToOpen(currentRatio);
}
}
// 慢调用比例熔断处理,打开熔断开关,下次调用直接被熔断
protected void transformToOpen(double triggerValue) {
// 一开始熔断器肯定是关闭状态也就是CLOSE
State cs = currentState.get();
switch (cs) {
case CLOSED:
// 关闭状态 —> 打开状态
fromCloseToOpen(triggerValue);
break;
case HALF_OPEN:
// 半开状态 --> 打开状态
fromHalfOpenToOpen(triggerValue);
break;
default:
break;
}
}
// 熔断器打开的时候会设置下一次重试的时机(当前时间+熔断时长),也就是半开状态的时机,此时会放行一个请求,如果这个请求不再是慢调用请求,那么熔断器关闭,否则熔断器打开。
protected void updateNextRetryTimestamp() {
this.nextRetryTimestamp = TimeUtil.currentTimeMillis() + recoveryTimeoutMs;
}
// 在DegradeSlot.entry()会调用tryPass方法,这里会判断是否把熔断器从打开状态 —-> 半开状态
// 熔断器打开的状态下,这里会先判断当前时间是否大于上面设置的下次重试时间,如果大于就把熔断器状态变为半开。
public boolean tryPass(Context context) {
// Template implementation.
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
// For half-open state we allow a request for probing.
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
上面的代码很长,总结一下。
在给一个资源配置慢调用比例类型的熔断规则前提下:
1. Sentinel会根据我们配置的规则,创建对应的滑动窗口来统计相应的慢调用请求数和总请求数;这里的滑动窗口的统计窗口个数为1,窗口长度为我们设置的长度;在本文的案例中,窗口长度为3s,小窗口个数为1。
2. 对应的资源每次请求结束的时候,会在DegradeSlot.exit() 方法中做统计,先获取当前滑动窗口,然后根据rt来判断是否是慢调用,然后记录在1中的统计窗口中。
3. 如果当前请求是慢调用就会走慢调用规则判断,默认一开始熔断器是close关闭状态,此时会计算当前窗口的总调用次数是否不小于配置的最小调用数(5),如果是的话就会进行慢调用比例判断,如果慢调用比例刚好也大于我们配置的慢调用比例(50%),此时就会把熔断器状态从close->open,并根据我们配置的熔断时长,设置下一次半开状态的时间(当前时间+熔断时长(3s))。
4. 下次该资源的请求过来的时候先走DegradeSlot.entry()中的判断逻辑。
如果熔断器是close状态,那么直接放行该请求;
如果熔断器是open状态,且熔断时间还没结束那么该请求就会直接熔断,抛出DegradeException熔断异常,在最外层的会catch住该异常走我们配置的降级逻辑;
如果熔断器是open状态,且熔断时间已经结束,那么就会把熔断状态从open->half-open也就是改为半开状态,此时会放行一个请求,如果该请求rt不再是慢调用请求,那么就吧熔断状态改为close关闭状态。
真相大白
到此就真相大白了,通过以上源码分析可以找到线上Ab接口为什么会偶发熔断了。
Sentinel的慢调用比例熔断规则统计的时候,不是等到滑动窗口结束了再去根据这一整个窗口的数据来做判断,而是每次请求都会做判断。
比如拿最上面的配置规则来做例子的话,如果当前窗口的刚开始的前几个请求中(大于5)慢调用比例刚好超过了50%,那么就会触发熔断,断路器直接打开,3s内的所有请求都走降级,然后3s后断路器进入半开状态,如果下一个请求正常了,那么断路器就关闭。
*文/mmmooo