CAS
全称是:Compare And Swap 比较并交换,是乐观锁的主要实现方式。cas实现了在多线程环境下即使不使用锁也可以让线程间同步。
在Java中使用CAS设计到三个参数,变量的内存地址,期望修改的值,要修改的值。
只有内存中的值和期望修改的值相等时,才会将内存中的值修改为要修改的值,这样代表cas操作成功。
如果修改失败,一般都会自旋重试,正常情况下都会设置线程阻塞,用来防止CPU空转导致占用率飙高的问题。
CAS的ABA问题
ABA问题通俗的解释为:
- 线程A执行了CAS的操作,将内存地址A中期望的0修改为1,成功!
- 线程B将内存地址A中期望的1又修改为0,成功!
- 线程C将内存地址A中期望的0修改为1,成功!
我们可以发现,线程C看似是成功的,但是在它执行之前,内存中的数值已经被修改过一次了,这样虽然结果是CAS成功的,但是过程中可能蕴含着一些不可控的因素在里边,显然是不可以接受的。
解决方法,可以在每次修改的时候都增加一次版本号,每次更新都+1,在执行修改之前,再次判断版本号是否和期望的版本号一致。
在AtomicStampedReference<T>
中就携带的有版本号,可以帮助我们快速解决ABA问题。
AQS
aqs是抽象同步队列,通过Node实现了队列,内部使用unsafe提供的CAS api实现的cas操作,也是java无锁编程的重要实现,最常用的有RLock,线程池
Node节点的几种状态
- INIT:Node节点初始化时的状态,其实就是
waitStatus
的默认值(int 默认值=0) - CANCELLED:代表当前节点的任务是取消了的
- SIGNAL:代表当前线程需要唤醒下一个线程
- CONDITION:表示当前Node节点的线程处于等待状态,等待其它节点的唤醒
- PROPAGATE:共享模式下的Node状态
AQS应用总流程
-
首先使用AQS必须要子类实现以下两组中其中一组方法,如果不实现,则会调用父类
AbstractQueuedSynchronizer
的方法,则会直接抛出UnsupportedOperationException
异常。- tryAcquire(arg) tryRelease(arg) :以非共享模式尝试上锁和尝试解锁
- tryAcquireShared(arg) tryReleaseShard(arg) :以共享模式尝试上锁和尝试解锁
-
应用类获取锁:
- 一般都会先调用父类
AbstractQueuedSynchronizer
的compareAndSetState
即CAS操作,进行上锁。- 成功则代表这个线程cas操作成功了,也就是拿到锁了
- 失败则代表这个锁已经被别的线程抢走了。
- 这个时候会调用父类
AbstractQueuedSynchronizer
的acquire
方法。- 再次尝试获取锁,如果失败了,并且成功添加到双向链表中,则当前线程阻塞等待。
- 如果尝试获取锁成功了,则说明刚才拿到锁的线程已经释放掉了,当前线程上锁成功。
- 一般都会先调用父类
-
应用类释放锁:
- 应用类首先会调用release方法,这个时候aqs会去调用由应用类实现的tryRelease方法,并尝试进行解锁。
- 如果解锁成功的话:会去唤醒当前节点的下一个节点,依次循环此流程。
- 如果解锁失败的话:那说明子类有自己的逻辑。
- 应用类首先会调用release方法,这个时候aqs会去调用由应用类实现的tryRelease方法,并尝试进行解锁。
一、类信息
AbstractQueuedSynchronizer:AQS的主类。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
}
AbstractQueuedSynchronizer
简称AQS,集成自:AbstractOwnableSynchronizer
类,实现了序列化的接口。
AbstractOwnableSynchronizer
父类内部维护了一个属性:
private transient Thread exclusiveOwnerThread;
用于存放当前执行的线程。独占模式同步的当前所有者。
Node:用于存放任务线程,以及实现双向链表类。
static final class Node {}
这个类是AQS的精髓,重要的信息都在内部进行存放,其中几个重要的属性如下:
-
waitStatus:描述了当前节点的运行状态,共有以下几种状态。
-
初始化、默认状态:Node节点初始化时的状态(并没有真实属性表示),其实就是
waitStatus
的默认值(int 默认值=0) -
CANCELLED:代表当前节点的任务是取消了的
-
SIGNAL:代表当前线程需要唤醒下一个线程
-
CONDITION:表示当前Node节点的线程处于等待状态,等待其它节点的唤醒
-
PROPAGATE:共享模式下的Node状态
-
-
prev:链表的上一个节点
-
next:链表的下一个节点
-
thread:工作线程
二、内部属性
head、tail:用于保存双线链表的头尾指针
private transient volatile Node head; private transient volatile Node tail;
阻塞队列(因为队列中的元素会被阻塞掉,所以也可以称之为阻塞队列,实际上就是一个双向链表)的头尾,在使用时初始化。
如果head存在,则head的waitStatus属性保证不为CANCELLED。
state:队列的同步状态
示例:在应用类ReentrantLock用于保存锁的获取状态以及获取次数。
private volatile int state;
CAS操作的必要属性
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
这些字段会在类初始化的时候,也就是static代码块中进行赋值:
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
三、需要实现的方法
在AQS中,部分的方法要求子类去实现,否则调用时会抛出异常,像:tryXXX()
这些方法,详细列表如下:
tryAcquire(int arg)
尝试使用独占锁的方式进行加锁。
tryAcquireShared(int arg)
尝试使用共享方式进行加锁。
tryRelease(int arg)
释放独占锁。
tryReleaseShard(int arg)
释放共享锁。
这样做的目的可能是因为:AQS因为有两套实现,共享和独占,如果以接口的设计那么就过于的死板,子类必须实现所有的方法才可以使用AQS,即使子类只需要独占模式,这样设计的好处在于,如果子类只希望使用独占模式,则只需要实现独占模式的方法即可,更加的灵活,而且子类实现,也可以扩展自己其它的逻辑。
四、AQS使用流程及原理
AQS本身是一个抽象类,并不能直接拿来使用,所以此处使用实现类:ReentrantLock进行举例。
上锁的流程
应用类首先会去调用aqs实现的cas方法判断是否可以直接加锁成功。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
-
实际上这个
compareAndSetState
是由UnSafe类实现的navite
方法,直接调用操作系统原语进行操作,这里不深究。 -
protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
```
以下为AQS的实现。
AQS的独占锁方法acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
这个tryAcquire()
方法是子类实现的,其实还是调用的cas操作,此处不深究。主要看入队列和封装waiter的操作。
按照顺序,先看addWaiter()
方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq():初始化双向链表 && Node入队
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
}
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
到此addWaiter方法就执行完毕,任务线程已经包装成Node进入了队列中。
再回过头来看acquire()
方法的acquireQueued()
方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire():判断当前Node的任务线程是否需要进行阻塞,以及移除状态为取消的Node
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
else {
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt():挂起线程,并返回当前线程是否被中断。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
释放锁的流程
在aqs中释放锁会先去调用:release()
方法对锁进行一个释放。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0){
unparkSuccessor(h);
}
return true;
}
return false;
}
unparkSuccessor(h):清除头节点执行状态(waitStatus),唤醒阻塞线程
private void unparkSuccessor(Node node) {
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
解锁流程到此结束。
锁相关概念
公平锁
等待获取锁的线程拥有特定顺序,挨个排队拿锁。
非公平锁
不按照特定的顺序进行拿锁,谁先抢到就是谁的。
可重入锁
已经拿到过锁的线程,不需要再次拿锁,就可以访问共享资源。
不可重入锁
已经拿到锁的线程,如果要访问共享资源,则需要再次获取锁。
独占锁
自己已经拿到锁了,别的线程再过来那就拿不到。
偏向锁
拿到过一次锁,再去获取锁如果锁指向的是自己,就直接执行流程,不用等待拿锁了。
乐观锁
每次在修改数据之前,获取一个版本号,一般会和数据一起保存,在修改的时候判断版本号是否和携带的版本号一致,如果一致就修改,否则就失败,抛一个版本号异常。适用于并发修改量低的场景。
悲观锁
在每次修改之前都获取锁,以此来保证在修改数据时不会被其它线程影响,性能开销比较大。适用于并发修改量大,且需要保证数据xxx的场景。
抽象类和接口的区别
接口,我觉的是一套规范的具体定义,比如说数据库的驱动,要链接数据库的话,数据库厂商必须实现特定的接口,才可以构建出一个完整的数据库驱动。
抽象类,其实从名字上就可以看出来,它是一件事情,或一个功能的抽象实现,就好比,AQS的这个类,拥有两套实现,公平锁和非公平锁,如果aqs定义为是一个接口的话,那么就过于的具体,不具备抽象的概念,实现类就必须同时实现两套加锁逻辑,抽象类的话可以定义方法的修饰符为protected这样的话,子类就按需实现即可。
没有回复内容