深入了解ConcurrentHashMap_網頁設計公司

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

節能減碳愛地球是景泰電動車的理念,是創立景泰電動車行的初衷,滿意態度更是服務客戶的最高品質,我們的成長來自於你的推薦。

在上一篇文章【簡單了解系列】從基礎的使用來深挖HashMap里,我從最基礎的使用中介紹了HashMap,大致是JDK1.7和1.8中底層實現的變化,和介紹了為什麼在多線程下可能會造成死循環,擴容機制是什麼樣的。感興趣的可以先看看。

我們知道,HashMap是非線程安全的容器,那麼為什麼ConcurrentHashMap能夠做到線程安全呢?

底層結構

首先看一下ConcurrentHashMap的底層數據結構,在Java8中,其底層的實現方式與HashMap一樣的,同樣是數組、鏈表再加紅黑樹,具體的可以參考上面的HashMap的文章,下面所有的討論都是基於Java 1.8。

transient volatile Node<K,V>[] table;

volatile關鍵字

對比HashMap的底層結構可以發現,table的定義中多了一個volatile關鍵字。這個關鍵字是做什麼的呢?我們知道所有的共享變量都存在主內存中,就像table。

而線程對變量的所有操作都必須在線程自己的工作內存中完成,而不能直接讀取主存中的變量,這是JMM的規定。所以每個線程都會有自己的工作內存,工作內存中存放了共享變量的副本。而正是因為這樣,才造成了可見性的問題。

ABCD四個線程同時在操作一個共享變量X,此時如果A從主存中讀取了X,改變了值,並且寫回了內存。那麼BCD線程所得到的X副本就已經失效了。此時如果沒有被volatile修飾,那麼BCD線程是不知道自己的變量副本已經失效了。繼續使用這個變量就會造成數據不一致的問題。

內存可見性

而如果加上了volatile關鍵字,BCD線程就會立馬看到最新的值,這就是內存可見性。你可能想問,憑什麼加了volatile的關鍵字就可以保證共享變量的內存可見性?

那是因為如果變量被volatile修飾,在線程進行寫操作時,會直接將新的值寫入到主存中,而不是線程的工作內存中;而在讀操作時,會直接從主存中讀取,而不是線程的工作內存。

基礎使用

首先這個使用與HashMap沒有任何區別,只是實現改成了ConcurrentHashMap。

Map<String, String> map = new ConcurrentHashMap<>();
map.put("微信搜索", "SH的全棧筆記");
map.get("微信搜索"); // SH的全棧筆記

取值

首先我們來看一下get方法的使用,源碼如下。

public V get(Object key) {
  Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  int h = spread(key.hashCode());
  if ((tab = table) != null && (n = tab.length) > 0 &&
      (e = tabAt(tab, (n - 1) & h)) != null) {
    if ((eh = e.hash) == h) {
      if ((ek = e.key) == key || (ek != null && key.equals(ek)))
        return e.val;
    }
    else if (eh < 0)
      return (p = e.find(h, key)) != null ? p.val : null;
    while ((e = e.next) != null) {
      if (e.hash == h &&
          ((ek = e.key) == key || (ek != null && key.equals(ek))))
        return e.val;
    }
  }
  return null;
}

大概解釋一下這個過程發生了什麼,首先根據key計算出哈希值,如果找到了就直接返回值。如果是紅黑樹的話,就在紅黑樹中查找值,否則就按照鏈表的查找方式查找。

這與HashMap也差不多的,元素會首先以鏈表的方式進行存儲,如果該桶中的元素數量大於TREEIFY_THRESHOLD的值,就會觸發樹化。將當前的鏈錶轉換為紅黑樹。因為如果數量太多的話,鏈表的查詢效率就會變得非常低,時間複雜度為O(n),而紅黑樹的查詢時間複雜度則為O(logn),這個閾值在Java 1.8中的默認值為8,定義如下。

static final int TREEIFY_THRESHOLD = 8;

