面试官:concurrentHashMap读取数据需要加锁么?

平常在需要使用一些简单的本地缓存时会用到ConcurrentHashMap,但一直没深入了解过它的原理,之前面试时也有一定几率碰到,回答的总是一知半解的,最近深入的了解一下ConcurrentHashMap的原理,把它完整的梳理成一篇文章。
文章会从查询、存放、扩容、计数四个方面来深入解析ConcurrentHashMap,我先抛出几个问题,大家可以先思考一下,带着问题来阅读这样可以更好帮助大家理解concurrentHashMap,如果都了解那么大神可以出门右拐了;如果不了解可以详细看一下本篇文章,看完本章内容详细你能完整的回答全部问题,如果只想知道答案的,可以直接下滑到底部

  1. concurrentHashMap的查询过程是什么样的?
  2. concurrentHashMap的散列算法和hashMap的有什么区别?
  3. concurrentHashMap查询时会加锁么?
  4. concurrentHashMap的K/V允许为空么?为什么?
  5. concurrentHashMap什么时候初始化?
  6. concurrentHashMap哪些时机会进行扩容?
  7. concurrentHashMap如何实现并发扩容的?
  8. concurrentHashMap的size方法是准确的么?
  9. concurrentHashMap中计数的桶是如何扩容的?
  10. concurrentHashMap同一个桶中,链表和红黑树可以共存么?

平时在使用concurrentHashMap时,最常用的就是获取值,存放值以及查询存放了多少个值。代码如下:

ConcurrentHashMap<String,String> map = new ConcurrentHashMap();

map.get("key");

map.put("key","value");
map.putIfAbsent("key","value");

map.size();

属性

这些是concurrentHashMap内部的属性,为了更好的理解源码,我先贴出来,大家可以先熟悉一下。

private static final int MAXIMUM_CAPACITY = 1 << 30;


private static final int DEFAULT_CAPACITY = 16;


private static int RESIZE_STAMP_BITS = 16;


private static final int MIN_TRANSFER_STRIDE = 16;


private transient volatile int transferIndex;



private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;


private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;


static final int TREEIFY_THRESHOLD = 8;


static final int MIN_TREEIFY_CAPACITY = 64;


static final int MOVED     = -1; 


static final int TREEBIN   = -2; 


static final int RESERVED  = -3; 


static final int HASH_BITS = 0x7fffffff; 


transient volatile Node<K,V>[] table;


private transient volatile Node<K,V>[] nextTable;


private transient volatile long baseCount;


static final int NCPU = Runtime.getRuntime().availableProcessors();






private transient volatile int sizeCtl;

查询值

get 方法

对于查询流程:

  1. 调用spread方法对key进行散列运算,获取key的散列值;
  2. 先判断是否为空,并且该key对应下标的位置是否为空,如果都为空,则该值不存在;
  3. 再判断要查询的key是否在数组上,如果在则直接返回数组上的值;
  4. 如果不在则判断对应的节点的hash是否小于0;下面会有很多地方用到该数值,请大家务必记住,后面也会反复提示,小于零的含义分别是:
    // 当前位置已经被扩容时迁移走了 static final int MOVED = -1;
    // 当前位置是一棵树 static final int TREEBIN = -2;
    // 当前位置被占用,还未赋值 static final int RESERVED = -3;
  5. 因为不同的情况封装为不同的node类型,下面会详解各种类型的find方法
  6. 如果上述条件都不满足,则该key的值在链表中,从链表中查询数据。

具体代码以及注释如下:

