cover_image

《sentinel在信也消息中台的应用》

末小将 拍码场
2022年04月29日 06:05

1.背景

1.1 通道切换概述

消息中台融合了包括短信、语音、邮件、微信、5G消息、站内信、APP Push等在内的各种信息的触达,每一种信息都拥有至少一个可用通道用以处理消息的触达。通道的提供形式不限于自研或接入外部三方的处理能力,为了提高消息触达的成功率,系统设计者或使用者在处理某类消息时会挂载多个通道以提高消息触达的成功率。比如,当通道A不可用时,可自动切换到其他可用通道来处理消息的发送。

保证消息的成功发出只是系统要考虑的一个方面,从系统本身的角度来讲,当通道A不可用时,接下来的消息发送不再走通道A。系统希望它能被标记为不可用且这种标记是暂时的。当它可用后能在第一时间发现并自动添加到可用通道列表里。这样做的好处 1、在某个通道不可用的时间内极大的减小了消息发送的试错成本 2、加大系统发送服务的健壮性 3、不需要人工干预,可自动恢复通道。要做到这一点仅依靠我们系统当前的能力是远远不够的,为此我们借助了sentinel的能力。

2.sentinel介绍

Sentinel是阿里开源的项目,提供了流量控制、熔断降级、系统负载保护等多个维度来保障服务之间的稳定性。sentinel 具有 1)丰富的使用场景:突发流量控制在系统容量可以承受的范围、消息削峰填谷、实时熔断下游不可用应用等;2)完备的实时监控;3)完善的SPI扩展点;4)广泛的开源生态。

目标: 1)应对突发流量,提高系统稳定性 2)熔断不可用的下游服务,降低系统压力,大大降低程序的试错成本 3)自动探活检测。

2.1 技术原理

在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,它其实是采用一种责任链的模式进行工作。processorSlot是 Sentinel 实现限流降级、熔断降级、系统自适应降级等功能的切入点,我们可以对处理器插槽分为两个部分:
NodeSelectorSlot: 负责收集资源的路径,并为当前创建DefaultNode,并将 DefaultNode 赋值给 Context.curEntry.curNode。
ClusterBuilderSlot: 创建的ClusterNode将持有每个调用来源的 StatisticNode
StatisticSlot: 用于实现指标数据统计,先是调用后续的 ProcessorSlot#entry 判断是否放行请求,再根据判断结果进行相应的指标数据统计操作。
AuthoritySlot: 实现黑白名单降级
SystemSlot: 实现系统自适应降级
FlowSlot: 实现限流降级
DegradeSlot: 实现熔断降级

基于这个插槽的功能,我们可以把Sentinel提供的 ProcessorSlot 可以分为两类,一类是辅助完成资源指标数据统计的切入点,一类是实现降级功能的切入点。sentinel在使用的过程中会为每个资源创建且仅创建一个 ProcessorSlotChain,只要名称相同就认为是同一个资源。ProcessorSlotChain 被缓存在 CtSph.chainMap 静态字段,key 为资源 ID。

2.2 sentinel的工作流

ContextUtil.enter("上下文名称,例如:sentinel_spring_web_context");
Entry entry = null;
try {
entry = SphU.entry("资源名称,例如:/test/demo", EntryType.IN);
// 执行业务方法
return doSomething();
} catch (Exception e) {
if (!(e instanceof BlockException)) {
Tracer.trace(e);
}
throw e;
} finally {
if (entry != null) {
entry.exit(1);
}
ContextUtil.exit();
}

该段代码,执行逻辑分5个部分

1、调用 ContextUtil#enter 方法:为当前调用链路创建Context,同时为Conetxt创建EntranceNode

private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
//资源入口,生成EntranceNode
protected static Context trueEnter(String name, String origin) {
Context context = contextHolder.get();
//如果没有Context的话,生成context并写入到ContextHolder中
if (context == null) {
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
//生成ResourceWrapper,生成EntranceNode
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// Add entrance node.
Constants.ROOT.addChild(node);
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
context = new Context(node, name);
context.setOrigin(origin);
contextHolder.set(context);
}
return context;
}

2、调用 SphU#entry 方法:为资源创建 ResourceWrapper 对象 然后构造一个全局且唯一的 ProcessorSlotChain, 为资源创建 CtEntry 并将 CtEntry 赋值给当前调用链路的 Context.curEntry,最后调用 ProcessorSlotChain#entry 方法完成一次单向链表的 entry 方法调用

    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
