简介
在日常开发中经常会遇到需要在主线程中开启多个线程去并行执行任务,并且主线程需要等待所有子线程执行完毕后再进行汇总的场景。
在CountDownLatch出现之前一般都使用线程的join()方法来实现这一点,但是join方法不够灵活,不能够满足不同场景的需要,所以JDK提供了CountDownLatch这个类.
CountDownLatch是一种同步辅助工具,允许一个或多个线程等待,直到其他线程执行的一组操作完成。
CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。其它线程调用countDown方法会将计数器减1,当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
注意:这是一种一次性工具,即无法重置计数。
如果需要能重置计数的,就要用CyclicBarrier.
应用场景
1、 等待所有子线程执行完毕后,主线程再往下执行.
2、 让所有线程等待,直到latch计数为0后,再同时执行.
例子
下面的例子是利用CountDownLatch实现所有子线程同时启动,主线程等待所有子线程全部执行完毕后,主线程再往下执行.
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch downLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
new Thread(new worker(startLatch,downLatch),"线程:"+i).start();
}
TimeUnit.SECONDS.sleep(1); //休眠1秒,保证所有线程已经调用了await方法
System.out.println("所有任务都已启动");
startLatch.countDown();
System.out.println("等待所有任务完成");
//主线程阻塞等待,直到所有子线程完成
downLatch.await();
System.out.println("所有任务完成");
}
}
class worker implements Runnable{
private CountDownLatch startLatch;
private CountDownLatch downLatch;
public worker(CountDownLatch startLatch, CountDownLatch downLatch) {
this.startLatch = startLatch;
this.downLatch = downLatch;
}
@Override
public void run(){
System.out.println(Thread.currentThread().getName()+"准备工作完成");
try {
startLatch.await(); //等待所有工人准备完毕后再开始工作
} catch (Exception e) {
e.printStackTrace();
}
work();
try {
System.out.println(Thread.currentThread().getName()+"工作完成");
downLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
private void work() {
System.out.println(Thread.currentThread().getName()+"开始工作");
try {
TimeUnit.SECONDS.sleep(RandomUtil.randomInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
打印结果:
CountDownLatch与join方法的区别
一个区别是,调用一个子线程的join()方法后,该线程会一直被阻塞直到子线程运行完毕,而CountDownLatch则使用计数器来允许子线程运行完毕或者在运行中递减计数,也就是CountDownLatch可以在子线程运行的任何时候让await方法返回而不一定必须等到线程结束.
另外,使用线程池来管理线程时一般都是直接添加 Runable 到线程池,这时候就没有办法再调用线程的join方法了,就是说countDownLatch相比join方法让我们对线程同步有更灵活的控制 。
CountDownLatch实现原理
从CountDownLatch的名字就可以猜测其内部应该有个计数器,并且这个计数器是递减的.
private final Sync sync;
private static final class Sync extends AbstractQueuedSynchronizer {
//省略
}
CountDownLatch内部是使用Sync这个内部静态类来实现的,Sync又继承AbstractQueuedSynchronizer,所以底层是使用AQS实现的。
常用方法:
构造函数
通过下面的构造函数,可以发现,初始化时实际上是把计数器的值赋给了AQS的状态变量state,也就是这里使用AQS的状态值来表示计数器值。
注意:CountDownLatch初始化时,必须给定count,用来定义准备阻塞多少数量的线程.
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
void await()方法
当线程调用CountDownLatch对象的await方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回:
- 当所有线程都调用了CountDownLatch对象的countDown方法后,也就是计数器的值为0时;
- 其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程就会抛出InterruptedException异常,然后返回。
下面看下在await()方法内部是如何调用 AQS 的方法的:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
从以上代码可以看到,await()方法委托sync调用了AQS的acquireSharedInterruptibly方法.
后者的代码如下:
//AQS获取共享资源时可被中断的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果线程被中断则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//查看当前计数器值是否为0,
//若为0,tryAcquireShared方法返回1,不符合if条件,则直接返回;
//若不为0,则进入AQS的队列等待
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//sync类实现的AQS的接口
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
由如上代码可知,该方法的特点是线程获取资源时可以被中断,并且获取的资源是共享资源.
acquireSharedInterruptibly()方法首先判断当前线程是否己被中断,若是则抛出异常,否则调用sync实现tryAcquireShared()方法查看当前状态值(计数器值)是否为0,是则当前线程的 await()方法直接返回,否则调用AQS的doAcquireSharedInterruptibly()方法让当前线程阻塞。另外可以看到,这里tryAcquireShared传递的arg参数没有被用到,调用tryAcquireShared的方法仅仅是为了检查当前状态值是不是为0,并没有调用CAS让当前状态值减1.
boolean await(long timeout, TimeUnit unit)方法
当线程调用了CountDownLatch对象的该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回:
- 当所有线程都调用了CountDownLatch对象的countDown方法后,也就是计数器值为0时,这时候会返回true;
- 设置的timeout时间到了,因为超时而返回false;
- 其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出InterruptedException异常,然后返回。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
void countDown()方法
线程调用该方法后,计数器的值递减,递减后如果计数器值为0则唤醒所有因调用await方法而被阻塞的线程,否则什么都不做。
下面看下countDown()方法是如何调用AQS方法的:
public void countDown() {
//委托sync调用AQS的方法
sync.releaseShared(1);
}
//AQS的方法
public final boolean releaseShared(int arg) {
//调用sync实现的tryReleaseShared
if (tryReleaseShared(arg)) {
//AQS的释放资源方法
doReleaseShared();
return true;
}
return false;
}
在如上代码中,releaseShared首先调用了sync实现的AQS的tryReleaseShared方法.
其代码如下:
//sync的方法
protected boolean tryReleaseShared(int releases) {
//循环进行CAS,直到当前线程成功完成CAS使计数器值(状态值state)减1并更新到state
for (;;) {
int c = getState();
//如果当前状态值为0则直接返回 (1)
if (c == 0)
return false;
//使用CAS让计数器值减1 (2)
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
如上代码首先获取当前状态值(计数器值).代码(1)判断如果当前状态值为0则直接返回false,从而countDown()方法直接返回;否则执行代码(2)使用CAS将计数器值减1,CAS失败则循环重试,否则如果当前计数器值为0则返回true,返回true说明是 最后一个线程调用的countdown方法,那么该线程除了让计数器值减1外,还需要唤醒因调用CountDownLatch的await方法而被阻塞的线程,具体是调用AQS的doReleaseShared方法来激活阻塞的线程。
这里代码(1)貌似是多余的,其实不然,之所以添加代码(1)是为了防止当计数器值为0后,其他线程又调用了countDown方法,如果没有代码(1),状态值就可能会变成负数。
注意:如果CountDownLatch内部计数器由于程序错误而永远无法达到0,那么相应实例上的等待线程会一直处于WAITING状态.
避免该问题的出现有两种方法:
- 确保所有CountDownLatch.countDown()调用都位于代码中正确的位置,比如finally块中
- 使用CountDownLatch.await(long,TimeUnit)这个方法,给等待线程指定一个时间限制,如果超时了,等待线程会自动唤醒.
long getCount()方法
获取当前计数器的值,也就是AQS的state的值.下面看下代码:
public long getCount() {
return sync.getCount();
}
int getCount() {
return getState();
}
由如上代码可知,在其内部还是调用了AQS的getState方法来获取state的值(计数器当前值)。
小结
CountDownLatch底层是使用AQS实现的。使用AQS的状态变量来存放计数器的值。首先在初始化CountDownLatch时设置状态值(计数器值),当多个线程调用countDown方法时实际是原子性递减AQS的状态值。当线程调用await方法后当前线程会被放入AQS的阻塞队列等待计数器为0再返回。其他线程调用countDown 方法让计数器值递减l,当计数器值变为0时,当前线程还要调用AQS的doReleaseShared方法来激活由于调用await()方法而被阻塞的线程.