面试官:CAS和AQS都是什么,有什么用,谁来实现,源码怎么写的?

CAS

全称是:Compare And Swap 比较并交换,是乐观锁的主要实现方式。cas实现了在多线程环境下即使不使用锁也可以让线程间同步。

在Java中使用CAS设计到三个参数,变量的内存地址,期望修改的值,要修改的值。

只有内存中的值和期望修改的值相等时,才会将内存中的值修改为要修改的值,这样代表cas操作成功。

如果修改失败,一般都会自旋重试,正常情况下都会设置线程阻塞,用来防止CPU空转导致占用率飙高的问题。

CAS的ABA问题

ABA问题通俗的解释为:

  • 线程A执行了CAS的操作,将内存地址A中期望的0修改为1,成功!
  • 线程B将内存地址A中期望的1又修改为0,成功!
  • 线程C将内存地址A中期望的0修改为1,成功!

我们可以发现,线程C看似是成功的,但是在它执行之前,内存中的数值已经被修改过一次了,这样虽然结果是CAS成功的,但是过程中可能蕴含着一些不可控的因素在里边,显然是不可以接受的。

解决方法,可以在每次修改的时候都增加一次版本号,每次更新都+1,在执行修改之前,再次判断版本号是否和期望的版本号一致。

AtomicStampedReference<T> 中就携带的有版本号,可以帮助我们快速解决ABA问题。


AQS

aqs是抽象同步队列,通过Node实现了队列,内部使用unsafe提供的CAS api实现的cas操作,也是java无锁编程的重要实现,最常用的有RLock,线程池

Node节点的几种状态

  • INIT:Node节点初始化时的状态,其实就是waitStatus的默认值(int 默认值=0)
  • CANCELLED:代表当前节点的任务是取消了的
  • SIGNAL:代表当前线程需要唤醒下一个线程
  • CONDITION:表示当前Node节点的线程处于等待状态,等待其它节点的唤醒
  • PROPAGATE:共享模式下的Node状态

AQS应用总流程

  • 首先使用AQS必须要子类实现以下两组中其中一组方法,如果不实现,则会调用父类AbstractQueuedSynchronizer 的方法,则会直接抛出UnsupportedOperationException异常。

    • tryAcquire(arg) tryRelease(arg) :以非共享模式尝试上锁和尝试解锁
    • tryAcquireShared(arg) tryReleaseShard(arg) :以共享模式尝试上锁和尝试解锁
  • 应用类获取锁:

    • 一般都会先调用父类AbstractQueuedSynchronizercompareAndSetState即CAS操作,进行上锁。
      • 成功则代表这个线程cas操作成功了,也就是拿到锁了
      • 失败则代表这个锁已经被别的线程抢走了。
      • 这个时候会调用父类AbstractQueuedSynchronizeracquire方法。
        • 再次尝试获取锁,如果失败了,并且成功添加到双向链表中,则当前线程阻塞等待。
        • 如果尝试获取锁成功了,则说明刚才拿到锁的线程已经释放掉了,当前线程上锁成功。
  • 应用类释放锁:

    • 应用类首先会调用release方法,这个时候aqs会去调用由应用类实现的tryRelease方法,并尝试进行解锁。
      • 如果解锁成功的话:会去唤醒当前节点的下一个节点,依次循环此流程。
      • 如果解锁失败的话:那说明子类有自己的逻辑。

一、类信息

AbstractQueuedSynchronizer:AQS的主类。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

}

AbstractQueuedSynchronizer简称AQS,集成自:AbstractOwnableSynchronizer类,实现了序列化的接口。

AbstractOwnableSynchronizer父类内部维护了一个属性:

private transient Thread exclusiveOwnerThread;

用于存放当前执行的线程。独占模式同步的当前所有者。


Node:用于存放任务线程,以及实现双向链表类。

static final class Node {}

这个类是AQS的精髓,重要的信息都在内部进行存放,其中几个重要的属性如下:

  • waitStatus:描述了当前节点的运行状态,共有以下几种状态。

    • 初始化、默认状态:Node节点初始化时的状态(并没有真实属性表示),其实就是waitStatus的默认值(int 默认值=0)

    • CANCELLED:代表当前节点的任务是取消了的

    • SIGNAL:代表当前线程需要唤醒下一个线程

    • CONDITION:表示当前Node节点的线程处于等待状态,等待其它节点的唤醒

    • PROPAGATE:共享模式下的Node状态

  • prev:链表的上一个节点

  • next:链表的下一个节点

  • thread:工作线程


二、内部属性

head、tail:用于保存双线链表的头尾指针

private transient volatile Node head;

private transient volatile Node tail;

阻塞队列(因为队列中的元素会被阻塞掉,所以也可以称之为阻塞队列,实际上就是一个双向链表)的头尾,在使用时初始化。

如果head存在,则head的waitStatus属性保证不为CANCELLED。

state:队列的同步状态

示例:在应用类ReentrantLock用于保存锁的获取状态以及获取次数。

private volatile int state;

CAS操作的必要属性

private static final Unsafe unsafe = Unsafe.getUnsafe();

private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

这些字段会在类初始化的时候,也就是static代码块中进行赋值:

