JUC源码分析-容器-ConcurrentSkipListMap和ConcurrentSkipListSet

概述

ConcurrentSkipListMap:线程安全的有序Map,TreeMap是线程不安全的有序Map. 数据使用表单项链表存储,它借助一种跳表(Skip Lisy)的数据结构,可简单理解为 附加的索引,后面有详细分析。

ConcurrentSkipListSet:借助ConcurrentSkipListMap 实现的线程安全的有序Set,它们俩的关系就像 TreeMap 和TreeSet 。本章我们重点分析ConcurrentSkipListMap。

 

核心属性和数据结构

SkipList(跳表)是一个有层级的 阶梯状的索引。

Node类,数据节点, 有以下属性

final K key; 实例key

volatile Object value; 实例值

volatile Node<K,V> next; 指向下个节点 ,用于构建链表

private transient volatile HeadIndex<K,V> head;//指向最高层的索引头。

Index: 普通的索引节点。 属性如下

final Node<K,V> node;// 数据节点

final Index<K,V> down;//下层节点 ,方便到下层节点寻找。

volatile Index<K,V> right;//右侧节点, 通过 compartTo比较 key大于 node key的节点。

HeadIndex 索引头部节点 继承 Index 用于构建SkipList 的 每一个层级的头节点。独有属性如下

final int level;// 索引层级 ,从1开始

 

Skip List

首先来看一下本章最重要的一种数据结构—跳表(Skip List),它是替代平衡树的一种据结构,和红黑树不相同的是,跳表对于树的平衡的实现是基于一种随机化的算法的,这样也就是说跳表的插入和删除的工作是比较简单的。跳表具有以下特性:

  1. 跳表分为多层,层级越高跳跃性越大,数据越少
  2. 跳表的层级是通过“掷硬币”方式来决定增长的,也就是说在增加元素时,概率增长层数
  3. 跳表的第一层包含所有元素
  4. 每层的元素集合必须包含序数最小的元素
  5. 查找数据时,按照从上到下,从左往右的顺序查找
  6. 时间复杂度O(log n),空间复杂度O(n)

图中链表按照顺序排列,找到节点15需要5次比较

 

JUC源码分析-容器-ConcurrentSkipListMap和ConcurrentSkipListSet

加上跳表后,找到节点15需要2次比较(向下查找不算比较次数)

 

JUC源码分析-容器-ConcurrentSkipListMap和ConcurrentSkipListSet

源码分析

public V put(K key, V value) {

if (value == null)

throw new NullPointerException();

return doPut(key, value, false);

}

doPut 分为3个步骤

  1. 按照顺序找到前置节点
  2. 根据前置节点插入新节点
  3. 生成新的索引层或插入索引节点

先分析第一个步骤

private Node<K,V> findPredecessor(Comparable<? super K> key) {

if (key == null)

throw new NullPointerException(); // don't postpone errors

for (;;) {//

Index<K,V> q = head;//head指向当前最高级别索引层的第一个节点

Index<K,V> r = q.right;// 找到下个节点

for (;;) {

if (r != null) {// 下个节点不为空

Node<K,V> n = r.node;

K k = n.key;

if (n.value == null) {// n被删除

if (!q.unlink(r))

break; // restart 删除节点失败,从头部开始

r = q.right; // reread r 重读取q的下个节点

continue;

}

if (key.compareTo(k) > 0) { //链表是升序排列,比当前节点大,寻找下个节点

q = r;

r = r.right;

continue;

}

}

//要写入的节点key小于等于 r.node.key,q是要寻找的前驱节点。

Index<K,V> d = q.down;

if (d != null) {//如果有down节点 向下寻找

q = d;

r = d.right;

} else

return q.node;//返回前置节点

}

}

}

逻辑总结:

先向右找,如果右侧节点 的key值比当前节点值小,继续向右寻找,否则向下寻找。

 

有可能出现 对比次数 比遍历链表多的情况

如下 查找结点6 经过4次对比。遍历链表只有2次对比。

JUC源码分析-容器-ConcurrentSkipListMap和ConcurrentSkipListSet

 

如下图,将索引分为2个部分 part1和part2, part1部分索引节点比较多,part2部分索引比较少,当要查找的节点位于part2的部分,查询这个节点的次数少,反之,查询节点的次数多,有可能比遍历链表的比较次数还多,如上图的例子。

JUC源码分析-容器-ConcurrentSkipListMap和ConcurrentSkipListSet

 

 

再分析第二步骤

private V doPut(K kkey, V value, boolean onlyIfAbsent) {//onlyIfAbsent==false

Comparable<? super K> key = comparable(kkey);//转换成Comparable

for (;;) {//处理CAS 链接节点

Node<K,V> b = findPredecessor(key);//找到前置节点

Node<K,V> n = b.next;

for (;;) {

if (n != null) {//检查前置节点,处理不一致,并发,删除

Node<K,V> f = n.next;

if (n != b.next) // inconsistent read

break;//并发的情况下,可能插入了其他节点

Object v = n.value;

if (v == null) { // n is deleted n节点被删除

n.helpDelete(b, f);

break;

}

if (v == n || b.value == null) // b is deleted

break;

int c = key.compareTo(n.key);

if (c > 0) {//继续向后寻找

b = n;

n = f;

continue;

}

if (c == 0) {//相等

if (onlyIfAbsent || n.casValue(v, value))// key相等 更新value值成功,返回原来value

return (V)v;

else

break; // restart if lost race to replace value

}

// else c < 0; fall through

}

//创建 新节点

Node<K,V> z = new Node<K,V>(kkey, value, n);//n是z的next节点

if (!b.casNext(n, z))//修改b的next节点为z

break; // restart if lost race to append to b

int level = randomLevel();//随机一个层级

if (level > 0)//生成新的索引层或插入索引节点

insertIndex(z, level);

return null;

}

}

}