public V get(Object key) {



    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;

    int h = spread(key.hashCode());


    if ((tab = table) != null && 
        (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {

        if ((eh = e.hash) == h) {

            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }


        else if (eh < 0)

            return (p = e.find(h, key)) != null ? p.val : null;

        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }

    return null;
}

看完这里就简单的回答第一个问题了,但此时回答还不够详细,例如散列算法、是否加锁等内容还没解释。

spread 散列方法

散列算法与hashMap类似,但多了一步和Integer.MAX_VALUE进行与运算,保证得到一个正整数的结果。因为负数有特殊的含义:

  • static final int MOVED = -1; // 当前位置已经被扩容时迁移走了
  • static final int TREEBIN = -2; // 当前位置是一棵树
  • static final int RESERVED = -3; // 当前位置被占用,还未赋值

这三个值在源码中会非常高频的出现,文中会反复强调。

static final int spread(int h) {



    return (h ^ (h >>> 16)) & HASH_BITS;
}

看完这里可以回答第二个问题,concurrentHashMap的散列算法比HashMap多了一步需要保证为正数,因为负数有其他的含义。

迁移(ForwardingNode) find 方法

当该节点的hash为MOVED=-1说明该节点已经被迁移走了,要查询数据时需要去新数组上查询数据。

  1. 新数组、新数组中该下标位置的节点都为空,说明不存在该key,直接返回null
  2. 死循环查找,类似get方法,如果直接查询到了就返回结果
  3. 如果找到节点的hash还是小于0的,说明新数组中也出现了这三种情况(迁移走、树、还未赋值)
  4. 既不在数组上,又不是负数,那么只能是链表了,从链表中查询。

下面是具体代码,我在代码上每一行都写上注释了。

Node<K,V> find(int h, Object k) {

    outer: for (Node<K,V>[] tab = nextTable;;) {


        Node<K,V> e; int n;

        if (k == null || tab == null || (n = tab.length) == 0 ||
            (e = tabAt(tab, (n - 1) & h)) == null)
            return null;

        for (;;) {


            int eh; K ek;

            if ((eh = e.hash) == h &&
                ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;

            if (eh < 0) {

                if (e instanceof ForwardingNode) {
                    tab = ((ForwardingNode<K,V>)e).nextTable;
                    continue outer;
                }
                else
                    return e.find(h, k);
            }

            if ((e = e.next) == null)
                return null;
        }
    }
}

占用的(ReservationNode) find 方法

ReservationNode是被占用但还未赋值的实现,因为没被赋值呢,直接返回空即可。

static final class ReservationNode<K,V> extends Node<K,V> {
    ReservationNode() {
        super(RESERVED, null, null, null);
    }

    Node<K,V> find(int h, Object k) {
        return null;
    }
}

(TreeBin) find方法

这里是TreeBin的实现,大家都知道当数组大于64并且链表长度超过8会被转换成红黑树。在源码中不仅仅会被转成红黑树还维护着一个双向链表,后续会详细讲解。查询过程:

  1. 拿到双向链表的头节点;
  2. 判断当前桶的状态,是否有写锁或等待锁;
  3. 如果有则判断该节点的key是否就是要查询的key,是直接返回,不是的话往下遍历一个节点;
  4. 使用cas尝试设置读锁,设置成功则从红黑树中查询当前key
  5. 判断自己为最后一个释放读锁的线程并且有其他线程正在请求写锁,唤醒请求写锁的线程。
TreeNode<K,V> root;

volatile TreeNode<K,V> first;
volatile Thread waiter;

volatile int lockState;
static final int WRITER = 1; 
static final int WAITER = 2; 
static final int READER = 4; 

final Node<K,V> find(int h, Object k) {
    if (k != null) {

        for (Node<K,V> e = first; e != null; ) {


            int s; K ek;


            if (((s = lockState) & (WAITER|WRITER)) != 0) {

                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                e = e.next;
            }

            else if (U.compareAndSwapInt(this, LOCKSTATE, s,
                                         s + READER)) {
                TreeNode<K,V> r, p;
                try {

                    p = ((r = root) == null ? null :
                         r.findTreeNode(h, k, null));
                } finally {

                    Thread w;
                    if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
                        (READER|WAITER) && (w = waiter) != null)

                        LockSupport.unpark(w);
                }
                return p;
            }
        }
    }
    return null;
}

看完这里就可以回答第三个问题,get的时候也可能会加一把读锁。

存储值

对于存放值比较常用的三个方法put、putIfAbsent、putAll,通过下面的代码可以看到他们三个都会调用putVal

put方法

public V put(K key, V value) {


    return putVal(key, value, false);
}


public V putIfAbsent(K key, V value) {
    return putVal(key, value, true);
}


public void putAll(Map<? extends K, ? extends V> m) {
    tryPresize(m.size());
    for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
        putVal(e.getKey(), e.getValue(), false);
}

putVal

下面是真实的添加值,由于该方法比较长,大家可以分块看,我在代码里对每一块进行了分割,它主要做了以下几步动作:

  1. 通过spread方法拿到散列值,该方法上面已经解释过了;
  2. 懒加载,调用initTable方法初始化数组;
  3. 因为上层是一个for循环,初始化成功后再次循环就进入到第二块,采用cas方法设置node值,如果成功则跳出循环,失败的继续往下走;
  4. 判断当前是否为MOVE状态,这里前面有反复提到,此时的状态为该桶已经被迁移走了,调用helpTransfer帮助扩容。扩容后面会详细解释;
  5. 接下来还会根据onlyIfAbsent判断当前值是否存在,因为onlyIfAbsent为true是不对旧值进行覆盖的,直接返回即可。这行代码是新版本优化的代码;
  6. 上述经历了,cas尝试存放节点、判断是否被迁移以及是否已经存在该key的节点都失败,此时说明该桶有hash冲突,采用synchronized进行加锁操作;此时又有一个DCL检测;判断当前桶的状态是否大于0,大于0说明是正常的节点,遍历当前链表把节点放到链表尾部;
  7. 前面判断了hash值大于0,当hash值等于TREEBIN(-3)说明为红黑树,根据红黑树的逻辑进行添加;
  8. 数据添加完成,判断链表是否大于TREEIFY_THRESHOLD(8),调用treeifyBin尝试转为红黑树;
  9. 如果是覆盖原有key的值,则直接返回
  10. 进行addCount计数
final V putVal(K key, V value, boolean onlyIfAbsent) {

    if (key == null || value == null) throw new NullPointerException();


    int hash = spread(key.hashCode());

    int binCount = 0;
--------------------------------------- 第一块 --------------------------------

    for (Node<K,V>[] tab = table;;) {




        Node<K,V> f; int n, i, fh; K fk; V fv;


        if (tab == null || (n = tab.length) == 0)

            tab = initTable();

--------------------------------------- 第二块 --------------------------------





        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))

                break;                  
        }

--------------------------------------- 第三块 --------------------------------


        else if ((fh = f.hash) == MOVED)

            tab = helpTransfer(tab, f);



        else if (onlyIfAbsent 
                 && fh == hash
                 && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                 && (fv = f.val) != null)
            return fv;

--------------------------------------- 第四块 --------------------------------


        else {

            V oldVal = null;

            synchronized (f) {

                if (tabAt(tab, i) == f) {

                    if (fh >= 0) {

                        binCount = 1;

                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;


                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {

                                oldVal = e.val;

                                if (!onlyIfAbsent)

                                    e.val = value;
                                break;
                            }


                            Node<K,V> pred = e;

                            if ((e = e.next) == null) {

                                pred.next = new Node<K,V>(hash, key, value);
                                break;
                            }
                        }
                    }

                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;

                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;

                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }

                }
            }