//获得resource
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entry(resource, count, args);
}

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException
{
Context context = ContextUtil.getContext();
//生成SlotChain
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
//生成entry
Entry e = new CtEntry(resourceWrapper, chain, context);
//开始处理具体逻辑
chain.entry(context, resourceWrapper, null, count, prioritized, args);
return e;
}

//生成SlotChain,默认使用SPI机制,加载
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}

chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}

3、如果抛出异常,且异常类型非 BlockException 异常,则调用 Tracer#trace 方法记录异常

4、调用 Entry#exit 方法

protected void exitForContext(Context context, int count, Object... args) throws ErrorEntryFreeException {
if (context != null) {
//......
// 1、调用 ProcessorSlotChain 的 exit 方法
if (chain != null) {
chain.exit(context, resourceWrapper, count, args);
}
// 2、将当前 CtEntry 的父节点设置为 Context 的当前节点
context.setCurEntry(parent);
if (parent != null) {
((CtEntry)parent).child = null;
}
if (parent == null) {
// Default context (auto entered) will be exited automatically.
if (ContextUtil.isDefaultContext(context)) {
ContextUtil.exit();
}
}
// 清理当前Contex entry 的引用以避免重复退出。
clearEntryContext();
}
}

5、调用 ContextUtil#exit 方法

public static void exit() {
Context context = contextHolder.get();
if (context != null && context.getCurEntry() == null) {
contextHolder.set(null);
}
}

如果 Context.curEntry 为空,则说明所有 SphU#entry 都对应执行了一次 Entry#exit 方法,此时就可以将 Context 从 ThreadLocal 中移除。

SPI Slot加载顺序如下

    NodeSelectorSlot -> ClusterBuilderSlot -> StatisticSlot -> AuthoritySlot -> SystemSlot -> FlowSlot -> DegradeSlot

3.方案实现

3.1 方案调研

消息中台承载着整个集团绝大部分的消息发送,面对的压力是很大的。我们的日发送量最大的情况下达到了上千万,且这个数量并不是均匀分布的,绝大多数情况下会在某个时间段内激增。在保证系统的稳定性的前提下,也要保证消息发送的成功率。这对系统提出了很高的要求。系统要做到:

  1. 短时间处理上千万的消息,且保证系统稳定性;

  2. 三方通道的不稳定、不确定性,加大了系统的不稳定性的概率,需要一个监控机制对外部服务进行监控预警、探活等;

  3. 当三方通道不可用时,消除人工处理带来的时间成本和发送的低成功率;

  4. 不同三方通道提供服务的能力不同,并不能一概而论,需要更为灵活的配置熔断、限流规则;

  5. 能够实时监控通道服务。
    基于上述几点的需求,消息中台研发人员从自研、使用sentinel以及hystrix,这三个方面进行了非常认真仔细的探讨,最后我们选择了使用了sentinel。

3.2 流程设计

sentinel项目中使用的架构图
图片

功能逻辑图
图片

3.3 技术实现

规则初始化与变更

从上面的架构图中可以知道我们的初始规则是通过dashboard后台配置的,然后实时同步到apollo。

dashboard#rules -> apollo#sentinel-data

系统启动时这些规则会被load到sentinel server中,为此我们为每一条规则创建一个内存映射map<Channel,List<FlowRule>>,它的Key是通道服务的唯一标识,value则是配置的规则,是一个List类型。

apollo#sentinel-data -> load#rules -> careate#Map<Channel,List<FlowRule>>

规则使用

sendMsg -> loadResourse -> loadRule -> send -> callback -> sentinel#resetRule -> sentinel#loadRule

消息发送方式我们考虑的两种情况:

1. 单个服务单个通道
2. 单个服务多个通道

单个服务单个通道,我们设定一般的熔断处理措施 就可以,因为他只有一个通道,当唯一的通道不能提供服务时这个发送消息的链路就已经断掉了。配置熔断规则之后利用sentinel本身的探活机制尽早发现并恢复服务,然后在有效期内利用重试机制发送之前未发送出去的消息。

单个服务多个通道,这是我们主要考虑的一种情况,比如存在这样的场景:某个发送服务S1下挂载了通道A、B、C、D,起初的情况是A、B、C、D均可对外按照预先设定好的比例提供通道服务发送消息

图片

某个时间段,通道A不能提供服务了
图片