返回 值不等于null 说明 节点已经存在,否则 节点不存在,新插入节点。

当节点存在 onlyIfAbsent==true 不替换value ,否则替换。

 

 

 

private int randomLevel() {

int x = randomSeed;

x ^= x << 13;

x ^= x >>> 17;

randomSeed = x ^= x << 5;

if ((x & 0x80000001) != 0) // test highest and lowest bits

return 0;

int level = 1;

while (((x >>>= 1) & 1) != 0) ++level;

return level;

}

这个随机生成层级值得方法比较难以理解,总之就是50%的几率返回0,25%的几率返回1,12.5%的几率返回2...最大返回31。

分析第三步骤

 

这个步骤理解起来最困难,是分析跳表实现的关键

图解2种情况。

当生成一个索引节点时,增加一层 所有的层都要插入这个节点,下图是没有插入新节点前的情况。JUC源码分析-容器-ConcurrentSkipListMap和ConcurrentSkipListSet

 

当加入一个节点16,并增加一层时,需要为所有层级创建节点,如下图 红框内的节点,先创建高层的head节点,设置索引节点为右节点,旧的最高层head为下层节点. 最后把红框内的节点(高层级节点除外)与相应的层级链接。JUC源码分析-容器-ConcurrentSkipListMap和ConcurrentSkipListSet

 

 

insertIndex

private void insertIndex(Node<K,V> z, int level) {

HeadIndex<K,V> h = head;//当前head指向最高层级的首节点

int max = h.level;//获取层级值

 

if (level <= max) {//插入一个索引节点

Index<K,V> idx = null;

for (int i = 1; i <= level; ++i)//有多少个

idx = new Index<K,V>(z, idx, null);

addIndex(idx, h, level);//向从1到level的所有层级插入索引节点 level 是最大的需要链接节点层级值,用于限制链接层级数。

 

} else { //加入新的层级

 

level = max + 1;//不管生成的层级多大 只能在当前最大级加1

Index<K,V>[] idxs = (Index<K,V>[])new Index[level+1];

Index<K,V> idx = null;

for (int i = 1; i <= level; ++i)//创建红框内的节点

idxs[i] = idx = new Index<K,V>(z, idx, null);

 

HeadIndex<K,V> oldh;

int k;

for (;;) {

oldh = head;

int oldLevel = oldh.level;

if (level <= oldLevel) { // lost race to add level

k = level;

break;

}

HeadIndex<K,V> newh = oldh;

Node<K,V> oldbase = oldh.node;

for (int j = oldLevel+1; j <= level; ++j)//正常情况下只能增加一层

newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);//将head节点和对应的索引节点链接起来

if (casHead(oldh, newh)) {//如果更新head成功,oldLevel将作为限制层级数

k = oldLevel;//创建head的时候已经将head和索引节点链接上了,后面只需要将还没链接的层级处理即可。

break;

}

}

addIndex(idxs[k], oldh, k);

}

}

addIndex(idxs[k], oldh, k)就不分析了,知道了要干的事,分析起来有也比较容易。

 

节点查询

public V get(Object key) {

return doGet(key);

}

private V doGet(Object okey) {

Comparable<? super K> key = comparable(okey);

 

for (;;) {//循环 直到 返回结果

Node<K,V> n = findNode(key);

if (n == null)// 如果没找到返回 null

return null;

Object v = n.value;

if (v != null)//找到了 值不是null 返回 value

return (V)v;

}

}

 

private Node<K,V> findNode(Comparable<? super K> key) {

for (;;) {

Node<K,V> b = findPredecessor(key);// 找到前置节点

Node<K,V> n = b.next;// 下个节点

for (;;) {// 从前置节点向后遍历,直到找到或发现不存在(链表是升序的)

if (n == null)

return null;

Node<K,V> f = n.next;

if (n != b.next) // 下个节点改变 出现不一致读,重试

break;

Object v = n.value;

if (v == null) { // n 被删除

n.helpDelete(b, f);

break;

}

if (v == n || b.value == null) // b 被删除 重试

break;

int c = key.compareTo(n.key); //比较

if (c == 0) //找到

return n;

if (c < 0)//不存在

return null;

b = n;//向后遍历

n = f;

}

}

}

 

逻辑总结:

  1. 找到前置节点
  2. 从前置节点向后遍历,直到找到或发现不存在

节点查询 主要依赖findPredecessor(key); 实现。

ConcurrentSkipListSet源码分析

 

public ConcurrentSkipListSet() {

m = new ConcurrentSkipListMap<E,Object>();

}

插入节点

public boolean add(E e) {

return m.putIfAbsent(e, Boolean.TRUE) == null;

}

 

public V putIfAbsent(K key, V value) {

if (value == null)

throw new NullPointerException();

return doPut(key, value, true);//注意 这里 onlyIfAbsent==true

}

ConcurrentSkipListSet 插入节点最终依赖ConcurrentSkipListMap doPut 实现。

前面已经讲过,当节点存在 onlyIfAbsent==true 不替换value ,否则替换。 这里 onlyIfAbsent==true, 实现了 只有不存在节点才替换的功能。

ConcurrentSkipListSet 的其他方法比较简单,留给 各位童鞋 自己分析。

 

总结: ConcurrentSkipListMap使用 SkipList(跳表)结构实现,理解SkipList(跳表)数据结构,掌握 插入和查询节点的步骤是本文的重点。