暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

AQS其实没那么复杂

程序猿研习社 2021-09-06
234
本文分两个个部分讲解:
  • AQS简介以及类结构

  • 从源码看应用案例


看过很多关于java中AQS的文章,每次看完总有种意犹未尽的感觉,这次决定通过自己的思路和学习路线来重新理解它,欢迎各位同学一起讨论,PS:本文简单易懂,目的是把阅读的你看懂看会,不会把你看蒙;


01

AQS简介类结构


AQS

全称为:AbstractQueuedSynchronizer,大体从字面翻译就是“基于队列的抽象同步器”,我们都直到队列的性质就是先进先出(FIFO),这个在AQS中用Node来组成;

至于AQS的作用,官方文档是这样描述的:Provides a framework for implementing blocking locks and related synchronizers that rely on first-in-first-out (FIFO) wait queues;翻译过来大概的意思就是:提供一个框架,用于实现阻塞锁和依赖先进先出(FIFO)等待队列的相关同步器;简单点说就是:你用AQS作为基础,去实现各种各样的同步器,例如:Lock、信号量、CountDownLatch等;


类结构

    public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    略去中间无关代码...
    private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state;


        //内部类ConditionObject
    public class ConditionObject implements Condition, java.io.Serializable {
         private transient Node firstWaiter;
    private transient Node lastWaiter;
                略去各种方法...
        }       
        static final class Node {
         volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
        }        
    复制


    从上面的类图可以看出,AQS的结构也不复杂,通过声明一个volatile类型(保证可见性)的变量state,然后用CAS(保证原子性)操作这个state,使用内部类Node将请求资源的线程封装,使用内部类ConditionObject来实现线程间的通信;废话少说来看代码


    02


    应用案例


    ReentrantLock

    直接看lock方法,有公平锁和非公平锁两种实现:

    先看公平锁的实现:

    1.进入acquire方法,这里会调用tryAcquire方法

    2.进入tryAcquire方法

    1和2是获取锁成功的情况,在aquire方法中,当tryAquire失败时,首先会进入addWaiter方法,可以看到将线程包装,放进队列

    enq方法中,首次进入会初始化一个空的Node做为head,此时tail也指向head

    然后将线程被包装的Node添加到尾部:

    从addWaiter方法出来后会进入aquireQueued方法,改方法也简单,通过两次循环将等待“锁”的线程,这里说的等待锁就是获取state变量时,state不是等于0那个线程第一次循环将Node的状态设置为signal=-1(shouldParkAfterFaildAquire)

    第二次循环进入parkAndCheckInterupt将线程挂起


    总结lock方法过程很简单

    • state=0则表示可以获取资源,当前线程将state置为1

    • 如果state=1,表示别人已经设置过state了,则将当前线程封装进Node,加入队列尾部,并将线程挂起


    再来看unlock方法,首先进入release方法

    release方法中,先调用tryRelease尝试释放锁,如果成功,则调用unparkSuccessor来唤醒下一个Node

    其中,tryRelease方法先判断当前释放锁的线程是否是持有锁的线程,正所谓:解铃还须系铃人;c=0说明释放后已经没有其他线程持有该锁(共享),或者某个线程已经释放该可重入锁,所以设置当前持有锁的线程为null;最后setState更新state值

    随后进入的unparkSuccessor中,先判断后一个节点的状态是否大于0,大于0说明下一个节点的状态为CANCELLED,则从尾节点反向遍历,找到第一个状态小于0的节点唤醒它;


    总结一下unlock方法很简单

    尝试释放锁(更新state),唤醒下一个Node,就两步...

    再看一下非公平锁的实现

    可以看到非公平锁多了if里的代码,线程进来直接获取锁,如果compareAndSetState成功,则将持有锁的线程设为自己,否则调用acquire(到这就和公平锁的实现一样了),所以可以看到,ReentrantLock的公平和非公平指的就是抢锁的一瞬间;

    打个比方就是:一群人排队上厕所,突然跑过来一个人直接插队,如果他正赶上里面的人用完了厕所那他就可以使用厕所,如果插队的瞬间里面人还没用完厕所,那这个插队的人就得乖乖排队;


    Condition

    该接口用来实现线程间的通信,实现类为AQS的内部类ConditionObject,来看一个案例:实现一个阻塞队列,这里我们主要完成take和put方法

      public class ConditionTst {
      Lock lock = new ReentrantLock();
      Condition pc = lock.newCondition();
      Condition cc = lock.newCondition();
      Queue<Integer> list = new LinkedList<>();


      public static void main(String[] args) throws Exception{
      ConditionTst cts = new ConditionTst();
      new Thread(new Runnable() {
      @Override
      public void run() {
      while (true) {
      try {
      Thread.sleep(100);
      cts.put(new Random().nextInt(10));
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      }
      }).start();


      new Thread(new Runnable() {
      @Override
      public void run() {
      while (true) {
      try {
      Thread.sleep(1000);
      Integer take = cts.take();
      System.out.println("消费线程取出了"+take);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      }
      }).start();




      }


      public Integer take() throws InterruptedException {
      lock.lock();
      try {
      if (list.size() == 0) {
      cc.await();
      }
      Integer num = list.poll();
      pc.signal();
      return num;
      } finally {
      lock.unlock();
      }
      }
      public void put(Integer num) throws InterruptedException {
      lock.lock();
      try {
      if (list.size() == 10) {
      System.out.println("队列已满,等待消费,当前元素为:"+list);
      pc.await();
      }
      list.add(num);
      cc.signal();
      } finally {
      lock.unlock();
      }
      }
      复制

      运行结果如图:

      这里,队列空则消费者等待通知生产者生产,队列满则生产者等待通知消费者消费,直接看源码它是如何做到的;

      先看await方法,关键的就这三部:加入等待队列、释放锁、挂起线程

      1.加入等待队列:创建一个Node并将lastWaiter指向它

      2.释放锁:也就明白为什么Condition要配合Lock来使用了吧,await要在获取锁的线程中进行,否则抛出异常

      3.挂起线程:判断线程是否在AQS的同步队列,如果不在则挂起,什么时候能从While循环里出来呢?就是另外一个线程调用signal之后,下面会说;

      当线程从挂起恢复后,进入aquireQueued方法也是获取锁,成功则返回,失败则判断是否挂起

      再看signal方法,先判断当前线程是否是持有锁的线程,如果不是则抛异常;然后,唤醒firstWaiter节点,也就是等待队列的第一个Node

      来看transferForSinal方法做了什么:将被唤醒的Node加入同步队列,然后唤醒改Node

      加入同步队列

      总结一下await和signal方法

      await:加入等待队列尾部(lastWaiter)、释放锁、挂起线程

      sinal:将firstWaiter加入同步队列、并唤醒firstWaiter节点

      两者一组合起来就能达到:我等你,你叫醒我的效果了


      CountDownLatch

      其实上面讲的和这个还有信号量等等都大同小异,换汤不换药,这里我们实现一个模拟百米赛跑的功能,其中countDown方法和await方法估计大家也能猜出个八九不离十,也就是:每次调countDown方法都把await减1,调await的方法在state到达0之前就挂起,这里就不再赘述,感兴趣的同学运行一下下面的代码,点开源码看一下;

        public class ConditionTst3 {
        控制所有人都完成,比赛结束
        static CountDownLatch countDownLatch = new CountDownLatch(10);
        发令枪,所有准备好的选手(线程)必须等待该信号
        static CountDownLatch startLatch = new CountDownLatch(1);
        模拟跑道的线程池
        static ExecutorService executorService = Executors.newFixedThreadPool(10);


        public static void main(String[] args) throws InterruptedException {


        ConditionTst3 tst = new ConditionTst3();
        for (int i = 1; i <= 10; i++) {
        RunTask runTask = tst.new RunTask(startLatch, countDownLatch, i);
        executorService.submit(runTask);
        }
        executorService.shutdown();
        比赛开始信号
        startLatch.countDown();
        System.out.println("比赛开始,等待选手全部完成");
        裁判计分中
        countDownLatch.await();
        System.out.println("比赛结束");


        }


        class RunTask implements Runnable {
        private CountDownLatch startLatch;
        private CountDownLatch countDownLatch;
        private Integer num;


        public RunTask(CountDownLatch startLatch, CountDownLatch countDownLatch, Integer num) {
        this.num = num;
        this.startLatch = startLatch;
        this.countDownLatch = countDownLatch;
        }


        @Override
        public void run() {
        try {
        System.out.println(num + "号选手已准备好");
        //比赛选手等待发令枪
        startLatch.await();
        long start = System.currentTimeMillis();
        Thread.sleep(new Random().nextInt(5000));
        long end = System.currentTimeMillis();
        System.out.println(num + "号的成绩已冲线,成绩是:" + (end - start));
        //该选手完成比赛
        countDownLatch.countDown();
        } catch (InterruptedException e) {
        e.printStackTrace();
        }
        }
        }


        }
        复制


        总结:

        在学习AQS时,尽量站在设计者的角度思考,这个东西的核心是什么,把核心搞明白其他的各种实现就迎刃而解了;

        • 用volatile修饰的state以保证可见性

        • 用CAS操作这个state以保证原子性

        • 使用Node类封装线程

        • 使用前后指针构成队列

        • 其实看上面列出的几项,一句话总结也就是:线程获取锁失败阻塞,别人释放锁就通知阻塞线程


        文章转载自程序猿研习社,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论