本期内容:
背景
ReentrantLock与AQS的关联
lock加锁
unLock解锁
小结
一、背景
AQS全称AbstractQueuedSynchronizer,名为抽象同步队列。它是一种由Doug Lea开发的,基于先进先出(FIFO)等待队列的实现了阻塞锁及相关同步操作的一种框架。
JUC下的相关并发工具类如CountDownLatch、ReentrantLock、Worker等都是依赖此框架,本文将借助ReentrantLock的lock()方法和unLock()方法详细分析AQS保证线程同步的原理,让我们抱着知其然更要知其所以然的谦卑学习态度逐行走进Doug Lea的世界。
二、ReentrantLock与AQS的关联
ReentrantLock是一种基于AQS的用于保证线程同步的一种JDK原生工具类,其内部基于AQS实现了公平锁FairSync和非公平锁NonfairSync,先对它混个脸熟:
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// 篇幅过长。。。
}
public static void main(String[] args) {
// 公平锁,参数为false则创建的是非公平锁
ReentrantLock reentrantLock = new ReentrantLock(true);
// 加锁
reentrantLock.lock();
// 业务逻辑
doSomeThing();
// 解锁
reentrantLock.unlock();
}
public static void doSomeThing() {
}
// 这是AQS提供的加锁方法,这里先直接介绍各主要方法是干什么的,后面会一个一个进行拆解
public final void acquire(int arg) { // arg:1
// 1、尝试加锁
if (!tryAcquire(arg) &&
//2、加锁失败入队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 3、还原用户行为
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) { // acquires:1
// 1、获取当前线程
final Thread current = Thread.currentThread();
// 2、获取锁状态state,先记住一个点:无锁时默认state=0,上锁成功后state+1
int c = getState();
// 3、无锁,去看能不能加锁
if (c == 0) {
// 4、hasQueuedPredecessors()是个非常重要的的方法,这里的作用的是看要不要入队,如果不用则去尝试加锁,加锁其实就是利用CAS重新设置state的值
if (!hasQueuedPredecessors() &&
// 5、CAS加锁(修改state状态)
compareAndSetState(0, acquires)) {
// 6、将持有锁的线程设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 7、判断当前线程是不是持有锁的线程,如果是,则将state+1。这里也可以看出来ReentrantLock是支持重入的
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 上面的获取锁状态
protected final int getState() {
return state;
}
// 就是AQS里面的一个int变量
private volatile int state;
static final Node EXCLUSIVE = null;
private Node addWaiter(Node mode) { // mode就是EXCLUSIVE常量null
// 1、创建一个新的节点,下面贴出了该构造方法,节点存储了当前线程thread还有前后节点的引用pred和next
Node node = new Node(Thread.currentThread(), mode);
// 2、如果对尾tail不为null,说明队列已经初始化,那么将新节点的pred指向tail,tail的next指向新节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 3、队列未初始化(tail为null)或者两个线程同时入队时某一线程入队失败(CAS入队),再次维护队列关系
enq(node);
return node;
}
// 上面的构造方法
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 通过循环CAS维护队列关系
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 1、初始化队列
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 2、维护队列关系
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {// node:新节点, arg:1
// 1、是否失败的一个标识,用于执行cancelAcquire()出队操作(后面分析,这边先看当前正常流程)
boolean failed = true;
try {
// 2、是否被打断的一个标识
boolean interrupted = false;
for (;;) {
// 3、如果当前节点的前节点是head那么重新执行tryAcquire()去尝试加锁,为什么这样设计呢?
// 还是那句话,通过自旋保证线程不立即park,毕竟线程挂起可是个重量级操作
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 3.1、加锁成功将当前节点设置为head并将其next置为null以便于垃圾回收
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 4、将新节点的前节点的waitStatus置为-1,0是默认状态,-1是线程挂起状态(后面分析)
if (shouldParkAfterFailedAcquire(p, node) &&
// 5、阻塞当前线程,下面贴出了该方法,
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 6、特殊情况,出队操作,相当复杂,后面分析
cancelAcquire(node);
}
}
// 阻塞操作
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 还原用户行为,interrupted()重复调用会改变线程中断状态
return Thread.interrupted();
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //pred:前节点, node:当前节点
int ws = pred.waitStatus;
// 1、已经被设置了,直接返回
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 2、跳过取消的节点,对应解锁时的第2种情况,记得回头过来体会一下~
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 3、利用CAS修改前节点的waitStatus状态为-1(Node.SIGNAL)
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 看下5)就知道为什么要调用该方法了,因为interrupt()重复调用会改变线程中断状态,这里用于还原用户行为,好好体会下~
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
private void cancelAcquire(Node node) {// node是当前节点
// 1、前置判断
if (node == null)
return;
// 2、既然要出队了,那么对线程的引用自然就无意义了
node.thread = null;
// 3、找到前置有效节点,并且将下面出队的第二种情况的节点移除队列
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
// 4、要出队了,将节点状态修改为取消,即1
node.waitStatus = Node.CANCELLED;
// 5、出队第一种情况:当前节点为tail尾节点,利用CAS取消与前节点前后关系
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 6、出队第二种情况:当前节点即不是尾节点也不是head头节点的下一个节点
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
// 为什么当前节点不是tail尾节点了还要判断next != nul呢?
// 在看看下面的addWaiter()方法
if (next != null && next.waitStatus <= 0)
// 将当前节点的前节点的next指向当前节点的下一个节点
// 那当前节点的下一个节点的pred为什么不用指向当前节点的前节点呢
// 这个是由其他线程操作的,看看这里面的序号3,还有一种情况就是其他线程调用shouldParkAfterFailedAcquire()时,里面有标记哦
// 回头看看,好好体会下~
compareAndSetNext(pred, predNext, next);
} else {
// 7、出队的第三种情况:当前节点为头节点,头节点要出队了,自然要唤醒下一个线程了
// 具体代码实现见后面的解锁唤醒线程
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node; // 不是原子操作,当这一步还没执行到的时候上述方法的next就会为null
return node;
}
}
enq(node);
return node;
}
final void lock() {
// 1、先直接加锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 2、加锁失败再走公平锁的原有逻辑
acquire(1);
}
// 老规矩,先大致了解下释放锁的主要流程
public final boolean release(int arg) {// arg:1
// 1、释放锁,即将锁状态state改为0
if (tryRelease(arg)) {
Node h = head;
// 2、锁释放成功,看是否要唤醒下一个线程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) { // releases:1
// 1、将当前锁状态state减1
int c = getState() - releases;
// 2、如果不是当前持有锁的线程来释放锁直接抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 3、如果state为0了,表示释放锁成功,那么将持有锁的线程置为nul,从这里也可以看出ReentrantLock是支持重入的
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 4、重新设置state的值
setState(c);
return free;
}
protected final void setState(int newState) {
state = newState;
}
private void unparkSuccessor(Node node) { // node为当前节点
// 1、当前节点释放锁了,将其状态改为初始值0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 2、找到下一个节点
// 既然要唤醒下一个节点了,那个节点s怎么还会为null呢?这个问题前面已经回答了,详见cancelAcquire()方法
// 下个节点s的状态waitStatus大于0说明该节点被取消了
if (s == null || s.waitStatus > 0) {
s = null;
// 从队尾往前遍历找到离当前节点最近的一个正在挂起的线程
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 3、如果有正在挂起的线程,将其唤起
if (s != null)
LockSupport.unpark(s.thread);
}
牛年邀牛人
一起战斗、一起成长
技术、产品、UED、运营、职能等海量岗位
玩物得志期待你的加入