--------------------------------------- 第五块 --------------------------------


            if (binCount != 0) {

                if (binCount >= TREEIFY_THRESHOLD)

                    treeifyBin(tab, i);
                if (oldVal != null)

                    return oldVal;
                break;
            }
        }
    }
--------------------------------------- 第五块 --------------------------------

    addCount(1L, binCount);
    return null;
}

initTable 初始化数组

这里需要用到属性sizeCtrl,它的数值有以下含义:
// 标识数组初始化和扩容的标识信息
// 等于 -1 代表正在初始化
// 小于 -1 代表正在扩容低16位代表当前数组正在扩容的线程个数(如果1个线程扩容,值为-2,如果2个线程扩容,值为-3)
// 等于 0 代表没有初始化
// 大于 0 如果数组没有初始化,则代表初始化的长度,如果数组已经初始化了,代表下次扩容的阈值
private transient volatile int sizeCtl;

该方法步骤如下:

  1. 判断是否当前table为空,
  2. 判断它的状态,如果小于0说明有人在扩容,让出cpu,让其他线程进行扩容;
  3. 使用cas尝试更改为-1代表正在初始化;
  4. 更改成功,采用DCL(双重检查)检查一次是否已经被初始化了,这里的代码要注意,整个JUC包下的源码非常多的地方都采用DCL。开始初始化,这里记住两个数:初始容量为16,扩容阈值(3/4)
  5. 更改失败,说明已经有其他线程初始化了,直接返回。

