上一篇文章给大家详细介绍了下跳表的概念,还不清楚的小伙伴可以去看看,跳表(SkipList)了解下?
回顾
这里再贴下跳表结构图以及插入节点的GIF图,大家再眼熟下:
通过之前的学习,我们知道跳表采用空间换时间的算法,他的插入和查找的效率O(logn),效率不低于红黑树,但是其原理和实现的复杂度要比红黑树简单多了。一般来说会操作链表List,就会对SkipList毫无压力。
ConcurrentSkipListMap数据结构
从名字就可以猜到,ConcurrentSkipListMap的数据结构使用的是跳表,看下ConcurrentSkipListMap的数据结构图:
其实ConcurrentSkipListMap头部注解也画了个图:
重要的内部类
ConcurrentSkipListMap包含了很多内部类,下面我们介绍3个重要的内部类:Index、HeadIndex、Node。
其实看上面的数据结构图就知道了这三个内部类主要是干嘛的了
Index
1 | static class Index<K,V> { |
可以看到,Index节点包括一个Node节点的引用、一个指向下层的down域和一个指向下一个Index的right域,即对应数据结构中的Index节点。sun.misc.Unsafe这个包熟悉吧?CAS嘛,不清楚的同学可以去看看我写的 CAS机制是什么鬼? 修改right域的时候确保原子性。
HeadIndex
1 | /** |
配合数据结构图可以知道,HeadIndex主要是表示当前层级的,继承自Index类,并且在Index类的基础上添加了level域。
Node
1 | static final class Node<K,V> { |
Node就是存放原始数据的节点了,包含了key、value、next域,链表结构,同样使用CAS确保修改value和next的原子性。
属性
1 | public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> |
构造函数
四个构造函数1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53/**
* Constructs a new, empty map, sorted according to the
* {@linkplain Comparable natural ordering} of the keys.
*/
public ConcurrentSkipListMap() {
this.comparator = null;
initialize();
}
/**
* Constructs a new, empty map, sorted according to the specified
* comparator.
*
* @param comparator the comparator that will be used to order this map.
* If {@code null}, the {@linkplain Comparable natural
* ordering} of the keys will be used.
*/
public ConcurrentSkipListMap(Comparator<? super K> comparator) {
this.comparator = comparator;
initialize();
}
/**
* Constructs a new map containing the same mappings as the given map,
* sorted according to the {@linkplain Comparable natural ordering} of
* the keys.
*
* @param m the map whose mappings are to be placed in this map
* @throws ClassCastException if the keys in {@code m} are not
* {@link Comparable}, or are not mutually comparable
* @throws NullPointerException if the specified map or any of its keys
* or values are null
*/
public ConcurrentSkipListMap(Map<? extends K, ? extends V> m) {
this.comparator = null;
initialize();
putAll(m);
}
/**
* Constructs a new map containing the same mappings and using the
* same ordering as the specified sorted map.
*
* @param m the sorted map whose mappings are to be placed in this
* map, and whose comparator is to be used to sort this map
* @throws NullPointerException if the specified sorted map or any of
* its keys or values are null
*/
public ConcurrentSkipListMap(SortedMap<K, ? extends V> m) {
this.comparator = m.comparator();
initialize();
buildFromSorted(m);
}
可以看到都调用了initialize()方法,进行初始化工作。
1 | private void initialize() { |
put操作
简单判断下非空,然后执行doPut()函数,看过JDK源码的应该很熟悉这种操作了。1
2
3
4
5public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}
doPut()源码很多,咋们拆开来看。
1 | private V doPut(K key, V value, boolean onlyIfAbsent) { |
可以看到doPut有三个入参K、V、onlyIfAbsent。KV大家都知道,这个boolean类型的onlyIfAbsent的作用是如果存在当前key时,该做何动作?当onlyIfAbsent为false时,替换value,为true时,则返回该value。
主题其中还调用了findPredecessor()方法,传入key来确认位置,该方法就是确认key插入位置的。
findPredecessor()源码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
if (key == null) // 键为空,抛出空异常
throw new NullPointerException(); // don't postpone errors
for (;;) { //死循环
// 从head节点开始,head是level最高级别的headIndex
for (Index<K,V> q = head, r = q.right, d;;) {
// r != null,表示该节点右边还有节点,需要比较
if (r != null) {
// n为当前Node节点
Node<K,V> n = r.node;
// 为当前key
K k = n.key;
// value == null,表示该节点已经被删除了
if (n.value == null) {
// 通过unlink()方法过滤掉该节点
if (!q.unlink(r))
break; // restart
// r为rightIndex节点
r = q.right; // reread r
continue;
}
// value != null,节点存在
// 如果key 大于r节点的key 则往前进一步
if (cpr(cmp, key, k) > 0) {
// 向右移动
q = r;
r = r.right;
continue;
}
}
// 到达最右边,如果dowm == null,表示指针已经达到最下层了,直接返回q对应的Node节点
if ((d = q.down) == null)
return q.node;
// 向下移动
q = d;
// d的right节点
r = d.right;
}
}
}
findPredecessor()方法就是寻找前辈节点。从最高层HeadIndex开始向右一个个比较,直到right域为null或者右边的key大于当前的key,然后通过down域向下寻找,直到down为null,找到后返回原始数据的Node节点。
这个过程中还会执行另一个操作unlink(),判断节点的value是否为null,如果为null,表示该节点已经被删除了,通过调用unlink()方法删除该节点。
OK,现在通过findPredecessor()方法找到前辈节点后,继续走。
1 | // 无限循环 |
找到前辈节点后,通过以上步骤找到合适的位置插入节点。new一个Node节点通过casNext()方法加入到原始数据Node链表中。
在原始数据插入节点后,还远远不够,毕竟这是跳表,不是链表。还需要新建Index索引,新建Index之前,还需要确认新节点的高度(level),所以就采用如下方式来生成随机数。
1 | int rnd = ThreadLocalRandom.nextSecondarySeed(); |
生成随机数之后进行判断,0x80000001这个数字比较特殊,为-2147483647,生成随机数为正偶数才会更新层级(通过最高位和最低位不为1验证)。
1 | if ((rnd & 0x80000001) == 0) { // test highest and lowest bits |
之后的while循环用来计算跳表的level,level等于从低2位开始向左有多少个连续的1的个数。
算出level之后,会有两种情况,一是level高于最高层次的话就需要新增一层,二是小于等于当前最高层次。
1 | // idx:新添加的index的level层index |
当level小于等于当前层级,就会从下到上构建,每一层初始化一个Index,该Index的down域指向下一层的节点,right域为null,node直线新插入的Node。
当level大于当前层级时,就新增一层。就行以下逻辑操作:
1.初始化一个长度为level+1长度的Index数组, 从下到到上构建新的节点Index,并赋值down域right域node域。
2.通过一个死循环,从最高层处理,新增一个HeadIndex,该HeadIndex的node域指向之前的最高层的Node,right域指向刚才常见的对应层次的Index,level为当前层次,down域指向前的最高层的HeadIndex。最后通过CAS操作吧当前的head与新加入层的head进行替换。
通过以上步骤,我们已经将Node插入到原始数据链表,确定了level,并且生成了相应的Index以及HeadIndex。现在还差最后一步了,就是把这些Index插入到相对应的层中。
1 |
|
这段源码主要是有两步,一是找到相应层次的该节点插入的位置,第二部分在该位置插入,然后下移。
到这里put操作就结束了,代码量还是很多的,理解了就行,稍微总结下。
1.先通过findPredecessor()方法找到前辈节点Node,这个过程会通过unlink()方法删除一些已经被标记的节点。
2.找到前辈节点后,新建Node节点插入到原始数据链表中。
3.随机生成一个数,判定是否需要添加该跳表的层级,并生成插入Node的Index和HeadIndex。
4.将生成的Index链表插入到跳表结构中。
get操作
看源码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) == 0) {
"unchecked") V vv = (V)v; (
return vv;
}
if (c < 0)
break outer;
b = n;
n = f;
}
}
return null;
}
代码量直接比put操作少三分之二了,简单了很多,过程就和doPut操作的第一步差不多。
通过findPredecessor()方法找到前辈节点,然后从前驱结点开始往后查找,找到与key相等的结点,则返回该结点,否则,返回null。在这个过程中会删除一些已经标记为删除状态的结点。
remove操作
1 | final V doRemove(Object key, Object value) { |
doRemove操作和get差不多,找到需要删除节点的前辈节点,如果在查找过程中发现有已经删除的节点,就帮助清除节点。找到后不会直接删除,先利用 CAS 给这个节点添加一个删除标识(next 节点改为一个指向自身的节点),然后再利用 CAS 解除它的链接;如果途中 CAS 执行失败,则调用findNode方法来清除已经删除的节点。
最后检查该节点是不是这层唯一的index,如果是的话,调用tryReduceLevel()方法把这层干掉,完成删除。
从remove源码可以看出,remove方法仅仅是把Node的value设置null,并没有真正删除该节点Node,其实从上面的put操作、get操作我们可以看出,他们在寻找节点的时候都会判断节点的value是否为null,如果为null,则调用unLink()方法取消关联关系,完成删除操作。
tryReduceLevel降级操作
remove里面的删除层级操作。
如果最高的前三个HeadIndex不为空,并且其right域都为null,那么就将level减少1层,并将head设置为之前head的下一层,设置完成后,还有检测之前的head的right域是否为null,如果为null,则减少层级成功,否则再次将head设置为h。
h.level > 3代表跳表只有在层级大于3时才可以降级。
1 | private void tryReduceLevel() { |
OK,这篇文章到此也就结束了,ConcurrentSkipListMap过程不算复杂,重要的还是理解。