本文主要介绍了 JUC 中的 AbstractQueuedSynchronizer
的实现基础、其和 CLH 队列锁之间的关联、独占锁模式及共享锁模式加解锁的过程等。不包含 ConditionObject
的分析。
简介
AQS 提供了实现同步阻塞队列的基本框架,是 juc 包中其他众多同步原语和锁实现的基础。
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState, setState and compareAndSetState is tracked with respect to synchronization. S 以上片段摘录自 AQS 源码中的注释,可以总结出以下几点:
- AQS 提供了一种基于先入先出 FIFO 等待队列的线程同步框架;
- AQS 被设计为其他同步器实现的基础,其他同步器需要在内部实现 AQS 的模板方法,来实现自己的同步原语。
Node 数据结构
Node
结构是 AQS 阻塞队列实现的基础结构,是 AQS 排队、获取锁、释放锁等行为被操作的对象。 Node
代表的其实就是想要获取锁的线程,每一个 Node
对象都是对一个想要获取锁的线程的封装和表示,其实现基于 CLH Queue Lock 中的节点数据结构,关于 CLH Queue Lock,可以参考笔者之前的博客(深入理解 CLH Queue Lock | caffcen’s blog):
static final class Node {
static final Node SHARED = new Node(); // 节点处于共享等待的标识
static final Node EXCLUSIVE = null; // 节点处于独占等待的标识
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus; // 线程的状态
volatile Node prev; // 前驱节点
volatile Node next; // 后继节点
volatile Thread thread; // 节点对应的线程
Node nextWaiter; // 指向下一个等待在条件变量上的节点
}
waitStatus
可能有几种取值:
0
: 节点初始化时候的值CANCELLED = 1
: 表示节点由于超时或者被打断,『取消』排队SIGNAL = -1
: 当前节点如果是SIGNAL
则表示后继结点需要被唤醒,或者换过来说,如果当前节点想要被唤醒并尝试获取锁,则需要前驱节点的waitStatus
是SIGNAL
CONDITION = -2
: 表示节点位于条件变量的等待队列中PROPAGATE = -3
: 仅在释放共享锁时使用到
Node
类是构建起 AQS 阻塞双向队列的基础节点数据结构, prev
和 next
变量实现了双向队列, waitStatus
表示节点对应的线程的等待状态。
AQS 队列
AQS 双向等待 FIFO 队列的实现基于上面提到 Node
数据结构:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
}
state
字段是 AQS 的同步状态字段,主要通过 compareAndSetState
(内部实现基于 Unsafe 的 CAS 操作)进行原子地更新。
线程节点入队
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail; // 获取队列尾部节点
// 如果没有初始化,则原子地设置 head
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t; // node 将要成为队列的新 tail,因此更新 t 为 node 的前驱节点
if (compareAndSetTail(t, node)) { // 原子地更新
t.next = node;
return t; // 更新成功则返回 node 的前驱节点,否则进入下一次循环直至成功
}
}
}
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 尝试原子地更新尾结点,如果成功则直接返回,否则调用 enq 入队
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 快速入队失败,调用 enq 入队
enq(node);
return node;
}
}
线程节点出队
节点出队列则很简单,把 head
设为当前节点即可:
To dequeue, you just set the head field.
模板方法
Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class. Class AbstractQueuedSynchronizer does not implement any synchronization interface. Instead it defines methods such as acquireInterruptibly that can be invoked as appropriate by concrete locks and related synchronizers to implement their public methods.
AQS 是非常优秀的模板模式的实现和应用,它提供以下模板方法供实现类实现,以达到不同的同步原语以及同步器机制:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
protected boolean tryAcquire(int arg); // 尝试获取独占锁
protected boolean tryRelease(int arg); // 尝试释放独占锁
protected int tryAcquireShared(int arg); // 尝试获取共享锁
protected boolean tryReleaseShared(int arg); // 尝试释放共享锁
protected boolean isHeldExclusively(); // 返回共享资源是否以独占的方式被占用
}
ReentrantLock 如何实现 AQS
我们以 ReentrantLock
为例来看一下 juc 包中的同步器和 AQS 是如何搭配使用的:
public class ReentrantLock implements Lock, java.io.Serializable {
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {...}
protected final boolean tryRelease(int releases) {...}
protected final boolean isHeldExclusively() {...}
}
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {...}
}
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {...}
}
}
ReentrantLock
内部类 Sync
继承了 AQS,同时实现了 tryRelease
接口来释放锁,同时,为了同时实现公平锁和非公平锁的加锁逻辑, ReentrantLock
还提供了 NonfairSync
和 FairSync
来实现不同的 tryAcquire
逻辑。
可见 AQS 负责维护队列状态、更新同步状态等底层操作,这些操作对任何的同步器都是相同的,并且暴露出了 5 个模板接口供实现类实现,这样实现类仅仅需要关注自己的同步原语的实现即可,不用关注相同的底层逻辑。
独占锁模式
独占锁模式下的加锁操作
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
// 尝试获取锁,如果获取成功直接返回
// tryAcquire 由具体的同步器实现
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 无法获取到资源,先加入队列,再尝试获取
selfInterrupt(); // 中断当前线程
}
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 取前置节点
if (p == head && tryAcquire(arg)) {
// 只有前置节点已经获取了锁,即前置节点成为了队列的 head 也就是出队时,尝试获取锁
setHead(node); // 获取锁成功,出队列,返回
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获取锁失败,调用 shouldParkAfterFailedAcquire 看是否需要睡眠当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 取前置节点的 waitStatus
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true; // 前置节点的 SIGNAL 已设置,当前可以安全地睡眠,返回 true
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 前驱节点对应的线程不再尝试获取锁,不断往前遍历知道找到一个状态为非取消的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 设置前驱节点的状态为 SIGNAL,之后当前线程才能够被唤醒
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
}
总结一下加锁过程:
- 调用
acquire
获取锁,acquire
内部调用同步器内部类实现的tryAcquire
尝试获取锁,加锁成功则直接返回,否则将线程节点加入等待队列中,并调用acquireQueued
继续尝试获取锁; acquireQueued
有一个无限循环来不断尝试获取锁。当前驱节点为head
时才会调用tryAcquire
来获取锁,这和 CLH 队列锁的加锁操作如出一辙:只有等前驱节点成功获取锁并解锁后,当前节点才能够取获取锁。这里贴出了 CLH 队列锁加锁的代码供对比参考:
public class CLHLock implements Lock {
public void lock() {
QNode qnode = myNode.get();
qnode.locked = true;
QNode pred = tail.getAndSet(qnode);
myPred.set(pred);
while (pred.locked) {
}
}
}
acquireQueued
如果加锁成功,那么则会将head
设为当前节点,将当前节点出队并返回;否则调用shouldParkAfterFailedAcquire
shouldParkAfterFailedAcquire
方法是用来在当前线程没有成功获取到锁时,检查并更新节点状态的。其有几个功能:- 判断当前线程是否能够『安全地』休眠,即判断前驱结点的
waitStatus
是否为SIGNAL
; - 如果前驱节点取消了排队,则不断往前寻找一个未取消的节点,当做当前节点的前置节点;
- 如果前驱节点的
waitStatus
不为SIGNAL
,则原子性的更新为SIGNAL
,方便当前节点之后被阻塞 - 返回当前线程是否能够休眠;
- 判断当前线程是否能够『安全地』休眠,即判断前驱结点的
总的来说,AQS 共享锁的加锁逻辑和 CLH 队列锁的加锁逻辑本质上是相同的:当前线程只有在它的前驱节点的线程获取到锁之后,才可以尝试去获取锁,且它们都采用的是一种自旋忙等待的方式,不断地测试前置节点的状态来实现的。
不同点在于 AQS 通过 shouldParkAfterFailedAcquire
方法可以让当前忙等待的线程被阻塞,好让出 CPU 时间。
独占锁模式下的解锁操作
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
// 调用同步器实现的 tryRelease
if (tryRelease(arg)) {
Node h = head;
// 如果成功释放,则调用 unparkSuccessor 唤醒可能在睡眠的后继节点线程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
// 如果后继节点取消了排队,则从 tail 开始往队列头部遍历,找到 node 后第一个未取消排队的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果后继节点存在,则唤醒它
if (s != null)
LockSupport.unpark(s.thread);
}
}
独占模式下解锁的操作则简单很多,主要工作是唤醒当前节点的后继节点。
共享锁模式
共享锁模式下的加锁操作
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
// 调用同步器实现的 tryAcquireShared 方法
if (tryAcquireShared(arg) < 0)
// 失败,调用 doAcquireShared 方法继续不断尝试
doAcquireShared(arg);
}
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
// 获取锁失败,现将线程入队
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 和独占模式相同,只有前置节点为 head 时才能够尝试获取锁
if (p == head) {
int r = tryAcquireShared(arg);
// r >= 0 表示获取锁成功
if (r >= 0) {
// 当前线程出队,并将 release 信息传播给后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 获取锁失败,判断是否需要休眠
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 设置 head 为当前节点,表示当前节点已经获取到了临界资源,出队
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 当满足任一条件时,则唤醒后继节点让其尝试进入临界区
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 当 waitStatus 为 SIGNAL 时,需要唤醒后继节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// recheck 保证线程安全
continue; // loop to recheck cases
// 唤醒后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
// 为了保证将唤醒操作传播出去,需要设置 waitStatus 为 PROPAGATE
continue; // loop on failed CAS
}
// 如果 head 变更,说明有其他节点也成功释放了临界资源,它将会负责唤醒后继的阻塞节点,当前线程可以退出循环
if (h == head) // loop if head changed
break;
}
}
}
共享模式和独占模式加锁的区别主要是:
- 可以有多个线程同时尝试获取锁;独占模式只能有一个;
- 共享锁模式下,如果一个线程成功获取到了锁,那么它还会同时尝试去唤醒其他等待着的线程;而独占模式只有在解锁后才会去唤醒队列中阻塞的节点。
共享锁模式比较重要的是 setHeadAndPropagate
和 doReleaseShared
。 setHeadAndPropagate
会进行两个操作:
- 将获取了锁的线程出队,即将
head
设为当前节点; - 满足一下条件时,调用
doReleaseShared
唤醒队列中阻塞的线程:propagate > 0
: 表示还有资源剩余可以索取h == null
h.waitStatus < 0
(h = head) == null
h.waitStatus < 0
doReleaseShared
则是在尝试不断地唤醒符合条件的节点。
总结一下共享模式加锁:
- 调用
acquireShared
加锁。会调用同步器实现的tryAcquireShared
方法尝试获取锁,如果成功直接返回;否则调用doAcquireShared
doAcquireShared
首先会将当前线程入队;之后则会尝试获取锁:- 先判断自己的前驱节点是否是
head
,是的话则尝试获取锁,如果成功则调用setHeadAndPropagate
,将head
设置为自己,同时唤醒队列中阻塞的节点 - 如果自己的前驱节点不是
head
,又或者获取锁失败,则调用shouldParkAfterFailedAcquire
判断自己是否需要休眠,这和独占锁模式是一模一样的
- 先判断自己的前驱节点是否是
setHeadAndPropagate
首先将当前获得了共享锁的节点出队,之后在满足条件的情况下,尝试唤醒队列中被阻塞的线程doReleaseShared
不断尝试唤醒被阻塞的线程,直到head
因为某个线程也成功地获得到了共享锁而被更新为止
因为共享锁模式下,共同持有锁的多个线程势必将带来比较复杂的竞争,笔者对这部分内容的理解也不是很透彻,推荐阅读文末 Reference 中的美团技术博客(从ReentrantLock的实现看AQS的原理及应用 - 美团技术团队),其中有较好的分析。
共享锁模式下的解锁操作
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
}
解锁操作很简单,调用了同步器实现的 tryReleaseShared
方法,如果成功释放了共享资源则调用 doReleaseShared
唤醒后续线程。