Java AQS 介绍(一)-Java专区论坛-技术-SpringForAll社区

Java AQS 介绍(一)

前言

AbstractQueuedSynchronizer 抽象队列同步器,简称 AQS 。是在 JUC 包下面一个非常重要的基础组件,JUC 包下面的并发锁 ReentrantLock CountDownLatch 等都是基于 AQS 实现的。所以想进一步研究锁的底层原理,非常有必要先了解 AQS 的原理。

公众号:liuzhihangs,记录工作学习中的技术、开发及源码笔记;时不时分享一些生活中的见闻感悟。欢迎大佬来指导!

介绍

先看下 AQS 的类图结构,以及源码注释,有一定的大概了解之后再从源码入手,一步一步研究它的底层原理。

图片[1]-Java AQS 介绍(一)-Java专区论坛-技术-SpringForAll社区

” 源码注释

提供了实现阻塞锁和相关同步器依靠先入先出(FIFO)等待队列(信号量,事件等)的框架。 此类中设计了一个对大多数基于 AQS 的同步器有用的原子变量来表示状态(state)。 子类必须定义 protected 方法来修改这个 state,并且定义 state 值在对象中的具体含义是 acquired 或 released。 考虑到这些,在这个类中的其他方法可以实现所有排队和阻塞机制。 子类可以保持其他状态字段,但只能使用方法 getState 、setState 和 compareAndSetState 以原子方式更新 state 。

子类应被定义为用于实现其封闭类的同步性能的非公共内部辅助类。 类 AbstractQueuedSynchronizer 没有实现任何同步接口。 相反,它定义了一些方法,如 acquireInterruptibly 可以通过具体的锁和相关同步器来调用适当履行其公共方法。

此类支持独占模式和共享模式。 在独占模式下,其他线程不能获取成功,共享模式下可以(但不一定)获取成功。 此类不“理解”,在机械意义上这些不同的是,当共享模式获取成功,则下一个等待的线程(如果存在)也必须确定它是否能够获取。 线程在不同模式下的等待共享相同的 FIFO 队列。 通常情况下,实现子类只支持其中一种模式,但同时使用两种模式也可以,例如 ReadWriteLock 。 仅共享模式不需要定义支持未使用的模式的方法的子类。

这个类中定义了嵌套类 AbstractQueuedSynchronizer.ConditionObject ,可用于作为一个 Condition 由子类实现,并使用 isHeldExclusively 方法说明当前线程是否以独占方式进行,release()、 getState() acquire() 方法用于操作 state 原子变量。

此类提供检查和监视内部队列的方法,以及类似方法的条件对象。 根据需要进使用以用于它们的同步机制。

要使用这个类用作同步的基础上,需要重新定义以下方法,如使用,通过检查和或修改 getState 、setState 或 compareAndSetState 方法:

tryAcquire

tryRelease

tryAcquireShared

tryReleaseShared

isHeldExclusively

通过上面的注释可以得出大概的印象:

  1. 内部依靠先入先出(FIFO) 等待队列。

  2. 存在 state 表示状态信息。state 值只能用 getState 、setState 和 compareAndSetState 方法以原子方式更新。

  3. 支持独占模式和共享模式,但具体需要子类实现具体支持哪种模式。

  4. 嵌套 AbstractQueuedSynchronizer.ConditionObject 可以作为 Condition 由子类实现。

  5. 子类需要重新定义 tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared、isHeldExclusively 方法。

队列节点 Node

图片[2]-Java AQS 介绍(一)-Java专区论坛-技术-SpringForAll社区

Node 节点,包含以下元素:

prev :上一个节点

next :下一个节点

thread :持有线程

waitStatus :节点状态

nextWaiter : 下一个处于 CONDITION 状态的节点

组合成等待队列则如下:

图片[3]-Java AQS 介绍(一)-Java专区论坛-技术-SpringForAll社区

下面是等待队列节点的 Node 属性:

static final class Node {
    // 节点正在共享模式下等待的标记
    static final Node SHARED = new Node();
    // 指示节点正在以独占模式等待的标记
    static final Node EXCLUSIVE = null;