賦值

put的源碼就不放出來了,放在這大家估計也不會一行一行的去看。所以我就簡單的解釋一下put的過程發生了什麼事,並貼上關鍵代碼就好了。

整個過程,除開併發的一些細節,大致的流程和1.8中的HashMap是差不多的。

  • 首先會根據傳入的key計算出hashcode,如果是第一次被賦值,那自然需要進行初始化table
  • 如果這個key沒有存在過,直接用CAS在當前槽位的頭節點創建一個Node,會用自旋來保證成功
  • 如果當前的Node的hashcode是否等於-1,如果是則證明有其它的線程正在執行擴容操作,當前線程就加入到擴容的操作中去
  • 且如果該槽位(也就是桶)上的數據結構如果是鏈表,則按照鏈表的插入方式,直接接在當前的鏈表的後面。如果數量大於了樹化的閾值就會轉為紅黑樹。
  • 如果這個key存在,就會直接覆蓋。
  • 判斷是否需要擴容

看到這你可能會有一堆的疑問。

例如在多線程的情況下,幾個線程同時來執行put操作時,怎麼保證只執行一次初始化,或者怎麼保證只執行一次擴容呢?萬一我已經寫入了數據,另一個線程又初始化了一遍,豈不是造成了數據不一致的問題。同樣是多線程的情況下, 怎麼保證put值的時候不會被其他線程覆蓋。CAS又是什麼?

接下來我們就來看一下在多線程的情況下,ConcurrentHashMap是如何保證線程安全的。

初始化的線程安全

首先我們來看初始化的源碼。

private final Node<K,V>[] initTable() {
  Node<K,V>[] tab; int sc;
  while ((tab = table) == null || tab.length == 0) {
    if ((sc = sizeCtl) < 0)
      Thread.yield(); // lost initialization race; just spin
    else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
      try {
        if ((tab = table) == null || tab.length == 0) {
          int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
          @SuppressWarnings("unchecked")
          Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
          table = tab = nt;
          sc = n - (n >>> 2);
        }
      } finally {
        sizeCtl = sc;
      }
      break;
    }
  }
  return tab;
}

可以看到有一個關鍵的變量,sizeCtl,其定義如下。

private transient volatile int sizeCtl;

sizeCtl使用了關鍵字volatile修飾,說明這是一個多線程的共享變量,可以看到如果是首次初始化,第一個判斷條件if ((sc = sizeCtl) < 0)是不會滿足的,正常初始化的話sizeCtl的值為0,初始化設定了size的話sizeCtl的值會等於傳入的size,而這兩個值始終是大於0的。

CAS

然後就會進入下面的U.compareAndSwapInt(this, SIZECTL, sc, -1)方法,這就是上面提到的CAS,Compare and Swap(Set),比較並交換,Unsafe是位於sun.misc下的一個類,在Java底層用的比較多,它讓Java擁有了類似C語言一樣直接操作內存空間的能力。

例如可以操作內存、CAS、內存屏障、線程調度等等,但是如果Unsafe類不能被正確使用,就會使程序變的不安全,所以不建議程序直接使用它。

compareAndSwapInt的四個參數分別是,實例、偏移地址、預期值、新值。偏移地址可以快速幫我們在實例中定位到我們要修改的字段,此例中便是sizeCtl。如果內存當中的sizeCtl是傳入的預期值,則將其更新為新的值。這個Unsafe類的方法可以保證這個操作的原子性。當你在使用parallelStream進行併發的foreach遍歷時,如果涉及到修改一個整型的共享變量時,你肯定不能直接用i++,因為在多線程下,i++每次操作不能保證原子性。所以你可能會用到如下的方式。

AtomicInteger num = new AtomicInteger();
arr.parallelStream().forEach(item -> num.getAndIncrement());

你可能會好奇,為什麼使用了AtomicInteger就可以保證原子性,跟Unsafe類和CAS又有什麼關係,讓我們接着往下,看getAndIncrement方法的底層實現。

