ConcurrentHashMap源码分析

释放双眼,带上耳机,听听看~!

1
2
3
4
1public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
2    implements ConcurrentMap<K,V>, Serializable {
3
4

ConcurrentHashMap源码分析

一些成员变量


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1//扩容的最大容量限制
2private static final int MAXIMUM_CAPACITY = 1 << 30;
3//默认容量
4private static final int DEFAULT_CAPACITY = 16;
5//扩容因子,当达到容量达到n*LOAD_FACTOR时,就会扩容
6private static final float LOAD_FACTOR = 0.75f;
7//超过这个值会扩容或建红黑树
8static final int TREEIFY_THRESHOLD = 8;
9//从链表升级为红黑树的阈值
10static final int MIN_TREEIFY_CAPACITY = 64;
11//多线程扩容,这个表示每个线程最少负责迁移数据的数量
12private static final int MIN_TRANSFER_STRIDE = 16;
13//下面都是hash值
14static final int MOVED     = -1; //表示在扩容
15static final int TREEBIN   = -2; //表示是树节点
16static final int RESERVED  = -3;
17//hash值取除最高一位以外的31位
18static final int HASH_BITS = 0x7fffffff;
19//CPU数量
20static final int NCPU = Runtime.getRuntime().availableProcessors();
21//装载数据的数组
22   transient volatile Node<K,V>[] table;
23//扩容用的新数组
24   private transient volatile Node<K,V>[] nextTable;
25//没有并发时的计数
26   private transient volatile long baseCount;
27//达到该值就会扩容
28   private transient volatile int sizeCtl;
29
30

构造方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1public ConcurrentHashMap(int initialCapacity) {
2       if (initialCapacity < 0)
3           throw new IllegalArgumentException();
4       //如果initialCapacity大于MAXIMUM_CAPACITY的一半就直接赋值为MAXIMUM_CAPACITY
5       //否则赋值为1.5倍initialCapacity+1,然后向上去最近2的次方的数
6       int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
7                  MAXIMUM_CAPACITY :
8                  tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
9       this.sizeCtl = cap;
10   }
11
12public ConcurrentHashMap(int initialCapacity,
13                            float loadFactor, int concurrencyLevel) {
14       if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
15           throw new IllegalArgumentException();
16       if (initialCapacity < concurrencyLevel)   // Use at least as many bins
17           initialCapacity = concurrencyLevel;   // as estimated threads
18       long size = (long)(1.0 + (long)initialCapacity / loadFactor);
19       int cap = (size >= (long)MAXIMUM_CAPACITY) ?
20           MAXIMUM_CAPACITY : tableSizeFor((int)size);
21       this.sizeCtl = cap;
22   }
23
24

数据都封装到Node类中


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1static class Node<K,V> implements Map.Entry<K,V> {
2       final int hash;//哈希值
3       final K key;//键
4       volatile V val;//数据
5       volatile Node<K,V> next;//下一个节点
6       Node(int hash, K key, V val, Node<K,V> next) {
7           this.hash = hash;
8           this.key = key;
9           this.val = val;
10           this.next = next;
11       }
12  ...
13}
14
15

下面从增删查分析源码

分析之前,先了解下ConcurrentHashMap类中CAS操作,主要有以下三个方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
2       //返回第i个Node元素,i左移ASHIFT位+第一个元素的地址ABASE
3       return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
4   }
5
6   static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
7                                       Node<K,V> c, Node<K,V> v) {
8       //CAS方式更新第i个位置的元素
9       return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
10   }
11   static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
12       //cas赋值数组第i个元素
13       U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
14   }
15
16

ASHIFT、ABASE在静态代码块中进行赋值


1
2
3
4
5
6
7
8
9
10
11
12
1Class<?> ak = Node[].class;
2  //arrayBaseOffset: 返回当前数组第一个元素地址相对于数组起始地址的偏移值
3  ABASE = U.arrayBaseOffset(ak);
4  //arrayIndexScale: 返回当前数组一个元素占用的字节数,在本例中返回4。
5  int scale = U.arrayIndexScale(ak);
6  if ((scale & (scale - 1)) != 0)//必须是2的指数
7      throw new Error("data type scale not a power of two");
8  //Integer.numberOfLeadingZeros:返回int数字的高位直到第一个非0位的位数,如4,返回29(100,前面29个0)
9  //ASHIFT作用:可方便地利用左移ASHIFT位拿到数组第i个元素的地址偏移量
10  ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
11
12

