中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

Map 大家族的那點事兒 ( 7 ) :ConcurrentHashMap

2018-09-13    來源:importnew

容器云強勢上線!快速搭建集群,上萬Linux鏡像隨意使用

我們上述所講的Map都是非線程安全的,這意味著不應該在多個線程中對這些Map進行修改操作,輕則會產(chǎn)生數(shù)據(jù)不一致的問題,甚至還會因為并發(fā)插入元素而導致鏈表成環(huán)(插入會觸發(fā)擴容,而擴容操作需要將原數(shù)組中的元素rehash到新數(shù)組,這時并發(fā)操作就有可能產(chǎn)生鏈表的循環(huán)引用從而成環(huán)),這樣在查找時就會發(fā)生死循環(huán),影響到整個應用程序。

Collections.synchronizedMap(Map<K,V> m)可以將一個Map轉(zhuǎn)換成線程安全的實現(xiàn),其實也就是通過一個包裝類,然后把所有功能都委托給傳入的Map實現(xiàn),而且包裝類是基于synchronized關鍵字來保證線程安全的(時代的眼淚Hashtable也是基于synchronized關鍵字),底層使用的是互斥鎖(同一時間內(nèi)只能由持有鎖的線程訪問,其他競爭線程進入睡眠狀態(tài)),性能與吞吐量差強人意。

public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
    return new SynchronizedMap<>(m);
}
private static class SynchronizedMap<K,V>
    implements Map<K,V>, Serializable {
    private static final long serialVersionUID = 1978198479659022715L;
    private final Map<K,V> m;     // Backing Map
    final Object      mutex;        // Object on which to synchronize
    SynchronizedMap(Map<K,V> m) {
        this.m = Objects.requireNonNull(m);
        mutex = this;
    }
    SynchronizedMap(Map<K,V> m, Object mutex) {
        this.m = m;
        this.mutex = mutex;
    }
    public int size() {
        synchronized (mutex) {return m.size();}
    }
    public boolean isEmpty() {
        synchronized (mutex) {return m.isEmpty();}
    }
    ............
}

然而ConcurrentHashMap的實現(xiàn)細節(jié)遠沒有這么簡單,因此性能也要高上許多。它沒有使用一個全局鎖來鎖住自己,而是采用了減少鎖粒度的方法,盡量減少因為競爭鎖而導致的阻塞與沖突,而且ConcurrentHashMap的檢索操作是不需要鎖的。

在Java 7中,ConcurrentHashMap把內(nèi)部細分成了若干個小的HashMap,稱之為段(Segment),默認被分為16個段。對于一個寫操作而言,會先根據(jù)hash code進行尋址,得出該Entry應被存放在哪一個Segment,然后只要對該Segment加鎖即可。

理想情況下,一個默認的ConcurrentHashMap可以同時接受16個線程進行寫操作(如果都是對不同Segment進行操作的話)。

分段鎖對于size()這樣的全局操作來說就沒有任何作用了,想要得出Entry的數(shù)量就需要遍歷所有Segment,獲得所有的鎖,然后再統(tǒng)計總數(shù)。事實上,ConcurrentHashMap會先試圖使用無鎖的方式統(tǒng)計總數(shù),這個嘗試會進行3次,如果在相鄰的2次計算中獲得的Segment的modCount次數(shù)一致,代表這兩次計算過程中都沒有發(fā)生過修改操作,那么就可以當做最終結(jié)果返回,否則,就要獲得所有Segment的鎖,重新計算size。

本文主要討論的是Java 8的ConcurrentHashMap,它與Java 7的實現(xiàn)差別較大。完全放棄了段的設計,而是變回與HashMap相似的設計,使用buckets數(shù)組與分離鏈接法(同樣會在超過閾值時樹化,對于構造紅黑樹的邏輯與HashMap差別不大,只不過需要額外使用CAS來保證線程安全),鎖的粒度也被細分到每個數(shù)組元素(個人認為這樣做的原因是因為HashMap在Java 8中也實現(xiàn)了不少優(yōu)化,即使碰撞嚴重,也能保證一定的性能,而且Segment不僅臃腫還有弱一致性的問題存在),所以它的并發(fā)級別與數(shù)組長度相關(Java 7則是與段數(shù)相關)。

/**
 * The array of bins. Lazily initialized upon first insertion.
 * Size is always a power of two. Accessed directly by iterators.
 */
transient volatile Node<K,V>[] table;

尋址


ConcurrentHashMap的散列函數(shù)與HashMap并沒有什么區(qū)別,同樣是把key的hash code的高16位與低16位進行異或運算(因為ConcurrentHashMap的buckets數(shù)組長度也永遠是一個2的N次方),然后將擾亂后的hash code與數(shù)組的長度減一(實際可訪問到的最大索引)進行與運算,得出的結(jié)果即是目標所在的位置。

// 2^31 - 1,int類型的最大值
// 該掩碼表示節(jié)點hash的可用位,用來保證hash永遠為一個正整數(shù)
static final int HASH_BITS = 0x7fffffff;
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

下面是查找操作的源碼,實現(xiàn)比較簡單。

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            // 先嘗試判斷鏈表頭是否為目標,如果是就直接返回
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            // eh < 0代表這是一個特殊節(jié)點(TreeBin或ForwardingNode)
            // 所以直接調(diào)用find()進行遍歷查找
            return (p = e.find(h, key)) != null ? p.val : null;
        // 遍歷鏈表
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

一個普通的節(jié)點(鏈表節(jié)點)的hash不可能小于0(已經(jīng)在spread()函數(shù)中修正過了),所以小于0的只可能是一個特殊節(jié)點,它不能用while循環(huán)中遍歷鏈表的方式來進行遍歷。

TreeBin是紅黑樹的頭部節(jié)點(紅黑樹的節(jié)點為TreeNode),它本身不含有key與value,而是指向一個TreeNode節(jié)點的鏈表與它們的根節(jié)點,同時使用CAS(ConcurrentHashMap并不是完全基于互斥鎖實現(xiàn)的,而是與CAS這種樂觀策略搭配使用,以提高性能)實現(xiàn)了一個讀寫鎖,迫使Writer(持有這個鎖)在樹重構操作之前等待Reader完成。