static {
    try {
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

三、需要实现的方法

在AQS中,部分的方法要求子类去实现,否则调用时会抛出异常,像:tryXXX()这些方法,详细列表如下:

tryAcquire(int arg) 尝试使用独占锁的方式进行加锁。

tryAcquireShared(int arg) 尝试使用共享方式进行加锁。

tryRelease(int arg) 释放独占锁。

tryReleaseShard(int arg) 释放共享锁。

这样做的目的可能是因为:AQS因为有两套实现,共享和独占,如果以接口的设计那么就过于的死板,子类必须实现所有的方法才可以使用AQS,即使子类只需要独占模式,这样设计的好处在于,如果子类只希望使用独占模式,则只需要实现独占模式的方法即可,更加的灵活,而且子类实现,也可以扩展自己其它的逻辑。


四、AQS使用流程及原理

AQS本身是一个抽象类,并不能直接拿来使用,所以此处使用实现类:ReentrantLock进行举例。

上锁的流程

应用类首先会去调用aqs实现的cas方法判断是否可以直接加锁成功。

final void lock() {

    if (compareAndSetState(0, 1))

        setExclusiveOwnerThread(Thread.currentThread());
    else

        acquire(1);
}
  • 实际上这个compareAndSetState是由UnSafe类实现的navite方法,直接调用操作系统原语进行操作,这里不深究。

  • protected final boolean compareAndSetState(int expect, int update) {
    
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

```

以下为AQS的实现。

AQS的独占锁方法acquire()

public final void acquire(int arg) {



    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){

        selfInterrupt();
    }
}

这个tryAcquire()方法是子类实现的,其实还是调用的cas操作,此处不深究。主要看入队列和封装waiter的操作。

按照顺序,先看addWaiter()方法

private Node addWaiter(Node mode) {

    Node node = new Node(Thread.currentThread(), mode);




    Node pred = tail;
    if (pred != null) {

        node.prev = pred;

        if (compareAndSetTail(pred, node)) {


            pred.next = node;

            return node;
        }
    }

    enq(node);

    return node;
}

enq():初始化双向链表 && Node入队

private Node enq(final Node node) {

    for (;;) {

        Node t = tail;

        if (t == null) { 


            if (compareAndSetHead(new Node()))
                tail = head;
        } 

        else {

            node.prev = t;

            if (compareAndSetTail(t, node)) {

                t.next = node;

                return t;
            }
        }
    }
}

到此addWaiter方法就执行完毕,任务线程已经包装成Node进入了队列中。

再回过头来看acquire()方法的acquireQueued()方法

final boolean acquireQueued(final Node node, int arg) {

    boolean failed = true;
    try {

        boolean interrupted = false;
        for (;;) {

            final Node p = node.predecessor();

            if (p == head && tryAcquire(arg)) {

                setHead(node);


                p.next = null; 
                failed = false;

                return interrupted;
            }




            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){

                interrupted = true;
            }
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire():判断当前Node的任务线程是否需要进行阻塞,以及移除状态为取消的Node

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

    int ws = pred.waitStatus;

    if (ws == Node.SIGNAL)

             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
        return true;

    if (ws > 0) {

             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */

        do {




            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);

        pred.next = node;
    }

    else {

             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }

    return false;
}

parkAndCheckInterrupt():挂起线程,并返回当前线程是否被中断。

private final boolean parkAndCheckInterrupt() {

    LockSupport.park(this);

    return Thread.interrupted();
}

释放锁的流程

在aqs中释放锁会先去调用:release() 方法对锁进行一个释放。

public final boolean release(int arg) {


    if (tryRelease(arg)) {

        Node h = head;

        if (h != null && h.waitStatus != 0){

            unparkSuccessor(h);
        }

        return true;
    }

    return false;
}

unparkSuccessor(h):清除头节点执行状态(waitStatus),唤醒阻塞线程

private void unparkSuccessor(Node node) {

         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */


    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);


         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */

    Node s = node.next;






    if (s == null || s.waitStatus > 0) {
        s = null;


        for (Node t = tail; t != null && t != node; t = t.prev)

            if (t.waitStatus <= 0)
                s = t;
    }

    if (s != null)

        LockSupport.unpark(s.thread);
}

解锁流程到此结束。


锁相关概念

公平锁

等待获取锁的线程拥有特定顺序,挨个排队拿锁。

非公平锁

不按照特定的顺序进行拿锁,谁先抢到就是谁的。

可重入锁

已经拿到过锁的线程,不需要再次拿锁,就可以访问共享资源。

不可重入锁

已经拿到锁的线程,如果要访问共享资源,则需要再次获取锁。

独占锁

自己已经拿到锁了,别的线程再过来那就拿不到。

偏向锁

拿到过一次锁,再去获取锁如果锁指向的是自己,就直接执行流程,不用等待拿锁了。

乐观锁

每次在修改数据之前,获取一个版本号,一般会和数据一起保存,在修改的时候判断版本号是否和携带的版本号一致,如果一致就修改,否则就失败,抛一个版本号异常。适用于并发修改量低的场景。

悲观锁

在每次修改之前都获取锁,以此来保证在修改数据时不会被其它线程影响,性能开销比较大。适用于并发修改量大,且需要保证数据xxx的场景。

抽象类和接口的区别

接口,我觉的是一套规范的具体定义,比如说数据库的驱动,要链接数据库的话,数据库厂商必须实现特定的接口,才可以构建出一个完整的数据库驱动。

抽象类,其实从名字上就可以看出来,它是一件事情,或一个功能的抽象实现,就好比,AQS的这个类,拥有两套实现,公平锁和非公平锁,如果aqs定义为是一个接口的话,那么就过于的具体,不具备抽象的概念,实现类就必须同时实现两套加锁逻辑,抽象类的话可以定义方法的修饰符为protected这样的话,子类就按需实现即可。

链接:https://juejin.cn/post/7280429214607786042

请登录后发表评论

    没有回复内容