这里CAS操作数组,可以这么理解:拿到数组第一个元素的偏移地址p,然后拿到每个数据所占的字节数c,那么第i个元素偏移地址就是p+i*c。所以可以利用左移ASHIFT位来拿到第i个元素的偏移量,再加上ABASE,就是第i个元素相对于数组起始地址的偏移量。

1. 增加数据


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
1public V put(K key, V value) {
2      //插入数据,存在就覆盖
3      return putVal(key, value, false);
4  }
5
6final V putVal(K key, V value, boolean onlyIfAbsent) {
7      if (key == null || value == null) throw new NullPointerException();
8//【标记1】对key的哈希值进行处理,使得更离散
9      int hash = spread(key.hashCode());
10      int binCount = 0;//统计链表长度
11      for (Node<K,V>[] tab = table;;) {
12          Node<K,V> f; int n, i, fh;
13          if (tab == null || (n = tab.length) == 0)
14              tab = initTable();//【标记2】一开始没初始化数组
15          else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//用(n - 1) & hash取到数组下标,发现其元素为null
16              if (casTabAt(tab, i, null,
17                           new Node<K,V>(hash, key, value, null)))
18                  break;//直接CAS成功添加了一个新节点
19          }
20          else if ((fh = f.hash) == MOVED)//MOVED是数据迁移标志
21              tab = helpTransfer(tab, f);//【标记3】正在数据迁移,去帮助数据迁移
22          else {
23              V oldVal = null;
24              synchronized (f) {//锁住当前节点
25                  if (tabAt(tab, i) == f) {
26                      if (fh >= 0) {//说明是链表结构
27                          binCount = 1;//计算链表节点数量
28                          for (Node<K,V> e = f;; ++binCount) {
29                              K ek;
30                              if (e.hash == hash &&
31                                  ((ek = e.key) == key ||
32                                   (ek != null && key.equals(ek)))) {
33                                  oldVal = e.val;//找到原来的值
34                                  if (!onlyIfAbsent)
35                                      e.val = value;//只用onlyIfAbsent为false才更新值
36                                  break;
37                              }
38                              Node<K,V> pred = e;
39                              if ((e = e.next) == null) {//到了链尾
40                                  //这里新建一个节点插入
41                                  pred.next = new Node<K,V>(hash, key,
42                                                            value, null);
43                                  break;
44                              }
45                          }
46                      }
47                      else if (f instanceof TreeBin) {
48                          Node<K,V> p;
49                          binCount = 2;
50                          if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
51                                                         value)) != null) {
52                              oldVal = p.val;
53                              if (!onlyIfAbsent)
54                                  p.val = value;
55                          }
56                      }
57                  }
58              }
59              if (binCount != 0) {
60                  if (binCount >= TREEIFY_THRESHOLD)
61                      treeifyBin(tab, i);//【标记4】超过阀值开始考虑建红黑树
62                  if (oldVal != null)
63                      return oldVal;
64                  break;
65              }
66          }
67      }
68      //当对应的key原先不存在才到这里
69//【标记5】增加计数
70      addCount(1L, binCount);
71      return null;
72  }
73
74

ConcurrentHashMap插入需要保证线程安全,所以操作数据用了CAS方式,另外当插入的时候如果正好在扩容,就会去帮忙扩容,最后插入成功后会进行判断是否需要扩容,是否要增加计数。

【标记1】hash值处理,使映射更加离散


1
2
3
4
5
6
7
1static final int spread(int h) {
2    //异或,然后再取低31位
3    //HASH_BITS = 0x7fffffff;
4    return (h ^ (h >>> 16)) & HASH_BITS;
5}
6
7

【标记2】初始化数组


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1private final Node<K,V>[] initTable() {
2     Node<K,V>[] tab; int sc;
3     while ((tab = table) == null || tab.length == 0) {
4         if ((sc = sizeCtl) < 0)
5             Thread.yield(); // 被其他线程抢先了,睡眠一下,醒来再自旋判断
6         else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//CAS方式设置为-1,防止多线程同时初始化
7             try {
8                 if ((tab = table) == null || tab.length == 0) {
9                     //如果sizeCtl小于等于0 ,会赋值为DEFAULT_CAPACITY
10                     int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
11                     //申请数组空间
12                     @SuppressWarnings("unchecked")
13                     Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
14                     table = tab = nt;
15                     //n-0.25n = 0.75n
16                     sc = n - (n >>> 2);
17                 }
18             } finally {
19                 sizeCtl = sc;
20             }
21             break;
22         }
23     }
24     return tab;
25 }
26
27