ForwardingNode是一個在數(shù)據(jù)轉(zhuǎn)移過程(由擴容引起)中使用的臨時節(jié)點,它會被插入到頭部。它與TreeBin(和TreeNode)都是Node類的子類。

為了判斷出哪些是特殊節(jié)點,TreeBin和ForwardingNode的hash域都只是一個虛擬值:

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }
    ......
    /**
     * Virtualized support for map.get(); overridden in subclasses.
     */
    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}
/*
 * Encodings for Node hash fields. See above for explanation.
 */
static final int MOVED     = -1; // hash for forwarding nodes
static final int TREEBIN   = -2; // hash for roots of trees
static final int RESERVED  = -3; // hash for transient reservations    
static final class TreeBin<K,V> extends Node<K,V> {
    ....
    TreeBin(TreeNode<K,V> b) {
        super(TREEBIN, null, null, null);
        ....
    }   
    
    ....     
}
static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
    .....
}

可見性


我們在get()函數(shù)中并沒有發(fā)現(xiàn)任何與鎖相關的代碼,那么它是怎么保證線程安全的呢?一個操作ConcurrentHashMap.get("a"),它的步驟基本分為以下幾步:

  • 根據(jù)散列函數(shù)計算出的索引訪問table。
  • 從table中取出頭節(jié)點。
  • 遍歷頭節(jié)點直到找到目標節(jié)點。
  • 從目標節(jié)點中取出value并返回。

所以只要保證訪問table與節(jié)點的操作總是能夠返回最新的數(shù)據(jù)就可以了。ConcurrentHashMap并沒有采用鎖的方式,而是通過volatile關鍵字來保證它們的可見性。在上文貼出的代碼中可以發(fā)現(xiàn),table、Node.val和Node.next都是被volatile關鍵字所修飾的。

volatile關鍵字保證了多線程環(huán)境下變量的可見性與有序性,底層實現(xiàn)基于內(nèi)存屏障(Memory Barrier)。

為了優(yōu)化性能,現(xiàn)代CPU工作時的指令執(zhí)行順序與應用程序的代碼順序其實是不一致的(有些編譯器也會進行這種優(yōu)化),也就是所謂的亂序執(zhí)行技術。亂序執(zhí)行可以提高CPU流水線的工作效率,只要保證數(shù)據(jù)符合程序邏輯上的正確性即可(遵循happens-before原則)。不過如今是多核時代,如果隨便亂序而不提供防護措施那是會出問題的。每一個cpu上都會進行亂序優(yōu)化,單cpu所保證的邏輯次序可能會被其他cpu所破壞。

內(nèi)存屏障就是針對此情況的防護措施?梢哉J為它是一個同步點(但它本身也是一條cpu指令)。例如在IA32指令集架構中引入的SFENCE指令,在該指令之前的所有寫操作必須全部完成,讀操作仍可以亂序執(zhí)行。LFENCE指令則保證之前的所有讀操作必須全部完成,另外還有粒度更粗的MFENCE指令保證之前的所有讀寫操作都必須全部完成。

內(nèi)存屏障就像是一個保護指令順序的柵欄,保護后面的指令不被前面的指令跨越。將內(nèi)存屏障插入到寫操作與讀操作之間,就可以保證之后的讀操作可以訪問到最新的數(shù)據(jù),因為屏障前的寫操作已經(jīng)把數(shù)據(jù)寫回到內(nèi)存(根據(jù)緩存一致性協(xié)議,不會直接寫回到內(nèi)存,而是改變該cpu私有緩存中的狀態(tài),然后通知給其他cpu這個緩存行已經(jīng)被修改過了,之后另一個cpu在讀操作時就可以發(fā)現(xiàn)該緩存行已經(jīng)是無效的了,這時它會從其他cpu中讀取最新的緩存行,然后之前的cpu才會更改狀態(tài)并寫回到內(nèi)存)。

例如,讀一個被volatile修飾的變量V總是能夠從JMM(Java Memory Model)主內(nèi)存中獲得最新的數(shù)據(jù)。因為內(nèi)存屏障的原因,每次在使用變量V(通過JVM指令use,后面說的也都是JVM中的指令而不是cpu)之前都必須先執(zhí)行load指令(把從主內(nèi)存中得到的數(shù)據(jù)放入到工作內(nèi)存),根據(jù)JVM的規(guī)定,load指令必須發(fā)生在read指令(從主內(nèi)存中讀取數(shù)據(jù))之后,所以每次訪問變量V都會先從主內(nèi)存中讀取。相對的,寫操作也因為內(nèi)存屏障保證的指令順序,每次都會直接寫回到主內(nèi)存。

不過volatile關鍵字并不能保證操作的原子性,對該變量進行并發(fā)的連續(xù)操作是非線程安全的,所幸ConcurrentHashMap只是用來確保訪問到的變量是最新的,所以也不會發(fā)生什么問題。

出于性能考慮,Doug Lea(java.util.concurrent包的作者)直接通過Unsafe類來對table進行操作。

Java號稱是安全的編程語言,而保證安全的代價就是犧牲程序員自由操控內(nèi)存的能力。像在C/C++中可以通過操作指針變量達到操作內(nèi)存的目的(其實操作的是虛擬地址),但這種靈活性在新手手中也經(jīng)常會帶來一些愚蠢的錯誤,比如內(nèi)存訪問越界。

Unsafe從字面意思可以看出是不安全的,它包含了許多本地方法(在JVM平臺上運行的其他語言編寫的程序,主要為C/C++,由JNI實現(xiàn)),這些方法支持了對指針的操作,所以它才被稱為是不安全的。雖然不安全,但畢竟是由C/C++實現(xiàn)的,像一些與操作系統(tǒng)交互的操作肯定是快過Java的,畢竟Java與操作系統(tǒng)之間還隔了一層抽象(JVM),不過代價就是失去了JVM所帶來的多平臺可移植性(本質(zhì)上也只是一個c/cpp文件,如果換了平臺那就要重新編譯)。

對table進行操作的函數(shù)有以下三個,都使用到了Unsafe(在java.util.concurrent包隨處可見):