public final int getAndIncrement() {
  return unsafe.getAndAddInt(this, valueOffset, 1);
}

可以看到,底層調用的是Unsafe類的方法,這不就聯繫上了嗎,而getAndIncrement的實現又長這樣。

public final int getAndAddInt(Object var1, long var2, int var4) {
  int var5;
  do {
    var5 = this.getIntVolatile(var1, var2);
  } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  return var5;
}

沒錯,這裏底層調用了compareAndSwapInt方法。可以看到這裏加了while,如果該方法返回false就一直循環,直到成功為止。這個過程有個的名字,叫自旋。特別高端啊,說人話就是無限循環。

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

透過選單樣式的調整、圖片的縮放比例、文字的放大及段落的排版對應來給使用者最佳的瀏覽體驗,所以不用擔心有手機版網站兩個後台的問題,而視覺效果也是透過我們前端設計師優秀的空間比例設計,不會因為畫面變大變小而影響到整體視覺的美感。

什麼情況會返回false呢?那就是var5變量存儲的值,和現在內存中實際var5的值不同,說明這個變量已經被其他線程修改過了,此時通過自旋來重新獲取,直到成功為止,然後自旋結束。

結論

聊的稍微有點多,這小節的問題是如何保證不重複初始化。那就是執行首次擴容時,會將變量sizeCtl設置為-1,因為其被volatile修飾,所以其值的修改對其他線程可見。

其它線程再調用初始化時,就會發現sizeCtl的值為-1,說明已經有線程正在執行初始化的操作了,就會執行Thread.yield(),然後退出。

yield相信大家都不陌生,和sleep不同,sleep可以讓線程進入阻塞狀態,且可以指定阻塞的時間,同時釋放CPU資源。而yield不會讓線程進入阻塞狀態,而且也不能指定時間,它讓線程重新進入可執行狀態,讓出CPU調度,讓CPU資源被同優先級或者高優先級的線程使用,稍後再進行嘗試,這個時間依賴於當前CPU的時間片劃分。

如何保證值不被覆蓋

我們在上一節舉了在併發下i++的例子,說在併發下i++並不是一個具有原子性的操作,假設此時i=1,線程A和線程B同時取了i的值,同時+1,然後此時又同時的寫回。那麼此時i++的值會是2而不是3,在併發下1+1+1=2是可能出現的。

讓我們來看一下ConcurrentHashMap在目標key已經存在時的賦值操作,因為如果不存在會直接調用Unsafe的方法創建一個Node,所以後續的線程就會進入到下面的邏輯中來,由於太長,我省略了一些代碼。

......
V oldVal = null;
synchronized (f) {
  if (tabAt(tab, i) == f) {
    if (fh >= 0) {
      binCount = 1;
      for (Node<K,V> e = f;; ++binCount) {
        ......
      }
    }
    else if (f instanceof TreeBin) {
      Node<K,V> p;
      binCount = 2;
      if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
        oldVal = p.val;
        if (!onlyIfAbsent)
          p.val = value;
      }
    }
  }
}
if (binCount != 0) {
  if (binCount >= TREEIFY_THRESHOLD)
    treeifyBin(tab, i);
  if (oldVal != null)
    return oldVal;
  break;
}

上述代碼在賦值的邏輯外層包了一個synchronized,這個有什麼用呢?

synchronized關鍵字

這個地方也可以換一個方式來理解,那就是synchronized如何保證線程安全的。線程安全,我認為更多的是描述一種風險。在堆內存中的數據由於可以被任何線程訪問到,在沒有任何限制的情況下存在被意外修改的風險。

synchronized是通過對共享資源加鎖的方式,使同一時間只能有一個線程能夠訪問到臨界區(也就是共享資源),共享資源包括了方法、鎖代碼塊和對象。

那是不是使用了synchronized就一定能保證線程安全呢?不是的,如果不能正確的使用,很可能就會引發死鎖,所以,保證線程安全的前提是正確的使用synchronized

