1.简介
CyclicBarrier即栅栏,类似于CountDownLatch,能阻塞一组线程直到某个事件发生,栅栏与CountDownLatch的最大区别就是,所有线程必须都到达了栅栏的位置才能继续执行,CountDownLatch一般用于等待一组事件,而栅栏是等待其它线程
关于它和CountDownLatch的具体区别我们看下面:
1.使用方式不一样
CountDownLatch中,调用await方法后,线程会阻塞,需要其它的线程调用countDown,来完成count变为0的操作
在CyclicBarrier中,执行await,即减-1操作,不需要手动调用countDown
2.重用性
CountDownLatch:是一次性的,如果state等于0后,会一直为0,即如果门开了,就不会关了
CyclicBarrier:则在所有的线程通过后,会进行重置,相当于们开之后,又会重新关闭
3.锁区别
CountDownLatch:使用的是基于AQS的共享锁,所有的线程都可以执行countDown来执行减法操作,当state值不为0时,调用await后会进入CLH队列等候,当count值为0的时候,会自动唤醒所有的等待线程
CyclicBarrier:使用的是基于ReentrantLock的独占锁,当count值不为0的时候,会进入condition queue中等待,,当变成0后,最后一个线程会调用signalAll(),所有的线程会进入AQS的CLH对列中等待
1.1.使用示例
public class CyclicBarrierTest {
private final static ExecutorService EXECUTOR = Executors.newFixedThreadPool(3);
private final static CyclicBarrier BARRIER = new CyclicBarrier(3);
public static void main(String[] args) {
System.out.println("3排房间已开好");
for (int i = 0; i < BARRIER.getParties(); i++) {
final String name = "王者荣耀小刘" + i;
EXECUTOR.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(name + "已登录游戏");
BARRIER.await();
System.out.println(name + "进入三排房间");
} catch (InterruptedException e) {
System.out.println(name + "离开游戏");
} catch (BrokenBarrierException e) {
System.out.println(name + "离开游戏");
}
}
});
}
EXECUTOR.shutdown();
}
}复制
输出
3排房间已开好
王者荣耀小刘1已登录游戏
王者荣耀小刘2已登录游戏
王者荣耀小刘0已登录游戏
王者荣耀小刘0进入三排房间
王者荣耀小刘1进入三排房间
王者荣耀小刘2进入三排房间复制
2.源码分析
2.1.类属性分析
public class CyclicBarrier {
//用来控制栅栏是否被打破,当线程把中断时会被打破
private static class Generation {
boolean broken = false;
}
//锁
private final ReentrantLock lock = new ReentrantLock();
//condition队列,当count值大于0的时候,调用awiat后,都会挂起在trip上,
//等于0时调用signalAll唤醒所有的线程,进入CLH队列,抢夺式获取锁
private final Condition trip = lock.newCondition();
//总共需要等待的线程数,通过构造方法传入,一旦传入,不在改变
private final int parties;
//一个runnable对象,在所有的线程通过CyclicBarrier之前会调用这个对象的run方法,可以类似为一个前置操作,如果不需要,我们不用管
private final Runnable barrierCommand;
//当前的Generation。如果栅栏失效或者开闸之后都会自动替换掉。从而实现重置的功能
private Generation generation = new Generation();
//初始值和parties一致,每次线程await后减1,到0的时候表示所有的线程可以通过了
private int count;复制
2.2.构造方法
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
//传入的parties,即需要等待的线程数
this.parties = parties;
//初始化count等于parties
this.count = parties;
//当count值变为0的时候执行
this.barrierCommand = barrierAction;
}复制
提供了两个构造方法,唯一的区别就是,在count值变为0的时候是否会有一个前置方法会执行,这个对象相当于我们的一个钩子对象
2.3.await
在分析这个方法之前,让我们先来看
//线程被中断时会调用
private void breakBarrier() {
//标志
generation.broken = true;
//恢复count值
count = parties;
//唤醒当前这一代中所有的在condition队列上等待的线程
trip.signalAll();
}
//当count值减少为0的时候会被调用,开启下一代
private void nextGeneration() {
// signal completion of last generation
//唤醒最新的一代中所有的在condition队列上等待的线程
trip.signalAll();
// set up next generation
//恢复count值
count = parties;
//开启下一带
generation = new Generation();
}复制
上面的两个方法的调用是处于在获取锁的状态下调用的,下面让我们看await方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}复制
它提供了两个await方法,区别就是,后面一个会控制等的时间
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//上锁
lock.lock();
try {
//当前代
final Generation g = generation;
//如果当前是打破状态(调用breakBarrier()),会报异常
if (g.broken)
throw new BrokenBarrierException();
//如果当前线程被打断
if (Thread.interrupted()) {
/*
1:重置剩余的count,即让count重新等于parties
2:唤醒所有的等待线程,抛异常
3:之所以这么做是因为我们知道正常情况下所有的线程应该是互相等待,既然当前线程被打断了,那么其它线程也就不用互相等待了
*/
breakBarrier();
throw new InterruptedException();
}
//count数-1
int index = --count;
/*
1:如果count==0,即所有的线程都可以通过了,当前代的最后一个线程会执行barrierCommand
2:会唤醒其它所有已经在等待的线程
*/
if (index == 0) { // tripped
boolean ranAction = false;
try {
//执行barrierCommand
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//开启下一代
nextGeneration();
return 0;
} finally {
if (!ranAction) //即调用barrierCommand发生异常了,也会打破当前栅栏
breakBarrier();
}
}
//走到这里就说明count值还不等于0,需要等待
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed) //如果不需要管等待的时间,则直接等待,当线程中断,或者被唤醒会结束等待(await会释放锁的,这个大家要注意)
trip.await();
//等待nanos时间,当到达时间后恢复过来,当然也可以是线程中断,或被唤醒
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//如果线程被中断,且还是处于当前代中,且栅栏没有被打破
if (g == generation && ! g.broken) {
//打破栅栏
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
//否则的话则只中断当前线程
Thread.currentThread().interrupt();
}
}
/*
执行到这里说明线程从wait状态被唤醒了,有几种情况:
1:线程处于await状态,被其它线程中断了
2:线程处于awaitNanos(nanos)状态,等待的时间已经到了
3: 线程执行breakBarrier时候发生异常了
4: 调用reset()方法
*/
//如果当代被打破
if (g.broken)
throw new BrokenBarrierException();
//线程被唤醒后,已经开启了新的一代了,则直接返回count的值
if (g != generation)
return index;
//如果是因为超时异常被唤醒的,打破栅栏,抛异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//解锁
lock.unlock();
}
}复制
关于这一段代码,大家跟着相关注释读,但是需要清除一点,当线程调用await之后它会释放掉当前的锁并会进入Condition对列中等待,当最后一个线程到达时count变成0时,此时调用trip.singelAll线程并没有被唤醒而是会进入AQS对列中继续等待,当最后一个线程调用lock.unlock之后,此时在AQS中的队列中的第一个从阻塞队列中恢复,此时满足条件g!=generation,接着调用lock.unlock,最后跳出循环,后面的线程重复上面的工作
2.4.reset
public void reset() {
final ReentrantLock lock = this.lock;
//获取锁
lock.lock();
try {
//打破栅栏
breakBarrier(); // break the current generation
//开启新一代
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}复制
3.总结
借用一个网上的图,来表示CyclicBarrier的运行流程(地址是:https://mp.weixin.qq.com/s/D71xl3COuHZtPvPg29xBow)
即所有的线程都调用await方法后,才能被触发
到这里我们就比较完全的分析了CyclicBarrier的核心方法,如果分析过程存在错误的,希望大家指出来,最后谢谢大家