代码注释如下:

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;

    while ((tab = table) == null || tab.length == 0) {


        if ((sc = sizeCtl) < 0)

            Thread.yield(); 


        else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
            try {

                if ((tab = table) == null || tab.length == 0) {

                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;


                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];


                    table = tab = nt;


                    sc = n - (n >>> 2);
                }
            } finally {

                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

treeifyBin 转换树

它是转为树的前置方法:

  1. 首先判断数组的长度是否小于MIN_TREEIFY_CAPACITY(64),如果是的话,优先扩容,因为数据存放在数组中查询起来性能会更好;
  2. 该位置不能空,并且它的hash>=0,说明是正常节点,小于的话说明已经被转完树、扩容了;
  3. 拿到该节点的锁,并且DCL检查,首先组装一个双向链表,再调用setTabAt转换红黑树。

判断是否需要转红黑树还是扩容;tab为数组,index为索引下标。

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n;

    if (tab != null) {


        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)

            tryPresize(n << 1);



        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {



            synchronized (b) {

                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;

                    for (Node<K,V> e = b; e != null; e = e.next) {

                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }

                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

扩容

tryPresize尝试扩容

扩容前操作,它有两个入口,putAll长度大于8都会调用该方法;由于该方法也过长,我对代码进行了分割,大家一块一块看。

  1. MAXIMUM_CAPACITY 数组长度的最大值,如果比它的一半还大,则需要扩容的数组长度为MAXIMUM_CAPACITY,否则就设置为size最接近的2的N次幂的那个值。
  2. 判断是否没初始化,对数组进行初始化,因为putAll会直接调用这个方法,所以有可能没初始化。
  3. 如果要扩容的数值比sc小或者比最大值还大 什么都不做。
  4. 开始扩容,并设置当前扩容线程数量,扩容时sc的低位代表扩容线程数,第一个线程扩容时扩容线程数量+2,后续帮助扩容的线程,扩容线程数都是+1。这里贴的是jdk8的代码,中间有一段代码是bug,后面新版本已经删掉。
private final void tryPresize(int size) {


    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :

        tableSizeFor(size + (size >>> 1) + 1);


    int sc;
    while ((sc = sizeCtl) >= 0) {

        Node<K,V>[] tab = table; int n;

        if (tab == null || (n = tab.length) == 0) {

            n = (sc > c) ? sc : c;


            if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
                try {

                    if (table == tab) {
                        @SuppressWarnings("unchecked")

                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];

                        table = nt;

                        sc = n - (n >>> 2);
                    }
                } finally {

                    sizeCtl = sc;
                }
            }
        }



        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;


        else if (tab == table) {


            int rs = resizeStamp(n);




            if (sc < 0) {

                Node<K,V>[] nt;

                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || 

                    sc == rs + 1 ||

                    sc == rs + MAX_RESIZERS || 

                    (nt = nextTable) == null ||

                    transferIndex <= 0)  
                    break;

                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }





            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))

                transfer(tab, null);
        }
    }
}


static final int resizeStamp(int n) {



    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

transfer 扩容

这里就是真正的扩容方法了,这个方法超级长,还是会把该方法进行分块,帮助大家更好分析代码。

  1. 计算线程当前要迁移桶的数量,这里会根据cpu的数量进行计算,充分利用cpu资源;
  2. 如果新数组还没初始化,则进行初始化;
  3. 循环迁移,通过cas尝试领取任务;
  4. 判断要迁移的节点是否为空或者已经被迁移走了,如果是则下一次循环;
  5. 锁住当前桶,DCL检测,开始迁移。
  6. 扩容完成finishing = advance = true;并从头检查一遍。注意:这里的判断 线程数-2==0 时才会走到这一步,说明最后一个线程来检查。

该方法代码特别长,注释标记的比较多,如果有不清楚的地方可以留言讨论。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {


    int n = tab.length, stride;




    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; 

--------------------------------------- 第一块 --------------------------------


    if (nextTab == null) {            
        try {

            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      

            sizeCtl = Integer.MAX_VALUE;
            return;
        }

        nextTable = nextTab;

        transferIndex = n;
    }
--------------------------------------- 第二块 --------------------------------




    int nextn = nextTab.length;


    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

    boolean advance = true;

    boolean finishing = false; 
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;

        while (advance) {
            int nextIndex, nextBound;

            if (--i >= bound || finishing)
                advance = false;

            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }


            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {

                bound = nextBound;
                i = nextIndex - 1;

                advance = false;
            }
        }
--------------------------------------- 第三块 --------------------------------


        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;

            if (finishing) {

                nextTable = null;

                table = nextTab;

                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }

            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {

                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;

                finishing = advance = true;
                i = n; 
            }
        }