自動擴容的線程安全

除了初始化、併發的寫入值,還有一個問題值得關注,那就是在多線程下,ConcurrentHashMap是如何保證自動擴容是線程安全的。

擴容的關鍵方案是transfer,但是由於代碼太多了,貼在這個地方可能會影響大家的理解,感興趣的可以自己的看一下。

還是大概說一下自動擴容的過程,我們以一個線程來舉例子。在putVal的最後一步,會調用addCount方法,然後在方法里判讀是否需要擴容,如果容量超過了實際容量 * 負載因子(也就是sizeCtl的值)就會調用transfer方法。

計算分區的範圍

因為ConcurrentHashMap是支持多線程同時擴容的,所以為了避免每個線程處理的數量不均勻,也為了提高效率,其對當前的所有桶按數量(也就是上面提到的槽位)進行分區,每個線程只處理自己分到的區域內的桶的數據即可。

當前線程計算當前stride的代碼如下。

stride = (NCPU > 1) ? (n >>> 3) / NCPU : n);

如果計算出來的值小於設定的最小範圍,也就是private static final int MIN_TRANSFER_STRIDE = 16;,就把當前分區範圍設置為16。

初始化nextTable

nextTable也是一個共享變量,定義如下,用於存放在正在擴容之後的ConcurrentHashMap的數據,當且僅當正在擴容時才不為空。

private transient volatile Node<K,V>[] nextTable;

如果當前transfer方法傳入的nextTab(這是個局部變量,比上面提到的nextTable少了幾個字母,不要搞混了)是null,說明是當前線程是第一個調用擴容操作的線程,就需要初始化一個size為原來容量2被的nextTable,核心代碼如下。

Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 可以看到傳入的初始化容量是n << 1。

初始化成功之後就更新共享變量nextTable的值,並設置transferIndex的值為擴容前的length,這也是一個共享的變量,表示擴容使還未處理的桶的下標。

設置分區邊界

一個新的線程加入擴容操作,在完成上述步驟后,就會開始從現在正在擴容的Map中找到自己的分區。例如,如果是第一個線程,那麼其取到的分區就會如下。

start = nextIndex - 1;
end = nextIndex > stride ? nextIndex - stride : 0;
// 實際上就是當還有足夠的桶可以分的時候,線程分到的分區為 [n-stride, n - 1]

可以看到,分區是從尾到首進行的。而如果是首次進入的線程,nextIndex 的值會被初始化為共享變量transferIndex 的值。

Copy分區內的值

當前線程在自己劃分到的分區內開始遍歷,如果當前桶是null,那麼就生成一個 ForwardingNode,代碼如下。

ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

並把當前槽位賦值為fwd,你可以把ForwardingNode理解為一個標誌位,如果有線程遍歷到了這個桶, 發現已經是ForwardingNode了,就代表這個桶已經被處理過了,就會跳過這個桶。

如果這個桶沒有被處理過,就會開始給當前的桶加鎖,我們知道ConcurrentHashMap會在多線程的場景下使用,所以當有線程正在擴容的時候,可能還會有線程正在執行put操作,所以如果當前Map正在執行擴容操作,如果此時再寫入數據,很可能會造成的數據丟失,所以要對桶進行加鎖。

總結

對比在1.7中採用的Segment分段鎖的臃腫設計,1.8中直接使用了CASSynchronized來保證併發下的線程安全。總的來說,在1.8中,ConcurrentHashMap和HashMap的底層實現都差不多,都是數組、鏈表和紅黑樹的方式。其主要區別就在於應用場景,非併發的情況可以使用HashMap,而如果要處理併發的情況,就需要使用ConcurrentHashMap。關於ConcurrentHashMap就先聊到這裏。

本文使用 mdnice 排版

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

搬家費用:依消費者運送距離、搬運樓層、有無電梯、步行距離、特殊地形、超重物品等計價因素後,評估每車次單