@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    // 從tab數(shù)組中獲取一個引用,遵循Volatile語義
    // 參數(shù)2是一個在tab中的偏移量,用來尋找目標對象
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    // 通過CAS操作將tab數(shù)組中位于參數(shù)2偏移量位置的值替換為v
    // c是期望值,如果期望值與實際值不符,返回false
    // 否則,v會成功地被設置到目標位置,返回true
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    // 設置tab數(shù)組中位于參數(shù)2偏移量位置的值,遵循Volatile語義
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

如果對Unsafe感興趣,可以參考這篇文章:Java Magic. Part 4: sun.misc.Unsafe

初始化


ConcurrentHashMap與HashMap一樣是Lazy的,buckets數(shù)組會在第一次訪問put()函數(shù)時進行初始化,它的默認構造函數(shù)甚至是個空函數(shù)。

/**
 * Creates a new, empty map with the default initial table size (16).
 */
public ConcurrentHashMap() {
}

但是有一點需要注意,ConcurrentHashMap是工作在多線程并發(fā)環(huán)境下的,如果有多個線程同時調(diào)用了put()函數(shù)該怎么辦?這會導致重復初始化,所以必須要有對應的防護措施。

ConcurrentHashMap聲明了一個用于控制table的初始化與擴容的實例變量sizeCtl,默認值為0。當它是一個負數(shù)的時候,代表table正處于初始化或者擴容的狀態(tài)。-1表示table正在進行初始化,-N則表示當前有N-1個線程正在進行擴容。

在其他情況下,如果table還未初始化(table == null),sizeCtl表示table進行初始化的數(shù)組大。ㄋ詮臉嬙旌瘮(shù)傳入的initialCapacity在經(jīng)過計算后會被賦給它)。如果table已經(jīng)初始化過了,則表示下次觸發(fā)擴容操作的閾值,算法stzeCtl = n - (n >>> 2),也就是n的75%,與默認負載因子(0.75)的HashMap一致。

private transient volatile int sizeCtl;

初始化table的操作位于函數(shù)initTable(),源碼如下:

/**
 * Initializes table, using the size recorded in sizeCtl.
 */
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // sizeCtl小于0,這意味著已經(jīng)有其他線程進行初始化了
        // 所以當前線程讓出CPU時間片
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // 否則,通過CAS操作嘗試修改sizeCtl
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // 默認構造函數(shù),sizeCtl = 0,使用默認容量(16)進行初始化
                    // 否則,會根據(jù)sizeCtl進行初始化
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // 計算閾值,n的75%
                    sc = n - (n >>> 2);
                }
            } finally {
                // 閾值賦給sizeCtl
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

sizeCtl是一個volatile變量,只要有一個線程CAS操作成功,sizeCtl就會被暫時地修改為-1,這樣其他線程就能夠根據(jù)sizeCtl得知table是否已經(jīng)處于初始化狀態(tài)中,最后sizeCtl會被設置成閾值,用于觸發(fā)擴容操作。

擴容


ConcurrentHashMap觸發(fā)擴容的時機與HashMap類似,要么是在將鏈表轉(zhuǎn)換成紅黑樹時判斷table數(shù)組的長度是否小于閾值(64),如果小于就進行擴容而不是樹化,要么就是在添加元素的時候,判斷當前Entry數(shù)量是否超過閾值,如果超過就進行擴容。

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        // 小于MIN_TREEIFY_CAPACITY,進行擴容
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                // 將鏈表轉(zhuǎn)換成紅黑樹...
            }
        }
    }
}
...
final V putVal(K key, V value, boolean onlyIfAbsent) {
    ...
    addCount(1L, binCount); // 計數(shù)
    return null;
}
private final void addCount(long x, int check) {
    // 計數(shù)...
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // s(元素個數(shù))大于等于sizeCtl,觸發(fā)擴容
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            // 擴容標志位
            int rs = resizeStamp(n);
            // sizeCtl為負數(shù),代表正有其他線程進行擴容
            if (sc < 0) {
                // 擴容已經(jīng)結(jié)束,中斷循環(huán)
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // 進行擴容,并設置sizeCtl,表示擴容線程 + 1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 觸發(fā)擴容(第一個進行擴容的線程)
            // 并設置sizeCtl告知其他線程
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            // 統(tǒng)計個數(shù),用于循環(huán)檢測是否還需要擴容
            s = sumCount();
        }
    }
}

可以看到有關sizeCtl的操作牽涉到了大量的位運算,我們先來理解這些位運算的意義。首先是resizeStamp(),該函數(shù)返回一個用于數(shù)據(jù)校驗的標志位,意思是對長度為n的table進行擴容。它將n的前導零(最高有效位之前的零的數(shù)量)和1 << 15做或運算,這時低16位的最高位為1,其他都為n的前導零。

static final int resizeStamp(int n) {
    // RESIZE_STAMP_BITS = 16
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

初始化sizeCtl(擴容操作被第一個線程首次進行)的算法為(rs << RESIZE_STAMP_SHIFT) + 2,首先RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS = 16,那么rs << 16等于將這個標志位移動到了高16位,這時最高位為1,所以sizeCtl此時是個負數(shù),然后加二(至于為什么是2,還記得有關sizeCtl的說明嗎?1代表初始化狀態(tài),所以實際的線程個數(shù)是要減去1的)代表當前有一個線程正在進行擴容,

這樣sizeCtl就被分割成了兩部分,高16位是一個對n的數(shù)據(jù)校驗的標志位,低16位表示參與擴容操作的線程個數(shù) + 1。

可能會有讀者有所疑惑,更新進行擴容的線程數(shù)量的操作為什么是sc + 1而不是sc - 1,這是因為對sizeCtl的操作都是基于位運算的,所以不會關心它本身的數(shù)值是多少,只關心它在二進制上的數(shù)值,而sc + 1會在低16位上加1。

tryPresize()函數(shù)跟addCount()的后半段邏輯類似,不斷地根據(jù)sizeCtl判斷當前的狀態(tài),然后選擇對應的策略。

private final void tryPresize(int size) {
    // 對size進行修正
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    // sizeCtl是默認值或正整數(shù)
    // 代表table還未初始化
    // 或還沒有其他線程正在進行擴容
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            // 設置sizeCtl,告訴其他線程,table現(xiàn)在正處于初始化狀態(tài)
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        // 計算下次觸發(fā)擴容的閾值
                        sc = n - (n >>> 2);
                    }
                } finally {
                    // 將閾值賦給sizeCtl
                    sizeCtl = sc;
                }
            }
        }
        // 沒有超過閾值或者大于容量的上限,中斷循環(huán)
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        // 進行擴容,與addCount()后半段的邏輯一致
        else if (tab == table) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}

