作者:vivo 互联网服务器团队-Li Haoxuan
一、关于 Caffeine Cache
二、Caffeine Cache 配置套路
public Caffeine<K, V> maximumWeight( long maximumWeight) {
requireState(this.maximumWeight == UNSET_INT,
"maximum weight was already set to %s", this.maximumWeight);
requireState(this.maximumSize == UNSET_INT,
"maximum size was already set to %s", this.maximumSize);
this.maximumWeight = maximumWeight;
requireArgument(maximumWeight >= 0, "maximum weight must not be negative");
return this;
}
default V get(K key) {
return cache().computeIfAbsent(key, mappingFunction());
}
public BoundedLocalCache<K, V> cache() {
return cache;
}
public UnboundedLocalCache<K, V> cache() {
return cache;
}
public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(
super K1, V1> loader) { CacheLoader<?
requireWeightWithWeigher();
"unchecked") (
Caffeine<K1, V1> self = (Caffeine<K1, V1>) this;
return isBounded() || refreshes()
? new BoundedLocalCache.BoundedLocalLoadingCache<>(self, loader)
: new UnboundedLocalCache.UnboundedLocalLoadingCache<>(self, loader);
}
boolean isBounded() {
return (maximumSize != UNSET_INT)
|| (maximumWeight != UNSET_INT)
|| (expireAfterAccessNanos != UNSET_INT)
|| (expireAfterWriteNanos != UNSET_INT)
|| (expiry != null)
|| (keyStrength != null)
|| (valueStrength != null);
}
boolean refreshes() {
// 调用了 refreshAfter 就会返回 false
return refreshNanos != UNSET_INT;
}
public @Nullable V computeIfAbsent(K key,
Function<? super K, ? extends V> mappingFunction,
boolean recordStats, boolean recordLoad) {
// 常用的 LoadingCache#get 方法 recordStats、recordLoad 都为 true
// mappingFunction 即 builder 中传入的 CacheLoader 实例包装
requireNonNull(key);
requireNonNull(mappingFunction);
// 默认的 ticker read 返回的是 System.nanoTime();
// 关于其他的 ticker 见文末参考文献,可以让使用者自定义超时的计时方式
long now = expirationTicker().read();
// data 是 ConcurrentHashMap<Object, Node<K, V>>
// key 根据代码目前都是 LookupKeyReference 对象
// 可以发现 LookupKeyReference 保存的是 System.identityHashCode(key) 结果
// 关于 identityHashCode 和 hashCode 的区别可阅读文末参考资料
Node<K, V> node = data.get(nodeFactory.newLookupKey(key));
if (node != null) {
V value = node.getValue();
if ((value != null) && !hasExpired(node, now)) {
// isComputingAsync 中将会判断当前是否为异步类的缓存实例
// 是的话再判断 node.getValue 是否完成。BoundedLocaCache 总是返回 false
if (!isComputingAsync(node)) {
// 此处在 BoundedLocaCache 中也是直接 return 不会执行
tryExpireAfterRead(node, key, value, expiry(), now);
setAccessTime(node, now);
}
// 异步驱逐任务提交、异步刷新操作
// CacheLoader#asyncReload 就在其中的 refreshIfNeeded 方法被调用
afterRead(node, now, recordStats);
return value;
}
}
if (recordStats) {
// 记录缓存的加载成功、失败等统计信息
mappingFunction = statsAware(mappingFunction, recordLoad);
}
// 这里2.8.0版本不同实现类生成的都是 WeakKeyReference
Object keyRef = nodeFactory.newReferenceKey(key, keyReferenceQueue());
// 本地缓存没有,使用加载函数读取到缓存
return doComputeIfAbsent(key, keyRef, mappingFunction,
new long[] { now }, recordStats);
}
boolean hasExpired(Node<K, V> node, long now) {
return
(expiresAfterAccess() &&
(now - node.getAccessTime() >= expiresAfterAccessNanos()))
| (expiresAfterWrite() &&
(now - node.getWriteTime() >= expiresAfterWriteNanos()))
| (expiresVariable() &&
(now - node.getVariableTime() >= 0));
}
Object keyRef, V doComputeIfAbsent(K key,
Function<? super K, ? extends V> mappingFunction,
long[] now, boolean recordStats) {
"unchecked") (
V[] oldValue = (V[]) new Object[1];
"unchecked") (
V[] newValue = (V[]) new Object[1];
"unchecked") (
K[] nodeKey = (K[]) new Object[1];
"unchecked", "rawtypes"}) ({
Node<K, V>[] removed = new Node[1];
int[] weight = new int[2]; // old, new
RemovalCause[] cause = new RemovalCause[1];
// 对 data 这个 ConcurrentHashMap 调用 compute 方法,计算 key 对应的值
// compute 方法的执行是原子的,并且会对 key 加锁
// JDK 注释说明 compute 应该短而快并且不要在其中更新其他的 key-value
Node<K, V> node = data.compute(keyRef, (k, n) -> {
if (n == null) {
// 没有值的时候调用 builder 传入的 CacheLoader#load 方法
// mappingFunction 是在 LocalLoadingCache#newMappingFunction 中创建的
newValue[0] = mappingFunction.apply(key);
if (newValue[0] == null) {
return null;
}
now[0] = expirationTicker().read();
// builder 没有指定 weigher 时,这里默认为 SingletonWeigher,总是返回 1
weight[1] = weigher.weigh(key, newValue[0]);
n = nodeFactory.newNode(key, keyReferenceQueue(),
newValue[0], valueReferenceQueue(), weight[1], now[0]);
setVariableTime(n, expireAfterCreate(key, newValue[0], expiry(), now[0]));
return n;
}
// 有值的时候对 node 实例加同步块
synchronized (n) {
nodeKey[0] = n.getKey();
weight[0] = n.getWeight();
oldValue[0] = n.getValue();
// 设置驱逐原因,如果数据有效直接返回
if ((nodeKey[0] == null) || (oldValue[0] == null)) {
cause[0] = RemovalCause.COLLECTED;
} else if (hasExpired(n, now[0])) {
cause[0] = RemovalCause.EXPIRED;
} else {
return n;
}
// 默认的配置 writer 是 CacheWriter.disabledWriter(),无操作;
// 自己定义的 CacheWriter 一般用于驱逐数据时得到回调进行外部数据源操作
// 详情可以参考文末的资料
writer.delete(nodeKey[0], oldValue[0], cause[0]);
newValue[0] = mappingFunction.apply(key);
if (newValue[0] == null) {
removed[0] = n;
n.retire();
return null;
}
weight[1] = weigher.weigh(key, newValue[0]);
n.setValue(newValue[0], valueReferenceQueue());
n.setWeight(weight[1]);
now[0] = expirationTicker().read();
setVariableTime(n, expireAfterCreate(key, newValue[0], expiry(), now[0]));
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
return n;
}
});
// 剩下的代码主要是调用 afterWrite、notifyRemoval 等方法
// 进行后置操作,后置操作中将会再次尝试缓存驱逐
// ...
return newValue[0];
}
如果 expireAfterWrite 周期 < refreshAfterWrite 周期会如何?
此时查询失效数据时总是会调用 load 方法,refreshAfterWrite 根本没用!
如果 CacheLoader#asyncReload 有额外操作,导致它自身实际执行查询耗时超过 expireAfterWrite 又会如何?
还是 CacheLoader#load 生效,refreshAfterWrite 还是没用!
if (n == null) {
// 这部分代码其他后续线程进入后已经有值,不再执行
}
synchronized (n) {
// ...
if ((nodeKey[0] == null) || (oldValue[0] == null)) {
cause[0] = RemovalCause.COLLECTED;
} else if (hasExpired(n, now[0])) {
cause[0] = RemovalCause.EXPIRED;
} else {
// 未失效时在这里返回,不会触发 load 函数
return n;
}
// ...
}
三、参考资料
END
猜你喜欢
vivo互联网技术
vivo移动互联网是基于vivo 智能手机所建立的完整移动互联网生态圈,围绕vivo大数据运营,打造包括应用、游戏、资讯、品牌、电商、内容、金融、搜索的全方位服务生态,满足海量用户的多样化需求。
点一下,代码无 Bug