/ 今日科技快讯 /
近日,在2021年第一季度检测中,腾讯应用宝、小米应用商店、OPPO软件商店、华为应用市场和vivo应用商店发现问题数量分别占比14.22%、13.81%、12.80%、11.37%和11.17%,存在上架审核不严格,存量问题清理不彻底,登记核验APP开发运营者信息不准确,误导用户下载等问题。工信部已督促相关平台企业进行全面整改,严格落实企业主体责任。
/ 作者简介 /
本篇文章来自MxsQ同学的投稿,和大家分享了Java并发开发中锁分配的解决方案AQS的相关内容,相信会对大家有所帮助!同时也感谢作者贡献的精彩文章!
MxsQ的博客地址:
https://juejin.cn/user/3491704658736942
/ 什么是AQS /
并发使计算机得以充分利用计算能力,有效率地完成各类程序任务。当深入地学习Java中的并发,不可避免地将学习到锁 —— 使并发的资源能被正确访问的手段。锁的学习也将分为两部分,一部分是如何加解锁,另一部分是把锁分配给谁。
AQS(AbstractQueuedSynchronizer)也叫“抽象队列同步器”,它提供了“把锁分配给谁"这一问题的一种解决方案,使得锁的开发人员可以将精力放在“如何加解锁上”,避免陷于把锁进行分配而带来的种种细节陷阱之中。
例如JUC中,如CountDownLatch、Semaphore、ReentrantLock、ReentrantReadWriteLock等并发工具,均是借助AQS完成他们的所需要的锁分配问题。
/ 基于CAS的状态更新 /
AQS要把锁正确地分配给请求者,就需要其他的属性来维护信息,那么自身也要面对并发问题,因为信息将会被更改,而且可能来源于任意线程。
AQS使用了CAS (compare and set) 协助完成自身要维护的信息的更新(后续的源码处处可见)。CAS的意义为:期望对象为某个值并设置为新的值。那么,如果不为期望的值或更新值失败,返回false;如果为期望的值并且设置成功,那么返回true。用例子表达就是“我认为我的家门是开着的,我将把它关上”。那么只有在家门是开着的,并且我把他关上了,这句断言为ture。
CAS是硬件层面上提供的原子操作保证,意味着任意时刻只有一个线程能访问CAS操作的对象。那么,AQS使用CAS的原因在于:
CAS足够快 如果并发时CAS失败时,可能通过自旋再次尝试,因为AQS知道维护信息的并发操作需要等待的时间非常短 AQS对信息的维护不能导致其它线程的阻塞
static final class Node {
// 表示线程取消申请锁
static final int CANCELLED = 1;
// 表示线程正在申请锁,等待被分配
static final int SIGNAL = -1;
// 表示线程在等待某些条件达成,再进入下一阶段
static final int CONDITION = -2;
// 表示把对当前节点进行的操作,继续往队列传播下去
static final int PROPAGATE = -3;
// 表示当前线程的状态
volatile int waitStatus;
// 指向前一个节点,也叫前驱节点
volatile Node prev;
// 指向后一个节点,也叫后继节点
volatile Node next;
// 节点代表的线程
volatile Thread thread;
// 指向下一个代表要等待某些条件达成时,才进行下阶段的线程的节点
Node nextWaiter;
}复制
private transient volatile Node head;
private transient volatile Node tail;
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 进入到这里,说明没有head节点,CAS操作创建一个head节点
// 失败也不要紧,失败说明发生了并发,会走到下面的else
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 把Node加入到尾部,保证加入到为止,并发会重走
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}复制