擴容操作的核心在于數(shù)據(jù)的轉(zhuǎn)移,在單線程環(huán)境下數(shù)據(jù)的轉(zhuǎn)移很簡單,無非就是把舊數(shù)組中的數(shù)據(jù)遷移到新的數(shù)組。但是這在多線程環(huán)境下是行不通的,需要保證線程安全性,在擴容的時候其他線程也可能正在添加元素,這時又觸發(fā)了擴容怎么辦?有人可能會說,這不難啊,用一個互斥鎖把數(shù)據(jù)轉(zhuǎn)移操作的過程鎖住不就好了?這確實是一種可行的解決方法,但同樣也會帶來極差的吞吐量。

互斥鎖會導致所有訪問臨界區(qū)的線程陷入阻塞狀態(tài),這會消耗額外的系統(tǒng)資源,內(nèi)核需要保存這些線程的上下文并放到阻塞隊列,持有鎖的線程耗時越長,其他競爭線程就會一直被阻塞,因此吞吐量低下,導致響應時間緩慢。而且鎖總是會伴隨著死鎖問題,一旦發(fā)生死鎖,整個應用程序都會因此受到影響,所以加鎖永遠是最后的備選方案。

Doug Lea沒有選擇直接加鎖,而是基于CAS實現(xiàn)無鎖的并發(fā)同步策略,令人佩服的是他不僅沒有把其他線程拒之門外,甚至還邀請它們一起來協(xié)助工作。

那么如何才能讓多個線程協(xié)同工作呢?Doug Lea把整個table數(shù)組當做多個線程之間共享的任務隊列,然后只需維護一個指針,當有一個線程開始進行數(shù)據(jù)轉(zhuǎn)移,就會先移動指針,表示指針劃過的這片bucket區(qū)域由該線程負責。

這個指針被聲明為一個volatile整型變量,它的初始位置位于table的尾部,即它等于table.length,很明顯這個任務隊列是逆向遍歷的。

/**
 * The next table index (plus one) to split while resizing.
 */
private transient volatile int transferIndex;
/**
 * 一個線程需要負責的最小bucket數(shù)
 */
private static final int MIN_TRANSFER_STRIDE = 16;
	
/**
 * The next table to use; non-null only while resizing.
 */
private transient volatile Node<K,V>[] nextTable;

一個已經(jīng)遷移完畢的bucket會被替換成ForwardingNode節(jié)點,用來標記此bucket已經(jīng)被其他線程遷移完畢了。我們之前提到過ForwardingNode,它是一個特殊節(jié)點,可以通過hash域的虛擬值來識別它,它同樣重寫了find()函數(shù),用來在新數(shù)組中查找目標。

數(shù)據(jù)遷移的操作位于transfer()函數(shù),多個線程之間依靠sizeCtl與transferIndex指針來協(xié)同工作,每個線程都有自己負責的區(qū)域,一個完成遷移的bucket會被設置為ForwardingNode,其他線程遇見這個特殊節(jié)點就跳過該bucket,處理下一個bucket。

transfer()函數(shù)可以大致分為三部分,第一部分對后續(xù)需要使用的變量進行初始化:

/**
 * Moves and/or copies the nodes in each bin to new table. See
 * above for explanation.
 */
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 根據(jù)當前機器的CPU數(shù)量來決定每個線程負責的bucket數(shù)
    // 避免因為擴容線程過多,反而影響到性能
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    // 初始化nextTab,容量為舊數(shù)組的一倍
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n; // 初始化指針
    }
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab

第二部分為當前線程分配任務和控制當前線程的任務進度,這部分是transfer()的核心邏輯,描述了如何與其他線程協(xié)同工作:

// i指向當前bucket,bound表示當前線程所負責的bucket區(qū)域的邊界
for (int i = 0, bound = 0;;) {
    Node<K,V> f; int fh;
    // 這個循環(huán)使用CAS不斷嘗試為當前線程分配任務
    // 直到分配成功或任務隊列已經(jīng)被全部分配完畢
    // 如果當前線程已經(jīng)被分配過bucket區(qū)域
    // 那么會通過--i指向下一個待處理bucket然后退出該循環(huán)
    while (advance) {
        int nextIndex, nextBound;
        // --i表示將i指向下一個待處理的bucket
        // 如果--i >= bound,代表當前線程已經(jīng)分配過bucket區(qū)域
        // 并且還留有未處理的bucket
        if (--i >= bound || finishing)
            advance = false;
        // transferIndex指針 <= 0 表示所有bucket已經(jīng)被分配完畢
        else if ((nextIndex = transferIndex) <= 0) {
            i = -1;
            advance = false;
        }
        // 移動transferIndex指針
        // 為當前線程設置所負責的bucket區(qū)域的范圍
        // i指向該范圍的第一個bucket,注意i是逆向遍歷的
        // 這個范圍為(bound, i),i是該區(qū)域最后一個bucket,遍歷順序是逆向的
        else if (U.compareAndSwapInt
                 (this, TRANSFERINDEX, nextIndex,
                  nextBound = (nextIndex > stride ?
                               nextIndex - stride : 0))) {
            bound = nextBound;
            i = nextIndex - 1;
            advance = false;
        }
    }
    // 當前線程已經(jīng)處理完了所負責的所有bucket
    if (i < 0 || i >= n || i + n >= nextn) {
        int sc;
        // 如果任務隊列已經(jīng)全部完成
        if (finishing) {
            nextTable = null;
            table = nextTab;
            // 設置新的閾值
            sizeCtl = (n << 1) - (n >>> 1);
            return;
        }
        // 工作中的擴容線程數(shù)量減1
        if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
            // (resizeStamp << RESIZE_STAMP_SHIFT) + 2代表當前有一個擴容線程
            // 相對的,(sc - 2) !=  resizeStamp << RESIZE_STAMP_SHIFT
            // 表示當前還有其他線程正在進行擴容,所以直接返回
            if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                return;
            // 否則,當前線程就是最后一個進行擴容的線程
            // 設置finishing標識
            finishing = advance = true;
            i = n; // recheck before commit
        }
    }
    // 如果待處理bucket是空的
    // 那么插入ForwardingNode,以通知其他線程
    else if ((f = tabAt(tab, i)) == null)
        advance = casTabAt(tab, i, null, fwd);
    // 如果待處理bucket的頭節(jié)點是ForwardingNode
    // 說明此bucket已經(jīng)被處理過了,跳過該bucket
    else if ((fh = f.hash) == MOVED)
        advance = true; // already processed

