上期回顾
先说明下,本文的ConcurrentHashMap是基于jdk1.8的
之前的几篇文章 hashCode()和hash算法的那些事儿, HashMap扫盲 , 浅谈HashMap线程安全问题 详细给大家分析了一波HashMap源码实现,以及并发情况下HashMap的隐患,其解决方案谈到ConcurrentHashMap,这篇文章就开始谈谈ConcurrentHashMap。ConcurrentHashMap和HashMap有很多相同点,类似的地方本文就不多说了,可以看我之前的文章,就默认在看的各位是看过HashMap源码的优秀程序猿啦~
ConcurrentHashMap简介
在并发场景下,ConcurrentHashMap是一个经常被使用的数据结构。相对于HashTable和Collections.synchronizedMap(),ConcurrentHashMap在线程安全的基础上提供了更好的写并发能力,利用了锁分段的思想提高了并发度。
在此膜拜下Doug Lea大师!!牛逼!!!
ConcurrentHashMap在JDK1.6,1.7实现主要思路:
ConcurrentHashMap采用了分段锁的设计,只有在同一个分段内才存在竞态关系,不同的分段锁之间没有锁竞争。相比于对整个Map加锁的设计,分段锁大大的提高了高并发环境下的处理能力。
ConcurrentHashMap中的分段锁称为Segment
,它即类似于HashMap(JDK7与JDK8中HashMap的实现)的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表;同时又是一个ReentrantLock
(Segment
继承了ReentrantLock
)。
贴一个1.7的ConcurrentHashMap数据结构图:
关于1.7,本文不再累赘,具体实现大家可以百度,以后我可能也会写个基于1.7的ConcurrentHashMap。
ConcurrentHashMap在JDK1.8实现主要思路:
ConcurrentHashMap在JDK8中进行了巨大改动,光是代码量就从1000多行增加到6000行!1.8摒弃了Segment
(锁段)的概念,引入了大量的CAS
无锁操机制以及synchronized
来保证多线程操作的安全性。到这里有人就有疑问了,怎么又开始使用synchronized
啦?1.7的ReentrantLock
方式不行吗??其实是1.8对synchronized
也进行了很多优化,使用synchronized
相较于ReentrantLock的性能会持平甚至在某些情况更优。
贴一个1.8的ConcurrentHashMap数据结构图:
可以看到,和HashMap 1.8的数据结构很像。底层数据结构改变为采用数组+链表+红黑树的数据形式。
和HashMap1.8相同的一些地方
- 底层数据结构一致
- HashMap初始化是在第一次put元素的时候进行的,而不是init
- HashMap的底层数组长度总是为2的整次幂
- 默认树化的阈值为 8,而链表化的阈值为 6
- hash算法也很类似,但多了一步
& HASH_BITS
,该步是为了消除最高位上的负符号,hash的负在ConcurrentHashMap中有特殊意义表示在扩容或者是树节点
1 | static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash |
这些相同的地方我就不多说了,前几篇关于HashMap的文章都深度分析过了,有兴趣的可以去看看,老司机忽略。
一些关键属性
1 | private static final int MAXIMUM_CAPACITY = 1 << 30; //数组最大大小 同HashMap |
很多关键字都很眼熟吧。
Unsafe与CAS
在阅读ConcurrentHashMap源码过程中,你会发现有许多的U
,U.compareAndSwapXXX
的方法,比如:1
2
3static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
引用的是sun.misc.Unsafe
包里面的方法,这个包哪里还用到了?看过我之前写的 CAS机制是什么鬼? 读者可能会知道。
JDK原子变量类可以保证原子操作,看下AtomicReference
原子变量类内部源码:1
2
3public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
这个方法是利用一个CAS算法实现无锁化的修改值的操作,他可以大大降低锁代理的性能消耗。其中unsafe.compareAndSwapObject()
就是引用了sun.misc.Unsafe
包里面的compareAndSwapObject
方法。
TreeBin是什么鬼?
大家都知道HashMap1.8中,链表长度超过8,就会转为TreeNode(红黑树)1
2else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
但是来看看ConcurrentHashMap中的:1
2
3
4else if (f instanceof TreeBin) {
...
}
咦?我们的TreeNode
同志哪里去啦?有经验的读者可能已经推算出这个地方就是判断节点是否红黑树的依据了。
其实TreeBin就是封装TreeNode的容器,TreeBin这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象。
put操作
和HashMap差不多,最后调用的是putVal()方法,直接贴putVal()代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 根据 key 计算出 hashcode
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 判断是否需要进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// f 即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果当前位置的 hashcode == MOVED == -1,则需要进行扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 如果都不满足,则利用 synchronized 锁写入数据
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
// 当前为链表,在链表中插入新的键值对
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 当前为红黑树,将新的键值对插入到红黑树中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 插入完键值对后再根据实际大小看是否需要转换成红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容
addCount(1L, binCount);
return null;
}
putVal操作和HashMap1.8操作过程总体上差不多的,说下过程:
spread()
方法获取hash,减小hash冲突- 判断是否初始化table数组,没有的话调用
initTable()
方法进行初始化 - 判断是否能直接将新值插入到table数组中
- 判断当前是否在扩容,
MOVED
为-1说明当前ConcurrentHashMap正在进行扩容操作,正在扩容的话就进行协助扩容 - 当table[i]为链表的头结点,在链表中插入新值,通过synchronized (f)的方式进行加锁以实现线程安全性。
- 在链表中如果找到了与待插入的键值对的key相同的节点,就直接覆盖
- 如果没有找到的话,就直接将待插入的键值对追加到链表的末尾
- 当table[i]为红黑树的根节点,在红黑树中插入新值/覆盖旧值
- 根据当前节点个数进行调整,否需要转换成红黑树(个数大于等于8,就会调用
treeifyBin
方法将tabel[i]第i个散列桶
拉链转换成红黑树) - 对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就进行扩容
我们可以发现JDK8中的实现也是锁分离的思想,只是锁住的是一个Node,而不是JDK7中的Segment,而锁住Node之前的操作是无锁的并且也是线程安全的,建立在之前提到的原子操作上。
ps:ConcurrentHashMap不允许key或value为null值
addCount()方法
在put方法结尾处调用了addCount方法,把当前ConcurrentHashMap的元素个数+1这个方法一共做了两件事,更新baseCount的值,检测是否进行扩容。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54// 从 putVal 传入的参数是 1, binCount,binCount 默认是0,只有 hash 冲突了才会大于 1.且他的大小是链表的长度(如果不是红黑数结构的话)。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//利用CAS方法更新baseCount的值
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//如果需要检查,检查是否需要扩容,在 putVal 方法调用时,默认就是要检查的。
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 如果map.size() 大于 sizeCtl(达到扩容阈值需要扩容) 且
// table 不是空;且 table 的长度小于 1 << 30。(可以扩容)
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 根据 length 得到一个标识
int rs = resizeStamp(n);
// 如果正在扩容
if (sc < 0) {
// 如果 sc 的低 16 位不等于 标识符(校验异常 sizeCtl 变化了)
// 如果 sc == 标识符 + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
// 如果 sc == 标识符 + 65535(帮助线程数已经达到最大)
// 如果 nextTable == null(结束扩容了)
// 如果 transferIndex <= 0 (转移状态变化了)
// 结束循环
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果可以帮助扩容,那么将 sc 加 1. 表示多了一个线程在帮助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 扩容
transfer(tab, nt);
}
// 如果不在扩容,将 sc 更新:标识符左移 16 位 然后 + 2. 也就是变成一个负数。高 16 位是标识符,低 16 位初始是 2.
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 更新 sizeCtl 为负数后,开始扩容。
transfer(tab, null);
s = sumCount();
}
}
}
CounterCell类是只有一个volatile的long类型变量
1 | if ((as = counterCells) != null || |
这个if主要干的事
- counterCells初始是null,则运行后面的CAS对baseCount增加,但存在多线程可能会导致CAS增加失败,则运行fullAddCount把数量值存到counterCells数组中
- counterCells不null之前已经有过baseCount CAS失败,这种能失败大多代表并发不低,则在counterCells数组中使用随机数随便取一个索引位置之前记录的数据进行数量累加,如果在counterCells数组中CAS累加因多线程还是失败这继续fullAddCount,fullAddCount中会触发扩容等操作,因此直接return
总结下来看,addCount 方法做了两件事情:
- 对 table 的长度加一。无论是通过修改 baseCount,还是通过使用 CounterCell。当 CounterCell 被初始化了,就优先使用他,不再使用 baseCount。
- 检查是否需要扩容,或者是否正在扩容。如果需要扩容,就调用扩容方法,如果正在扩容,就帮助其扩容。
size()方法
1 | public int size() { |
因为元素个数保存baseCount中,部分元素的变化个数保存在CounterCell数组中,CounterCell数组中保存的就是并发导致 CAS 失败的数据,所以通过累加baseCount和CounterCell数组中的数量,即可得到元素的总个数。
1 | public long mappingCount() { |
顺带说下JDK 8 推荐使用mappingCount
方法,因为这个方法的返回值是 long 类型,不会因为 size 方法是 int 类型限制最大值
helpTransfer()
从字面上就可以知道,这是帮助扩容。在putVal()
方法中,如果发现线程当前 hash 冲突了,也就是当前 hash 值对应的槽位有值了,且如果这个值是 -1 (MOVED),说明 Map 正在扩容。那么就帮助 Map 进行扩容。以加快速度。具体代码如下:
1 | final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { |
总结一下:
当put元素的时候,如果发现这个节点的元素类型是 forward 的话,就帮助正在扩容的线程一起扩容,提高速度。其中, sizeCtl 是关键,该变量高 16 位保存 length 生成的标识符,低 16 位保存并发扩容的线程数,通过这连个数字,可以判断出,是否结束扩容了。这也是为什么ConcurrentHashMap扩容性能那么高的原因。
扩容方法 transfer
ConcurrentHashMap的扩容操作总体来说分为两个部分:
- 构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的。这个单线程的保证是通过
RESIZE_STAMP_SHIFT
这个常量经过一次运算来保证的。 - 将原来table中的元素复制到nextTable中,这里允许多线程进行操作。
1 | /** |
扩容过程有点复杂,这里主要涉及到多线程并发扩容,实在看不懂也没关系 ,只要记住结论就行。ForwardingNode的作用就是支持扩容操作,将已处理的节点和空节点置为ForwardingNode,并发处理时多个线程经过ForwardingNode就表示已经遍历了,就往后遍历。