点击关注上方蓝字,阅读更多干货~
1.背景
2.ETCD
2.1 etcd简介
type KeyValue struct {
Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
CreateRevision int64 `protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"`
ModRevision int64 `protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"`
Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"`
Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"`
Lease int64 `protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
V2版本的client通过HTTP/1.1协议,长连接定时轮询server,最新V3版本的client采用了HTTP/2的gRPC协议,双向stream的API,实现了服务端推送功能、多路复用等机制。同时Watch也从client轮询优化成server流式推送,极大降低server端 socket、内存等资源。
etcd分布式锁在官方提供的etcdctl中本身就集成了,有赖于http api的支持,各语言的client能够非常容易的进行接入与扩展,下面我们分析一下etcd v3版本client的Mutex实现,我们的源码来自最新(2022.10.20)的main分支上的代码。(https://github.com/etcd-io/etcd)
type Mutex struct {
s *Session // Session对象
pfx string // 锁的前缀,如 "business/lock/"
myKey string // 当前持有锁的客户端的 leaseid 值(完整 Key 的组成为 pfx+"/"+leaseid)
myRev int64 // 自增revision
hdr *pb.ResponseHeader // etcd Server中的信息
}
tryAcquire是试图获取锁并返回函数,拼接生成全局唯一的key,以及可以进行重入判断,设置租约,放入到etcd中。
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
//申明并赋值session和client
s := m.s
client := m.s.Client()
//赋值myKey为pfx+"/"+leaseid,leaseid是一个64位的整数值,每个客户端唯一
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
//比较 Key的revision是否为0,为0表示不存在该key
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
//put key,设置租约
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
//再获取锁,判断是否存在相同租约,租约一致,那么是可以重入的.
get := v3.OpGet(m.myKey)
//通过前缀获取最先创建的key,获取当前锁的真正持有者
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
//Txn进行事务处理,判断前面的cmp条件,成立(不存在该key)执行Then中的存入key,
//不成立(不存在该key)执行Else中的获取
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return nil, err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
return resp, nil
}
Lock函数是分布式锁的主逻辑,其中包括了获取锁成功与否的判断,以及没有获取锁后的阻塞操作。
func (m *Mutex) Lock(ctx context.Context) error {
//上方代码块的逻辑,获取key
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
//操作失败,则获取else返回的值,即已有的revision
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
//获取锁成功
m.hdr = resp.Header
return nil
}
client := m.s.Client()
// 代码走到这里说明没有获得锁,需要等待之前的锁被释放,即revision 小于当前revision 的 kv 被删除
// 阻塞等待其他程序释放以为pfx前缀锁,并删除其他revisions
_, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
return werr
}
//..异常处理省略
}
图1 etcd分布式锁流程图
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
//获取当前线程
Thread currentThread = Thread.currentThread();
//通过能否在map中取到该线程的LockData信息,来判断该线程是否已经持有锁
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
//因为当前线程的锁存在,lockcount自增后返回,变成重入锁
lockData.lockCount.incrementAndGet();
return true;
}
//当前线程的锁不存在,进行加锁
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
//加锁成功,保存到map中
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
这个方法主要是创建临时顺序节点,并且在未获取到锁时,会进行阻塞。
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
//开始时间
final long startMillis = System.currentTimeMillis();
//将超时时间统一转化为毫秒单位
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
//节点数据,这里为null
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
//重试次数
int retryCount = 0;
//锁路径
String ourPath = null;
//是否获取到锁
boolean hasTheLock = false;
//是否完成
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{
//创建临时顺序节点,并返回节点路径
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//依据返回的节点路径,判断是否抢到了锁;阻塞,直到获取锁成功
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
//在会话过期时,可能导致driver找不到临时有序节点,进行重试,失败抛出NoNodeException,进行重试
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
//获取到锁,则返回节点路径,供调用方记录到map中
if ( hasTheLock )
{
return ourPath;
}
return null;
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
//是否获取到锁
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
//给前一个节点设置了监听器,当该节点被删除时,将会触发watcher中的回调
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
//一直尝试获取锁
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
//返回basePath下所有的临时有序节点,并且按照后缀从小到大排列
List<String> children = getSortedChildren();
//取出当前线程创建出来的临时有序节点的名称
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
//判断当前节点是否处于排序后的首位,如果处于首位,则代表获取到了锁
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
//获取到锁之后,则终止循环
haveTheLock = true;
}
else
{
//这里代表没有获取到锁
//获取比当前节点索引小的前一个节点
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
//如果前一个节点不存在,则直接抛出NoNodeException,catch中不进行处理,在下一轮中继续获取锁
//如果前一个节点存在,则给它设置一个监听器,监听它的释放事件
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
//判断是否超时
if ( millisToWait <= 0 )
{
//获取锁超时,删除刚才创建的临时有序节点
doDelete = true;
break;
}
//没超时的话,在millisToWait内进行等待
wait(millisToWait);
}
else
{
//无限期阻塞等待,监听到前一个节点被删除时,才会触发唤醒操作
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
//如果前一个节点不存在,则直接抛出NoNodeException,catch中不进行处理,在下一轮中继续获取锁
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
//删除刚才创建出来的临时有序节点
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
Redis是一个开源、用C语言编写、遵守BSD协议、高性能的(key/value)分布式内存数据库。基于内存运行,它可以用作数据库、缓存和消息中间件,支持多种类型的数据结构,是当前最热门的NoSQL数据库之一。
redisdb的struct中存着dict *expires,字典的键为key,字典的value为过期事件 UNIX时间戳。检查给定的键是否在过期字典中,如果存在就获取键的过期时间,检查当前UNIX时间戳是否大于键的过期时间,是就过期,否则未过期,在分布式锁中可以控制锁的时间。
redis和zookeeper同样,本身的命令并没有提供分布式锁的能力,我们通过redis官方推荐的java客户端redisson,分析其源码Reentrant Lock中实现分布式锁的方式。下面我们的源码来自最新(2022.10.20)的master分支上的代码(https://github.com/redisson/redisson)。
lock方法入口,会调用获取锁方法,创建这个key的订阅消息并发布。在获取锁失败后(锁被占用)会阻塞,并等待其他获得锁的线程的消息队列触发,再进行锁的获取。
/**
* 获取锁,注意这里没有等待时间,只有指定锁的最大持有时间
* 通过interruptibly参数配置支持中断
*/
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 尝试获取锁,返回的ttl为空代表获取锁成功,返回的ttl代表已经存在的KEY的剩余存活时间
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 订阅redisson_lock__channel:{$KEY},redisson client通过Redis的订阅发布,获取到解锁的事件通知
// 这个方法会在LockPubSub中注册一个entryName -> RedissonLockEntry的哈希映射
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
//实际上内部是netty的时间轮算法HashedWheelTimer-newTimeout,触发的延迟任务,目的是确保redis命令执行完成后再执行后续操作
//时间是getTimeout() + getRetryInterval() * getRetryAttempts();注意这个时间是redis的baseconfig中的时间
pubSub.timeout(future);
RedissonLockEntry entry;
// 同步订阅执行,获取注册订阅Channel的响应,区分是否支持中断
if (interruptibly) {
//内部就是CompletableFuture<V>的get方法,阻塞线程,直到计算完成(订阅成功)返回结果
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
// 走到下面的循环说明Redis已经存在对应的KEY
// 有其他客户端已经获取到锁,此客户端线程的调用需要阻塞等待获取锁
try {
while (true) {
// 死循环中获取锁,这个死循环或者抛出中断异常,或者获取到锁成功break跳出
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// 返回的ttl为空,说明获取到锁,跳出死循环
if (ttl == null) {
break;
}
// 这个ttl来源于等待存在的锁的KEY的存活时间,直接使用许可为0的信号量进行阻塞等待
// 直到锁释放事件订阅LockPubSub的onMessage()方法回调激活getLatch().release()进行解锁才会往下走
if (ttl >= 0) {
try {
//tryAcquire尝试在特定时间内获取1个许可,可中断
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
//获取一个许可,在提供一个许可前一直将线程阻塞,除非线程被中断。
entry.getLatch().acquire();
} else {
//获取许可,不可以被打断
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 获取到锁或者抛出中断异常,退订redisson_lock__channel:{$KEY},不再关注解锁事件
unsubscribe(entry, threadId);
}
// get(lockAsync(leaseTime, unit));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
// leaseTime>0 说明没过期
if (leaseTime > 0) {
// 实质是异步执行加锁Lua脚本
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 否则,已经过期了,传参变为新的时间,internalLockLeaseTime是读取的配置文件里的时间
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired,如果ttlRemainingFuture是null的话,就是获得锁了
if (ttlRemaining == null) {
if (leaseTime > 0) {
//有剩余时间的话,内部锁剩余时间就是leaseTime
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
//看门狗续期,本文章由于篇幅所限,不介绍这个机制
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
/*
KEYS[1] 就是 Collections.singletonList(getName()),表示分布式锁的key;
ARGV[1] 就是internalLockLeaseTime,即锁的租约时间(持有锁的有效时间),默认30s;
ARGV[2] 就是getLockName(threadId),是获取锁时set的唯一值 value,即UUID+threadId
*/
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
// 1.如果缓存中的key不存在,则执行 hincrby 命令(hincrby key UUID+threadId 1,创建并+1), 设值重入次数1
// 然后通过 pexpire 命令设置锁的过期时间(即锁的租约时间)
// 返回空值 nil ,表示获取锁成功
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果key已经存在,并且value也匹配,表示是当前线程持有的锁,则执行 hincrby 命令,重入次数加1,并且设置失效时间
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果key已经存在,但是value不匹配,说明锁已经被其他线程持有,通过 pttl 命令获取锁的剩余存活时间并返回,至此获取锁失败
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
选择AP模型放弃一致性实现分布式锁,就是redis的实现。放弃一致性具体体现在Redis的主从同步是异步进行的,如果向master发送请求修改了数据后,master突然出现异常,发生高可用切换,缓冲区的数据可能无法同步到新的master(原replica)上,导致数据不一致。针对这个问题Martin Kleppmann又提出了红锁算法(https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html),然而这个实现至少需要3个独立的redis实例,且依赖系统时钟为人诟病,后续又有在redis基础上提供付费方案Tair。尤其是基础组件需要特别考虑保A弃C的场景,因为这把复杂性暴露给了调用方。
选择CP模型放弃可用性实现分布式锁,一般用于对数据质量要求很高的场合中,而分布式锁的大多数场景恰恰符合要求数据强一致。etcd的Raft和zookeeper的ZAB来保证强一致性,主节点由于某些原因宕机,系统会在从节点中选取出来数据比较新的一个从节点作为新的主节点,从而避免数据丢失等问题。但是放弃可用性一旦网络发生分区,节点之间的信息同步时间可以无限制地延长,如果是热点业务不能及时修复,那也会导致不可预期的损失。
InterProcessMutex:重入锁
InterProcessSemaphoreMutex:不可重入锁
InterProcessReadWriteLock:读写锁
InterProcessMultiLock:联锁
Reentrant Lock::重入锁
Fair Lock:公平锁
MultiLock:联锁
RedLock:红锁
ReadWriteLock:读写锁
Semaphore:信号量
PermitExpirableSemaphore:可过期性信号量
CountDownLatch:闭锁
etcd | zookeeper | redis | |
命令支持 | 原生自带分布式锁命令 | 需要各语言client | 需要各语言client |
CAP | CP | CP | AP |
一致性算法 | Raft | ZAB | 无 |
监听机制 | 基于HTTP/2 -gRPC api的推送watcher | 自己实现的观察者模式watcher | 发布订阅模式 |
编写语言 | GO | JAVA | C |
并发性能 | 中 | 中 | 高 |
运维成本 | 低 | 中 | 低 |
SysDictGroupInfo sysDictGroupInfo = new SysDictGroupInfo();
BeanUtils.copyProperties(sysDictGroupInfoVo, sysDictGroupInfo);
//数据库更新
super.updateById(sysDictGroupInfo);
//redis同步更新
redissonUtil.set(SYS_DICT_GROUP_INFO + sysDictGroupInfo.getId(), JSON.toJSONString(sysDictGroupInfo));
在我们团队中,我们选择了redisson来实现分布式锁。原因如下:
由于项目之中本身就包含了redis组件,不包含etcd和zookeeper,不需要额外引入组件。
在Java项目中,redisson客户端已经实现了非常丰富的分布式锁API,不需要额外封装。
在中台项目中,后期需要支撑大量QPS,redis的分布式锁性能无疑是最好的。
//上写锁(这边是示例,各种重入锁都行)
RReadWriteLock XXXLock = redissonClient.getReadWriteLock(LOCK);
try {
try {
//最多尝试20秒获取锁,2秒钟以后自动解锁
if (dicItemLock.writeLock().tryLock(20,2, TimeUnit.SECONDS)){
//业务
}else{
//在waittime中还没有获取到锁
throw new BadRequestException("请稍后再试");
}
}catch (Exception e) {
//看情况需不需要捕获,不需要就不用catch
}
finally {
XXXLock.writeLock().unlock();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("tryLock fail lockname:{}, reason:{}", LOCK ,e.getMessage());
throw new BadRequestException("请稍后再试");
}
tryLock中的参数看具体业务。
本文作者
杰拉尔,来自缦图互联网中心中台团队。
--------END--------
也许你还想看