JUC源码分析-容器-ConcurrentSkipListMap和ConcurrentSkipListSet
概述
ConcurrentSkipListMap:线程安全的有序Map,TreeMap是线程不安全的有序Map. 数据使用表单项链表存储,它借助一种跳表(Skip Lisy)的数据结构,可简单理解为 附加的索引,后面有详细分析。
ConcurrentSkipListSet:借助ConcurrentSkipListMap 实现的线程安全的有序Set,它们俩的关系就像 TreeMap 和TreeSet 。本章我们重点分析ConcurrentSkipListMap。
核心属性和数据结构
SkipList(跳表)是一个有层级的 阶梯状的索引。
Node类,数据节点, 有以下属性
final K key; 实例key
volatile Object value; 实例值
volatile Node<K,V> next; 指向下个节点 ,用于构建链表
private transient volatile HeadIndex<K,V> head;//指向最高层的索引头。
Index: 普通的索引节点。 属性如下
final Node<K,V> node;// 数据节点
final Index<K,V> down;//下层节点 ,方便到下层节点寻找。
volatile Index<K,V> right;//右侧节点, 通过 compartTo比较 key大于 node key的节点。
HeadIndex 索引头部节点 继承 Index 用于构建SkipList 的 每一个层级的头节点。独有属性如下
final int level;// 索引层级 ,从1开始
Skip List
首先来看一下本章最重要的一种数据结构—跳表(Skip List),它是替代平衡树的一种据结构,和红黑树不相同的是,跳表对于树的平衡的实现是基于一种随机化的算法的,这样也就是说跳表的插入和删除的工作是比较简单的。跳表具有以下特性:
- 跳表分为多层,层级越高跳跃性越大,数据越少
- 跳表的层级是通过“掷硬币”方式来决定增长的,也就是说在增加元素时,概率增长层数
- 跳表的第一层包含所有元素
- 每层的元素集合必须包含序数最小的元素
- 查找数据时,按照从上到下,从左往右的顺序查找
- 时间复杂度O(log n),空间复杂度O(n)
图中链表按照顺序排列,找到节点15需要5次比较
加上跳表后,找到节点15需要2次比较(向下查找不算比较次数)
源码分析
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}
doPut 分为3个步骤
- 按照顺序找到前置节点
- 根据前置节点插入新节点
- 生成新的索引层或插入索引节点
先分析第一个步骤
private Node<K,V> findPredecessor(Comparable<? super K> key) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
for (;;) {//
Index<K,V> q = head;//head指向当前最高级别索引层的第一个节点
Index<K,V> r = q.right;// 找到下个节点
for (;;) {
if (r != null) {// 下个节点不为空
Node<K,V> n = r.node;
K k = n.key;
if (n.value == null) {// n被删除
if (!q.unlink(r))
break; // restart 删除节点失败,从头部开始
r = q.right; // reread r 重读取q的下个节点
continue;
}
if (key.compareTo(k) > 0) { //链表是升序排列,比当前节点大,寻找下个节点
q = r;
r = r.right;
continue;
}
}
//要写入的节点key小于等于 r.node.key,q是要寻找的前驱节点。
Index<K,V> d = q.down;
if (d != null) {//如果有down节点 向下寻找
q = d;
r = d.right;
} else
return q.node;//返回前置节点
}
}
}
逻辑总结:
先向右找,如果右侧节点 的key值比当前节点值小,继续向右寻找,否则向下寻找。
有可能出现 对比次数 比遍历链表多的情况
如下 查找结点6 经过4次对比。遍历链表只有2次对比。
如下图,将索引分为2个部分 part1和part2, part1部分索引节点比较多,part2部分索引比较少,当要查找的节点位于part2的部分,查询这个节点的次数少,反之,查询节点的次数多,有可能比遍历链表的比较次数还多,如上图的例子。
再分析第二步骤
private V doPut(K kkey, V value, boolean onlyIfAbsent) {//onlyIfAbsent==false
Comparable<? super K> key = comparable(kkey);//转换成Comparable
for (;;) {//处理CAS 链接节点
Node<K,V> b = findPredecessor(key);//找到前置节点
Node<K,V> n = b.next;
for (;;) {
if (n != null) {//检查前置节点,处理不一致,并发,删除
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;//并发的情况下,可能插入了其他节点
Object v = n.value;
if (v == null) { // n is deleted n节点被删除
n.helpDelete(b, f);
break;
}
if (v == n || b.value == null) // b is deleted
break;
int c = key.compareTo(n.key);
if (c > 0) {//继续向后寻找
b = n;
n = f;
continue;
}
if (c == 0) {//相等
if (onlyIfAbsent || n.casValue(v, value))// key相等 更新value值成功,返回原来value
return (V)v;
else
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
//创建 新节点
Node<K,V> z = new Node<K,V>(kkey, value, n);//n是z的next节点
if (!b.casNext(n, z))//修改b的next节点为z
break; // restart if lost race to append to b
int level = randomLevel();//随机一个层级
if (level > 0)//生成新的索引层或插入索引节点
insertIndex(z, level);
return null;
}
}
}
返回 值不等于null 说明 节点已经存在,否则 节点不存在,新插入节点。
当节点存在 onlyIfAbsent==true 不替换value ,否则替换。
private int randomLevel() {
int x = randomSeed;
x ^= x << 13;
x ^= x >>> 17;
randomSeed = x ^= x << 5;
if ((x & 0x80000001) != 0) // test highest and lowest bits
return 0;
int level = 1;
while (((x >>>= 1) & 1) != 0) ++level;
return level;
}
这个随机生成层级值得方法比较难以理解,总之就是50%的几率返回0,25%的几率返回1,12.5%的几率返回2...最大返回31。
分析第三步骤
这个步骤理解起来最困难,是分析跳表实现的关键
图解2种情况。
当生成一个索引节点时,增加一层 所有的层都要插入这个节点,下图是没有插入新节点前的情况。
当加入一个节点16,并增加一层时,需要为所有层级创建节点,如下图 红框内的节点,先创建高层的head节点,设置索引节点为右节点,旧的最高层head为下层节点. 最后把红框内的节点(高层级节点除外)与相应的层级链接。
insertIndex
private void insertIndex(Node<K,V> z, int level) {
HeadIndex<K,V> h = head;//当前head指向最高层级的首节点
int max = h.level;//获取层级值
if (level <= max) {//插入一个索引节点
Index<K,V> idx = null;
for (int i = 1; i <= level; ++i)//有多少个
idx = new Index<K,V>(z, idx, null);
addIndex(idx, h, level);//向从1到level的所有层级插入索引节点 level 是最大的需要链接节点层级值,用于限制链接层级数。
} else { //加入新的层级
level = max + 1;//不管生成的层级多大 只能在当前最大级加1
Index<K,V>[] idxs = (Index<K,V>[])new Index[level+1];
Index<K,V> idx = null;
for (int i = 1; i <= level; ++i)//创建红框内的节点
idxs[i] = idx = new Index<K,V>(z, idx, null);
HeadIndex<K,V> oldh;
int k;
for (;;) {
oldh = head;
int oldLevel = oldh.level;
if (level <= oldLevel) { // lost race to add level
k = level;
break;
}
HeadIndex<K,V> newh = oldh;
Node<K,V> oldbase = oldh.node;
for (int j = oldLevel+1; j <= level; ++j)//正常情况下只能增加一层
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);//将head节点和对应的索引节点链接起来
if (casHead(oldh, newh)) {//如果更新head成功,oldLevel将作为限制层级数
k = oldLevel;//创建head的时候已经将head和索引节点链接上了,后面只需要将还没链接的层级处理即可。
break;
}
}
addIndex(idxs[k], oldh, k);
}
}
addIndex(idxs[k], oldh, k)就不分析了,知道了要干的事,分析起来有也比较容易。
节点查询
public V get(Object key) {
return doGet(key);
}
private V doGet(Object okey) {
Comparable<? super K> key = comparable(okey);
for (;;) {//循环 直到 返回结果
Node<K,V> n = findNode(key);
if (n == null)// 如果没找到返回 null
return null;
Object v = n.value;
if (v != null)//找到了 值不是null 返回 value
return (V)v;
}
}
private Node<K,V> findNode(Comparable<? super K> key) {
for (;;) {
Node<K,V> b = findPredecessor(key);// 找到前置节点
Node<K,V> n = b.next;// 下个节点
for (;;) {// 从前置节点向后遍历,直到找到或发现不存在(链表是升序的)
if (n == null)
return null;
Node<K,V> f = n.next;
if (n != b.next) // 下个节点改变 出现不一致读,重试
break;
Object v = n.value;
if (v == null) { // n 被删除
n.helpDelete(b, f);
break;
}
if (v == n || b.value == null) // b 被删除 重试
break;
int c = key.compareTo(n.key); //比较
if (c == 0) //找到
return n;
if (c < 0)//不存在
return null;
b = n;//向后遍历
n = f;
}
}
}
逻辑总结:
- 找到前置节点
- 从前置节点向后遍历,直到找到或发现不存在
节点查询 主要依赖findPredecessor(key); 实现。
ConcurrentSkipListSet源码分析
public ConcurrentSkipListSet() {
m = new ConcurrentSkipListMap<E,Object>();
}
插入节点
public boolean add(E e) {
return m.putIfAbsent(e, Boolean.TRUE) == null;
}
public V putIfAbsent(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, true);//注意 这里 onlyIfAbsent==true
}
ConcurrentSkipListSet 插入节点最终依赖ConcurrentSkipListMap doPut 实现。
前面已经讲过,当节点存在 onlyIfAbsent==true 不替换value ,否则替换。 这里 onlyIfAbsent==true, 实现了 只有不存在节点才替换的功能。
ConcurrentSkipListSet 的其他方法比较简单,留给 各位童鞋 自己分析。
总结: ConcurrentSkipListMap使用 SkipList(跳表)结构实现,理解SkipList(跳表)数据结构,掌握 插入和查询节点的步骤是本文的重点。