最后一部分是具體的遷移過程(對當前指向的bucket),這部分的邏輯與HashMap類似,拿舊數(shù)組的容量當做一個掩碼,然后與節(jié)點的hash進行與操作,可以得出該節(jié)點的新增有效位,如果新增有效位為0就放入一個鏈表A,如果為1就放入另一個鏈表B,鏈表A在新數(shù)組中的位置不變(跟在舊數(shù)組的索引一致),鏈表B在新數(shù)組中的位置為原索引加上舊數(shù)組容量。

這個方法減少了rehash的計算量,而且還能達到均勻分布的目的,如果不能理解請去看本文中HashMap擴容操作的解釋。

else {
    // 對于節(jié)點的操作還是要加上鎖的
    // 不過這個鎖的粒度很小,只鎖住了bucket的頭節(jié)點
    synchronized (f) {
        if (tabAt(tab, i) == f) {
            Node<K,V> ln, hn;
            // hash code不為負,代表這是條鏈表
            if (fh >= 0) {
                // fh & n 獲得hash code的新增有效位,用于將鏈表分離成兩類
                // 要么是0要么是1,關于這個位運算的更多細節(jié)
                // 請看本文中有關HashMap擴容操作的解釋
                int runBit = fh & n;
                Node<K,V> lastRun = f;
                // 這個循環(huán)用于記錄最后一段連續(xù)的同一類節(jié)點
                // 這個類別是通過fh & n來區(qū)分的
                // 這段連續(xù)的同類節(jié)點直接被復用,不會產(chǎn)生額外的復制
                for (Node<K,V> p = f.next; p != null; p = p.next) {
                    int b = p.hash & n;
                    if (b != runBit) {
                        runBit = b;
                        lastRun = p;
                    }
                }
                // 0被放入ln鏈表,1被放入hn鏈表
                // lastRun是連續(xù)同類節(jié)點的起始節(jié)點
                if (runBit == 0) {
                    ln = lastRun;
                    hn = null;
                }
                else {
                    hn = lastRun;
                    ln = null;
                }
                // 將最后一段的連續(xù)同類節(jié)點之前的節(jié)點按類別復制到ln或hn
                // 鏈表的插入方向是往頭部插入的,Node構造函數(shù)的第四個參數(shù)是next
                // 所以就算遇到類別與lastRun一致的節(jié)點也只會被插入到頭部
                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                    int ph = p.hash; K pk = p.key; V pv = p.val;
                    if ((ph & n) == 0)
                        ln = new Node<K,V>(ph, pk, pv, ln);
                    else
                        hn = new Node<K,V>(ph, pk, pv, hn);
                }
                // ln鏈表被放入到原索引位置,hn放入到原索引 + 舊數(shù)組容量
                // 這一點與HashMap一致,如果看不懂請去參考本文對HashMap擴容的講解
                setTabAt(nextTab, i, ln);
                setTabAt(nextTab, i + n, hn);
                setTabAt(tab, i, fwd); // 標記該bucket已被處理
                advance = true;
            }
            // 對紅黑樹的操作,邏輯與鏈表一樣,按新增有效位進行分類
            else if (f instanceof TreeBin) {
                TreeBin<K,V> t = (TreeBin<K,V>)f;
                TreeNode<K,V> lo = null, loTail = null;
                TreeNode<K,V> hi = null, hiTail = null;
                int lc = 0, hc = 0;
                for (Node<K,V> e = t.first; e != null; e = e.next) {
                    int h = e.hash;
                    TreeNode<K,V> p = new TreeNode<K,V>
                        (h, e.key, e.val, null, null);
                    if ((h & n) == 0) {
                        if ((p.prev = loTail) == null)
                            lo = p;
                        else
                            loTail.next = p;
                        loTail = p;
                        ++lc;
                    }
                    else {
                        if ((p.prev = hiTail) == null)
                            hi = p;
                        else
                            hiTail.next = p;
                        hiTail = p;
                        ++hc;
                    }
                }
                // 元素數(shù)量沒有超過UNTREEIFY_THRESHOLD,退化成鏈表
                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                setTabAt(nextTab, i, ln);
                setTabAt(nextTab, i + n, hn);
                setTabAt(tab, i, fwd);
                advance = true;
            }

計數(shù)


在Java 7中ConcurrentHashMap對每個Segment單獨計數(shù),想要得到總數(shù)就需要獲得所有Segment的鎖,然后進行統(tǒng)計。由于Java 8拋棄了Segment,顯然是不能再這樣做了,而且這種方法雖然簡單準確但也舍棄了性能。

Java 8聲明了一個volatile變量baseCount用于記錄元素的個數(shù),對這個變量的修改操作是基于CAS的,每當插入元素或刪除元素時都會調(diào)用addCount()函數(shù)進行計數(shù)。

private transient volatile long baseCount;
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // 嘗試使用CAS更新baseCount失敗
    // 轉(zhuǎn)用CounterCells進行更新
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        // 在CounterCells未初始化
        // 或嘗試通過CAS更新當前線程的CounterCell失敗時
        // 調(diào)用fullAddCount(),該函數(shù)負責初始化CounterCells和更新計數(shù)
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        // 統(tǒng)計總數(shù)
        s = sumCount();
    }
    if (check >= 0) {
    	// 判斷是否需要擴容,在上文中已經(jīng)講過了
    }
}

