cover_image

关于Sentinel的那些事

mmmooo 得物技术
2021年12月02日 10:31



图片

前言

随着微服务的流行,服务和服务之间的稳定性变得越来越重要。

Sentinel是面向分布式服务架构的流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统自适应保护等多个维度来帮助您保障微服务的稳定性。

所以合理的使用Sentinel,并掌握其背后的底层原理就显得尤为重要!



近日,在进行线上稳定性巡检的时候,发现线上服务偶尔会出现调用Ab接口熔断的情况。查看线上配置的Sentinel熔断规则如下 

图片

发现似乎是没有问题的。

查看线上日志,发现熔断之前的慢调用也不是很多,按照1s的统计窗口来计算的话,远远达不到配置的50%的比例。

很是不解,于是决定看下源码一探究竟。



为了帮大家节省时间,这里先直接上结论: 

Sentinel的慢调用比例熔断规则统计的时候,不是等到滑动窗口结束了再去根据这一整个窗口的数据来做判断,而是每次请求都会做判断。

比如拿最上面的配置规则来做例子的话,如果当前窗口的刚开始的前几个请求中(大于5)慢调用比例刚好超过了50%,那么就会触发熔断,断路器直接打开,3s内的所有请求都走降级,然后3s后断路器进入半开状态,如果下一个请求正常了,那么断路器就关闭。 



下面是源码探险之旅 

梦开始的地方:

@SentinelResource(value = "pandora.abService.callAb", blockHandler = "callAb", blockHandlerClass = SentinelFallbackHandler.class) public String callAb(Long userId, String key) {     //省略... } 

项目中通过@SentinelResource注解来进行资源管控,如果触发熔断或者降级会走注解中配置的blockHandlerClass的blockHandler降级方法,这是我们最常用的Sentinel使用方式。


然后再SentinelAutoConfiguration类中可以看到会注入一个切面处理的类SentinelResourceAspect。

@Bean @ConditionalOnMissingBean public SentinelResourceAspect sentinelResourceAspect() {    return new SentinelResourceAspect(); } 

顾名思义,这个类就是处理@SentinelResource注解包裹的资源的。


下面来看下这个SentinelResourceAspect。

图片

可以发现SphU.entry() 是一个核心的逻辑, 从SphU.entry()方法往下执行会进入到Sph.entry(),Sph的默认实现类是CtSph,而最终会进入CtSph的entry方法:

图片

可以发现,这里主要就是对资源进行封装,然后调用entryWithPriority()。

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)     throws BlockException {     // 获取上下文    Context context = ContextUtil.getContext();     // 省略部分代码...     // 获取处理责任链    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);     // 省略部分代码...     new CtEntry(resourceWrapper, chain, context);     try {         // 链式处理资源        chain.entry(context, resourceWrapper, null, count, prioritized, args);     } catch (BlockException e1) {         e.exit(count, args);         throw e1;     } catch (Throwable e1) {         // This should not happen, unless there are errors existing in Sentinel internal.         RecordLog.info("Sentinel unexpected exception", e1);     }     return e; } 

看了这段代码,可以发现,这里会先去获取对应的资源(也就是@SentinelResource包裹的方法,在Sentinel中把这类方法抽象成一个resource)处理的责任链,然后通过对资源进行链式处理。


我们先看下lookProcessChain(resourceWrapper) 获取了哪些处理链:

public class DefaultSlotChainBuilder implements SlotChainBuilder {    @Override    public ProcessorSlotChain build() {        // 处理链        ProcessorSlotChain chain = new DefaultProcessorSlotChain();        chain.addLast(new NodeSelectorSlot());        chain.addLast(new ClusterBuilderSlot());        chain.addLast(new LogSlot());        chain.addLast(new StatisticSlot());        chain.addLast(new SystemSlot());        chain.addLast(new AuthoritySlot());        chain.addLast(new FlowSlot());        chain.addLast(new DegradeSlot());        return chain;    } } @Override public voidaddLast(AbstractLinkedProcessorSlot<?>protocolProcessor) { end.setNext(protocolProcessor); end= protocolProcessor; }

走到这里就很清晰了,发现这里其实就和Sentinel官网的一张图对应上了。

图片

这里就是一个典型的责任链模式。


我们先接着往下看责任链是如何工作的,

也就是chain.entry(),

发现chain.entry()处理的时候都调用fireEntry(),调用责任链的next节点进行处理。

图片
图片

现在我们重点关注下慢查询和熔断相关的逻辑

根据官网的介绍,也就是下面二个ProcessorSlot。


StatisticSlot:用于记录、统计不同纬度的 runtime 指标监控信息。

DegradeSlot:通过统计信息以及预设的规则,来做熔断降级。



StatisticSlot:

先来看StatisTicSlot对应entry()。

@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,                   boolean prioritized, Object... args) throws Throwable {     try {         // 先走下面的规则校验的slot        // Do some checking.         fireEntry(context, resourceWrapper, node, count, prioritized, args);         // 校验通过就会走下面的node的两个统计方法        // Request passed, add thread count and pass count.         node.increaseThreadNum();         node.addPassRequest(count);         // 省略部分代码...     } catch (BlockException e) {         // 后续slot如果校验不通过,也就是触发了限流或者熔断就会走到这里        // Blocked, set block exception to current entry.         context.getCurEntry().setBlockError(e);         // 统计被block的次数        // Add block count.         node.increaseBlockQps(count);         if (context.getCurEntry().getOriginNode() != null) {             context.getCurEntry().getOriginNode().increaseBlockQps(count);         }         // 省略部分代码...         throw e;     } catch (Throwable e) {         // Unexpected internal error, set error to current entry.         context.getCurEntry().setError(e);         throw e;     } } 

看下entry方法,该方法首先会触发后续slot的entry方法,即SystemSlot、FlowSlot、DegradeSlot等的规则,如果规则不通过,就会抛出BlockException,则会在node中统计被block的数量。反之会在node中统计通过的请求数和线程数等信息。


我们可以看到node.addPassRequest()这段代码是在fireEntry执行之后执行的,这意味着,当前请求通过了sentinel的流控等规则,此时需要将当次请求记录下来,也就是执行 node.addPassRequest()这行代码,我们跟进去看看:

@Override public void addPassRequest(int count) {     rollingCounterInSecond.addPass(count);     rollingCounterInMinute.addPass(count); } 

这里就是我们发现Sentinel内部有两个窗口的概念,一种是秒级别的,一种是分钟级别的。


继续跟进去,发现会进入ArrayMetric。

private final LeapArray<MetricBucket> data; 
@Override public void addPass(int count) {    WindowWrap<MetricBucket> wrap = data.currentWindow();    wrap.value().addPass(count); }

这里就发现了统计的两个核心类LeapArray和MetricBucket,我们先看MetricBucket。 

private final LongAdder[] counters; 
public MetricBucket add(MetricEvent event, long n) {    counters[event.ordinal()].add(n);    return this; }
public enum MetricEvent {    PASS,    BLOCK,    EXCEPTION,    SUCCESS,    RT,    OCCUPIED_PASS }

发现MetricBucket会用MetricEvent来区分统计的信息类别,并用LongAdder来记录信息确保线程安全。


然后我们回过头去下看data.currentWindow()这个获取当前窗口的处理,也就是LeapArray类的逻辑。 

public WindowWrap<T> currentWindow() {     return currentWindow(TimeUtil.currentTimeMillis()); } // timeMillis就是当前时间戳public WindowWrap<T> currentWindow(long timeMillis) {     if (timeMillis < 0) {         return null;     }     // 根据当前时间戳来计算时间窗口的角标,来定位时间窗口    int idx = calculateTimeIdx(timeMillis);     // 计算当前窗口的开始时间    long windowStart = calculateWindowStart(timeMillis);     while (true) {         // 获取当前窗口        WindowWrap<T> old = array.get(idx);         if (old == null) {             // 如果为空,说明之前还没初始化,就初始化一个新的窗口            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));             // cas更新到array中             if (array.compareAndSet(idx, null, window)) {                 return window;             } else {                 Thread.yield();             }         }  // 如果old窗口的开始时间和计算得到的开始时间相等,说明old窗口就是当前时间窗口            else if (windowStart == old.windowStart()) {             return old;         } else if (windowStart > old.windowStart()) {             // 如果old窗口的开始时间比计算得到的开始时间要小,说明old已经过期了,需要重新设置一个时间窗口            if (updateLock.tryLock()) {                 try {                     // 重设时间窗口                    return resetWindowTo(old, windowStart);                } finally {                     updateLock.unlock();                 }             } else {                 Thread.yield();             }         } else if (windowStart < old.windowStart()) {             // 这种可能是时间回拨导致的,当前时间戳回退,直接返回一个新的窗口            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));        }     } } 

还有一点补充一下,秒级别的滑动窗口,在内部会初始化两个小的窗口,每个窗口的时间为500ms,代码如下:

// 这里SampleCountProperty.SAMPLE_COUNT默认为2 private transient volatile Metric rollingCounterInSecond = new  ArrayMetric(SampleCountProperty.SAMPLE_COUNT,     IntervalProperty.INTERVAL); 
public ArrayMetric(int sampleCount, int intervalInMs) {    this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); }
public LeapArray(int sampleCount, int intervalInMs) {    // 窗口个数,1s默认为两个,所以每个小窗口长度为500ms    this.windowLengthInMs = intervalInMs / sampleCount;    this.intervalInMs = intervalInMs;    this.sampleCount = sampleCount;    this.array = new AtomicReferenceArray<>(sampleCount); }

统计数据的StatisticSlot搞明白了,最后我们看熔断降级相关的规则判断DegradeSlot。



DegradeSlot:

熔断相关的逻辑主要在DegradeSlot.exit()方法中。


public ResponseTimeCircuitBreaker(DegradeRule rule) {
   // 慢调用比例的滑动窗口    // 可以发现小窗口只有一个,窗口大小是熔断规则配置的长度,这里是30s;这点比StatisticSlot的滑动窗口要简单一些    this(rule, new SlowRequestLeapArray(1, rule.getStatIntervalMs())); }

@Override public void exit(Context context, ResourceWrapper r, int count, Object... args) {    Entry curEntry = context.getCurEntry();        // 省略部分代码…        if (curEntry.getBlockError() == null) {        // passed request        for (CircuitBreaker circuitBreaker : circuitBreakers) {                    // 熔断规则校验            circuitBreaker.onRequestComplete(context);        }    }        fireExit(context, r, count, args); }

// 慢调用比例熔断规则校验ResponseTimeCircuitBreaker.onRequestComplete @Override public void onRequestComplete(Context context) {
   // 这里小窗口只有1个    SlowRequestCounter counter = slidingCounter.currentWindow().value();        Entry entry = context.getCurEntry();    if (entry == null) {        return;    }    long completeTime = entry.getCompleteTimestamp();    if (completeTime <= 0) {        completeTime = TimeUtil.currentTimeMillis();    }        // 获取rt    long rt = completeTime - entry.getCreateTimestamp();        // 根据rt时长记录这次请求是否是慢调用    if (rt > maxAllowedRt) {        counter.slowCount.add(1);    }        // 总调用次数+1    counter.totalCount.add(1);
   // 慢调用熔断规则判断    handleStateChangeWhenThresholdExceeded(rt); }

// 慢调用熔断规则判断 private void handleStateChangeWhenThresholdExceeded(long rt) {    if (currentState.get() == State.OPEN) {        return;    }        if (currentState.get() == State.HALF_OPEN) {        // In detecting request        // TODO: improve logic for half-open recovery        if (rt > maxAllowedRt) {            fromHalfOpenToOpen(1.0d);        } else {            fromHalfOpenToClose();        }        return;    }
   List<SlowRequestCounter> counters = slidingCounter.values();    long slowCount = 0;    long totalCount = 0;    for (SlowRequestCounter counter : counters) {        slowCount += counter.slowCount.sum();        totalCount += counter.totalCount.sum();    }    // 如果当前窗口的总调用次数小于配置的最小调用数(也就是管理后台配置的5)直接return,不做校验    if (totalCount < minRequestAmount) {        return;    }    // 如果当前窗口的总调用次数不小于配置的最小调用数,开始进行慢调用比例校验    // 计算慢调用比例    double currentRatio = slowCount * 1.0d / totalCount;    // 如果慢调用比例大于设置的阈值就走transformToOpen进行熔断处理    if (currentRatio > maxSlowRequestRatio) {        transformToOpen(currentRatio);    } }
// 慢调用比例熔断处理,打开熔断开关,下次调用直接被熔断protected void transformToOpen(double triggerValue) {    // 一开始熔断器肯定是关闭状态也就是CLOSE    State cs = currentState.get();    switch (cs) {        case CLOSED:            // 关闭状态 —> 打开状态            fromCloseToOpen(triggerValue);            break;        case HALF_OPEN:            // 半开状态 --> 打开状态            fromHalfOpenToOpen(triggerValue);            break;        default:            break;    } }

// 熔断器打开的时候会设置下一次重试的时机(当前时间+熔断时长),也就是半开状态的时机,此时会放行一个请求,如果这个请求不再是慢调用请求,那么熔断器关闭,否则熔断器打开。

protected void updateNextRetryTimestamp() {     this.nextRetryTimestamp = TimeUtil.currentTimeMillis() + recoveryTimeoutMs; } 

// 在DegradeSlot.entry()会调用tryPass方法,这里会判断是否把熔断器从打开状态 —-> 半开状态

// 熔断器打开的状态下,这里会先判断当前时间是否大于上面设置的下次重试时间,如果大于就把熔断器状态变为半开。 

public boolean tryPass(Context context) {     // Template implementation.     if (currentState.get() == State.CLOSED) {         return true;     }     if (currentState.get() == State.OPEN) {         // For half-open state we allow a request for probing.         return retryTimeoutArrived() && fromOpenToHalfOpen(context);     }     return false; } 

上面的代码很长,总结一下。

在给一个资源配置慢调用比例类型的熔断规则前提下: 

1. Sentinel会根据我们配置的规则,创建对应的滑动窗口来统计相应的慢调用请求数和总请求数;这里的滑动窗口的统计窗口个数为1,窗口长度为我们设置的长度;在本文的案例中,窗口长度为3s,小窗口个数为1。

2. 对应的资源每次请求结束的时候,会在DegradeSlot.exit() 方法中做统计,先获取当前滑动窗口,然后根据rt来判断是否是慢调用,然后记录在1中的统计窗口中。

3. 如果当前请求是慢调用就会走慢调用规则判断,默认一开始熔断器是close关闭状态,此时会计算当前窗口的总调用次数是否不小于配置的最小调用数(5),如果是的话就会进行慢调用比例判断,如果慢调用比例刚好也大于我们配置的慢调用比例(50%),此时就会把熔断器状态从close->open,并根据我们配置的熔断时长,设置下一次半开状态的时间(当前时间+熔断时长(3s))。

4. 下次该资源的请求过来的时候先走DegradeSlot.entry()中的判断逻辑。

  • 如果熔断器是close状态,那么直接放行该请求;

  • 如果熔断器是open状态,且熔断时间还没结束那么该请求就会直接熔断,抛出DegradeException熔断异常,在最外层的会catch住该异常走我们配置的降级逻辑;

  • 如果熔断器是open状态,且熔断时间已经结束,那么就会把熔断状态从open->half-open也就是改为半开状态,此时会放行一个请求,如果该请求rt不再是慢调用请求,那么就吧熔断状态改为close关闭状态。



真相大白

到此就真相大白了,通过以上源码分析可以找到线上Ab接口为什么会偶发熔断了。


Sentinel的慢调用比例熔断规则统计的时候,不是等到滑动窗口结束了再去根据这一整个窗口的数据来做判断,而是每次请求都会做判断。


比如拿最上面的配置规则来做例子的话,如果当前窗口的刚开始的前几个请求中(大于5)慢调用比例刚好超过了50%,那么就会触发熔断,断路器直接打开,3s内的所有请求都走降级,然后3s后断路器进入半开状态,如果下一个请求正常了,那么断路器就关闭。 


图片

*文/mmmooo


继续滑动看下一个
得物技术
向上滑动看下一个