static final class Node {
// 表明是共享锁节点
static final Node SHARED = new Node();
// 表明是独占锁节点
static final Node EXCLUSIVE = null;
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 如果插入尾部成功,就直接返回
pred.next = node;
return node;
}
}
// 通过CAS自旋确保入队
enq(node);
return node;
}复制
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
// 首先,找到当前节点前面未取消等待的节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 方便操作
Node predNext = pred.next;
// 记录当前节点状态为取消,这样,如果发生并发,也能正确地处理掉
node.waitStatus = Node.CANCELLED;
//如果当前节点为tail,通过CAS将tail设置为找到的没被取消的pred节点
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// ①
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// 移除掉找到的CANCELLED节点,整理CLH队列
compareAndSetNext(pred, predNext, next);
} else {
// 表示当pred头节点,唤醒下一节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}复制
pred不为头节点 pred记录的线程不为空 及pred的状态为SIGNAL,即等待分配到锁 或及pred的状态小于0是,能通过CAS设置为SIGNAL

public final void acquire(int arg) {
// 如果获取到锁,获取锁的成程序就执行下去
// 如果获取不到锁,插入代表当前线程的Node节点放入队列中,并请求锁
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 中断
selfInterrupt();
}复制
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 自旋
// 读取前驱结点,因为前驱节点可能发生了改变,如取消等待操作
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 只有当前驱节点为head时,才有资格获取锁
// 设置head为当前节点
setHead(node);
p.next = null; // help GC
failed = false;
// 返回是否发生过中断
return interrupted;
}
// 更新当前节点状态,并检查线程是否发生过中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 说明发生了意料之外的异常,将节点移除,避免影响到其他节点
cancelAcquire(node);
}
}复制
只有当自己的前驱节点为head时,才有资格去获取锁,这表达了FIFO。 获取锁成功后,会返回线程是否被中断过,结合acquire()看,如果线程被中断过,会让线程回到中断状态。 以acquireQueued()看,请求锁是的过程是公平的,按照队列排列顺序申请锁。 以acquire()看,请求锁的过程是不公平的,因为acquire()会先尝试获取锁再入队,意味着将在某一时刻,有线程完成插队。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
前驱节点状态为SIGNAL直接返回
return true;
if (ws > 0) {
// 这里和cancelAcquire()类似,整合移除node之前被取消的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// CAS设置前驱节点状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 挂起当前线程
LockSupport.park(this);
return Thread.interrupted();
}复制

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// CAS 修改节点状态为0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 如果s的后继节点为空或者状态大于0
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
// 从tail开始,找到最靠近head的状态不为0的节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒节点中记录的线程
LockSupport.unpark(s.thread);
}复制
public final boolean release(int arg) {
// 子类的实现,尝试解锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 释放锁,唤醒下一线程
unparkSuccessor(h);
return true;
}
return false;
}复制
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
......
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 与acquireQueued()主要不同,向上抛出了异常
throw new InterruptedException();
......
}复制
acquireInterruptibly() doAcquireInterruptibly()
doAcquireNanos() doAcquireSharedNanos()
private void doAcquireShared(int arg) {
......
// 与独占锁相比差异为这一段
if (p == head) {
// 尝试获取锁,r表示资源情况
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取到了锁,重新设置head,并传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
......
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 重新设置head
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// ①
Node s = node.next;
if (s == null || s.isShared())
// 唤醒其他的Node
doReleaseShared();
}
}复制
有更多的资源,即 propagate > 0 旧的head为空或未被取消 新的head为空或未被取消
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 设置头为 0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒下一节点
unparkSuccessor(h);
}
else if (ws == 0 &&
// ②
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}复制

acquireSharedInterruptibly() doAcquireSharedInterruptibly()
tryAcquireSharedNanos() doAcquireSharedNanos()
条件处理
await()系列:表示等待条件的完成 signal()、signalAll():表示条件达成的信号
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
}
static final class Node {
// 表示下一个CONDITION状态的节点
Node nextWaiter;
}复制
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
// 如果尾部节点已经不为CONDITION,那么把这些节点移除
unlinkCancelledWaiters();
// 重新指向尾部节点
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
// 作为头节点
firstWaiter = node;
else
// 作为下一节点
t.nextWaiter = node;
// 更新尾部节点
lastWaiter = node;
return node;
}复制
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
// 从头结点开始,移除所有不为Node.CONDITION的节点
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}复制

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 移除first节点的下一记录
first.nextWaiter = null;
} while (!transferForSignal(first)/*加入CLH队列*/ &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 更新 node 的状态
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 将节点加入CLH队列
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 如果取消等待或者不能设置为SIGNAL,唤起线程
LockSupport.unpark(node.thread);
return true;
}复制
public final void await() throws InterruptedException {
if (Thread.interrupted())
// 如果线程中断了,抛出异常
throw new InterruptedException();
// 加入到CONDITION队列中
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
// 记录中断的场景
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 自旋
// 如果没有被加入到CLH队列中,那么挂起线程
LockSupport.park(this);
// 更新中断场景
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 尝试获取锁,此时Node已经在CLH队列中了
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
// 根据中断场景做不同的处理
reportInterruptAfterWait(interruptMode);
}复制

tryAcquire(int):获取独占锁 tryRelease(int):释放独占锁 tryAcquireShared(int):获取共享锁 tryReleaseShared(int):释放共享锁
public static class MySync extends AbstractQueuedSynchronizer{
public void lock(){
acquire(0);
}
public void unlock(){
release(0);
}
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease(int arg) {
return compareAndSetState(1, 0);
}
}复制
维护一个CLH队列里,记录每一个需要获取锁的线程;在首次请求锁时,是不公平的;在队列里的锁请求时,是公平的。 当Node锁代表的线程没有请求到锁时,将被挂起,等被唤醒后,尝试再次请求锁,如果还是没有获取到锁,重复此过程。 当一个Node入队时,将从队尾移除取消等待的节点,直到找到第一个未取消等待的节点,插入此节点后。 当释放锁时,从CLH队里头部开始,找到第一个未取消等待的节点,唤醒。 对于共享锁,如果需要等待条件,则Node进入一个单项队列,自旋,挂起;待条件达成后,将Node加入到CLH队里,请求锁;若请求到锁,继续执行线程。
通过CAS和自旋控制自身状态并发,足够快 支持重入性判断,通过控制isHeldExclusively(),其代码位于操作CONDITION节点的各处,较零碎,因此没有将代码放出。可在tryAcquire()等子类的加锁方法中,借助setExclusiveOwnerThread()和getExclusiveOwnerThread()一起实现是否可重入 支持中断。 支持锁的获取时间控制。
复制