--------------------------------------- 第四块 --------------------------------



        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);


        else if ((fh = f.hash) == MOVED)

            advance = true; 
        else {

--------------------------------------- 第五块 --------------------------------


            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;

                    if (fh >= 0) {





                        int runBit = fh & n;
                        Node<K,V> lastRun = f;

                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }

                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }

                        setTabAt(nextTab, i, ln);

                        setTabAt(nextTab, i + n, hn);

                        setTabAt(tab, i, fwd);

                        advance = true;
                    }

                    else if (f instanceof TreeBin) {

                        TreeBin<K,V> t = (TreeBin<K,V>)f;

                        TreeNode<K,V> lo = null, loTail = null;

                        TreeNode<K,V> hi = null, hiTail = null;

                        int lc = 0, hc = 0;

                        for (Node<K,V> e = t.first; e != null; e = e.next) {

                            int h = e.hash;

                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);




                            if ((h & n) == 0) {

                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else

                                    loTail.next = p;

                                loTail = p;

                                ++lc;
                            }


                            else {

                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }

                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;

                        setTabAt(nextTab, i, ln);

                        setTabAt(nextTab, i + n, hn);

                        setTabAt(tab, i, fwd);

                        advance = true;
                    }
                }
            }
        }
    }
}

helpTransfer 帮助扩容

