Java AQS 介绍(二)-Java专区论坛-技术-SpringForAll社区

Java AQS 介绍(二)

/**
 * 根据上一个节点的状态,判断当前线程是否应该被阻塞
 * SIGNAL -1 :当前节点释放或者取消时,必须 unpark 他的后续节点。
 * CANCELLED 1 :由于超时(timeout)或中断(interrupt),该节点被取消。节点永远不会离开此状态。特别是,具有取消节点的线程永远不会再次阻塞。
 * CONDITION -2 :该节点目前在条件队列。 但它不会被用作同步队列节点,直到转移,转移时的状态将被设置为 0 。
 * PROPAGATE -3 :releaseShared 应该被传播到其他节点。 
 * 0:都不是
 *
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前一个节点的等待状态
    int ws = pred.waitStatus;
    // 前一个节点需要 unpark 后续节点
    if (ws == Node.SIGNAL)
        return true;
    // 当前节点处于取消状态
    if (ws > 0) {
        do {
            // 将取消的节点从队列中移除
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 设置前一个节点为 SIGNAL 状态
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

shouldParkAfterFailedAcquire 方法中,会判断前一个节点的状态,同时取消在队列中当前节点前面无效的节点。

图片[1]-Java AQS 介绍(二)-Java专区论坛-技术-SpringForAll社区

再继续阅读 出队列 acquireQueued 方法,发现有一个 finally 会判断状态后执行 cancelAcquire(node); ,也就是上面流程图中下面的红色方块。

cancelAcquire(Node node)
final boolean acquireQueued(final Node node, int arg) {
    // 是否拿到资源
    boolean failed = true;
    try {
        // 省略
        // 在 finally 会将当前节点置为取消状态
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


private void cancelAcquire(Node node) {
    // 节点不存在 直接返回
    if (node == null)
        return;

    // 取消节点关联线程
    node.thread = null;

    //跳过已经取消的节点,获取当前节点之前的有效节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // 获取当前节点之前的有效节点的下一个节点
    Node predNext = pred.next;

    // 当前节点设置为取消
    node.waitStatus = Node.CANCELLED;

    // 当前节点如果是尾节点,则将最后一个有效节点设置为尾节点,并将 predNext 设置为空
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        // pred 不是头节点(node 的上一个有效节点 不是 head) && ( pred的状态是 SIGNAL ||  pred 的状态设置为 SIGNAL 成功 ) && pred 的绑定线程不为空
        if (pred != head && 
        ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 
        pred.thread != null) {
            // 当前节点的后继节点
            Node next = node.next;
            // 后继节点不为空 且 状态有效 将 pred 的 后继节点设置为 当前节点的后继节点
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // node 的上一个有效节点 是 head, 或者其他情况 唤醒当前节点的下一个有效节点
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

private void unparkSuccessor(Node node) {

    // 判断当前节点状态
    int ws = node.waitStatus;
    if (ws < 0)
        // 将节点状态更新为 0 
        compareAndSetWaitStatus(node, ws, 0);

    // 下一个节点, 一般是下一个节点应该就是需要唤醒的节点,即颁发证书。
    Node s = node.next;
    // 大于 0  CANCELLED : 线程已取消
    // 但是有可能 后继节点 为空或者被取消了。
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从尾节点开始遍历,直到定位到 t.waitStatus <= 0 的节点
        // 定位到后并不会停止,会继续执行,相当于找到最开始的那个需要唤醒的节点
        // t.waitStatus <= 0 : SIGNAL( -1 后续线程需要释放) 
        //                     CONDITION ( -2 线程正在等待条件) 
        //                     PROPAGATE ( -3 releaseShared 应该被传播到其他节点)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 定位到需要唤醒的节点后 进行 unpark
    if (s != null)
        LockSupport.unpark(s.thread);
}

流程分析:

  1. 找到当前节点的前一个非无效节点 pred;

  2. 当前节点如果是尾节点,则将最后一个有效节点设置为尾节点,并将 predNext 设置为空;

  3. pred 不是头节点 && ( pred 的状态是 SIGNAL || pred 的状态设置为 SIGNAL 成功 ) && pred 的绑定线程不为空;

  4. 其他情况。

下面分别画图:

图片[2]-Java AQS 介绍(二)-Java专区论坛-技术-SpringForAll社区
图片[3]-Java AQS 介绍(二)-Java专区论坛-技术-SpringForAll社区
图片[4]-Java AQS 介绍(二)-Java专区论坛-技术-SpringForAll社区

Q: 通过图可以看出来,只操作了 next 指针,但是没有操作 prev 指针,这是为什么呢?

A:出队列:acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法中,shouldParkAfterFailedAcquire 方法会判断前一个节点的状态,同时取消在队列中当前节点前面无效的节点。这时候会移除之前的无效节点,此处也是为了防止指向一个已经被移除的节点。同时保证 prev 的稳定,有利于从 tail 开始遍历列表,这块在 unparkSuccessor(node); 中也可以看到是从后往前表里列表。

Q: unparkSuccessor(Node node) 为什么从后往前遍历?

A:

图片[5]-Java AQS 介绍(二)-Java专区论坛-技术-SpringForAll社区

addWaiter(Node.EXCLUSIVE) 插入新节点时,使用的是 尾插法,看红框部分,此时有可能还未指向 next。

Q: node.next = node; 这块导致 head 不是指向最新节点,链表不就断了么?

A: acquireQueued 方法介绍中,里面有个循环,会不断尝试获取资源,成功之后会设置为 head。并且在 shouldParkAfterFailedAcquire 中也会清除当前节点前的无效节点。

释放独占资源 release
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

以独占模式释放。 通过释放一个或多个线程,如果实现 tryRelease 返回 true。 这种方法可以用来实现方法 Lock.unlock 。

  1. tryRelease(arg) 操作释放资源,同样是由子类实现,后面介绍子类时会进行说明。返回 true 说明资源现在已经没有线程持有了,其他节点可以尝试获取;

  2. 释放成功,且 head != null && h.waitStatus != 0, 会继续执行 unparkSuccessor(h);

  3. 这块会看到 只要 tryRelease(arg) 操作释放资源成功, 后面无论执行是否成功,都会返回 true,unparkSuccessor(h) 相当于只是附加操作。

共享模式

获取共享资源 acquireShared
public final void acquireShared(int arg) {
    // 小于 0 表示获取资源失败
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    // 添加到节点 此处是共享节点
    final Node node = addWaiter(Node.SHARED);
    // 根据是否拿到资源 判断是否需要取消
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 返回前一个节点
            final Node p = node.predecessor();
            if (p == head) {
                // 再次尝试获取共享资源
                int r = tryAcquireShared(arg);
                // 表示获取成功
                if (r >= 0) {
                    // 设置当前节点为头节点 并尝试唤醒后续节点
                    setHeadAndPropagate(node, r);
                    // 释放头节点 GC 会回收
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. tryAcquireShared(arg),尝试获取资源,这块由子类实现;

  2. 返回值分为 3 种:

1. 小于 0: 表示失败;

2. 等于 0: 表示共享模式获取资源成功,但后续的节点不能以共享模式获取成功;

3. 大于 0: 表示共享模式获取资源成功,后续节点在共享模式获取也可能会成功,在这种情况下,后续等待线程必须检查可用性。

  1. 在失败后会使用 doAcquireShared(arg); 不断获取资源;

  2. final Node node = addWaiter(Node.SHARED); 同样会创建节点;

  3. 在循环中不断判断前一个节点如果是 head,则尝试获取资源;

  4. 在共享模式下获取到资源后会使用 setHeadAndPropagate(node, r); 设置头节点,同时唤醒后续节点。

设置头节点,并传播唤醒后续节点
// node 是当前节点
// propagate 是 前一步 tryAcquireShared 的返回值 进来时 >=0
// 大于 0: 表示共享模式获取资源成功,后续节点在共享模式获取也可能会成功,在这种情况下,后续等待线程必须检查可用性。
private void setHeadAndPropagate(Node node, int propagate) {
    // 记录下当前头节点
    Node h = head; // Record old head for check below
    // 设置传入 node 为头节点
    setHead(node);
    // 判断条件,唤醒后续节点
    // propagate > 0 有后续资源
    // h == null 旧的头节点 因为前面 addWaiter, 肯定不会为空,应该是防止 h.waitStatus < 0 空指针的写法
    // (h = head) == null 当前的 头节点,再判断状态
    // waitStatus < 0 后续节点就需要被唤醒
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 后续节点为共享,则需要唤醒
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
doReleaseShared() 释放共享资源
private void doReleaseShared() {
    // 循环
    for (;;) {
        // 从头开始
        Node h = head;
        // 判断队列是否为空,就是刚初始化
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // SIGNAL( -1 后续线程需要释放)
            if (ws == Node.SIGNAL) {
                // 将等待状态更新为 0 如果失败,会循环
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒后续节点, 同时将当前节点设置为 取消
                unparkSuccessor(h);
            }
            // 如果状态是 0 则会更新状态为 PROPAGATE
            // PROPAGATE ( -3 releaseShared 应该被传播到其他节点)
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 判断头节点有没有变化,有变化 是因为竞争,别的线程获取到了锁,会继续循环
        // 没有变化直接结束
        if (h == head)                   // loop if head changed
            break;
    }
}
  1. 从头节点开始进行,如果 h != null && h != tail 说明队列不是空或者刚初始化;

  2. 节点状态为 SIGNAL( -1 )说明后续线程需要释放;

  3. 会更改当前节点状态,成功后唤醒后续节点,失败则继续循环;

  4. 节点状态如果是 0 则更新为 PROPAGATE,会将状态传播。

释放共享资源 releaseShared
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 释放共享资源
        doReleaseShared();
        return true;
    }
    return false;
}

以共享模式释放。 通过释放一个或多个线程,如果实现 tryReleaseShared 返回 true。

总结

Q: AQS 到底是什么?

A: AQS 内部提供了一个先入先出(FIFO)双向等待队列,内部依靠 Node 实现,并提供了在独占模式共享模式下的出入队列的公共方法。而关于状态信息 state 的定义是由子类实现。tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared 等尝试获取资源操作都是由子类进行定义和实现的。而 AQS 中提供了子类获取资源之后的相关操作,包括节点 Node 的出入队列,自旋获取资源等等。

Q: AQS 获取资源失败后会如何操作?

A: 线程获取资源失败后,会放到等待队列中,在队列中会不断尝试获取资源(自旋),说明线程只是进入等待状态,后面还是可以再次获取资源的。

Q: AQS 等待队列的数据结构是什么?

A: CLH 变体的先入先出(FIFO)双向等待队列。(CLH 锁是一个自旋锁。能确保无饥饿性。提供先来先服务的公平性。是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。)

Q: AQS 等待队列中的节点如何获取获取和释放资源的?

A: 可以看下独占模式中的讲述过程,通过代码梳理。

本文分别从 独占模式共享模式介绍的 AQS 基本逻辑,并通过源码和作图理解基本思路。但是并没有对需要子类实现的业务逻辑做介绍。这块会在后面介绍 ReentrantLockCountDownLatch 等子类的时候做介绍。

请登录后发表评论

    没有回复内容