数组一开始初始化容量是根据sizeCtl的值来决定的,然后sizeCtl会被更新为容量的0.75倍。

【标记3】帮助数据迁移


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
2      Node<K,V>[] nextTab; int sc;
3      //必须要原先tab不为null,而且f是ForwardingNode,并且新数组已经被赋值
4      if (tab != null && (f instanceof ForwardingNode) &&
5          (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
6          int rs = resizeStamp(tab.length);
7          while (nextTab == nextTable && table == tab &&
8                 (sc = sizeCtl) < 0) {
9              //下面的条件不怎么看懂,大概是在控制帮助数据迁移的线程数量
10              // 最后的transferIndex <= 0说明数据迁移已经全部由其他线程完成或正在进行
11              if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
12                  sc == rs + MAX_RESIZERS || transferIndex <= 0)
13                  break;
14              if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
15                  transfer(tab, nextTab);//进行数据迁移
16                  break;
17              }
18          }
19          return nextTab;
20      }
21//上面条件不满足就返回旧的数组
22      return table;
23  }
24
25

【标记4】超过阀值开始考虑建红黑树


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
1private final void treeifyBin(Node<K,V>[] tab, int index) {
2      Node<K,V> b; int n, sc;
3      if (tab != null) {
4          if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
5              tryPresize(n << 1);//小于MIN_TREEIFY_CAPACITY就只是数组扩容
6          else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
7              ...链表转红黑树
8          }
9      }
10  }
11
12private final void tryPresize(int size) {
13      //如果size大于一般MAXIMUM_CAPACITY就直接取MAXIMUM_CAPACITY,
14      //否则1.5倍size+1,然后向上取最近的2的次方的数
15      int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
16          tableSizeFor(size + (size >>> 1) + 1);//
17      int sc;
18      while ((sc = sizeCtl) >= 0) {//大于0说明没有正在扩容
19          Node<K,V>[] tab = table; int n;
20          //table为null说明是从ConcurrentHashMap(Map<? extends K, ? extends V> m)构造方法进来的
21          if (tab == null || (n = tab.length) == 0) {
22              n = (sc > c) ? sc : c;
23              if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
24                  //赋值为-1,待会可以退出出循环,且阻止其他线程进入这
25                  try {
26                      if (table == tab) {
27                          @SuppressWarnings("unchecked")
28                          Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
29                          table = nt;
30                          sc = n - (n >>> 2);//实际是0.75n
31                      }
32                  } finally {
33                      sizeCtl = sc;
34                  }
35              }
36          }
37          else if (c <= sc || n >= MAXIMUM_CAPACITY)//小于阈值或大于最大容量就不在扩容了
38              break;
39          else if (tab == table) {
40              int rs = resizeStamp(n);
41              if (sc < 0) {//小于0说明已经开始数据迁移了,新的数组肯定不为空
42                  Node<K,V>[] nt;
43                  if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
44                      sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
45                      transferIndex <= 0)
46                      break;
47                  if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
48                      transfer(tab, nt);//开始数据迁移
49              }
50              //sc还大于0,说明现在才进行数据迁移,而新的数组还没申请
51              else if (U.compareAndSwapInt(this, SIZECTL, sc,
52                                           (rs << RESIZE_STAMP_SHIFT) + 2))
53                  transfer(tab, null);
54          }
55      }
56  }
57
58

