平常在需要使用一些简单的本地缓存时会用到ConcurrentHashMap,但一直没深入了解过它的原理,之前面试时也有一定几率碰到,回答的总是一知半解的,最近深入的了解一下ConcurrentHashMap的原理,把它完整的梳理成一篇文章。
文章会从查询、存放、扩容、计数四个方面来深入解析ConcurrentHashMap,我先抛出几个问题,大家可以先思考一下,带着问题来阅读这样可以更好帮助大家理解concurrentHashMap,如果都了解那么大神可以出门右拐了;如果不了解可以详细看一下本篇文章,看完本章内容详细你能完整的回答全部问题,如果只想知道答案的,可以直接下滑到底部
- concurrentHashMap的查询过程是什么样的?
- concurrentHashMap的散列算法和hashMap的有什么区别?
- concurrentHashMap查询时会加锁么?
- concurrentHashMap的K/V允许为空么?为什么?
- concurrentHashMap什么时候初始化?
- concurrentHashMap哪些时机会进行扩容?
- concurrentHashMap如何实现并发扩容的?
- concurrentHashMap的
size
方法是准确的么? - concurrentHashMap中计数的桶是如何扩容的?
- concurrentHashMap同一个桶中,链表和红黑树可以共存么?
平时在使用concurrentHashMap时,最常用的就是获取值,存放值以及查询存放了多少个值。代码如下:
ConcurrentHashMap<String,String> map = new ConcurrentHashMap();
map.get("key");
map.put("key","value");
map.putIfAbsent("key","value");
map.size();
属性
这些是concurrentHashMap内部的属性,为了更好的理解源码,我先贴出来,大家可以先熟悉一下。
private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
private static int RESIZE_STAMP_BITS = 16;
private static final int MIN_TRANSFER_STRIDE = 16;
private transient volatile int transferIndex;
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
static final int TREEIFY_THRESHOLD = 8;
static final int MIN_TREEIFY_CAPACITY = 64;
static final int MOVED = -1;
static final int TREEBIN = -2;
static final int RESERVED = -3;
static final int HASH_BITS = 0x7fffffff;
transient volatile Node<K,V>[] table;
private transient volatile Node<K,V>[] nextTable;
private transient volatile long baseCount;
static final int NCPU = Runtime.getRuntime().availableProcessors();
private transient volatile int sizeCtl;
查询值
get 方法
对于查询流程:
- 调用spread方法对key进行散列运算,获取key的散列值;
- 先判断是否为空,并且该key对应下标的位置是否为空,如果都为空,则该值不存在;
- 再判断要查询的key是否在数组上,如果在则直接返回数组上的值;
- 如果不在则判断对应的节点的hash是否小于0;下面会有很多地方用到该数值,请大家务必记住,后面也会反复提示,小于零的含义分别是:
// 当前位置已经被扩容时迁移走了 static final int MOVED = -1;
// 当前位置是一棵树 static final int TREEBIN = -2;
// 当前位置被占用,还未赋值 static final int RESERVED = -3; - 因为不同的情况封装为不同的node类型,下面会详解各种类型的find方法;
- 如果上述条件都不满足,则该key的值在链表中,从链表中查询数据。
具体代码以及注释如下:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null &&
(n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
看完这里就简单的回答第一个问题了,但此时回答还不够详细,例如散列算法、是否加锁等内容还没解释。
spread 散列方法
散列算法与hashMap类似,但多了一步和Integer.MAX_VALUE进行与运算,保证得到一个正整数的结果。因为负数有特殊的含义:
- static final int MOVED = -1; // 当前位置已经被扩容时迁移走了
- static final int TREEBIN = -2; // 当前位置是一棵树
- static final int RESERVED = -3; // 当前位置被占用,还未赋值
这三个值在源码中会非常高频的出现,文中会反复强调。
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
看完这里可以回答第二个问题,concurrentHashMap的散列算法比HashMap多了一步需要保证为正数,因为负数有其他的含义。
迁移(ForwardingNode) find 方法
当该节点的hash为MOVED=-1
说明该节点已经被迁移走了,要查询数据时需要去新数组上查询数据。
- 新数组、新数组中该下标位置的节点都为空,说明不存在该key,直接返回null
- 死循环查找,类似
get
方法,如果直接查询到了就返回结果 - 如果找到节点的hash还是小于0的,说明新数组中也出现了这三种情况(迁移走、树、还未赋值)
- 既不在数组上,又不是负数,那么只能是链表了,从链表中查询。
下面是具体代码,我在代码上每一行都写上注释了。
Node<K,V> find(int h, Object k) {
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
占用的(ReservationNode) find 方法
ReservationNode是被占用但还未赋值的实现,因为没被赋值呢,直接返回空即可。
static final class ReservationNode<K,V> extends Node<K,V> {
ReservationNode() {
super(RESERVED, null, null, null);
}
Node<K,V> find(int h, Object k) {
return null;
}
}
(TreeBin) find方法
这里是TreeBin的实现,大家都知道当数组大于64并且链表长度超过8会被转换成红黑树。在源码中不仅仅会被转成红黑树还维护着一个双向链表,后续会详细讲解。查询过程:
- 拿到双向链表的头节点;
- 判断当前桶的状态,是否有写锁或等待锁;
- 如果有则判断该节点的key是否就是要查询的key,是直接返回,不是的话往下遍历一个节点;
- 使用cas尝试设置读锁,设置成功则从红黑树中查询当前key
- 判断自己为最后一个释放读锁的线程并且有其他线程正在请求写锁,唤醒请求写锁的线程。
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
static final int WRITER = 1;
static final int WAITER = 2;
static final int READER = 4;
final Node<K,V> find(int h, Object k) {
if (k != null) {
for (Node<K,V> e = first; e != null; ) {
int s; K ek;
if (((s = lockState) & (WAITER|WRITER)) != 0) {
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
e = e.next;
}
else if (U.compareAndSwapInt(this, LOCKSTATE, s,
s + READER)) {
TreeNode<K,V> r, p;
try {
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null));
} finally {
Thread w;
if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
(READER|WAITER) && (w = waiter) != null)
LockSupport.unpark(w);
}
return p;
}
}
}
return null;
}
看完这里就可以回答第三个问题,get的时候也可能会加一把读锁。
存储值
对于存放值比较常用的三个方法put、putIfAbsent、putAll
,通过下面的代码可以看到他们三个都会调用putVal
put方法
public V put(K key, V value) {
return putVal(key, value, false);
}
public V putIfAbsent(K key, V value) {
return putVal(key, value, true);
}
public void putAll(Map<? extends K, ? extends V> m) {
tryPresize(m.size());
for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
putVal(e.getKey(), e.getValue(), false);
}
putVal
下面是真实的添加值,由于该方法比较长,大家可以分块看,我在代码里对每一块进行了分割,它主要做了以下几步动作:
- 通过
spread
方法拿到散列值,该方法上面已经解释过了; - 懒加载,调用initTable方法初始化数组;
- 因为上层是一个for循环,初始化成功后再次循环就进入到第二块,采用cas方法设置node值,如果成功则跳出循环,失败的继续往下走;
- 判断当前是否为MOVE状态,这里前面有反复提到,此时的状态为该桶已经被迁移走了,调用helpTransfer帮助扩容。扩容后面会详细解释;
- 接下来还会根据
onlyIfAbsent
判断当前值是否存在,因为onlyIfAbsent为true是不对旧值进行覆盖的,直接返回即可。这行代码是新版本优化的代码; - 上述经历了,cas尝试存放节点、判断是否被迁移以及是否已经存在该key的节点都失败,此时说明该桶有hash冲突,采用
synchronized
进行加锁操作;此时又有一个DCL检测;判断当前桶的状态是否大于0,大于0说明是正常的节点,遍历当前链表把节点放到链表尾部; - 前面判断了hash值大于0,当hash值等于
TREEBIN(-3)
说明为红黑树,根据红黑树的逻辑进行添加; - 数据添加完成,判断链表是否大于
TREEIFY_THRESHOLD(8)
,调用treeifyBin尝试转为红黑树; - 如果是覆盖原有key的值,则直接返回
- 进行addCount计数
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
--------------------------------------- 第一块 --------------------------------
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
--------------------------------------- 第二块 --------------------------------
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
}
--------------------------------------- 第三块 --------------------------------
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else if (onlyIfAbsent
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
--------------------------------------- 第四块 --------------------------------
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
--------------------------------------- 第五块 --------------------------------
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
--------------------------------------- 第五块 --------------------------------
addCount(1L, binCount);
return null;
}
initTable 初始化数组
这里需要用到属性sizeCtrl,它的数值有以下含义:
// 标识数组初始化和扩容的标识信息
// 等于 -1 代表正在初始化
// 小于 -1 代表正在扩容低16位代表当前数组正在扩容的线程个数(如果1个线程扩容,值为-2,如果2个线程扩容,值为-3)
// 等于 0 代表没有初始化
// 大于 0 如果数组没有初始化,则代表初始化的长度,如果数组已经初始化了,代表下次扩容的阈值
private transient volatile int sizeCtl;
该方法步骤如下:
- 判断是否当前table为空,
- 判断它的状态,如果小于0说明有人在扩容,让出cpu,让其他线程进行扩容;
- 使用cas尝试更改为-1代表正在初始化;
- 更改成功,采用DCL(双重检查)检查一次是否已经被初始化了,这里的代码要注意,整个JUC包下的源码非常多的地方都采用DCL。开始初始化,这里记住两个数:初始容量为16,扩容阈值(3/4)
- 更改失败,说明已经有其他线程初始化了,直接返回。
代码注释如下:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield();
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
treeifyBin 转换树
它是转为树的前置方法:
- 首先判断数组的长度是否小于MIN_TREEIFY_CAPACITY(64),如果是的话,优先扩容,因为数据存放在数组中查询起来性能会更好;
- 该位置不能空,并且它的hash>=0,说明是正常节点,小于的话说明已经被转完树、扩容了;
- 拿到该节点的锁,并且DCL检查,首先组装一个双向链表,再调用
setTabAt
转换红黑树。
判断是否需要转红黑树还是扩容;tab为数组,index为索引下标。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
扩容
tryPresize尝试扩容
扩容前操作,它有两个入口,putAll
、长度大于8
都会调用该方法;由于该方法也过长,我对代码进行了分割,大家一块一块看。
- MAXIMUM_CAPACITY 数组长度的最大值,如果比它的一半还大,则需要扩容的数组长度为MAXIMUM_CAPACITY,否则就设置为size最接近的2的N次幂的那个值。
- 判断是否没初始化,对数组进行初始化,因为putAll会直接调用这个方法,所以有可能没初始化。
- 如果要扩容的数值比sc小或者比最大值还大 什么都不做。
- 开始扩容,并设置当前扩容线程数量,扩容时sc的低位代表扩容线程数,第一个线程扩容时扩容线程数量+2,后续帮助扩容的线程,扩容线程数都是+1。这里贴的是jdk8的代码,中间有一段代码是bug,后面新版本已经删掉。
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs ||
sc == rs + 1 ||
sc == rs + MAX_RESIZERS ||
(nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
transfer 扩容
这里就是真正的扩容方法了,这个方法超级长,还是会把该方法进行分块,帮助大家更好分析代码。
- 计算线程当前要迁移桶的数量,这里会根据cpu的数量进行计算,充分利用cpu资源;
- 如果新数组还没初始化,则进行初始化;
- 循环迁移,通过cas尝试领取任务;
- 判断要迁移的节点是否为空或者已经被迁移走了,如果是则下一次循环;
- 锁住当前桶,DCL检测,开始迁移。
- 扩容完成finishing = advance = true;并从头检查一遍。注意:这里的判断 线程数-2==0 时才会走到这一步,说明最后一个线程来检查。
该方法代码特别长,注释标记的比较多,如果有不清楚的地方可以留言讨论。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
--------------------------------------- 第一块 --------------------------------
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
--------------------------------------- 第二块 --------------------------------
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false;
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
--------------------------------------- 第三块 --------------------------------
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n;
}
}
--------------------------------------- 第四块 --------------------------------
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true;
else {
--------------------------------------- 第五块 --------------------------------
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
helpTransfer 帮助扩容
该方法在介绍的源码中,只有在put时,发现该桶已经被移走了才会触发,在扩容时sc低位代表正在扩容的数量,协助扩容数量会+1,第一次扩容的线程数量会+2。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs ||
sc == rs + 1 ||
sc == rs + MAX_RESIZERS ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
计数
size 方法
面试时特别爱问concurrenHashMap是如何统计数值的,对于size方法,它是直接调用的sumCount
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
sumCount 获取总数
sumCount方法的实现非常简单,它就是取baseCount
与数组counterCells中存的值的和。
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
addCount 计数
对于concurrentHashMap取数来讲是很容易看到的,我们接下来看一下它是如何计数的。 这个方法有两个作用:
- 计数器,如果添加元素成功,对计数器 + 1
- 检验当前ConcurrentHashMap是否需要扩容
这里先说计数器:
- 判断counterCells是否不为空 || 通过cas直接给baseCount计数失败;
- 如果进入到该if分支,则会调用fullAddCount对counterCells初始化,后续计数随机拿一个counterCell通过cas进行计数;
是否需要扩容: 这里主要是对check进行判断,当check小于0时是移出节点,不需要进行扩容判断,只有大于等于0时才需要扩容。
代码注释如下:
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells)!= null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table)!= null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT)!= rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
fullAddCount 方法
这个方法是对CounterCell整体的初始化;
- 第一块代码代表着对数组中对象初始化;
- 第二块代码代表着对数组进行进行扩容每次为上一次的2倍;这里的扩容条件是cas失败,没线程扩容,数组长度小于cpu核数;
- 第三块代码代表着对数组进行初始化,第一次初始化的值为2;
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false;
for (;;) {
CounterCell[] cs; CounterCell c; int n; long v;
if ((cs = counterCells)!= null && (n = cs.length) > 0) {
if ((c = cs[(n - 1) & h]) == null) {
if (cellsBusy == 0) {
CounterCell r = new CounterCell(x);
if (cellsBusy == 0 &&
U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {
CounterCell[] rs; int m, j;
if ((rs = counterCells)!= null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue;
}
collide = false;
}
--------------------------------------- 第一块 --------------------------------
else if (!wasUncontended)
wasUncontended = true;
else if (U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))
break;
else if (counterCells!= cs || n >= NCPU)
collide = false;
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == cs)
counterCells = Arrays.copyOf(cs, n << 1);
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
h = ThreadLocalRandom.advanceProbe(h);
}
}
--------------------------------------- 第二块 --------------------------------
else if (cellsBusy == 0 && counterCells == cs &&
U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try {
if (counterCells == cs) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
--------------------------------------- 第三块 --------------------------------
else if (U.compareAndSetLong(this, BASECOUNT, v = baseCount, v + x))
break;
}
}
问题
现在回头看文章开头的十个问题。
1、concurrentHashMap的查询过程是什么样的?
1、获取key散列值计算下标位置,判断该位置的key是否与要查询的key相等,如果相等直接返回;
2、如果该位置的节点hash值小于0,根据不同含义封装的Node进行策略查找。小于0有特殊含义;被迁移走、未赋值、一颗树;
3、上述两种情况都不符合,说明是在链表中,逐个查询。
2、 concurrentHashMap的散列算法和hashMap的有什么区别?
concurrentHashMap的散列算法需要保证计算计算的结果为正数,而hashMap不需要。
3、concurrentHashMap查询过程会加锁么?
在查询的过程中有可能加锁,当要查询的位置是一棵树时,如果该树正在被其他线程的写锁、等待写锁占用,则直接从双向链表中查询一个节点判断是否符合,如果不符合则尝试获取读锁,读锁获取成功则拿到值进行返回。
4、concurrentHashMap的K/V允许为空么?为什么?
不允许为空,如果为空会抛出
NullPointerException
异常。同时如果存入一个null的话会产生歧义,concurrentHashMap是一个线程安全的工具类,出现歧义的话没法使用了。
5、concurrentHashMap什么时候初始化数组?
在真正插入数据时进行初始化。如:
putVal
、putAll
方法。
6、 concurrentHashMap哪些时机会进行扩容?
1、达到阈值时进行扩容,阈值为当前数组长度的3/4; 2、链表转红黑树时会检查数组长度是否小于64,如果小于也会进行扩容。
7、concurrentHashMap如何实现并发扩容的?
根据扩容时机会有一个线程发起扩容,扩容时sizeCtl的低16位会记录扩容的线程数量,第一个线程会记录成2个线程,后续的协助线程都记录成1个线程。开始扩容时会根据cpu以及数组长度计算出每个线程每次需要搬迁节点的
步长
,每个线程都会从尾部开始分配要迁移的桶,每次分配前面计算的步长
个桶,在搬迁的过程锁住每个桶,桶下的数据只会迁移到新数组中的当前index,或者index+n(n:代表原数组长度)的位置;所有线程都完成迁移后,最后一个线程会从尾部到头检查一遍。
8、concurrentHashMap的size
方法是准确的么?
是准确的。
9、concurrentHashMap中计数的桶是如何扩容的?
内部维护了一个计数字段、一个计数数组;如果没并发时,通过cas维护计数字段即可,并发高时就需要初始化计数数组,并且随着并发度会逐渐对该数组进行扩容,扩容的长度不超过cpu核数,每次扩容是原来的2倍,初始化长度为2。
10、concurrentHashMap同一个桶中,链表和红黑树可以共存么?
可以的,TreeBin其实既维护了链表又维护了红黑树;当链表转为红黑树时是先转为双向链表再转为红黑树的。
总结
至此就解析完了concurrentHashMap绝大部分代码,由于代码量太大、代码写的也比较晦涩并夹杂着一些bug,已经尽力解析成让大家能懂的注释。
最近看了很多juc相关代码,它的整体套路爱用死循环、一个参数代表多种含义,大家自己看juc源码时可以从以下几点入手:
- 对一个实体中的成员属性研究清楚代表什么含义,看起源码来会事半功倍;
- 在一个死循环中有多个if去处理不同的事情,大家可以优先去看每个if是做什么的,这样可以让大家更快的理解这个方法都能干什么事。
- 实在看不懂的代码,可以多个jdk版本同时看一下,毕竟是人写的代码就会有bug,在新版本中有对bug进行修复。
没有回复内容