counterCells是一個元素為CounterCell的數(shù)組,該數(shù)組的大小與當前機器的CPU數(shù)量有關,并且它不會被主動初始化,只有在調(diào)用fullAddCount()函數(shù)時才會進行初始化。

CounterCell是一個簡單的內(nèi)部靜態(tài)類,每個CounterCell都是一個用于記錄數(shù)量的單元:

/**
 * Table of counter cells. When non-null, size is a power of 2.
 */
private transient volatile CounterCell[] counterCells;
/**
 * A padded cell for distributing counts.  Adapted from LongAdder
 * and Striped64.  See their internal docs for explanation.
 */
@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

注解@sun.misc.Contended用于解決偽共享問題。所謂偽共享,即是在同一緩存行(CPU緩存的基本單位)中存儲了多個變量,當其中一個變量被修改時,就會影響到同一緩存行內(nèi)的其他變量,導致它們也要跟著被標記為失效,其他變量的緩存命中率將會受到影響。解決偽共享問題的方法一般是對該變量填充一些無意義的占位數(shù)據(jù),從而使它獨享一個緩存行。

ConcurrentHashMap的計數(shù)設計與LongAdder類似。在一個低并發(fā)的情況下,就只是簡單地使用CAS操作來對baseCount進行更新,但只要這個CAS操作失敗一次,就代表有多個線程正在競爭,那么就轉(zhuǎn)而使用CounterCell數(shù)組進行計數(shù),數(shù)組內(nèi)的每個ConuterCell都是一個獨立的計數(shù)單元。

每個線程都會通過ThreadLocalRandom.getProbe() & m尋址找到屬于它的CounterCell,然后進行計數(shù)。ThreadLocalRandom是一個線程私有的偽隨機數(shù)生成器,每個線程的probe都是不同的(這點基于ThreadLocalRandom的內(nèi)部實現(xiàn),它在內(nèi)部維護了一個probeGenerator,這是一個類型為AtomicInteger的靜態(tài)常量,每當初始化一個ThreadLocalRandom時probeGenerator都會先自增一個常量然后返回的整數(shù)即為當前線程的probe,probe變量被維護在Thread對象中),可以認為每個線程的probe就是它在CounterCell數(shù)組中的hash code。

這種方法將競爭數(shù)據(jù)按照線程的粒度進行分離,相比所有競爭線程對一個共享變量使用CAS不斷嘗試在性能上要效率多了,這也是為什么在高并發(fā)環(huán)境下LongAdder要優(yōu)于AtomicInteger的原因。

fullAddCount()函數(shù)根據(jù)當前線程的probe尋找對應的CounterCell進行計數(shù),如果CounterCell數(shù)組未被初始化,則初始化CounterCell數(shù)組和CounterCell。該函數(shù)的實現(xiàn)與Striped64類(LongAdder的父類)的longAccumulate()函數(shù)是一樣的,把CounterCell數(shù)組當成一個散列表,每個線程的probe就是hash code,散列函數(shù)也僅僅是簡單的(n - 1) & probe

CounterCell數(shù)組的大小永遠是一個2的n次方,初始容量為2,每次擴容的新容量都是之前容量乘以二,處于性能考慮,它的最大容量上限是機器的CPU數(shù)量。

所以說CounterCell數(shù)組的碰撞沖突是很嚴重的,因為它的bucket基數(shù)太小了。而發(fā)生碰撞就代表著一個CounterCell會被多個線程競爭,為了解決這個問題,Doug Lea使用無限循環(huán)加上CAS來模擬出一個自旋鎖來保證線程安全,自旋鎖的實現(xiàn)基于一個被volatile修飾的整數(shù)變量,該變量只會有兩種狀態(tài):0和1,當它被設置為0時表示沒有加鎖,當它被設置為1時表示已被其他線程加鎖。這個自旋鎖用于保護初始化CounterCell、初始化CounterCell數(shù)組以及對CounterCell數(shù)組進行擴容時的安全。

CounterCell更新計數(shù)是依賴于CAS的,每次循環(huán)都會嘗試通過CAS進行更新,如果成功就退出無限循環(huán),否則就調(diào)用ThreadLocalRandom.advanceProbe()函數(shù)為當前線程更新probe,然后重新開始循環(huán),以期望下一次尋址到的CounterCell沒有被其他線程競爭。

如果連著兩次CAS更新都沒有成功,那么會對CounterCell數(shù)組進行一次擴容,這個擴容操作只會在當前循環(huán)中觸發(fā)一次,而且只能在容量小于上限時觸發(fā)。

fullAddCount()函數(shù)的主要流程如下:

  • 首先檢查當前線程有沒有初始化過ThreadLocalRandom,如果沒有則進行初始化。ThreadLocalRandom負責更新線程的probe,而probe又是在數(shù)組中進行尋址的關鍵。
  • 檢查CounterCell數(shù)組是否已經(jīng)初始化,如果已初始化,那么就根據(jù)probe找到對應的CounterCell。
    • 如果這個CounterCell等于null,需要先初始化CounterCell,通過把計數(shù)增量傳入構造函數(shù),所以初始化只要成功就說明更新計數(shù)已經(jīng)完成了。初始化的過程需要獲取自旋鎖。
    • 如果不為null,就按上文所說的邏輯對CounterCell實施更新計數(shù)。
  • CounterCell數(shù)組未被初始化,嘗試獲取自旋鎖,進行初始化。數(shù)組初始化的過程會附帶初始化一個CounterCell來記錄計數(shù)增量,所以只要初始化成功就表示更新計數(shù)完成。
  • 如果自旋鎖被其他線程占用,無法進行數(shù)組的初始化,只好通過CAS更新baseCount。