真正开始数据迁移是在transfer()方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
1private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
2      int n = tab.length, stride;
3      //cpu数量为1就赋值为n,否则赋值为(n >>> 3) / NCPU ,但最小为MIN_TRANSFER_STRIDE
4      if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
5          stride = MIN_TRANSFER_STRIDE; // subdivide range
6      if (nextTab == null) { //第一个线程开始扩容时,nextTab为null
7          try {
8              @SuppressWarnings("unchecked")
9              //容量为原来的两倍
10              Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
11              nextTab = nt;
12          } catch (Throwable ex) {      // try to cope with OOME
13              sizeCtl = Integer.MAX_VALUE;
14              return;
15          }
16          nextTable = nextTab;
17          transferIndex = n;//从数组末尾开始数据迁移
18      }
19      int nextn = nextTab.length;
20      //用于标记旧数组中某位置已完成数据迁移
21      ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
22      boolean advance = true;
23      boolean finishing = false; // to ensure sweep before committing nextTab
24      for (int i = 0, bound = 0;;) {
25          Node<K,V> f; int fh;
26          while (advance) {
27              int nextIndex, nextBound;
28              if (--i >= bound || finishing)//一开始-1会小于0
29                  advance = false;//小于界限或已经完成就赋值为false跳出循环
30              //transferIndex<=0说明所有数据迁移都有其他线程完成或进行中
31              else if ((nextIndex = transferIndex) <= 0) {
32                  i = -1;
33                  advance = false;
34              }
35              //CAS更新transferIndex值
36              else if (U.compareAndSwapInt
37                       (this, TRANSFERINDEX, nextIndex,
38                        nextBound = (nextIndex > stride ?
39                                     nextIndex - stride : 0))) {
40                  bound = nextBound;//赋值迁移数据的界限
41                  i = nextIndex - 1;//赋值迁移数据的开始下标
42                  advance = false;//跳出循环开始迁移
43              }
44          }
45          if (i < 0 || i >= n || i + n >= nextn) {
46              int sc;
47              if (finishing) {//数据迁移完成
48                  nextTable = null;//
49                  table = nextTab;
50                  sizeCtl = (n << 1) - (n >>> 1);//0.75n
51                  return;
52              }
53              if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
54                  if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
55                      return;//这里大概会筛选出一个线程进行一遍数据迁移检查
56                  finishing = advance = true;
57                  i = n; // 赋值为n,重新检查一遍是否所有数据已经迁移完成
58              }
59          }
60          else if ((f = tabAt(tab, i)) == null)
61              //原先为null,说明没有数据可迁移,直接cas赋值为fwd
62              advance = casTabAt(tab, i, null, fwd);
63          else if ((fh = f.hash) == MOVED)
64              //已经是ForwardNode,进入下次循环
65              advance = true; // already processed
66          else {
67              synchronized (f) {//锁住f
68                  if (tabAt(tab, i) == f) {//如果不等,说明第i个节点已变成ForwardingNode了
69                      //分成两条链表
70                      //ln是表示原先节点的哈希值&n为0
71                      //hn表示原先节点的哈希值&n为1
72                      Node<K,V> ln, hn;
73                      if (fh >= 0) {//说明是链表
74                          int runBit = fh & n;
75                          Node<K,V> lastRun = f;
76                          for (Node<K,V> p = f.next; p != null; p = p.next) {
77                              int b = p.hash & n;
78                              if (b != runBit) {
79                                  runBit = b;
80                                  lastRun = p;
81                              }
82                          }
83                          //lastRun节点及其后面的节点的hash&n都是0或都是1
84                          if (runBit == 0) {
85                              ln = lastRun;
86                              hn = null;
87                          }
88                          else {
89                              hn = lastRun;
90                              ln = null;
91                          }
92                          for (Node<K,V> p = f; p != lastRun; p = p.next) {
93                              int ph = p.hash; K pk = p.key; V pv = p.val;
94                              if ((ph & n) == 0)
95                                  ln = new Node<K,V>(ph, pk, pv, ln);
96                              else
97                                  hn = new Node<K,V>(ph, pk, pv, hn);
98                          }
99                          //设置新数组的第i位元素为ln链表
100                          setTabAt(nextTab, i, ln);
101                          //设置新数组的第i+n位元素为hn链表
102                          setTabAt(nextTab, i + n, hn);
103                          //设置旧数组中第i个元素为ForwardingNode
104                          setTabAt(tab, i, fwd);
105                          advance = true;
106                      }
107                      else if (f instanceof TreeBin) {//红黑树操作
108                          ...
109                      }
110                  }
111              }
112          }
113      }
114  }
115
116

