前言
上次的文章《并发编程的锁和内存模型》介绍了synchronize和各种锁(偏向锁,自旋锁,轻量级锁以及重量级锁),介绍了Java内存模型的三大特性,引入了volitile这个关键字,详细说明了它的作用和原理,另外介绍了JUC里很多地方用到的思想或叫算法:CAS(比较并交换),今天的重点是JUC里的难点AQS(I think),这块不得不看源码了。
面试环节
- 面试官:那我记得还有一个和CAS名字很像的叫AQS,你能说下吗?
- 我: 可以。
1、AQS(AbstractQueuedSynchronizer)即队列同步器,它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC并发包的 作者Doug Lea期望它成为实现大部分同步需求的基础,然而如他所料,AQS是JUC并发包的核心基础组件。
2、AQS解决了在实现同步器时涉及的大量细节问题,例如获取同步状态、FIFO同步队列。基于AQS来构建同步器可以带来很多好处。它不仅能够极大的减少实现工作,而且也不必处理在多个位置上发生的竞争问题。在基于AQS构建的同步器中,只能在一个时刻发生阻塞,从而降低上下文切换的开销,提高了吞吐量。同时在设计AQS时充分考虑了可伸缩性。因此JUC中,所有基于AQS构建的同步器均可以获得这个优势。
3、AQS的主要使用方式是继承,子类通过继承同步器,并实现它的抽象方法来管理同步状态。AQS使用一个int类型的成员变量state来表示同步状态: 1、当state>0时,表示已经获取了锁。 2、当state=0时,表示释放了锁。
它提供了三个方法,来对同步状态state进行操作,并且AQS可以确保对state的操作时安全的:
getState();
setState(int newState);
compareAndSetState(int expect, int update);
4、另外,AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作。
(1)如果当前线程获取同步状态(锁)失败时,AQS则会将当前线程以及等待状态等信息构造一个节点(Node)并将其加入同步队列,同时会阻塞当前线程。
(2)当同步状态(锁)释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。 - 面试官:你说下AQS都提供了哪些有用的方法?
- 我:AQS主要提供了如下方法(API):
1、getState():返回同步状态的值。
2、setState(int newState):设置当前同步状态。
3、compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能保证状态设置的原子性。
4、tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态。
5、tryRelease(int arg):独占式释放同步状态。
6、tryAcquireShared(int arg):共享式获取同步状态,返回值如果大于等于0,表示获取成功,否则获取失败。
7、tryReleaseShared(int arg):共享式释放同步状态。 8、isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占。
9、acquire(int arg):独占式获取同步状态。如果当前线程获取同步状态成功,则由该方法返回,否则将会进入同步队列等待。该方法将会调用可重写的tryAcquire(int arg)方法。
10、acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断。当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法抛出 InterruptedException异常并返回。
11、tryAcquireNanos(int arg, long nanos):超时获取同步状态。如果当前线程在 nanos 时间内没有获取到同步状态,那么将会返回 false ,已经获取则返回 true 。
12、acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态; 13、acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断。
14、tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制。
15、release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒。
16、releaseShared(int arg):共享式释放同步状态。 - 面试官:你刚才提到AQS的的内部维护了一个FIFO队列,你能讲下这个队列吗?
- 我:该队列就是CLH队列。CLH队列是一个FIFO双向队列(学过数据结构的都应该了解),AQS依赖它来完成同步状态的管理。
1、当前线程如果获取同步状态失败时,AQS会将当前线程已等待状态等信息构造成一个节点(Node)将其加入到CLH同步队列中,同时会阻塞当前线程。
2、当同步状态释放时,会把首节点唤醒(公平锁),先进的先出,使其再次尝试获取同步状态。 - 我:在CLH同步队列中,一个节点(Node)表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next)。其定义如下: Node是AbstractQueuedSynchronizer的静态内部类:
static final class Node {
// 共享
static final Node SHARED = new Node();
// 独占
static final Node EXCLUSIVE = null;
/**
* 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态
*/
static final int CANCELLED = 1;
/**
* 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
*/
static final int SIGNAL = -1;
/**
* 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
*/
static final int CONDITION = -2;
/**
* 表示下一次共享式同步状态获取,将会无条件地传播下去
*/
static final int PROPAGATE = -3;
/** 等待状态 */
volatile int waitStatus;
/** 前驱节点,当节点添加到同步队列时被设置(尾部添加) */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 等待队列中的后续节点。如果当前节点是共享的,那么字段将是一个 SHARED 常量,也就是说节点类型(独占和共享)和等待队列中的后续节点共用同一个字段 */
Node nextWaiter;
/** 获取同步状态的线程 */
volatile Thread thread;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
说明:
1、waitStatus字段:等待状态,用来控制线程的阻塞和唤醒,有5种状态:INITAL/CANCELLED/SINGAL/CONDITION/PROPAGATE。
2、thread字段:Node节点对应的线程Thread。
3、nextWaiter字段:Node节点获取同步状态的模型。tryAcquire(int args)和tryAcquireShared(int args)方法,分别为独占式和共享式获取同步状态。在获取失败时, 它们都会调用addWaiter(Node mode)方法入队。而nextWaiter就是用来表示哪种模式: SHARED:枚举共享模式,EXCLUSIVE:枚举独占模式。
4、predecessor()方法:获取Node节点的前一个Node节点。在方法内部,Node p = prev的本地拷贝是为了避免并发情况下,prev判断完==null时,恰好被修改,从而保证线程安全。
- 面试官:那CLH队列是怎么进行入队和出队操作的呢
- 我: 学习数据结构的我们都知道,CLH入队很简单,如下图所示。
1、tail指向新节点。
2、新节点的rev指向当前最后的节点。
3、当前最后一个节点的next指向入队的节点。
实际上,入队逻辑实现的addWaiter(Node)方法,需要考虑并发情况。它通过CAS方式,来保证正确的添加Node。代码如下:
1: private Node addWaiter(Node mode) {
2: // 新建节点
3: Node node = new Node(Thread.currentThread(), mode);
4: // 记录原尾节点
5: Node pred = tail;
6: // 快速尝试,添加新节点为尾节点
7: if (pred != null) {
8: // 设置新 Node 节点的尾节点为原尾节点
9: node.prev = pred;
10: // CAS 设置新的尾节点
11: if (compareAndSetTail(pred, node)) {
12: // 成功,原尾节点的下一个节点为新节点
13: pred.next = node;
14: return node;
15: }
16: }
17: // 失败,多次尝试,直到成功
18: enq(node);
19: return node;
20: }
出队:CLH同步队列遵循FIFO,首节点的线程释放同步状态后,将会唤醒它的下一个节点(Node.next)。而后继节点将会在获取同步状态成功时,将自己设置为首节点(head)。 这个过程非常简单,head执行该节点并断开原首节点的next和当前节点的prev即可。注意:在这个过程中是不需要使用CAS来保证的,因为只有一个线程,能够成功获取到同步状态。 setHead(Node node)方法,实现上述的出队逻辑,如下图所示:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
- 面试官:你能跟我说下AQS是怎么获取和释放同步状态的呢?
- 我:前面说到,AQS的设计模式是模板方法模式,子类通过继承的方式,实现它的抽象方法来管理同步状态,AQS提供了大量模板方法实现同步,主要分为三类:独占式获取和释放同步状态、共享式获取和释放同步状态、查询同步队列中的等待线程情况。
1、独占式 :同一时刻仅有一个线程持有同步状态。
独占式获取同步状态
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1、tryAcquire:尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。该方法自定义同步组件自己实现,该方法必须要保证线程安全的获取同步状态。
2、addWaiter:如果tryAcquire返回false(获取同步状态失败),则调用该方法将当前线程加入到CLH同步队列尾部。
3、acquireQueued:当前线程会根据公平性原则来进行自旋,直至获取锁为止。
4、selfInterrupt:产生一个中断
5、下面看下acquireQueued方法:这个方法为一个自旋的过程,当前线程(Node)进入同步队列后,就会进入一个自旋的过程,每个节点都会自省的观察,当条件满足获取到同步状态后,就可以退出自旋的过程。从下面代码中可以看到,当前线程会一直尝试获取同步状态,当然前提是只有其前驱节点为头结点才能够尝试获取同步状态,理由:保持FIFO同步队列原则。头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己是否为头节点。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//中断标志
boolean interrupted = false;
/*
* 自旋过程,其实就是一个死循环而已
*/
for (;;) {
//当前线程的前驱节点
final Node p = node.predecessor();
//当前线程的前驱节点是头结点,且同步状态成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//获取失败,线程等待--具体后面介绍
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
还有独占式获取响应中断和独占式超时获取的方法,这里就不详细描述了。
独占式释放同步状态
当线程获取同步状态后,执行完相应逻辑后就需要释放同步状态。AQS提供了release(int arg)方法释放同步状态:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
该方法同样是调用tryRelease(int arg)方法来释放同步状态,释放成功后,会调用unparkSuccessor(Node node)方法唤醒后继节点。
总结:在AQS内部维护了一个FIFO同步队列,当线程获取同步状态失败后,则会加入到这个CLH队列的队尾并一直保持着自旋。在CLH同步队列中的线程在自旋时会判断其前驱节点是否为首节点, 如果是首节点就不断尝试获取同步状态,获取成功就退出CLH同步队列。当线程执行完逻辑后,会释放同步状态,释放后会唤醒其后继节点。
2、共享式
共享式与独占式最主要区别在于同一时刻独占式只能有一个线程获取同步状态,而共享式在同一时刻可以有多个线程获取同步状态。例如读操作可以多个线程同时读,写操作同一时刻只能有一个线程写。
共享式获取同步状态
AQS提供acquireShared(int arg)方法共享式获取同步状态:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
//获取失败,自旋获取同步状态
doAcquireShared(arg);
}
上述方法是先调用tryAcquireShared(int arg)方法尝试获取同步状态,获取失败就调用doAcquireShared(int arg)自旋获取同步状态。
private void doAcquireShared(int arg) {
/共享式节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//前驱节点
final Node p = node.predecessor();
//如果其前驱节点,获取同步状态
if (p == head) {
//尝试获取同步
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
addWaiter(Node.SHARED)这里先把当前线程加入CLH同步队列的队尾,然后循环(自旋)尝试获取同步状态:node.predecessor()表示当前节点的前驱结点,if (p ==head)如果前驱结点 是首节点的话,则调用tryAcquireShared(int args)方法尝试获取同步状态,获取成功(r >=0)就退出循环(自旋),在退出前唤醒下一个等待的节点(也就是设置下一个节点的前驱节点为首节点)。
共享式释放同步状态
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
- 面试官:从你上面列举的源码里,我看到shouldParkAfterFailedAcquire这个方法,貌似是用来阻塞线程的,你能说下AQS是怎么阻塞和唤醒线程的吗?
- 我:在上面讲到,在线程获取同步状态时失败后,则加入CLH同步队列,通过自旋方式不断获取同步状态,但是在自旋过程中则需要判断当前线程是否需要阻塞。在获取同步状态 失败后,线程并不是马上阻塞,需要检查该线程的状态,检查方法为shouldParkAfterFailedAcquire(Node pre, Node node),该方法主要靠前驱结点判断当前线程是否应该被阻塞。
例如上面源码的一部分:
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驱节点
int ws = pred.waitStatus;
//状态为signal,表示当前线程处于等待状态,直接放回true
if (ws == Node.SIGNAL)
return true;
//前驱节点状态 > 0 ,则为Cancelled,表明该节点已经超时或者被中断了,需要从同步队列中取消
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
//前驱节点状态为Condition、propagate
else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这段代码主要检查当前线程是否需要被阻塞,具体规则如下:
1、如果当前线程的前驱节点状态为SINNAL,表示当前线程需要被阻塞,调用unpark()方法唤醒,直接返回true,当前线程阻塞。
2、如果当前线程的前驱节点状态为CANCELLED(ws >0),则表示该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态<=0,返回false。
3、如果前驱节点非SINNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SINNAL,返回false。如果shouldParkAfterFailedAcquire(Node pre, Node node)方法返回false,则调用parkAndCheckInterrupt()方法阻塞当前线程。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
parkAndCheckInterrupt()方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。其内部则是调用LOckSupport工具类的park()方法来阻塞。
- 面试官:下面我们聊聊AQS在JUC里的应用-ReentrantLock吧。你能说下对ReentrantLock的了解吗?
- 我:ReentrantLock是可重入锁,是一种递归无阻塞的同步机制。它可以等同于synchronized的使用,但是提供了比synchronized更强大、灵活的机制,可以减少死锁的概率。
1、ReentrantLock将由最近成功获取锁定并且还没释放该锁定的线程所拥有。如果锁定没有被线程占有,调用lock的线程将成功获取锁。如果当前线程已经拥有该锁,再调用lock()方法会立即返回。
2、Reentrant提供了公平锁和非公平锁的选择。构造方法接收一个可选的公平参数(true表示公平锁,否则为非公平锁)。公平锁和非公平锁的区别在于公平锁的锁获取是有顺序的。但是公平锁的效率往往没有非公平锁高,在多线程访问的情况下,公平锁表现出较低的吞吐量。
下面来看看源码吧
1、lock方法
public void lock() {
sync.lock();
}
Sync为ReentrantLock里的一个内部类,它继承AQS,它有两个子类:公平锁FairSync和非公平锁NonFairSync。ReenTrantLock里面大部分的功能都是委托给Sync来实现的,同时Sync 定义了lock()抽象方法由其子类来实现,默认实现了nonfairTryAcquire(int acquires)方法。 我们来看非公平锁的lock方法
final void lock() {
//尝试获取锁
if (compareAndSetState(0,1))
setExclusiveOwnerThread(Thread.currentThread());
else
//获取失败,调用AQS的acquire(int arg)方法
acquire(1);
}
首先尝试获取锁,如果获取成功,设置锁被当前线程独占。如果获取失败,则调用acquire(1),该方法定义在AQS中,如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里先调用tryAcquire(int arg),在AQS中讲过,这个方法需要同步组件自己实现。在NonfairSync中的实现见下:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取同步状态
int c = getState();
//state==0 表示该锁处于空闲状态
if (c == 0) {
//用CAS方式占用该锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//判断锁持有的线程是否为当前线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//成功获得锁的线程再次获得锁,增加同步状态
setState(nextc);
return true;
}
return false;
}
2、释放锁unlock方法
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//减掉releases
int c = getState() - releases;
//如果释放的不是持有锁的线程,抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//state == 0 表示已经释放完全了,其他线程可以获取同步状态了
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//重新设置同步状态的值
setState(c);
return free;
}
unlock()内部使用Sync的release(int arg)释放锁,release(int arg)是在AQS中定义的。tryRelease也是需要同步组件自己实现。如果释放成功,再判断首节点后面还有等待同步状态的线程,则调用unparkSuccessor(Node node)方法唤醒下一个线程。
- 面试官:那么ReentrantLock和Synchronized有什么异同呢?
- 我:首先他们肯定具有相同的功能和内存语义。不同之处在于以下几点:
1、与synchronized相比,ReentrantLock提供了更多更加全面的功能,具备更强的扩展性。例如时间锁等候,可中断锁等候和锁投票。
2、ReentrantLock还提供了条件Condition,对线程的等待唤醒操作更加详细灵活,所以在多个条件变量和高度竞争锁的地方,ReentrantLock更加适合。
3、ReentrantLock提供了可轮询的锁请求,它会尝试去获取锁,如果成功则继续,否则等到下次运行时处理,而synchronized则一旦进入锁请求要么成功要么阻塞, 所以相对于synchronized来说,ReentrantLock会不容易死锁些。
4、ReentrantLock支持更加灵活的同步代码块,但是使用synchronized时,只能在一个synchronized块结构中获取和释放。
5、ReentrantLock支持中断处理,且性能相对好一些。 - 面试官:那你用过读写锁ReentrantReadWirteLock吗?能介绍下吗?
- 我:直接来看源码吧。ReentrantReadWrite的前面几行很简单,看下Sync类,先看下Sync的所有属性:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/*
下面这块就是将state一分为二,高16位用于共享模式,低16位用于独占模式。
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 取c的高16位值,代表读锁的获取次数 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 取c的低16位值,代表写锁的冲入次数,因为写锁是独占模式 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
/**
这个静态内部类的实例用来记录每个线程持有的读锁数量(读锁重入)
*/
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
/**
ThreadLocal的子类
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
/**
组合使用上面两个类,用一个ThreadLocal来记录当前线程持有的读锁数量
*/
private transient ThreadLocalHoldCounter readHolds;
/**
用于缓存,记录“最后一个获取读锁的线程”的读锁重入次数,所以不管哪个线程获取到读锁后,就把这个值占用,这样
就不用到ThreadLocal中查询map了。在获取-释放读锁的这段时间,如果没有其他线程获取读锁的话,此缓存可以帮助提高性能。
*/
private transient HoldCounter cachedHoldCounter;
/**
第一个获取读锁的线程(并且其未获取读锁),以及它持有的读锁数量。
*/
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
Sync() {
//初始化readHolds这个ThreadLocal属性
readHolds = new ThreadLocalHoldCounter();
//为了保证readHolds内存可见性
setState(getState()); // ensures visibility of readHolds
}
ReentrantReadWriteLock与ReentrantLock一样,其锁主体依然是Sync,它的读锁、写锁都是依靠Sync来实现的,所以ReentrantReadWriteLock实际上只有一个锁,只是在获取读取锁 和写入锁的方式上不一样。它的读写锁其实就是两个类ReadLock和WriteLock。在ReentrantLock中使用一个int类型的state来表示同步状态,该值表示锁被一个线程重复获取的次数。但是读写锁ReentrantReadWriteLock内部维护着一对锁,需要用一个变量维护多种状态,所以读写锁采用“按位切割使用”的方式来维护这个变量,将其切分为两部分,高16位表示读,低16位表示写。分割之后,通过位运算确定读锁和写锁的状态。假如当前同步状态为S,那么写状态=S & 0x0000FFFF(将高16位抹去),读状态=S >>>16(无符号补0右移16位)。
- 我:下面看下写锁的获取。写锁就是一个支持可重入的排他锁。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
//当前锁个数
int c = getState();
//写锁
int w = exclusiveCount(c);
if (c != 0) {
// c !=0 && w == 0表示存在读锁,当前线程不是已经获取写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//超出最大范围
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 尝试获取写锁
setState(c + acquires);
return true;
}
//是否需要阻塞
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//设置获取锁的线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
- 我:下面看下写锁的释放
protected final boolean tryRelease(int releases) {
//如果释放的线程不是锁的持有者,抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//同步状态更新
int nextc = getState() - releases;
//若写锁的新线程数为0,则将锁的持有者设置为null
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
- 我:下面看下读锁的获取
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
//当前线程
Thread current = Thread.currentThread();
//锁的个数
int c = getState();
//计算写锁,如果存在写锁且锁的持有者不是当前线程,直接返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//计算读锁
int r = sharedCount(c);
//readerShouldBlock:读锁是否需要等待(公平锁原则),且小于最大线程数,且CAS设置读取锁状态成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
如果锁没有被任何线程获取,那么当前线程就是第一个获取读锁的线程
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
}
//如果获取读锁的线程为第一次获取读锁的线程,则 firstReaderHoldCount重入数+1
else if (firstReader == current) {
firstReaderHoldCount++;
}
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
- 我:下面看下读锁的释放
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
//当前线程
Thread current = Thread.currentThread();
//如果想要释放锁的线程为第一个获取锁的线程
if (firstReader == current) {
// 仅获取了一次,则需要将firstReader设置为Null,否则-1
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
}
//获取rh对象,并更新“当前线程获取锁的信息”
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//CAS更新同步状态
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
- 我:在读锁获取锁和释放锁的过程中,可以看到一个变量rh(HoldCounter),该变量在读锁中很重要。为了更好理解HoldCounter,我们暂且认为它不是一个锁的概率,而相当于一个计数器。一次共享锁的操作就相当于在该计数器的操作。获取共享锁,则该计数器+1,释放共享锁,该计数器-1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作,所以HoldCounter的作用就是当前线程持有共享锁的数量,这个数量必须与线程绑定在一起,否则操作其他线程锁就会抛出异常。
/**
HoldConter定义比较简单,就是一个计数器count和线程id两个变量。
*/
static final class HoldCounter {
int count = 0;
final long tid = getThreadId(Thread.currentThread());
}
/**
通过ThreadLocal,HoldCounter就可以与线程绑定了,故而,HoldCounter应该就是绑定线程上的一个计数器,而ThreadLocalHoldCounter则是线程绑定的ThreadLocal。
*/
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
- 面试官:你能说下线程之间的同步除了Object的wait/notify,还有其他什么方法吗?
- 我:还有Condition。Lock提供了条件Condition,对线程的等待唤醒操作更加详细和灵活。Condition是一种广义上的条件队列。他为线程提供了一种更为灵活的等待/通知模式, 线程在调用await方法后执行挂起操作,直到线程等待的某个条件为真时才会被唤醒。Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。一张图对比Condition与Object的监视器方法。
- 面试官:Condition提供了哪些方法来阻塞和唤醒线程?
- 我:Condition提供了一系列的方法来阻塞和唤醒线程:
1、await():造成当前线程在接到信号或被中断之前一直处于等待状态。
2、awiat(long time, TimeUnit unit):造成当前线程在接到信号、被中断或到达执行等待时间之间一直处于等待状态。
3、awaitNanos(long nanosTimeout):造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在 nanosTimeout之前唤醒,那么返回值=nanosTimeout-消耗时间,如果返回值<=0,则可以认定它已经超时了。
4、awaitUninterruptibly():造成当前线程在接到信号之前一直处于等待状态。(该方法对中断不敏感)。
5、awaitUntil(Date deadline):造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true, 否则表示到了指定时间,返回false。
6、signal():唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
7、signalAll():唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。 - 面试官:Condition是怎么实现线程的阻塞和唤醒的?(原理)
- 我:先看下源码.获取一个Condition必须通过Lock的newCondition方法,该方法定义在接口Lock下,返回的结果是绑定到此Lock实例的新Condition实例。 Condition为一个接口,仅有一个实现类ConditionObject。ConditionObject又是AQS的一个内部类。 在ReentrantLock中
public Condition newCondition() {
return sync.newCondition();
}
在Sync中
final ConditionObject newCondition() {
return new ConditionObject();
}
在AbstractQueuedSynchronizer中:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
private transient Node firstWaiter;
private transient Node lastWaiter;
public ConditionObject() { }
- 我接着说: AQS等待队列与Condition队列是两个相互独立的队列。
1、await()就是在当前线程持有锁的基础上释放锁资源,并新建Condition节点加入到Condition的队列尾部,阻塞当前线程。
2、signal()就是将Condition的头结点移动到AQS等待节点尾部,让其等待再次获取锁。
以下是AQS与Condition队列的出入节点的示意图,可以通过这几张图看出线程节点在两个队列中的出入关系和条件。
1、初始化状态:AQS等待队列有3个Node,Condition队列有1个Node(也可能一个都没有)。
2、节点1执行Condition.await()
(1)将head后移。
(2)释放节点1的锁,并从AQS等待队列中移除。
(3)将节点1加入到Condition的等待队列中。
(4)更新lastWaiter为节点1。
3、节点2执行Condition.signal()操作
(5)将firstWaiter后移。
(6)将节点4移除Condition队列。
(7)将节点4加入到AQS的等待队列中去。
(8)更新AQS的等待队列的tail。
- 面试官:说来容易,还得实践,你能用Condition实现下生产者消费者吗?
- 我:拿起了笔,花了五分钟写了个Demo:
/**
* Condition实现简单的生产者消费者
*/
public class ConditionDemo {
private LinkedList<String> buffer; // 容器
private int maxSize; //容量
private Lock lock;
private Condition fullCondition;
private Condition notFullCondition;
ConditionDemo(int maxSize) {
this.maxSize = maxSize;
buffer = new LinkedList<>();
lock = new ReentrantLock();
fullCondition = lock.newCondition();
notFullCondition = lock.newCondition();
}
/**
* 生产者
* @param produceStr
* @throws InterruptedException
*/
public void set(String produceStr) throws InterruptedException {
//获得锁
lock.lock();
try {
while (maxSize == buffer.size()) {
notFullCondition.await();
}
buffer.add(produceStr);
fullCondition.signal();
} finally {
//释放锁
lock.unlock();
}
}
/**
* 消费者
* @return
* @throws InterruptedException
*/
public String get() throws InterruptedException {
String consumeStr;
lock.lock();
try {
while (buffer.size() == 0) {
fullCondition.await();
}
consumeStr = buffer.pollFirst();
notFullCondition.signal();
} finally {
lock.unlock();
}
return consumeStr;
}
}
往期精彩回顾
关注公众号阅读更多精彩文章
今天面试了吗系列
Java并发编程系列:
tech.souyunku.com684490…
redis:
tech.souyunku.com684490…
spring:
tech.souyunku.com684490…
mybatis:
tech.souyunku.com684490…
数据库系列
mysql索引:
tech.souyunku.com684490…
数据库锁:
tech.souyunku.com684490…
分库分表:
tech.souyunku.com684490…
数据库事务:
tech.souyunku.com684490…
线上问题系列
tech.souyunku.com684490…