cover_image

走近java并发同步器AQS

宽治 缦图coder
2024年01月31日 06:05



图片

点击关注上方蓝字,阅读更多干货~


图片



1.前言


AQS作为JDK提供的并发同步器的抽象类,其有许多实现类,可以应用于不同的业务场景,本篇文章会通过可重入锁的实现ReentrantLock为切人点,深入了解AQS内部实现机制。希望通过阅读这篇文章,能帮助你在将来的业务场景中更加得心应手的运用基于AQS的同步工具类



2.AQS的定义


AQS(AbstractQueuedSynchronizer)抽象队列同步器,提供了基于同步状态、阻塞与唤醒线程及队列模型的基础框架。JDK中许多并发工具类的实现都基于AQS,如ReentrantLock、Semaphore、CountDownLatch等,它们都是基于AQS的抽象,实现不同的锁机制。
从类命名上我们可以得到几个信息:
  1. 它是一个抽象类(可以提供实现的方法和抽象方法)

  2. 它应该是基于队列实现的(队列特性 FIFO)

  3. 它可以解决线程同步问题



3.AQS的特性


3.1 两个队列


  • 一个是同步队列,属于双向队列

  • 一个是条件队列,属于单向队列

图片

图1 同步队列数据结构

通过同步队列完成当前节点的状态记录,同时每个节点还对应一个线程对象,我们可以把它理解为每个节点某一时段都代表一个线程。
  • 节点的加入,表示线程需要等待获取锁资源,进行入队列

  • 节点的退出,表示线程获取到锁资源,进行出队列并执行任务


3.2 一个状态


volatile int state,其是一个整数类型,意为同步状态,也可以理解为资源。该字段的几个方法均为final,即禁止子类覆盖。

资源的状态state维护交由子类调用,由子类通过state判断是否获取锁,以及释放锁是否成功。state代表一种资源,具体的资源分配情况由具体的实现操作。state的更新是基于cas更新的,通过volatile关键词修饰,保障并发时的可见性和有序性。



4.数据结构


4.1 队列定义


  • 双向队列

基于队列,自然链表是最先想到的实现方式,其定义如下:

// AQS静态内部类,作为链表/队列中的数据节点static final class Node {  // 表明是一个共享锁    static final Node SHARED = new Node();  // 表明是一个排它锁    static final Node EXCLUSIVE = null;
// 上面的节点状态,具体枚举定义见下表,通过cas更新 volatile int waitStatus;
// 链表前继节点 volatile Node prev; // 链表后继节点 volatile Node next; // 节点对应的线程对象 volatile Thread thread;
// 下一个等待节点,在条件队列中使用;同时也是用来区分排它锁节点和共享锁节点的标识 Node nextWaiter; //// 此处省略了一些构造函数}

其中节点状态waitStatus的枚举状态值如下:

变量名

变量值
含义

CANCELLED

1

表明当前节点的线程已被取消

SIGNAL

-1

表明下一个节点需要前一节点唤醒,这样下一个节点便可以安心睡眠了

CONDITION

-2

表明线程在等待条件,条件队列才用的上,如ReentrantLock的Condition

PROPAGATE

-3

表明下一个共享节点应该被无条件传播,当需要唤醒下一个共享节点时,会一直传播唤醒下一个直到非共享节点

-
0

初始值,刚竞争资源进入队列的时候的初始状态


  • 条件队列
// AQS内部类public class ConditionObject implements Condition{  // 条件队列第一个节点     private transient Node firstWaiter;  // 条件队列最后一个节点    private transient Node lastWaiter;  // 注意:上文提到Node结构中有一个nextWaiter节点,一个使用场景便是条件队列的下一个节点(看作单向链表结构)。
// 当前线程等待,进入条件队列 public final void await(){} public final long awaitNanos(long nanosTimeout){} // 唤醒基于当前条件等待的一个线程,从第一个开始,加入到同步队列中,等待获取锁资源 public final void signal() {} // 唤醒所有条件等待线程,加入到同步队列中 public final void signalAll() {}}

ConditionObject中提到的await和signal开头的方法,类似于Object的wait()和notify() 方法,都需要获取到锁后调用。


4.2 核心变量

类型
名称
含义

Node

head

队列的节点类型,等待(双向)队列的头节点

Node

tail

队列的节点类型,等待(双向)队列的尾节点

int

state

同步器状态

Thread

exclusiveOwnerThread