2. 删除操作


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
1public V remove(Object key) {
2     return replaceNode(key, null, null);
3 }
4
5final V replaceNode(Object key, V value, Object cv) {
6     int hash = spread(key.hashCode());
7     for (Node<K,V>[] tab = table;;) {
8         Node<K,V> f; int n, i, fh;
9         if (tab == null || (n = tab.length) == 0 ||
10             (f = tabAt(tab, i = (n - 1) & hash)) == null)
11             break;//数组为null或数组第(n - 1) & hash下标下的元素为null直接退出
12         else if ((fh = f.hash) == MOVED)
13             tab = helpTransfer(tab, f);//帮忙迁移数据
14         else {
15             V oldVal = null;
16             boolean validated = false;
17             synchronized (f) {
18                 //多线程原因要判断是否是同个对象,因为可能被其他线程删除、更换了原来的元素
19                 if (tabAt(tab, i) == f) {
20                     if (fh >= 0) {//链表结构
21                         validated = true;//表示真正操作了
22                         for (Node<K,V> e = f, pred = null;;) {
23                             K ek;
24                             if (e.hash == hash &&
25                                 ((ek = e.key) == key ||
26                                  (ek != null && key.equals(ek)))) {
27                                 V ev = e.val;
28                                 //cv为null或当和原先的值和cv相等才替换
29                                 if (cv == null || cv == ev ||
30                                     (ev != null && cv.equals(ev))) {
31                                     oldVal = ev;
32                                     if (value != null)
33                                         e.val = value;
34                                     else if (pred != null)//断开链表
35                                         pred.next = e.next;
36                                     else//是链表头,直接把表头赋值为下一个节点
37                                         setTabAt(tab, i, e.next);
38                                 }
39                                 break;
40                             }
41                             pred = e;
42                             if ((e = e.next) == null)
43                                 break;//这里就是没有找到要删除的元素
44                         }
45                     }
46                     else if (f instanceof TreeBin) {//红黑树操作
47                         validated = true;
48                         TreeBin<K,V> t = (TreeBin<K,V>)f;
49                         TreeNode<K,V> r, p;
50                         if ((r = t.root) != null &&
51                             (p = r.findTreeNode(hash, key, null)) != null) {
52                             V pv = p.val;
53                             if (cv == null || cv == pv ||
54                                 (pv != null && cv.equals(pv))) {
55                                 oldVal = pv;
56                                 if (value != null)
57                                     p.val = value;
58                                 else if (t.removeTreeNode(p))
59                                     setTabAt(tab, i, untreeify(t.first));
60                             }
61                         }
62                     }
63                 }
64             }
65             if (validated) {//如果真正操作了
66                 if (oldVal != null) {//原先不为null
67                     if (value == null)//而且新赋值为null说明是删除元素
68                         addCount(-1L, -1);//【标记5】减少计数
69                     return oldVal;//返回旧的值
70                 }
71                 break;
72             }
73         }
74     }
75     return null;
76 }
77
78

3. 查询数据


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1public V get(Object key) {
2     Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
3     int h = spread(key.hashCode());
4     if ((tab = table) != null && (n = tab.length) > 0 &&
5         (e = tabAt(tab, (n - 1) & h)) != null) {
6         //第一个元素就是要找的元素
7         if ((eh = e.hash) == h) {
8             if ((ek = e.key) == key || (ek != null && key.equals(ek)))
9                 return e.val;
10         }
11         else if (eh < 0)//说明不是链表结构,调用不同节点类型的find方法
12             return (p = e.find(h, key)) != null ? p.val : null;
13         //到这里说明是链表结构
14         while ((e = e.next) != null) {
15             if (e.hash == h &&
16                 ((ek = e.key) == key || (ek != null && key.equals(ek))))
17                 return e.val;
18         }
19     }
20     return null;//没有找到元素
21 }
22
23

4. 计数操作

ConcurrentHashMap的计数方式是,先尝试在baseCount(一个int类型)上计数、如果遇到并发就在尝试在CounterCell上计数。而ConcurrentHashMap要拿到当前所有元素个数的时候就是baseCount+每个CounterCell的计数


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1@sun.misc.Contended static final class CounterCell {
2     volatile long value;
3     CounterCell(long x) { value = x; }
4 }
5
6final long sumCount() {
7     CounterCell[] as = counterCells; CounterCell a;
8     long sum = baseCount;
9     if (as != null) {
10         for (int i = 0; i < as.length; ++i) {
11             if ((a = as[i]) != null)
12                 sum += a.value;
13         }
14     }
15     //sum=baseCount + counterCells[0..n].value
16     return sum;
17 }
18
19

