AQS,是Abstract Queue Synchronizer的简称,即抽象的队列同步器。
实现了一个依赖于先进先出(FIFO)等待队列的用于实现阻塞锁和相关同步器(semaphores,events,etc)的框架。该类被设计为大多数类型的同步器的基础,这些同步器依赖于单个原子int值来表示状态。子类必须定义改变这个状态的受保护的方法,这些方法定义了这个状态对于被获取或释放的对象意味着什么。基于此,这个类中的其他方法执行所有的排队和阻塞机制。
子类应该定义为非公共的内部帮助器类,用于实现其封闭类的同步属性。类AbstractQueuedSynchronizer不实现任何同步接口。相反,它定义了像acquireInterruptibly(int)这样的方法,这些方法可以被具体的锁和相关的同步器调用来实现它们的公共方法。
接下来,我们来看看ReentrantLock是如何借助AQS来实现独占锁的。
加锁流程
// ReentrantLock
public void lock() {
sync.lock();
}
// ReentrantLock 内部类 NonfairSync
final void lock() {
if (compareAndSetState(0, 1)) // 1
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // 2
}
在第一步中,一上来就使用CAS对 state 变量进行设值,如果设值成功的话,lock 方法直接返回,表明了加锁成功。此处体现了非公平锁的特性。
第一步没有成功获得锁的话,该线程将进入队列等待获得锁。即第二步的操作。
// AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 1
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 2
selfInterrupt();
}
对于AQS中的acquire方法,实现了申请锁的过程。
其中的 tryAcquire 是一个抽象方法,对于ReentrantLock类来说,在 NonfairSync 类中实现。用于维护表示加锁状态的原子变量 state
// NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// ReentrantLock 中实现了 AbstractQueuedSynchronizer 的 Sync,
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 1
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 2
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
第一步先通过 getState() 方法获取表示加锁状态的原子变量 state,如果 state = 0 则说明当前没有线程持有锁。先通过 CAS 对 state 变量进行设值(即申请锁的一个操作),设值成功则直接返回 true。回到上层调用的 acquire 方法中,将直接返回,表明成功获得锁。
第二步判断当前线程是否持有锁的线程,如果是说明进行锁的重入操作,增加 state 变量,如果 state > 1,state 将表示当前持有锁的线程进行锁重入的次数,此时 nonfairTryAcquire 方法返回true,回到上层调用的 acquire 方法中,将直接返回,表明成功获得锁。
除了以上两种情况之外,该线程无法获得锁,返回false。需要进入等待队列中申请锁的流程。
再看一下 AQS 的 acquire 方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
当 tryAcquire 返回 false 时,将调用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 将当前线程入队,开始申请锁的生涯。。
先来看一下 addWaiter 方法。
/**
* 已给定的 mode 为当前线程创建一个节点,并入队
* Creates and enqueues node for current thread and given mode.
* mode 分为 Node.EXCLUSIVE 排他锁;Node.SHARED 共享锁
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
该方法的职责就是将当前线程进行包装为Node对象,并入队到等待锁的队列中。
当 tail 为空时,说明等待队列为空,需要进行必要的头结点初始化。
都已经准备进入锁等待队列的线程,说明在这之前存在其他线程持有锁,那么为什么此处会出现 tail 节点为空的情况?
原因:假设线程A持有锁,线程B尝试加锁时发现锁被其他线程征用了。线程B需要进入锁等待队列,当在准备入队时,线程A释放了锁,此时 tail 会成为空。这也是这里需要额外判断 tail 的值的原因
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter 方法将返回代表当前线程的Node对象,传给 acquireQueued 方法以实现排队等待锁。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 1
if (p == head && tryAcquire(arg)) { // 2
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 3
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1、 获取当前线程节点的前驱结点
2、 前驱节点为 head,说明在当前线程之前没有其他线程入队,此时再次尝试加锁。加锁成功则返回false,回到上层调用的 acquire 方法中直接返回表示加锁成功
3、 如果前驱节点不是 head,说明当前线程还未获得申请锁的资格,需要挂起。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
如果获取不到资源,当前线程将在队列上阻塞等待。阻塞由LockSupport支持来实现,如果在等待过程中线程发生中断不会立即响应,而是在获取到资源之后,重新设置上中断状态。由上层应用代码来决定如何处理。
要理解 acquireQueued 方法的逻辑,我们有必要先来了解在队列中表示线程的 Node 节点的状态。
对于普通队列,初始的节点状态为0;对于条件队列,初始的节点状态为CONDITION
// 表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
// 表示后续节点在等待当前节点唤醒。后续节点入队时会将前去节点的状态更新为SIGNAL。看shouldParkAfterFailedAcquire方法
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
// 表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
// 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
因此,我们可以总结出 acquireQueued 方法的整体逻辑:通过死循环,实现线程的自旋过程。如果当前线程的前驱结点是队首,那么尝试获取资源,获取到则线程返回。如果当前线程的前驱结点不是队首或者获取资源失败,先检查前驱结点状态,当前驱结点状态为Node.SIGNAL,挂起当前线程;如果前驱结点状态 > 0说明前驱结点被取消,需要维护队列,将当前节点的前驱结点指向到有效节点上;如果前驱结点的状态为0或者Node.PROPAGATE时,将被修改为Node.SIGNAL,当前线程进入下一个循环,下个循环中,如果前驱结点状态为Node.SIGNAL,将挂起当前线程,进入WAITING状态,等待前驱结点唤醒或者线程被中断;而且不会立即响应中断,而是在获取到资源后将中断状态返回
解锁流程
释放共享资源
public final boolean release(int arg) {
// 调用子类同步器实现的tryRelease释放共享资源
if (tryRelease(arg)) {
Node h = head;
// 如果发生过锁竞争,等待队列不为空,且当前线程的后继节点线程需要被唤醒(后继节点入队时,会修改前驱结点的waitStatus = Node.SIGNAL)
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// Sync#tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 重入体现在这里,只有当state = 0时才释放锁。意味着lock、unlock要成对出现。否则将无法正确释放锁
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
// AbstractQueuedSynchronizer#unparkSuccessor
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 为什么从后往前遍历?因为节点入队时,是通过CAS的方式设置tail节点后,再设置此前tail.next的值。这两个操作不是原子的,可能出现链表中插入了节点,但是遍历不到最后一个节点。具体可以看看enq(Node node)方法
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
取消获取资源
对于取消获取资源的操作,发生在中断或者timeout的场景中。
实际上,取消等待就是将需要取消的节点从CLH队列中出队。分三种情况讨论:
- 当前节点node是tail节点,直接出队即可。设置node前驱结点的next为null
- 当前节点node不是tail也不是head的后继节点:需要更改前驱结点的状态为Node.SIGNAL,前驱结点的next为node.next
- 当前节点时head的后继节点,直接唤醒当前节点的后继节点线程。由该线程进行当前节点node的出队。体现在AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire方法上
/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// 进行出队操作,分三种场景考虑
// If we are the tail, remove ourselves.
// 场景一:当前节点是尾结点,很好处理,直接出队即可
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
// 场景二:当前节点node不是tail而且不是head的后继节点。
int ws;
if (pred != head &&
// 修改node前驱结点状态为SIGNAL,并设置前驱结点的next为node.next
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 场景三:node是head的后继节点,直接唤醒node的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}