继承的变量,表示排它锁模式下线程的持有者


4.3 核心方法


返回值

方法名
说明

void

acquire(int arg)

获取排它锁

boolean

tryAcquire(int arg)

注意:需要子类实现,通过state控制尝试获取锁

boolean

release(int arg)

释放排它锁

boolean

tryRelease(int arg)

注意:需要子类实现,通过维护state尝试释放锁

void

acquireShared(int arg)

获取共享锁

int

tryAcquireShared(int arg)

注意:需要子类实现,通过维护state尝试获取锁

boolean

releaseShared

释放共享锁

boolean

releaseShared

注意:需要子类实现,通过state控制尝试释放锁

还有一些其它方法,如:可中断式获取锁、超时获取锁,此处就不一一展开说明了。


此处补充说明一下共享锁对应的共享节点,共享节点的唤醒是传播的,唤醒的共享节点会加入唤醒的“池子”里进行唤醒后续节点,循环往复可以起到强大的效果,CountDownLatch就是使用的共享锁模式。感兴趣的读者可以自己读下源码,通过下面ReentrantLock的源码分析过程后相信会容易些。


5.AQS的实现与应用


AQS提供了多种实现,用于不同的业务场景,下面我们一起来看一下常见的几种:

AQS实现

应用场景

ReentrantLock

可重入锁,对资源的互斥访问,支持多条件、超时、尝试获取等,如精准阻塞唤醒线程的生产消费模型的实现

ReentrantReadWriteLock

可重入读写锁,用于读写场景,如读多写少的业务场景,利用读读不互斥、读写互斥的特性实现高性能的数据一致性

CountDownLatch

计数器或闭锁,多个线程各持有一个资源,所有线程资源释放后唤醒最终等待的线程,起到线程间通讯的作用,如多线程分片计算最后统计的场景

Semaphore

信号灯、信号量,主要用于控制可以同时访问某种资源的线程个数,如做流量分流,对于公共资源有限的场景,以及数据库连接等

ThreadPoolExecutor

线程池的运用,内部Worker利用AQS实现对独占线程变量的操控

来看一个电商系统中的业务场景:我们在定时批量上架商品的业务场景中,假设上架商品数量多且涉及商品校验等IO操作时,我们需要考虑上架的实时性。


针对该场景很容易想到加入线程池的处理,但因为还要做到批量上架的结果统计,所以又涉及到了多线程之间的协作通讯。此时,线程池加上CountDownLatch是个不错的选择,代码片段如下:
// 代码片段 此处String代替商品modelpublic int handlerOnShelf(List<String> productList) {    AtomicInteger count = new AtomicInteger();    // MAX_HANDLE_THREAD_SIZE 最大处理线程数量    int groupSize = productList.size() / MAX_HANDLE_THREAD_SIZE == 0 ? productList.size()            : productList.size() / MAX_HANDLE_THREAD_SIZE;    // 定义计数器/闭锁    CountDownLatch latch = new CountDownLatch(groupSize);    // 将商品列表进行分组处理    List<List<String>> productListGroups = Lists.partition(productList, groupSize);    for (List<String> group : productListGroups) {        ThreadPool.execute(() -> {            try {                // 处理商品上架并统计                doHandlerOnShelf(group, count);            } catch (Exception e) {             } finally {                // 完成一组校验,调用countDown方法                latch.countDown();            }        });    }    try {        // 等待所有组数据处理完成        latch.await();    } catch (InterruptedException e) { }    // 返回统计结果    return count.get();}

此场景中,CountDownLatch便起到了线程间协作通讯的计数器功能。



6.从ReentrantLock源码看AQS


ReentrantLock意为可重入锁,是一种排它锁的实现,只能一个线程可访问,如果其它线程来竞争资源的话会进入同步队列进行等待,其包括公平与非公平两种方式。

对于这种对临界资源加锁互斥的实现,还有常见的JVM提供的synchronized,源码分析前,我们先对比下两者特性:


ReentrantLock

synchronized

实现机制

依赖于AQS

JVM实现基于对象的监视器锁

可重入性

可重入

可重入

灵活性

更加灵活,支持公平与非公平、超时尝试、多条件的锁等待和唤醒、可中断

相对没那么灵活

加/释放锁

显示调用api加锁,且需要显示释放,同时需要确保异常后也能释放

使用起来简单,不用显示的加锁和释放锁

使用场景