文中的【标记5】处是进行计数操作,代码如下,其中参数x表示要增加的计数(正负都有可能),check大于0表示要检查是否达到阈值扩容


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1private final void addCount(long x, int check) {
2      CounterCell[] as; long b, s;
3      if ((as = counterCells) != null ||
4          !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
5          //当counterCells为null或CAS设置baseCount的值失败时,进入下面
6          CounterCell a; long v; int m;
7          boolean uncontended = true;
8          //uncontended为false条件:当counterCells不为null,而且指定下标的计数器不为null,然后cas更新计数器值失败
9          if (as == null || (m = as.length - 1) < 0 ||
10              (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
11              !(uncontended =
12                U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
13              //ThreadLocalRandom.getProbe() & m:定位要用哪个计数器
14              //如果需要用到的计数器为null或没有累加成功就进入这里
15      //【标记6】全力以赴计数操作
16              fullAddCount(x, uncontended);
17              return;
18          }
19          if (check <= 1)
20              return;
21          s = sumCount();//计算一下容量,为下面如果check>=0时,用其来判断是否要扩容
22      }
23      if (check >= 0) {//check大于等于0才检查是否要扩容
24          Node<K,V>[] tab, nt; int n, sc;
25          while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
26                 (n = tab.length) < MAXIMUM_CAPACITY) {
27              int rs = resizeStamp(n);
28              if (sc < 0) {
29                  if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
30                      sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
31                      transferIndex <= 0)
32                      break;
33                  if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
34                      transfer(tab, nt);
35              }
36              else if (U.compareAndSwapInt(this, SIZECTL, sc,
37                                           (rs << RESIZE_STAMP_SHIFT) + 2))
38                  transfer(tab, null);
39              s = sumCount();//再次计算容量,看是否继续扩容
40          }
41      }
42  }
43
44

【标记6】处,当未能直接在baseCount上计数,且不能成功在CounterCell[]数组的指定下标计数器上计数,就会进入下面方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
1private final void fullAddCount(long x, boolean wasUncontended) {
2     int h;
3     if ((h = ThreadLocalRandom.getProbe()) == 0) {
4         ThreadLocalRandom.localInit();      // 初始化线程的Probe值
5         h = ThreadLocalRandom.getProbe();
6         wasUncontended = true;
7     }
8     boolean collide = false;                // True if last slot nonempty
9     for (;;) {
10         CounterCell[] as; CounterCell a; int n; long v;
11         if ((as = counterCells) != null && (n = as.length) > 0) {
12             if ((a = as[(n - 1) & h]) == null) {//当指定下标的计数器还是null
13                 if (cellsBusy == 0) { // 为0则表示还没其他线程抢到锁
14                     CounterCell r = new CounterCell(x); // Optimistic create
15                     if (cellsBusy == 0 &&
16                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//设置cellsBusy标志位为1
17                         boolean created = false;
18                         try {               // Recheck under lock
19                             CounterCell[] rs; int m, j;
20                             if ((rs = counterCells) != null &&
21                                 (m = rs.length) > 0 &&
22                                 rs[j = (m - 1) & h] == null) {
23                                 rs[j] = r;//给指定下标元素赋值一个新的CounterCell实例
24                                 created = true;
25                             }
26                         } finally {
27                             cellsBusy = 0;
28                         }
29                         if (created)
30                             break;
31                         continue;           // Slot is now non-empty
32                     }
33                 }
34                 collide = false;
35             }
36             //进入下面说明指定下标的CounterCell不为空了
37             else if (!wasUncontended)       // 调该方法前CAS失败
38                 wasUncontended = true;      // Continue after rehash
39             else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
40                 break;//赋值成功
41             else if (counterCells != as || n >= NCPU)
42                 collide = false;            // At max size or stale
43             else if (!collide)
44                 collide = true;
45             else if (cellsBusy == 0 &&
46                      U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//到这里竞争太厉害,要扩容?
47                 try {
48                     if (counterCells == as) {// Expand table unless stale
49                         //扩容为原来的两倍
50                         CounterCell[] rs = new CounterCell[n << 1];
51                         for (int i = 0; i < n; ++i)
52                             rs[i] = as[i];//数据迁移
53                         counterCells = rs;
54                     }
55                 } finally {
56                     cellsBusy = 0;
57                 }
58                 collide = false;
59                 continue;                   // Retry with expanded table
60             }
61             //重新随机一个probe
62             h = ThreadLocalRandom.advanceProbe(h);
63         }
64         //这里是counterCells数组还没初始化
65         else if (cellsBusy == 0 && counterCells == as &&
66                  U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
67             boolean init = false;
68             try {                           // Initialize table
69                 if (counterCells == as) {//判断是否还是空
70                     CounterCell[] rs = new CounterCell[2];//一开始只有2个
71                     rs[h & 1] = new CounterCell(x);//根据最低位选一个计数器
72                     counterCells = rs;
73                     init = true;
74                 }
75             } finally {
76                 cellsBusy = 0;
77             }
78             if (init)
79                 break;//直接退出了
80         }
81         else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
82             break;   //在baseCount上计数成功
83     }
84 }
85
86

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google Adsense广告CPC(单元点击价格)为什么会减少??

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索