在互联网-电商这个业务模块,,特别 C 端模块,整体用户基数大,特别是在做营销活动的场景,甚至做秒杀活动的时候,访问量很高,而且 C 端是典型的大部分数据是多读少写场景,所以需要扛高并发,并且需要支持高性能,其中一个主要实现之一:添加策略缓存,在整个访问全链路上,各个地方添加缓存,对我们后端同学来说,常见的数据可能放入分布式缓存,甚至放入应用的本地缓存中,以便支持高吞吐量,低延迟的业务需求。
支持 key/value 形式的数据结构
支持线程安全
支持缓存过期时间
支持缓存淘汰策略
支持缓存大小控制
支持性能处理方面
支持统计和可观察性
支持 key/value 形式的数据结构(支持)
支持线程安全(支持)
缓存过期时间(本身不支持,需要额外单独实现)
缓存淘汰策略(本身不支持,需要额外单独实现)
缓存大小控制(本身不支持,需要额外单独实现)。
支持 key/value 形式的数据结构(支持)
支持线程安全(支持) 缓存过期时间 (支持,常见使用惰性删除,读请求中混杂着写操作)
缓存淘汰策略(支持,常见使用LRU算法)
缓存大小控制(支持)
支持 key/value 形式的数据结构(支持)
支持线程安全(支持)
缓存过期时间(支持,使用异步淘汰数据的策略,缩短 get 请求的执行时长,间接提升了响应性能)
缓存淘汰策略(支持,使用一种高效的近似 LFU 算法,W-TinyLFU 算法 特点:高命中率、低内存占用【利用堆外缓存降低内存缓存大小,减少 GC 频率】)
缓存大小控制(支持) 近似统计频率(采用 Count–Min Sketch 算法【类似布隆过滤器】降低频率信息带来的内存消耗)
支持热点缓存(支持,维护一个PK机制保证新进入的热点数据能够缓存)
支持异步(支持,eg:支持自动异步回源) 性能优化方面(很多地方采用了异步)。
通过统一的 SDK 封装支持(目标:尽量和业务解耦;方便接入;统一的可维护性)
接入方式:AOP(可以融合多种缓存能力支持:分布式缓存 OR 本地缓存 OR 组合使用)
本地缓存:推荐选型Caffeine
KEY:使用SpEL动态表达式
可支持动态调整参数【例如:通过Apollo上配置动态支持】
支持缓存降级:支持降级
支持统计各业务key类型的命中率:支持
支持统计各业务key类型的QPS:支持
正常的使用缓存的场景,多数是多读少写,我们可以支持TTL的设置。
在缓存失效的时候我们做异步回源,不过有部分业务需求,需要更高的强实效性和数据一致性 。
例如 B 端有相关的变更,部分业务场景,我们需要及时的支持数据一致性的闭环。
例如:强实效性的业务数据,EG:定时发布页面功能,修改敏感信息等。
本地缓存数据一致性主要交互流程
发布/订阅是一种消息模式,消息的发送者不会将消息直接发送给特定的接收者,而是通过消息通道广播出去,让订阅该消息主题的订阅者消费到,到达解耦 下面是主要相关伪代码参考示例 生产者:发布消息-主要部分代码示例参考;
/**
* 生产者:发布消息-代码示例
* @param channel:通道:类似topic(CommonCacheConstant.CHANGE_CHANNEL)
* @param message:信息本体
*/
redisTemplate.convertAndSend(String channel, Object message);
消费者:监听消息和处理消息-主要部分代码示例参考;
/**
* 消息者:订阅消息-代码示例,通过Redis消息监听容器(加载了RedisConnectionFactory和MessageListenerAdapter)
* @param RedisConnectionFactory:redis链接工厂
* @param ChangeListener(MessageListenerAdapter):消息监听器
*/
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,ChangeListener changeListener)
{
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(changeListener, new PatternTopic(CommonCacheConstant.CHANGE_CHANNEL));
return container;
}
/**
* 消息监听器-代码示例
*/
public class ChangeListener extends MessageListenerAdapter implements MessageListener
{
/**
* 处理目标消息
*/
@Override
public void onMessage(Message message, @Nullable byte[] pattern)
{
String partKey = message.toString();
//log.info("更新清除本地缓存:"+ partKey);
Set<String> deleteKeySet = new HashSet<>();
//TODO 封装deleteKeySet
//例如:删除目标的本地缓存
caffeineClient.invalidateAll(deleteKeySet);
}
}
Redis 的发布的消息,接入使用相对比较简单,不过也有些缺点:例如:不会持久化,消息有被丢失的风险等 若对数据一致性要求比较高的业务场景,建议可以使用 MQ 的类似相关功能 很多 MQ 都有类似功能,我们以 RocketMQ 为示例-使用其中的广播模式 下面是主要伪代码参考示例 生产者:发送消息-主要部分代码示例参考:
/**
* 生产者:发送消息-代码示例
*/
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
//设置生产者组
defaultMQProducer.setProducerGroup("producerGroupTest");
//启动生产者
defaultMQProducer.start();
//构建消息 topic tag 内容
Message msg = new Message("TopicTest" , "TagA" ,
("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//同步发送,且返回结果
SendResult sendResult = defaultMQProducer.send(msg);
System.out.println("发送结果"+sendResult);
//关闭生产者
defaultMQProducer.shutdown();
消息者:收到消息并广播模式-处理消费消息-主要部分代码示例参考
/**
* 消息者:收到消息并广播模式-处理消费消息-代码示例
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("clusterConsumer");
//核心代码-设置:广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//订阅主题和标签
consumer.subscribe("TopicTest","TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消费信息处理:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* MQ Message model
*/
public enum MessageModel {
/**
* broadcast
*/
BROADCASTING("BROADCASTING"),
/**
* clustering
*/
CLUSTERING("CLUSTERING");
}
Apollo 的配置示例如下(例如:变更配置后,闭环触发操作本地缓存数据一致性) 下面是主要伪代码参考示例
/**
* 添加Apollo里配置的目标key的变更监听-主要部分代码示例参考
*/
ConfigPropertySourceFactory configPropertySourceFactory = SpringInjector.getInstance(ConfigPropertySourceFactory.class);
List<ConfigPropertySource> configPropertySources = configPropertySourceFactory.getAllConfigPropertySources();
for (ConfigPropertySource configPropertySource : configPropertySources)
{
configPropertySource.addChangeListener(changeEvent ->
{
for (String changedKey : changeEvent.changedKeys())
{
String newValue = changeEvent.getChange(changedKey).getNewValue();
//监听变更后的action-TODO,例如:deleta/updagte本地缓存
doChange(changedKey, newValue);
}
});
}
基本流程 提供方启动时广播自己的地址 消费方启动时广播订阅请求 提供方收到订阅请求时,单播自己的地址给订阅者,如果设置了 unicast=false,则广播给订阅者 消费方收到提供方地址时,连接该地址进行 RPC 调用。下面是主要主要伪代码参考示例
/**
*dubbo-广播模式-内部的主要部分代码示例参考
*/
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
RpcContext.getContext().setInvokers((List) invokers);
RpcException exception = null;
Result result = null;
/**
* 循环遍历所有的服务提供者,轮询请求过程中,任意一个抛出异常,并不会中断后面的请求,只有在所有请求处理完成后,才会去检查异常。只有所有的请求都成功的情况下,才会将最后一次调用的结果返回。
*/
for (Invoker<T> invoker : invokers) {
try {
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
logger.warn(e.getMessage(), e);
}
}
if (exception != null) {
throw exception;
}
return result;
}
下面是主要伪代码参考示例
/**
* 手工应用的POD-IP传入和通过应用名称,从ZK中获取pod的ip列表集合,轮询遍历pod,调用统一的rpc接口(背后操作清除目标的本地缓存)
*/
public CacheGetResp execute(String cacheKey, Integer cacheOpsType,
String appName, List<String> podIpList) {
CacheGetResp resp = new CacheGetResp();
resp.setKey(cacheKey);
Map<String, String> ipValueMap = Maps.newHashMap();
//支持手工ip传人
if (CollectionUtils.isEmpty(podIpList)) {
//通过应用名称,从zk中获取pod的ip列表集合
podIpList = queryIpListFromZK(appName);
}
if (CollectionUtils.isNotEmpty(podIpList)) {
podIpList.forEach(ip -> {
//构造RPC统一代理请求操作,调用dubbo分组的对于应用的清除本地缓存的实现
String value = opsLocalCache(cacheKey, cacheOpsType, appName, ip);
ipValueMap.put(ip, value);
});
}
resp.setValue(JSON.toJSONString(ipValueMap));
return resp;
}
/**
* 构造RPC统一代理请求,通过dubbo分组调用目标应用里的通用接口服务-清除目标本地缓存的实现
*/
private String opsLocalCache(String key,String appName, String podIp) {
String consumerUrl =
String.format(CONSUMER_URL_FMT, podIp, ICacheManagerService.class.getName(), appName);
ICacheManagerService cacheManagerService =
proxy.getProxy(protocol.refer(ICacheManagerService.class, URL.valueOf(consumerUrl)));
}
/**
* 通过提供统一的SDK,给业务各应用方接入,最终触发操作-调用本地缓存的清除入口,实现本地缓存的数据一致性
* 当前可以采用dubbo group来开箱实现,业务方无感实现
*/
public ServiceBean<ICacheManagerService> cacheManagerServiceBean(ApplicationContext applicationContext) {
ServiceBean<ICacheManagerService> serviceBean = new ServiceBean<>();
serviceBean.setInterface(ICacheManagerService.class);
CacheManagerServiceImpl ref =
applicationContext.getAutowireCapableBeanFactory().createBean(CacheManagerServiceImpl.class);
serviceBean.setRef(ref);
//约定大于配置,用应用的ID,作为dubbo分组的group,实现开箱实现,业务方无感
serviceBean.setGroup(getAppId());
serviceBean.setApplicationContext(applicationContext);
return serviceBean;
}
/**
* 通过应用名称,从zk中获取pod的ip列表集合
*/
public List<String> queryIpListFromZK(String appName) {
String path = String.format("/dubbo/%s/providers", ICacheManagerService.class.getName());
List<String> urlList = zkClient.getChildren(path);
List<String> ipList = Lists.newArrayList();
for (String url : urlList) {
URL current = URL.valueOf(URLDecoder.decode(url, "UTF-8"));
if (Objects.nonNull(current) && appName.equals(current.getParameter("application"))) {
ipList.add(current.getHost());
}
}
return ipList;
}
可以尽量和业务解耦,可以抽象为公共 SDK,让应用方简单接入即可 若使用在Caffeine 框架,也可以利用它本身提供了一些的打点监控策略,例如通过recordStats()Api 进行开启,默认是使用 Caffeine 自带的,也可以自定义实现。在 StatsCounter 接口中,定义了需要打点的方法目前来说有如下几个:
recordHits:记录缓存命中;
recordMisses:记录缓存未命中;
本地缓存的可观察性,包含两部分。
最后实现的效果示例如下(例如 grafana 配置为例子)。
建议尽量选择比较成熟的缓存框架,例如Caffeine。
整体实际项目使用用途下的综合建议。
建议可以使用 Redis 的发布订阅模式来实现。
建议可以尝试使用,dubbo的广播模式或者自定义实现方式(广播轮询pod+RPC调用+dubbo分组) 目前本地缓存一致性的预案使用了该方案。