AQS简介以及类结构
从源码看应用案例
看过很多关于java中AQS的文章,每次看完总有种意犹未尽的感觉,这次决定通过自己的思路和学习路线来重新理解它,欢迎各位同学一起讨论,PS:本文简单易懂,目的是把阅读的你看懂看会,不会把你看蒙;
01
—
AQS简介类结构
AQS
全称为:AbstractQueuedSynchronizer,大体从字面翻译就是“基于队列的抽象同步器”,我们都直到队列的性质就是先进先出(FIFO),这个在AQS中用Node来组成;
至于AQS的作用,官方文档是这样描述的:Provides a framework for implementing blocking locks and related synchronizers that rely on first-in-first-out (FIFO) wait queues;翻译过来大概的意思就是:提供一个框架,用于实现阻塞锁和依赖先进先出(FIFO)等待队列的相关同步器;简单点说就是:你用AQS作为基础,去实现各种各样的同步器,例如:Lock、信号量、CountDownLatch等;
类结构
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
略去中间无关代码...
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
//内部类ConditionObject
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
略去各种方法...
}
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
复制
从上面的类图可以看出,AQS的结构也不复杂,通过声明一个volatile类型(保证可见性)的变量state,然后用CAS(保证原子性)操作这个state,使用内部类Node将请求资源的线程封装,使用内部类ConditionObject来实现线程间的通信;废话少说来看代码
02
—
应用案例
ReentrantLock
直接看lock方法,有公平锁和非公平锁两种实现:
先看公平锁的实现:
1.进入acquire方法,这里会调用tryAcquire方法
2.进入tryAcquire方法
1和2是获取锁成功的情况,在aquire方法中,当tryAquire失败时,首先会进入addWaiter方法,可以看到将线程包装,放进队列
enq方法中,首次进入会初始化一个空的Node做为head,此时tail也指向head
然后将线程被包装的Node添加到尾部:
从addWaiter方法出来后会进入aquireQueued方法,改方法也简单,通过两次循环将等待“锁”的线程,这里说的等待锁就是获取state变量时,state不是等于0那个线程第一次循环将Node的状态设置为signal=-1(shouldParkAfterFaildAquire)
第二次循环进入parkAndCheckInterupt将线程挂起
总结lock方法过程很简单
state=0则表示可以获取资源,当前线程将state置为1
如果state=1,表示别人已经设置过state了,则将当前线程封装进Node,加入队列尾部,并将线程挂起
再来看unlock方法,首先进入release方法
release方法中,先调用tryRelease尝试释放锁,如果成功,则调用unparkSuccessor来唤醒下一个Node
其中,tryRelease方法先判断当前释放锁的线程是否是持有锁的线程,正所谓:解铃还须系铃人;c=0说明释放后已经没有其他线程持有该锁(共享),或者某个线程已经释放该可重入锁,所以设置当前持有锁的线程为null;最后setState更新state值
随后进入的unparkSuccessor中,先判断后一个节点的状态是否大于0,大于0说明下一个节点的状态为CANCELLED,则从尾节点反向遍历,找到第一个状态小于0的节点唤醒它;
总结一下unlock方法很简单
尝试释放锁(更新state),唤醒下一个Node,就两步...
再看一下非公平锁的实现
可以看到非公平锁多了if里的代码,线程进来直接获取锁,如果compareAndSetState成功,则将持有锁的线程设为自己,否则调用acquire(到这就和公平锁的实现一样了),所以可以看到,ReentrantLock的公平和非公平指的就是抢锁的一瞬间;
打个比方就是:一群人排队上厕所,突然跑过来一个人直接插队,如果他正赶上里面的人用完了厕所那他就可以使用厕所,如果插队的瞬间里面人还没用完厕所,那这个插队的人就得乖乖排队;
Condition
该接口用来实现线程间的通信,实现类为AQS的内部类ConditionObject,来看一个案例:实现一个阻塞队列,这里我们主要完成take和put方法
public class ConditionTst {
Lock lock = new ReentrantLock();
Condition pc = lock.newCondition();
Condition cc = lock.newCondition();
Queue<Integer> list = new LinkedList<>();
public static void main(String[] args) throws Exception{
ConditionTst cts = new ConditionTst();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(100);
cts.put(new Random().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
Integer take = cts.take();
System.out.println("消费线程取出了"+take);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
public Integer take() throws InterruptedException {
lock.lock();
try {
if (list.size() == 0) {
cc.await();
}
Integer num = list.poll();
pc.signal();
return num;
} finally {
lock.unlock();
}
}
public void put(Integer num) throws InterruptedException {
lock.lock();
try {
if (list.size() == 10) {
System.out.println("队列已满,等待消费,当前元素为:"+list);
pc.await();
}
list.add(num);
cc.signal();
} finally {
lock.unlock();
}
}
复制
运行结果如图:
这里,队列空则消费者等待通知生产者生产,队列满则生产者等待通知消费者消费,直接看源码它是如何做到的;
先看await方法,关键的就这三部:加入等待队列、释放锁、挂起线程
1.加入等待队列:创建一个Node并将lastWaiter指向它
2.释放锁:也就明白为什么Condition要配合Lock来使用了吧,await要在获取锁的线程中进行,否则抛出异常
3.挂起线程:判断线程是否在AQS的同步队列,如果不在则挂起,什么时候能从While循环里出来呢?就是另外一个线程调用signal之后,下面会说;
当线程从挂起恢复后,进入aquireQueued方法也是获取锁,成功则返回,失败则判断是否挂起
再看signal方法,先判断当前线程是否是持有锁的线程,如果不是则抛异常;然后,唤醒firstWaiter节点,也就是等待队列的第一个Node
来看transferForSinal方法做了什么:将被唤醒的Node加入同步队列,然后唤醒改Node
加入同步队列
总结一下await和signal方法
await:加入等待队列尾部(lastWaiter)、释放锁、挂起线程
sinal:将firstWaiter加入同步队列、并唤醒firstWaiter节点
两者一组合起来就能达到:我等你,你叫醒我的效果了
CountDownLatch
其实上面讲的和这个还有信号量等等都大同小异,换汤不换药,这里我们实现一个模拟百米赛跑的功能,其中countDown方法和await方法估计大家也能猜出个八九不离十,也就是:每次调countDown方法都把await减1,调await的方法在state到达0之前就挂起,这里就不再赘述,感兴趣的同学运行一下下面的代码,点开源码看一下;
public class ConditionTst3 {
控制所有人都完成,比赛结束
static CountDownLatch countDownLatch = new CountDownLatch(10);
发令枪,所有准备好的选手(线程)必须等待该信号
static CountDownLatch startLatch = new CountDownLatch(1);
模拟跑道的线程池
static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws InterruptedException {
ConditionTst3 tst = new ConditionTst3();
for (int i = 1; i <= 10; i++) {
RunTask runTask = tst.new RunTask(startLatch, countDownLatch, i);
executorService.submit(runTask);
}
executorService.shutdown();
比赛开始信号
startLatch.countDown();
System.out.println("比赛开始,等待选手全部完成");
裁判计分中
countDownLatch.await();
System.out.println("比赛结束");
}
class RunTask implements Runnable {
private CountDownLatch startLatch;
private CountDownLatch countDownLatch;
private Integer num;
public RunTask(CountDownLatch startLatch, CountDownLatch countDownLatch, Integer num) {
this.num = num;
this.startLatch = startLatch;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
System.out.println(num + "号选手已准备好");
//比赛选手等待发令枪
startLatch.await();
long start = System.currentTimeMillis();
Thread.sleep(new Random().nextInt(5000));
long end = System.currentTimeMillis();
System.out.println(num + "号的成绩已冲线,成绩是:" + (end - start));
//该选手完成比赛
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
复制
总结:
在学习AQS时,尽量站在设计者的角度思考,这个东西的核心是什么,把核心搞明白其他的各种实现就迎刃而解了;
用volatile修饰的state以保证可见性
用CAS操作这个state以保证原子性
使用Node类封装线程
使用前后指针构成队列
其实看上面列出的几项,一句话总结也就是:线程获取锁失败阻塞,别人释放锁就通知阻塞线程