    // 指示线程已取消
    static final int CANCELLED =  1;
    // 指示后续线程需要唤醒
    static final int SIGNAL    = -1;
    // 指示线程正在等待条件
    static final int CONDITION = -2;
    // 指示下一次acquireShared应该无条件传播
    static final int PROPAGATE = -3;
    
    /**
     * 状态字段,仅使用以下值
     * SIGNAL -1 :当前节点释放或者取消时,必须 unpark 他的后续节点。
     * CANCELLED 1 :由于超时(timeout)或中断(interrupt),该节点被取消。节点永远不会离开此状态。特别是,具有取消节点的线程永远不会再次阻塞。
     * CONDITION -2 :该节点目前在条件队列。 但它不会被用作同步队列节点,直到转移,转移时的状态将被设置为 0 。
     * PROPAGATE -3 :releaseShared 应该被传播到其他节点。 
     * 0:都不是
     * 值以数字表示以简化使用,大多数时候可以检查符号(是否大于0)以简化使用
     */
    volatile int waitStatus;

    // 上一个节点
    volatile Node prev;

    // 下一个节点
    volatile Node next;

    // 节点持有线程
    volatile Thread thread;

    // 链接下一个等待条件节点,或特殊值共享
    Node nextWaiter;

    // 节点是否处于 共享状态 是, 返回 true
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 返回前一个节点, 使用时 前一个节点不能为空
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

在 Node 节点中需要重点关注 waitStatus

  1. 默认状态为 0;

  2. waitStatus > 0 (CANCELLED 1) 说明该节点超时或者中断了,需要从队列中移除;

  3. waitStatus = -1 SIGNAL 当前线程的前一个节点的状态为 SIGNAL,则当前线程需要阻塞(unpark);

  4. waitStatus = -2 CONDITION -2 :该节点目前在条件队列;

  5. waitStatus = -3 PROPAGATE -3 :releaseShared 应该被传播到其他节点,在共享锁模式下使用。

了解完 Node 的结构之后,再了解下 AQS 结构,并从常用方法入手,逐步了解具体实现逻辑。

AbstractQueuedSynchronizer

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

    // 等待队列的头,延迟初始化。 除了初始化,它是仅经由方法setHead修改。 注意:如果头存在,其waitStatus保证不会是 CANCELLED 状态
    private transient volatile Node head;

    // 等待队列的尾部,延迟初始化。 仅在修改通过方法ENQ添加新节点等待。
    private transient volatile Node tail;

    // 同步状态 
    private volatile int state;

    // 获取状态
    protected final int getState() {
        return state;
    }

    // 设置状态值
    protected final void setState(int newState) {
        state = newState;
    }

    // 原子更新状态值
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

}

在 AQS 中主要参数为:

head :等待队列头

tail : 等待队列尾

state : 同步状态

通过注释了解到,在 AQS 里主要分为两种操作模式,分别是:独占模式、共享模式,下面分别从两个不同的角度去分析源码。

acquire : 以独占模式获取,忽略中断。 通过调用至少一次实施 tryAcquire ,在成功时返回。 否则,线程排队,可能重复查封和解封,调用 tryAcquire 直到成功为止。 这种方法可以用来实现方法 Lock.lock 。

 

release : 以独占模式释放。 通过疏通一个或多个线程,如果实现 tryRelease 返回 true。 这种方法可以用来实现方法 Lock.unlock 。

acquireShared : 获取在共享模式下,忽略中断。 通过至少一次第一调用实现 tryAcquireShared ,在成功时返回。 否则,线程排队,可能重复查封和解封,调用 tryAcquireShared 直到成功为止。

releaseShared : 以共享模式释放。 通过疏通一个或多个线程,如果实现 tryReleaseShared 返回 true。

无论是共享模式还是独占模式在这里面都会用到 addWaiter 方法,将当前线程及模式创建排队节点。

独占模式