可以知道我们熔断的粒度放在通道服务上,对有多个发送通道的服务来讲,也必须要满足至少两点:

1、明确的知道A已经不能提供通道服务了,后续的消息发送不再走A发送;
2、当A恢复时,要尽可能的快被告知,其后续发送恢复走A发送

解决熔断之后自动探测恢复,这一点是我们很想要的。sentinel可以帮助我们做这样的事情。在配置好熔断规则之后能够保证:

 1、当服务通道A发送熔断后, 后续的发送直接走其他可用的服务通道,比如 B、C、D,这时候我们要添加它的callback处理逻辑。
2、sentinel的探活机制会在设定时长进行探活,服务恢复后,并可重新使用通道 A

有一个重要的业务场景需要依赖sentinel的处理机制-服务授权。比如,在某天三方通道服务有升级或是明确告知会有5分钟的时间不能对外提供服务,这个时候我们需要在后台禁用掉该通道,以免消息发送继续走这个通道达到熔断。将该服务添加到sentinel的limitApp规则中,重新reLoader。待服务恢复后再手动把它从limitApp中remove掉。
图片

红色标注的逻辑部分是手动在后台禁用某个通道服务所做的事情,他需要将该通道对应的资源应用添加到limiter中,并重新reload当前发送服务对应的资源,它的目的就是人为的暂时性的下掉不可用的通道。深绿色的部分则是后台手动开启通道的逻辑,开启后也需要重新reload当前发送服务的资源。浅绿色部分是正常发送逻辑,浅红色部分则是熔断后的处理逻辑。

示例代码

1、核心包引入

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.2</version>
</dependency>

2、加载规则类

/**
* 动态加载规则
*/

public class LoadChannelRule {

private static Map<String, List<FlowRule>> rulesMaps;

private static Map<Stirng, ResourceChannel) resMaps;

static {
rules = new HashMap<>();
resMaps = new HashMap<>();
}

private void loadChannelRules(String pid) {
//......
/**
* 获取apollo规则
*/

rules = apolloApi.getRules(pid);
//初始化resource
initResource(rulesMaps);
//为资源加载规则
load(rulesMaps);
//设置resource
setSentinelResourse(rulesMaps)
//......
}

}

4、规则处理


private void getFlowRule(Map<String, List<FlowRule>> rulesMaps, String pid){
//......
//获取Apollo规则
}

private void buildFlowRuleMap(Map<String, List<FlowRule>> rulesMaps){
//......
//构建(reload)规则
}

private void reLoadFlowRule(Map<String, List<FlowRule>> rulesMaps){
//......
//构建(reload)规则
}

private void initResource(Map<String, List<FlowRule>> rulesMaps){
//......
//资源key的初始化
}
private void initResourceToApp(Map<String, List<FlowRule>> rulesMaps){
//......
//将资源应用化
}

通道开启/禁用


public class ChannelResourceLimiter {

public void checkAuthorityItems(String pid) throws Exception {
String origin = pid;
String resourceName = pid;
ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
ContextUtil.enter("entrance", origin);
try {
AuthorityRule ruleA = new AuthorityRule()
.setResource(resourceName)
.setLimitApp(origin + "," + pid)
.as(AuthorityRule.class)
.setStrategy(RuleConstant.AUTHORITY_WHITE);

AuthorityRuleManager.loadRules(Collections.singletonList(ruleA));
authoritySlot.checkBlackWhiteAuthority(resourceWrapper, ContextUtil.getContext());
} finally {
ContextUtil.exit();
}
}
}

6、callback处理

//serverId 服务id  pid 通道id  jsonBody消息体
private String callbackChanenl(String pid, String jsonBody){
//......
//获取该服务的所有通道
List<ChannelService> listChannel = getFlowRule();
//拿掉通道id 为 pid
markChannel(listChannel,pid);
//获取第一个可用的通道
ChannelService channelService = getFirstChannel(listChannel);
//处理发送
sendServiceApi(channelService, jsonBody);
//......
}

文章参考:

git地址:https://github.com/alibaba/Sentinel.git

文档中心:https://github.com/alibaba/Sentinel/wiki

招聘信息

Java、大数据、前端、测试等各种技术岗位热招中,欢迎扫码了解~

图片


更多福利请关注官方订阅号“拍码场

图片

好内容不要独享!快告诉小伙伴们吧!



继续滑动看下一个
拍码场
向上滑动看下一个