private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    // 當前線程的probe等于0,證明該線程的ThreadLocalRandom還未被初始化
    // 以及當前線程是第一次進入該函數(shù)
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        // 初始化ThreadLocalRandom,當前線程會被設置一個probe
        ThreadLocalRandom.localInit();      // force initialization
        // probe用于在CounterCell數(shù)組中尋址
        h = ThreadLocalRandom.getProbe();
        // 未競爭標志
        wasUncontended = true;
    }
    // 沖突標志
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;
        // CounterCell數(shù)組已初始化
        if ((as = counterCells) != null && (n = as.length) > 0) {
            // 如果尋址到的Cell為空,那么創(chuàng)建一個新的Cell
            if ((a = as[(n - 1) & h]) == null) {
                // cellsBusy是一個只有0和1兩個狀態(tài)的volatile整數(shù)
                // 它被當做一個自旋鎖,0代表無鎖,1代表加鎖
                if (cellsBusy == 0) {            // Try to attach new Cell
                    // 將傳入的x作為初始值創(chuàng)建一個新的CounterCell
                    CounterCell r = new CounterCell(x); // Optimistic create
                    // 通過CAS嘗試對自旋鎖加鎖
                    if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        // 加鎖成功,聲明Cell是否創(chuàng)建成功的標志
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            // 再次檢查CounterCell數(shù)組是否不為空
                            // 并且尋址到的Cell為空
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                // 將之前創(chuàng)建的新Cell放入數(shù)組
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            // 釋放鎖
                            cellsBusy = 0;
                        }
                        // 如果已經(jīng)創(chuàng)建成功,中斷循環(huán)
                        // 因為新Cell的初始值就是傳入的增量,所以計數(shù)已經(jīng)完畢了
                        if (created)
                            break;
                        // 如果未成功
                        // 代表as[(n - 1) & h]這個位置的Cell已經(jīng)被其他線程設置
                        // 那么就從循環(huán)頭重新開始
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            // as[(n - 1) & h]非空
            // 在addCount()函數(shù)中通過CAS更新當前線程的Cell進行計數(shù)失敗
            // 會傳入wasUncontended = false,代表已經(jīng)有其他線程進行競爭
            else if (!wasUncontended)       // CAS already known to fail
                // 設置未競爭標志,之后會重新計算probe,然后重新執(zhí)行循環(huán)
                wasUncontended = true;      // Continue after rehash
            // 嘗試進行計數(shù),如果成功,那么就退出循環(huán)
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                break;
            // 嘗試更新失敗,檢查counterCell數(shù)組是否已經(jīng)擴容
            // 或者容量達到最大值(CPU的數(shù)量)
            else if (counterCells != as || n >= NCPU)
                // 設置沖突標志,防止跳入下面的擴容分支
                // 之后會重新計算probe
                collide = false;            // At max size or stale
            // 設置沖突標志,重新執(zhí)行循環(huán)
            // 如果下次循環(huán)執(zhí)行到該分支,并且沖突標志仍然為true
            // 那么會跳過該分支,到下一個分支進行擴容
            else if (!collide)
                collide = true;
            // 嘗試加鎖,然后對counterCells數(shù)組進行擴容
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                try {
                    // 檢查是否已被擴容
                    if (counterCells == as) {// Expand table unless stale
                        // 新數(shù)組容量為之前的1倍
                        CounterCell[] rs = new CounterCell[n << 1];
                        // 遷移數(shù)據(jù)到新數(shù)組
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;
                    }
                } finally {
                    // 釋放鎖
                    cellsBusy = 0;
                }
                collide = false;
                // 重新執(zhí)行循環(huán)
                continue;                   // Retry with expanded table
            }
            // 為當前線程重新計算probe
            h = ThreadLocalRandom.advanceProbe(h);
        }
        // CounterCell數(shù)組未初始化,嘗試獲取自旋鎖,然后進行初始化
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) {
                    // 初始化CounterCell數(shù)組,初始容量為2
                    CounterCell[] rs = new CounterCell[2];
                    // 初始化CounterCell
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            // 初始化CounterCell數(shù)組成功,退出循環(huán)
            if (init)
                break;
        }
        // 如果自旋鎖被占用,則只好嘗試更新baseCount
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base
    }
}

對于統(tǒng)計總數(shù),只要能夠理解CounterCell的思想,就很簡單了。仔細想一想,每次計數(shù)的更新都會被分攤在baseCount和CounterCell數(shù)組中的某一CounterCell,想要獲得總數(shù),把它們統(tǒng)計相加就是了。

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}
 final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

其實size()函數(shù)返回的總數(shù)可能并不是百分百精確的,試想如果前一個遍歷過的CounterCell又進行了更新會怎么樣?盡管只是一個估算值,但在大多數(shù)場景下都還能接受,而且性能上是要比Java 7好上太多了。

添加元素


添加元素的主要邏輯與HashMap沒什么區(qū)別,有所區(qū)別的復雜操作如擴容和計數(shù)我們上文都已經(jīng)深入解析過了,所以整體來說putVal()函數(shù)還是比較簡單的,可能唯一需要注意的就是在對節(jié)點進行操作的時候需要通過互斥鎖保證線程安全,這個互斥鎖的粒度很小,只對需要操作的這個bucket加鎖。

public V put(K key, V value) {
    return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0; // 節(jié)點計數(shù)器,用于判斷是否需要樹化
    // 無限循環(huán)+CAS,無鎖的標準套路
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 初始化table
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // bucket為null,通過CAS創(chuàng)建頭節(jié)點,如果成功就結(jié)束循環(huán)
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // bucket為ForwardingNode
        // 當前線程前去協(xié)助進行擴容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    // 節(jié)點是鏈表
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 找到目標,設置value
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // 未找到節(jié)點,插入新節(jié)點到鏈表尾部
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 節(jié)點是紅黑樹
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 根據(jù)bucket中的節(jié)點數(shù)決定是否樹化
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                // oldVal不等于null,說明沒有新節(jié)點
                // 所以直接返回,不進行計數(shù)
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 計數(shù)
    addCount(1L, binCount);
    return null;
}

至于刪除元素的操作位于函數(shù)replaceNode(Object key, V value, Object cv),當table[key].val等于期望值cv時(或cv等于null),更新節(jié)點的值為value,如果value等于null,那么刪除該節(jié)點。

remove()函數(shù)通過調(diào)用replaceNode(key, null, null)來達成刪除目標節(jié)點的目的,replaceNode()的具體實現(xiàn)與putVal()沒什么差別,只不過對鏈表的操作有所不同而已,所以就不多敘述了。