获取独占资源 acquire
public final void acquire(int arg) {
    // tryAcquire 尝试获取 state,获取失败则会加入到队列
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

在独占模式下会尝试获取 state,当获取失败时会调用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。

  1. tryAcquire(arg),尝试获取 state 这块由子类自己实现,不同的子类逻辑不同,这块在介绍子类代码时会说明。

  2. 获取 state 失败后,会进行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),这块代码可以拆分为两块:addWaiter(Node.EXCLUSIVE),acquireQueued(node, arg)。

  3. addWaiter(Node.EXCLUSIVE) 返回的是当前新创建的节点。

  4. acquireQueued(node, arg) 线程获取锁失败,使用 addWaiter(Node.EXCLUSIVE) 放入等待队列,而 acquireQueued(node, arg) 使用循环,不断的为队列中的节点去尝试获取资源,直到获取成功或者被中断。

总结获取资源主要分为三步:

  1. 尝试获取资源

  2. 入队列

  3. 出队列

尝试获取资源 tryAcquire(arg),由子类实现,那下面则着手分别分析 入队列出队列

入队列:addWaiter(Node.EXCLUSIVE)

使用 addWaiter(Node.EXCLUSIVE) 方法将节点插入到队列中,步骤如下:

  1. 根据传入的模式创建节点

  2. 判断尾节点是否存在,不存在则需要使用 enq(node) 方法初始化节点,存在则直接尝试插入尾部。

  3. 尝试插入尾部时使用 CAS 插入,防止并发情况,如果插入失败,会调用 enq(node) 自旋直到插入。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 定位到队列末尾的 node
    Node pred = tail;
    if (pred != null) {
        // 新节点的上一个节点 指向尾节点
        node.prev = pred;
        // 使用 CAS 设置尾节点,tail 如果等于 pred 则更新为 node
        if (compareAndSetTail(pred, node)) {
            // 更新成功则将 pred 的下一个节点指向 node
            pred.next = node;
            return node;
        }
    }
    // 尾节点没有初始化,或竞争失败,自旋
    enq(node);
    return node;
}

/**
 * tailOffset 也就是成员变量 tail 的值
 * 此处相当于:比较 tail 的值和 expect 的值是否相等, 相等则更新为 update
 */
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 尾节点为空 需要初始化头节点,此时头尾节点是一个
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 不为空 循环赋值
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

看完代码和注释肯定还是有点模糊,现在用图一步一步进行说明。

因为根据初始尾节点是否为空分为两种情况,这里使用两幅图:

  1. 第一幅为第一次添加节点,此时 head 会延迟初始化;

  2. 第二幅图为已经存在队列,进行插入节点;

  3. 注意看代码,enq 方法返回的是之前的尾节点

  4. addWaiter 方法 返回的是当前插入的新创建的节点

图片[4]-Java AQS 介绍(一)-Java专区论坛-技术-SpringForAll社区
图片[5]-Java AQS 介绍(一)-Java专区论坛-技术-SpringForAll社区

节点添加到队列之后,返回当前节点,而下一步则需要调用方法 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 不断的去获取资源。

出队列:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

方法会通过循环不断尝试获取拿到资源,直到成功。代码如下:

final boolean acquireQueued(final Node node, int arg) {
    // 是否拿到资源
    boolean failed = true;
    try {
        // 中断状态
        boolean interrupted = false;
        // 无限循环
        for (;;) {
            // 当前节点之前的节点
            final Node p = node.predecessor();
            // 前一个节点是头节点, 说明当前节点是 头节点的 next 即真实的第一个数据节点 (因为 head 是虚拟节点)
            // 然后再尝试获取资源
            if (p == head && tryAcquire(arg)) {
                // 获取成功之后 将头指针指向当前节点
                setHead(node); 
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // p 不是头节点, 或者 头节点未能获取到资源 (非公平情况下被别的节点抢占) 
            // 判断 node 是否要被阻塞,
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. 不断获取本节点的上一个节点是否为 head,因为 head 是虚拟节点,如果当前节点的上一个节点是 head 节点,则当前节点为 第一个数据节点

  2. 第一个数据节点不断的去获取资源,获取成功,则将 head 指向当前节点;

  3. 当前节点不是头节点,或者 tryAcquire(arg) 失败(失败可能是非公平锁)。这时候需要判断前一个节点状态决定当前节点是否要被阻塞(前一个节点状态是否为 SIGNAL)。

请登录后发表评论

    没有回复内容