对锁的使用场景需要更加灵活,如可以通过多条件精准阻塞唤醒线程,如jdk本身提供的一些阻塞队列

本来是重量级锁,优化增加了偏向、轻量级锁等,在线程不怎么竞争的情况下或灵活度要求不那么高的场景下更推荐,如常见单例模式的DCL实现


6.1 继承AQS


我们内部定义了一个继承AQS的类:

// ReentrantLock抽象静态内部类abstract static class Sync extends AbstractQueuedSynchronizer {  // 获取锁的抽象方法。为什么抽象往下看  abstract void lock();
// 非公平的获取锁 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // cas尝试,成功的话更新锁的持有线程 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 可重入锁的体现 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
// 关键:释放锁,该访问是实现了AQS的tryRelease方法的 protected final boolean tryRelease(int releases){ int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 如果释放后锁没有了,持有锁的线程标识也置为null if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
// 创建条件,对应AQS的条件队列 final ConditionObject newCondition() { return new ConditionObject(); }}


Sync是个抽象类,包含公平锁FairSync非公平锁NonFairSync两个实现:
// 非公平锁实现static final class NonfairSync extends Sync {  // 非公平获取锁,不用排队,来到就可以试一试尝试获取锁  final void lock() {      // 尝试修改AQS的state从0到1,成功的话表示获取同步锁成功,并设置当前锁持有线程        if (compareAndSetState(0, 1))            setExclusiveOwnerThread(Thread.currentThread());        else      // 否则调用AQS的竞争锁方法            acquire(1);        }
protected final boolean tryAcquire(int acquires) { // 调用Sync的非同步锁尝试获取,实现见Sync return nonfairTryAcquire(acquires); }}
// 公平锁实现static final class FairSync extends Sync {  // 公平锁,没有尝试修改状态,直接获取锁    final void lock() {      // 内部会调用下面的tryAcquire(int acquires)方法        acquire(1);    }    protected final boolean tryAcquire(int acquires) {        final Thread current = Thread.currentThread();        int c = getState();        if (c == 0) {        // 注意:hasQueuedPredecessors() 与非公平锁的区别的地方,对于公平锁,如果队列有节点,直接跳过尝试获取资源            if (!hasQueuedPredecessors() &&                compareAndSetState(0, acquires)) {                setExclusiveOwnerThread(current);                return true;            }        }        else if (current == getExclusiveOwnerThread()) {            int nextc = c + acquires;            if (nextc < 0)                throw new Error("Maximum lock count exceeded");            setState(nextc);            return true;        }        return false;    }}

通过ReentrantLock的内部类及方法定义我们可以看出:

  1. 非公平与公平的区别在于公平锁会调用hasQueuedPredecessors()方法,该方法含义为只要有其它线程比当前线程等待锁时间久就会返回true,即只要同步队列有其它线程节点在等待,则当前线程会直接进入等待,不会尝试获取一次锁。

  2. 可重入锁体现在getExclusiveOwnerThread()方法,在没有资源的情况下,再比较当前线程是否为持有锁的线程,如果是则表示已经持有当前锁,可直接调整资源使用情况。


6.2 核心变量

// ReentrantLock的成员变量,核心的方法都在Sync类体现了private final Sync sync;


6.3 核心方法


返回值

方法名

说明
-

ReentrantLock()

构造器,默认为非公平锁;有参构造器可以指定使用公平锁

void

lock()

加锁

void

lockInterruptibly()

加锁,可中断,线程被中断会抛异常

boolean

tryLock()

尝试获取锁,不会进入AQS同步器队列,仅尝试cas state,成功与失败都会立马返回

void

unlock()

释放锁

Condition

newCondition()

获取条件对象



7.源码分析


7.1 模拟锁竞争


启动两个线程(线程1、线程2)模拟锁竞争(非公平锁为例):

public static void testReentrantLock() {    ReentrantLock reentrantLock = new ReentrantLock();    new Thread(() -> {        System.out.println("线程1开始竞争锁.");        // 排它锁        reentrantLock.lock();        try {            System.out.println("线程1获取锁成功了, 开始执行任务..");            Thread.sleep(5000);        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            System.out.println("线程1完成了要释放锁...");            reentrantLock.unlock();        }    }, "1").start();
new Thread(() -> { try { // 为了让线程1先获取到资源 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("线程2开始竞争锁."); reentrantLock.lock(); try { System.out.println("线程2获取锁成功了, 开始执行任务.."); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("线程2完成了要释放锁..."); reentrantLock.unlock(); } }, "2").start();
}

7.2 输出结果


  • 线程1开始竞争锁...

  • 线程1获取锁成功了, 开始执行任务...

  • 线程2开始竞争锁...

  • 线程1完成了要释放锁...

  • 线程2获取锁成功了, 开始执行任务...

  • 线程2完成了要释放锁...


7.3 调用过程


图片

图2 多线程调用过程

中间一列是标记AQS的队列与同步状态变化情况,其中浅蓝色为同步器状态,浅橙色为同步队列状态。


7.4 模拟锁竞争的源码分析


  • 竞争锁资源
线程1持有锁执行任务时,线程2竞争锁资源。
// java.util.concurrent.locks.ReentrantLock.NonfairSync
final void lock() { // cas尝试获取资源,案例中线程1会获取成功,直接返回; // 线程2会进入else逻辑,竞争锁资源。这里主要看线程2的逻辑 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // 进入AQS竞争锁,继续往下看 acquire(1);}
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) { // 此处的tryAcquire(arg)会尝试cas资源,由AQS子类ReentrantLock实现的 // 1、如果竞争成功则直接返回 // 2、否则调用addWaiter(Node.EXCLUSIVE),创建一个排它锁节点 // 3、再调用acquireQueued进入队列 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
// 继续看 addWaiter(Node.EXCLUSIVE) 创建排它锁节点private Node addWaiter(Node mode) { // 创建节点,传入当前线程,表明线程与节点的对应,mode是排它锁的标识 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 如果尾节点不空 if (pred != null) { // 将当前线程2所在节点的前继节点指向尾节点 node.prev = pred; // cas将当前线程2所在节点设置成尾节点,成功的话则返回true if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 上面cas操作失败的情况下,会通过轮询不断尝试直至成功,并返回节点 // 当前案例会进入当前方法,因为线程1虽然持有锁,但是没有队列,所以pred=null(tail也为null) enq(node); return node;}
// 不断轮询设置尾节点的操作private Node enq(final Node node) { for (;;) { // 无限循环 Node t = tail; if (t == null) { // 没有尾节点,cas一个新节点作为头节点,并且将尾节点也指向它 // 注意:该节点没有对应的线程,可以看作是线程1的 if (compareAndSetHead(new Node())) tail = head; } else { // 再一次循环到此,将线程2的节点设置成尾节点,直至成功。 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }}
// 继续看调用acquireQueued进入队列,已经通过 addWaiter(Node.EXCLUSIVE) 创建排它锁节点final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 中断标识,但如果中断也不会抛异常 boolean interrupted = false; for (;;) { // 循环 // 获取线程2节点的前继节点 final Node p = node.predecessor(); // 如果p节点是头节点,此例是的,所以会再次尝试获取下锁 // 聪明啊,真是不放过没一次机会去尝试,如果刚巧线程1此刻执行完任务释放了锁,直接成功获取锁;当然主要应该为了唤醒后获取锁 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 线程1还没有执行完任务,所以会进入到这里,if中两个判断的方法源码见下文 // parkAndCheckInterrupt()调用LockSupport.park(this);进入线程等待状态,等待唤醒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 如果线程终端,设置interrupted interrupted = true; } } finally { if (failed) // 如果中断/异常,会进行取消节点 cancelAcquire(node); }}
// 继续看 shouldParkAfterFailedAcquire(p, node)private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前继节点的等待状态,AQS介绍已经说过,默认是0,会经过下面的cas改成SIGNAL(-1) int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 如果已经是-1,表示后继节点可以安心睡觉了,前节点锁释放后会唤醒后继节点 return true; if (ws > 0) { // ws大于1表示节点状态已经取消了,可以跳过该节点了 do { // 比较难看懂,从后往前看,pred = pred.prev表示前节点指到再前一个节点; // node.prev = pred当前node节点的前节点指向刚刚的pred。加上后面那句pred.next = node; // 其实就是删除中间的取消节点。 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 将waitStatue cas成signal状态 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}
// 继续看,shouldParkAfterFailedAcquire调用后,最终会返回true,紧接着调用parkAndCheckInterrupt(),进入睡眠状态private final boolean parkAndCheckInterrupt() { // 进入awit,正常唤醒LockSupport.unpark(线程) LockSupport.park(this); // 是否是中断返回 return Thread.interrupted(); }
这里再说明下shouldParkAfterFailedAcquire(p, node) 与parkAndCheckInterrupt()的方法:
  1. shouldParkAfterFailedAcquire(p, node)是在进行前继节点waitStatus的赋值。赋值为-1,表示当前节点要进入睡眠或等待了,需要前继节点在释放锁的时候唤醒。如果cas设置失败了,会有调用该方法的外层进行无限循环确保赋值成功。

  2. parkAndCheckInterrupt()方法,是在shouldParkAfterFailedAcquire返回成功后才会调用,保证当前线程进入队列睡眠前,waitStatus的正确赋值;再进入方法调用LockSupport.park(this),进入睡眠或等待。


竞争调用链参考如下:

图片

图3 锁竞争调用链

一段时间后,线程1完成执行任务了,接下来开始释放锁资源并唤醒线程2。


  • 释放锁资源

// java.util.concurrent.locks.ReentrantLock
public void unlock() { // 核心方法调用,进行锁释放 sync.release(1); }


接着,进入AQS类操作:

// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) { // 尝试释放锁,由AQS子类ReentrantLock实现 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 唤醒h节点线程 return true; } return false;}


下面我们继续看ReentrantLock的tryRelease实现,可见释放锁的时候,对于公平和非公平锁,都是调用Sync类定义的方法

// java.util.concurrent.locks.ReentrantLock#Sync
protected final boolean tryRelease(int releases) { // 减后得到目前还被锁定的资源 int c = getState() - releases; // 如果当前线程不是队列锁持有者,抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; // 如果没有锁了,队列锁持有者置空 setExclusiveOwnerThread(null); } // 不用cas更新状态,会成功,因为当前线程是持有排它锁的 setState(c); return free;}


tryRelease()成功的话,会获取头节点,如果队列有节点(此例中的线程2),会继续调用AQS的unparkSuccessor(h)方法。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) // 更新为0,进行复位 compareAndSetWaitStatus(node, ws, 0);
// 注意:获取头节点的下一个节点进行唤醒的,因为头节点是持有锁的节点。 Node s = node.next; // s.waitStatus表示已经被取消了,会循环从后到前,找到第一个等待中的线程节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 进行唤醒(此处,是唤醒了线程2的) LockSupport.unpark(s.thread);}


此处,我们再补充说明点:
  1. 关于unparkSuccessor方法,我们可以从源码看到其唤醒的是头节点的后继节点,除非后继节点被取消了。但是这并不与是否公平锁冲突,因为在唤醒前已经进行了资源的释放。如果是非公平的锁,此时其它线程可能刚好申请资源拿到锁,那此处唤醒的线程将仍拿不到锁。

  2. 关于队列,第一个线程获取锁后,没有队列,head与tail定义均为null。仅有一个锁持有标识,通过调用getExclusiveOwnerThread()获取。当线程2竞争时,锁已经被其它线程持有了,此时会创建两个节点,一个头节点,没有线程数据,一个是当前未竞争到锁的线程的节点,追加到头节点之后,含有线程数据。


至此,线程1释放锁并唤醒线程2成功。


调用链参考如下:

图片

图4 锁释放调用链
那么,到这里我们仅剩最后一步了,唤醒等待的线程2:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 中断标识,但不会抛异常 boolean interrupted = false; for (;;) { // 循环,被唤醒后会继续进入循环,就可以通过调用tryAcquire(arg)成功了,将当前节点更新为头节点,并释放前节点帮助GC final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && // 上次等待的地方 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}// 至此,唤醒的线程2,获取锁成功,可以执行任务了。

至此,我们通过源码分析了线程2如何竞争锁失败进入队列,线程1释放锁的时候,如何唤醒线程2。



8.总结


通过对ReentrantLock分析,相信你对AQS的同步机制有了更好的理解。AQS对同步状态和队列进行了定义和抽象,JDK基于此提供了如ReentrantLock、CountDownLatch、Semaphore等一系列的实现,让我们可以更加方便的运用到日常的开发当中。



本文作者


宽治,来自缦图互联网中心中台团队。



--------END--------



也许你还想看

  | 从volatile关键字到多核CPU的线程可见性
  | 你还在用MysqlDump同步数据么?
  | 浅谈Spring源码之BeanDefinition



继续滑动看下一个
缦图coder
向上滑动看下一个