我们日常的业务站点中大量使用了Redis,其中有相当一部分的站点中使用到了Redisson组件,而Redisson中最为重要的除了对于缓存数据的存储以外,还有对于Redis的分布式锁的使用。
这里为大家展示一下Redisson在处理分布式锁时的特别之处,整个过程并不复杂,但是思路十分的独特,希望大家能够将这些独特的方式运用到自己的工作当中。
作为一把分布式锁必须拥有一些基本的特性,互斥,防死锁,可重入
。这些特性在Redisson以外的Redis分布式锁工具中一般都会被实现,但是如果只是实现了这三个特性依然会存在Redis分布式锁的续期问题。Redisson的看门狗机制(Watch Dog)对此提供了比较好的解决方案。
举个例子,我们有一个简单的秒杀逻辑,秒杀商品处理状态需要1-10秒不确定的时间处理,我们的目的是在最节约资源的情况下实现这一业务逻辑。
case1: 假设我们同时有两个线程A、B在两台机器上秒杀,A先抵达,A会先上锁LockA,此时节点宕机或关闭,由于A线程没有执行unlock操作导致订单锁被长时间占用,最终B线程修改状态命令丢失,秒杀商品卡死。
case2:上面的问题很好解决,对锁加一个过期时间即可,但是就会引入另外一个问题,我们如何设置合理的过期时间。A先抵达,A上锁LockA(expiredTime=10s),A线程开始修改订单状态,实际耗时12s,由于最后执行时间已超过锁的过期时间,B已经提前获取锁资源,最终秒杀商品超卖。
我们从现象入手,如果我们调用RLock.tryLock(),我们就会观察到Redis中出现了一个Hash。不难发现以下几个特征。
所以Redisson锁的本质就是向Redis中插入了这组键值对,这会帮助我们理解之后可重入&看门狗的实现
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
//hincrby 命令用于为哈希表中的字段值加上指定增量值。
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//pexpire 命令和 expire 命令的作用类似,但是它以毫秒为单位设置 key 的生存时间。
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
//KEYS[1] = getName(); 锁的名称
//ARGV[1] = internalLockLeaseTime; 上锁时间
//ARGV[2] = {UUID}:{ThreadId}
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
如果获取到分布式锁的节点宕机,且这个锁刚好处于锁住的状态,就会出现锁死的情况,为了避免这种情况发生,通常会设置一个expireTime让Lock自动过期,但是如果超过设定的过期时间,程序依然没有执行完,则会导致其他线程误获得锁,这个时候就需要看门狗机制了
看门狗会不断延长上锁的时间,来规避这个问题,默认情况下它的过期时间为30s,但是也可以通过修改一些配置参数另外指定。
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
//如果我们传入了leaseTime过期时间,则不会走到看门狗的逻辑,这个点需要十分注意
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
//正常上锁
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining) {
// 进入到看门狗的主逻辑
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
//看门狗renew逻辑
renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//设置一个定时任务,不断去check
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
//如果发先threadId已经为空,则直接跳出
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
// 重新renew过期时间
renewExpiration();
}
});
}
//每次重置过期时间时将之前的 过期时间/3
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
以上就是Redisson锁相关的一些内容,Redisson相比其他的Redis工具,在互斥,防死锁,可重入的基础上又添加了WatchDog机制。通过ThreadId实现了可重入,让同一个线程能够上锁多次,又通过对于线程的监听和循环不断延长过期时间,既保证了节点宕机后锁的过期,同时也保证了不会无限的让锁一直存在。由于篇幅有限,只介绍了非常浅显的内容,代码背后的执行逻辑和内部结构并不完整,感兴趣的话大家可以继续探索,谢谢!
Java、大数据、前端、测试等各种技术岗位热招中,欢迎扫码了解~