该方法在介绍的源码中,只有在put时,发现该桶已经被移走了才会触发,在扩容时sc低位代表正在扩容的数量,协助扩容数量会+1,第一次扩容的线程数量会+2。

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;

    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {

        int rs = resizeStamp(tab.length);

        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {


            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || 
                sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || 
                transferIndex <= 0)
                break;

            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

计数

size 方法

面试时特别爱问concurrenHashMap是如何统计数值的,对于size方法,它是直接调用的sumCount

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

sumCount 获取总数

sumCount方法的实现非常简单,它就是取baseCount与数组counterCells中存的值的和。

final long sumCount() {

    CounterCell[] as = counterCells; CounterCell a;

    long sum = baseCount;
    if (as != null) {

        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

addCount 计数

对于concurrentHashMap取数来讲是很容易看到的,我们接下来看一下它是如何计数的。 这个方法有两个作用:

  • 计数器,如果添加元素成功,对计数器 + 1
  • 检验当前ConcurrentHashMap是否需要扩容

这里先说计数器:

  1. 判断counterCells是否不为空 || 通过cas直接给baseCount计数失败;
  2. 如果进入到该if分支,则会调用fullAddCount对counterCells初始化,后续计数随机拿一个counterCell通过cas进行计数;

是否需要扩容: 这里主要是对check进行判断,当check小于0时是移出节点,不需要进行扩容判断,只有大于等于0时才需要扩容。

代码注释如下:

private final void addCount(long x, int check) {

    CounterCell[] as; long b, s;


    if ((as = counterCells)!= null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {

        CounterCell a; long v; int m;

        boolean uncontended = true;

        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||




                !(uncontended =
                        U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {



            fullAddCount(x, uncontended);
            return;
        }

        if (check <= 1)
            return;

        s = sumCount();
    }

    if (check >= 0) {

        Node<K,V>[] tab, nt; int n, sc;

        while (s >= (long)(sc = sizeCtl) && (tab = table)!= null &&
                (n = tab.length) < MAXIMUM_CAPACITY) {

            int rs = resizeStamp(n);

            if (sc < 0) {

                if ((sc >>> RESIZE_STAMP_SHIFT)!= rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                    break;

                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }

            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                    (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);

            s = sumCount();
        }
    }
}

fullAddCount 方法

这个方法是对CounterCell整体的初始化;

  1. 第一块代码代表着对数组中对象初始化;
  2. 第二块代码代表着对数组进行进行扩容每次为上一次的2倍;这里的扩容条件是cas失败,没线程扩容,数组长度小于cpu核数;
  3. 第三块代码代表着对数组进行初始化,第一次初始化的值为2;
private final void fullAddCount(long x, boolean wasUncontended) {

    int h;

    if ((h = ThreadLocalRandom.getProbe()) == 0) {

        ThreadLocalRandom.localInit();

        h = ThreadLocalRandom.getProbe();

        wasUncontended = true;
    }

    boolean collide = false;

    for (;;) {


        CounterCell[] cs; CounterCell c; int n; long v;

        if ((cs = counterCells)!= null && (n = cs.length) > 0) {

            if ((c = cs[(n - 1) & h]) == null) {

                if (cellsBusy == 0) {

                    CounterCell r = new CounterCell(x);

                    if (cellsBusy == 0 &&
                            U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {

                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells)!= null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {

                            cellsBusy = 0;
                        }

                        if (created)
                            break;

                        continue;
                    }

                    collide = false;
                }
--------------------------------------- 第一块 --------------------------------


                else if (!wasUncontended)
                    wasUncontended = true;

                else if (U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))
                    break;

                else if (counterCells!= cs || n >= NCPU)
                    collide = false;

                else if (!collide)
                    collide = true;

                else if (cellsBusy == 0 &&
                        U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
                    try {

                        if (counterCells == cs)
                            counterCells = Arrays.copyOf(cs, n << 1);
                    } finally {

                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;
                }

                h = ThreadLocalRandom.advanceProbe(h);
            }
        }
--------------------------------------- 第二块 --------------------------------


        else if (cellsBusy == 0 && counterCells == cs &&
                U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
            boolean init = false;
            try {

                if (counterCells == cs) {
                    CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {

                cellsBusy = 0;
            }

            if (init)
                break;
        }

--------------------------------------- 第三块 --------------------------------


        else if (U.compareAndSetLong(this, BASECOUNT, v = baseCount, v + x))
            break;
    }
}

问题

现在回头看文章开头的十个问题。

1、concurrentHashMap的查询过程是什么样的?

1、获取key散列值计算下标位置,判断该位置的key是否与要查询的key相等,如果相等直接返回;
2、如果该位置的节点hash值小于0,根据不同含义封装的Node进行策略查找。小于0有特殊含义;被迁移走、未赋值、一颗树;
3、上述两种情况都不符合,说明是在链表中,逐个查询。


2、 concurrentHashMap的散列算法和hashMap的有什么区别?

concurrentHashMap的散列算法需要保证计算计算的结果为正数,而hashMap不需要。


3、concurrentHashMap查询过程会加锁么?

在查询的过程中有可能加锁,当要查询的位置是一棵树时,如果该树正在被其他线程的写锁、等待写锁占用,则直接从双向链表中查询一个节点判断是否符合,如果不符合则尝试获取读锁,读锁获取成功则拿到值进行返回。


4、concurrentHashMap的K/V允许为空么?为什么?

不允许为空,如果为空会抛出NullPointerException异常。同时如果存入一个null的话会产生歧义,concurrentHashMap是一个线程安全的工具类,出现歧义的话没法使用了。


5、concurrentHashMap什么时候初始化数组?

在真正插入数据时进行初始化。如:putValputAll方法。


6、 concurrentHashMap哪些时机会进行扩容?

1、达到阈值时进行扩容,阈值为当前数组长度的3/4; 2、链表转红黑树时会检查数组长度是否小于64,如果小于也会进行扩容。


7、concurrentHashMap如何实现并发扩容的?

根据扩容时机会有一个线程发起扩容,扩容时sizeCtl的低16位会记录扩容的线程数量,第一个线程会记录成2个线程,后续的协助线程都记录成1个线程。开始扩容时会根据cpu以及数组长度计算出每个线程每次需要搬迁节点的步长,每个线程都会从尾部开始分配要迁移的桶,每次分配前面计算的步长个桶,在搬迁的过程锁住每个桶,桶下的数据只会迁移到新数组中的当前index,或者index+n(n:代表原数组长度)的位置;所有线程都完成迁移后,最后一个线程会从尾部到头检查一遍。


8、concurrentHashMap的size方法是准确的么?

是准确的。


9、concurrentHashMap中计数的桶是如何扩容的?

内部维护了一个计数字段、一个计数数组;如果没并发时,通过cas维护计数字段即可,并发高时就需要初始化计数数组,并且随着并发度会逐渐对该数组进行扩容,扩容的长度不超过cpu核数,每次扩容是原来的2倍,初始化长度为2。


10、concurrentHashMap同一个桶中,链表和红黑树可以共存么?

可以的,TreeBin其实既维护了链表又维护了红黑树;当链表转为红黑树时是先转为双向链表再转为红黑树的。

总结

至此就解析完了concurrentHashMap绝大部分代码,由于代码量太大、代码写的也比较晦涩并夹杂着一些bug,已经尽力解析成让大家能懂的注释。
最近看了很多juc相关代码,它的整体套路爱用死循环、一个参数代表多种含义,大家自己看juc源码时可以从以下几点入手:

  1. 对一个实体中的成员属性研究清楚代表什么含义,看起源码来会事半功倍;
  2. 在一个死循环中有多个if去处理不同的事情,大家可以优先去看每个if是做什么的,这样可以让大家更快的理解这个方法都能干什么事。
  3. 实在看不懂的代码,可以多个jdk版本同时看一下,毕竟是人写的代码就会有bug,在新版本中有对bug进行修复。

链接:https://juejin.cn/post/7403586214861275175

请登录后发表评论

    没有回复内容