并行計算


Java 8除了對ConcurrentHashMap重新設計以外,還引入了基于Lambda表達式的Stream API。它是對集合對象功能上的增強(所以不止ConcurrentHashMap,其他集合也都實現(xiàn)了該API),以一種優(yōu)雅的方式來批量操作、聚合或遍歷集合中的數(shù)據(jù)。

最重要的是,它還提供了并行模式,充分利用了多核CPU的優(yōu)勢實現(xiàn)并行計算。讓我們看看如下的示例代碼:

public static void main(String[] args) {
    ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
    String keys = "ABCDEFG";
    for (int i = 1; i <= keys.length(); i++) {
        map.put(String.valueOf(keys.charAt(i - 1)), i);
    }
    map.forEach(2,
            (k, v) -> System.out.println("key-" + k + ":value-" + v + ". by thread->" + Thread.currentThread().getName()));
}

這段代碼通過兩個線程(包括主線程)并行地遍歷map中的元素,然后輸出到控制臺,輸出如下:

key-A:value-1. by thread->main
key-D:value-4. by thread->ForkJoinPool.commonPool-worker-2
key-B:value-2. by thread->main
key-E:value-5. by thread->ForkJoinPool.commonPool-worker-2
key-C:value-3. by thread->main
key-F:value-6. by thread->ForkJoinPool.commonPool-worker-2
key-G:value-7. by thread->ForkJoinPool.commonPool-worker-2

很明顯,有兩個線程在進行工作,那么這是怎么實現(xiàn)的呢?我們先來看看forEach()函數(shù):

public void forEach(long parallelismThreshold,
                    BiConsumer<? super K,? super V> action) {
    if (action == null) throw new NullPointerException();
    new ForEachMappingTask<K,V>
        (null, batchFor(parallelismThreshold), 0, 0, table,
         action).invoke();
}

parallelismThreshold是需要并行執(zhí)行該操作的線程數(shù)量,action則是回調(diào)函數(shù)(我們想要執(zhí)行的操作)。action的類型為BiConsumer,是一個用于支持Lambda表達式的FunctionalInterface,它接受兩個輸入?yún)?shù)并返回0個結(jié)果。

@FunctionalInterface
public interface BiConsumer<T, U> {
    /**
     * Performs this operation on the given arguments.
     *
     * @param t the first input argument
     * @param u the second input argument
     */
    void accept(T t, U u);

看來實現(xiàn)并行計算的關鍵在于ForEachMappingTask對象,通過它的繼承關系結(jié)構圖可以發(fā)現(xiàn),F(xiàn)orEachMappingTask其實就是ForkJoinTask。

集合的并行計算是基于Fork/Join框架實現(xiàn)的,工作線程交由ForkJoinPool線程池維護。它推崇分而治之的思想,將一個大的任務分解成多個小的任務,通過fork()函數(shù)(有點像Linux的fork()系統(tǒng)調(diào)用來創(chuàng)建子進程)來開啟一個工作線程執(zhí)行其中一個小任務,通過join()函數(shù)等待工作線程執(zhí)行完畢(需要等所有工作線程執(zhí)行完畢才能合并最終結(jié)果),只要所有的小任務都已經(jīng)處理完成,就代表這個大的任務也完成了。

像上文中的示例代碼就是將遍歷這個大任務分解成了N個小任務,然后交由兩個工作線程進行處理。

static final class ForEachMappingTask<K,V>
    extends BulkTask<K,V,Void> {
    final BiConsumer<? super K, ? super V> action;
    ForEachMappingTask
        (BulkTask<K,V,?> p, int b, int i, int f, Node<K,V>[] t,
         BiConsumer<? super K,? super V> action) {
        super(p, b, i, f, t);
        this.action = action;
    }
    public final void compute() {
        final BiConsumer<? super K, ? super V> action;
        if ((action = this.action) != null) {
            for (int i = baseIndex, f, h; batch > 0 &&
                     (h = ((f = baseLimit) + i) >>> 1) > i;) {
                // 記錄待完成任務的數(shù)量
                addToPendingCount(1);
                // 開啟一個工作線程執(zhí)行任務
                // 其余參數(shù)是任務的區(qū)間以及table和回調(diào)函數(shù)
                new ForEachMappingTask<K,V>
                    (this, batch >>>= 1, baseLimit = h, f, tab,
                     action).fork();
            }
            for (Node<K,V> p; (p = advance()) != null; )
                // 調(diào)用回調(diào)函數(shù)
                action.accept(p.key, p.val);
            // 與addToPendingCount()相反
            // 它會減少待完成任務的計數(shù)器
            // 如果該計數(shù)器為0,代表所有任務已經(jīng)完成了
            propagateCompletion();
        }
    }
}

其他并行計算函數(shù)的實現(xiàn)也都差不多,只不過具體的Task實現(xiàn)不同,例如search()

public <U> U search(long parallelismThreshold,
                    BiFunction<? super K, ? super V, ? extends U> searchFunction) {
    if (searchFunction == null) throw new NullPointerException();
    return new SearchMappingsTask<K,V,U>
        (null, batchFor(parallelismThreshold), 0, 0, table,
         searchFunction, new AtomicReference<U>()).invoke();
}

為了節(jié)省篇幅(說實話現(xiàn)在似乎很少有人能耐心看完一篇長文(:з」∠)),有關Stream API是如何使用Fork/Join框架進行工作以及實現(xiàn)細節(jié)就不多講了,以后有機會再說吧。

參考文獻


  • Associative array – Wikiwand
  • Hash table – Wikiwand
  • Hash function – Wikiwand
  • ConcurrentHashMap (Java Platform SE 8 )
  • LongAdder (Java Platform SE 8 )
  • Java Magic. Part 4: sun.misc.Unsafe
  • ConcurrentHashMap in Java 8 – DZone Java

標簽: linux swap 安全 代碼

版權申明:本站文章部分自網(wǎng)絡,如有侵權,請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點!
本站所提供的圖片等素材,版權歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:SpringBoot | 第十七章:web 應用開發(fā)之文件上傳

下一篇:Map 大家族的那點事兒 ( 6 ) :LinkedHashMap