转自:http://blog.****.net/liulongling/article/details/50717706
本章是提高教程可能对于刚入门同学来说会有些难度,读懂本章你需要了解以下知识点:
一、 【Java基础提高】深入分析final关键字(一)
二、 【Java并发编程】深入分析volatile(四)
三、 【Java基础提高】HashTable源码分析(六)
一、Concurrent源码分析
ConcurrentHashMap是由Segment(桶)、HashEntry(节点)2大数据结构组成。如下图所示:

1.1 Segment类和属性
-
//Segment内部维护了一个链表数组
-
static final class Segment<K,V> extends ReentrantLock implements Serializable {
-
-
//链表数组,数组中的每一个元素代表了一个链表的头部
-
transient volatile HashEntry<K,V>[] table;
-
-
//Segment中元素的数量
-
transient int count;
-
-
//对table的大小造成影响的操作的数量(比如put或者remove操作)
-
transient int modCount;
-
-
//阈值,Segment里面元素的数量超过这个值会对Segment进行扩容,扩容后大小=old*2*负载因子
-
transient int threshold;
-
-
//负载因子,用于确定threshold
-
final float loadFactor;
-
}
Segment继承了ReentrantLock,这意味着每个segment都可以当做一个锁,每一把锁只锁住整个容器中的部分数据,这样不影响线程访问其它数据,当然如果是对全局改变时会锁定所有的segment段。比如:size()和containsValue(),注意的是要按顺序锁定所有段,操作完毕后,再按顺序释放所有段的锁。如果不按顺序的话,有可能会出现死锁。
1.2 HashEntry类和属性
-
//HashEntry是一个单向链表
-
static final class HashEntry<K,V> {
-
//哈希值
-
final int hash;
-
//存储的key和值value
-
final K key;
-
volatile V value;
-
//指向的下一个HashEntry,即链表的下一个节点
-
volatile HashEntry<K,V> next;
-
}
类似与HashMap节点Entry,HashEntry也是一个单向链表,它包含了key、hash、value和下一个节点信息。HashEntry和Entry的不同点:
不同点一:使用了多个final关键字(final class 、final hash) ,这意味着不能从链表的中间或尾部添加或删除节点,后面删除操作时会讲到。
不同点二:使用volatile,是为了更新值后能立即对其它线程可见。这里没有使用锁,效率更高。
1.3 类的初始化
-
/**
-
*
-
* @param initialCapacity 初始容量
-
* @param loadFactor 负载因子
-
* @param concurrencyLevel 代表ConcurrentHashMap内部的Segment的数量,
-
* ConcurrentLevel 并发级别
-
*/
-
public ConcurrentHashMap(int initialCapacity,
-
float loadFactor, int concurrencyLevel) {
-
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
-
throw new IllegalArgumentException();
-
if (concurrencyLevel > MAX_SEGMENTS)
-
concurrencyLevel = MAX_SEGMENTS;
-
// Find power-of-two sizes best matching arguments
-
int sshift = 0;
-
int ssize = 1;
-
while (ssize < concurrencyLevel) {
-
++sshift;
-
ssize <<= 1;//ssize左移一位也就是每次ssize=2*ssize。
-
}
-
//主要使用于put()和segmentForHash()方法,结合hash计算出元素在哪一个Segment中。
-
//假如concurrencyLevel是16,那么sshift=4、segmentShift=28、segmentMask=15;
-
this.segmentShift = 32 - sshift;
-
this.segmentMask = ssize - 1;
-
if (initialCapacity > MAXIMUM_CAPACITY)
-
initialCapacity = MAXIMUM_CAPACITY;
-
int c = initialCapacity / ssize;
-
if (c * ssize < initialCapacity)
-
++c;
-
int cap = MIN_SEGMENT_TABLE_CAPACITY;
-
while (cap < c)
-
cap <<= 1;
-
// create segments and segments[0]
-
Segment<K,V> s0 =
-
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
-
(HashEntry<K,V>[])new HashEntry[cap]);
-
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
-
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
-
this.segments = ss;
-
}
ConcurrencyLevel默认情况下内部按并发级别为16来创建。对于每个segment的容量,默认情况也是16。其中concurrentLevel和segment的初始容量都是可以通过构造函数设定的。要注意的是ConcurrencyLevel一经指定,不可改变,后续如果ConcurrentHashMap的元素数量增加导致ConrruentHashMap需要扩容,ConcurrentHashMap不会增加Segment的数量,而只会增加Segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMap做rehash,而只需要对Segment里面的元素做一次rehash就可以了。
1.4 ensureSegment()方法
该方法返回给定索引位置的Segment,如果Segment不存在,则参考Segment表中的第一个Segment的参数创建一个Segment并通过CAS操作将它记录到Segment表中去。
-
private Segment<K,V> ensureSegment(int k) {
-
final Segment<K,V>[] ss = this.segments;
-
long u = (k << SSHIFT) + SBASE; // raw offset
-
Segment<K,V> seg;
-
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
-
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
-
int cap = proto.table.length;
-
float lf = proto.loadFactor;
-
int threshold = (int)(cap * lf);
-
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
-
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
-
== null) { // recheck
-
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
-
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
-
== null) {
-
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
-
break;
-
}
-
}
-
}
-
return seg;
-
}
1.5 entryAt()方法
entryAt()方法是从链表中查找节点。在方法参数里注意到有传入tab链表数组和index索引,那为什么还要调用entryAt()方法获取数组项的值而不是通过tab[index]方式直接获取?那我们从源头(put)开始分析,见1.6put()操作。
-
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
-
return (tab == null) ? null :
-
(HashEntry<K,V>) UNSAFE.getObjectVolatile
-
(tab, ((long)i << TSHIFT) + TBASE);
-
}
1.6 put()操作
1.6.1 锁分离技术
大家知道HashTable是使用了synchronized来保证线程安全,但是其效率非常差。它效率非常差的原因是多个线程访问HashTable时需要竞争同一把锁,如果我们有多把锁,每一把锁只锁住一部分数据,那么多线程在访问不同的数据时也就不会存在竞争,能提高访问效率。这种做法我们称为锁分离技术。在《Java并发编程实战》一书中作者提到过分拆锁和分离锁技术:
分拆锁(lock spliting)就是若原先的程序中多处逻辑都采用同一个锁,但各个逻辑之间又相互独立,就可以拆(Spliting)为使用多个锁,每个锁守护不同的逻辑。
分拆锁有时候可以被扩展,分成可大可小加锁块的集合,并且它们归属于相互独立的对象,这样的情况就是分离锁(lock striping)。
而ConcurrentHashMap就是使用了分离锁技术,对每个Segment配置一把锁,如下图所示:
1.6.2 源码分析
Segment的put操作原理如下图所示,图中展示的不是很详细,其中关于加锁的步骤没有加上去,原因是加了几次觉得加锁后看着很复杂。用图片展示是为了更加简单和明了,如果看着复杂也就没有意义了,我尽量用文字说清楚它的步骤。

步骤一:进入Segment的put操作时先进行加锁保护。如果加锁没有成功,调用scanAndLockForPut方法(详细步骤见下面scanAndLockForPut()源码分析)进入自旋状态,该方法持续查找key对应的节点链中是已存在该机节点,如果没有找到,则预创建一个新节点,并且尝试n次,直到尝试次数操作限制,才真正进入加锁等待状态,自旋结束并返回节点(如果返回了一个非空节点,则表示在链表中没有找到相应的节点)。对最大尝试次数,目前的实现单核次数为1,多核为64。
步骤二:使用(tab.length - 1) & hash计算第一个节点位置,再通过entryAt()方法去查找第一个节点。如果节点存在,遍历链表找到key值所在的节点,如果找到了这个节点则直接更新旧value,结束循环。其中value使用了volatile,它更新后的值立马对其它线程可见。如果节点不存在,将步骤一预创建的新节点(如果没有则重新创建)添加到链表中,添加前先检查添加后节点数量是否超过容器大小,如果超过了,则rehash操作。没有的话调用setNext或setEntryAt方法添加新节点;
要注意的是在更新链表时使用了Unsafe.putOrderedObject()方法,这个方法能够实现非堵塞的写入,这些写入不会被Java的JIT重新排序指令(instruction reordering),使得它能更加快速的存储。
解决1.5问题:为什么还要调用entryAt()方法获取数组项的值而不是通过tab[index]方式直接获取?
虽然在开始时volatile table将引用赋值给了变量tab,但是多线程下table里的值可能发生改变,使用tab[index]并不能获得最新的值。。为了保证接下来的put操作能够读取到上一次的更新结果,需要使用volatile的语法去读取节点链的链头.
-
public V put(K key, V value) {
-
Segment<K,V> s;
-
if (value == null)
-
throw new NullPointerException();
-
int hash = hash(key);
-
//计算Segment的位置,在初始化的时候对segmentShift和segmentMask做了解释
-
int j = (hash >>> segmentShift) & segmentMask;
-
//从Segment数组中获取segment元素的位置
-
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
-
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
-
s = ensureSegment(j);
-
//
-
return s.put(key, hash, value, false);
-
}
-
-
//往Segment的HashEntry中添加元素,使用了分锁机制
-
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
-
//tryLock 仅在调用时锁为空闲状态才获取该锁。如果锁可用,则获取锁,并立即返回值 true。否则是false
-
//scanAndLockForPut 下面单独说scanAndLockForPut
-
HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
-
V oldValue;
-
try {
-
HashEntry<K,V>[] tab = table;
-
int index = (tab.length - 1) & hash;
-
HashEntry<K,V> first = entryAt(tab, index);
-
for (HashEntry<K,V> e = first;;) {
-
if (e != null) {
-
K k;
-
if ((k = e.key) == key ||
-
(e.hash == hash && key.equals(k))) {
-
oldValue = e.value;
-
if (!onlyIfAbsent) {
-
e.value = value;
-
++modCount;
-
}
-
break;
-
}
-
e = e.next;
-
}
-
else {
-
if (node != null)
-
node.setNext(first);
-
else
-
node = new HashEntry<K,V>(hash, key, value, first);
-
int c = count + 1;
-
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
-
rehash(node);
-
else
-
setEntryAt(tab, index, node);
-
++modCount;
-
count = c;
-
oldValue = null;
-
break;
-
}
-
}
-
} finally {
-
unlock();
-
}
-
return oldValue;
-
}
1.5 segmentForHash()方法
-
/**
-
* 查找Segment对象,这里Unsafe的主要作用是提供原子操作。
-
*/
-
@SuppressWarnings("unchecked")
-
private Segment<K,V> segmentForHash(int h) {
-
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
-
return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
-
}
1.6 scanAndLockForPut()方法
在下面代码中,它先获取key对应的头节点,进入链表循环。如果链表中不存在要插入的节点,则预创建一个新节点,否则retries值递增,直到操作最大尝试次数而进入等待状态。这个方法要注意的是:当在自旋过程中发现链表链头发生了变化,则更新节点链的链头,并重置retries值为-1,重新为尝试获取锁而自旋遍历。
-
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
-
//第一步:先找到HashEntry链表中的头节点
-
HashEntry<K,V> first = entryForHash(this, hash);
-
HashEntry<K,V> e = first;
-
HashEntry<K,V> node = null;
-
int retries = -1; // negative while locating node
-
while (!tryLock()) {
-
HashEntry<K,V> f; // to recheck first below
-
//第一次一定执行该条件内代码
-
if (retries < 0) {
-
//第二步:分三种情景
-
//情景一:没有找到,创建一个新的节点。
-
if (e == null) {
-
if (node == null) // speculatively create node
-
node = new HashEntry<K,V>(hash, key, value, null);
-
retries = 0;
-
}
-
//情景二:找到相同key的节点
-
else if (key.equals(e.key))
-
retries = 0;
-
else
-
//情景三:没找到key值对应的节点,指向下一个节点继续
-
e = e.next;
-
}
-
//尝试次数达到限制进入加锁等待状态。 对最大尝试次数,目前的实现单核次数为1,多核为64:
-
else if (++retries > MAX_SCAN_RETRIES) {
-
lock();
-
break;
-
}
-
//retries是偶数并且不是头节点。在自旋中链头可能会发生变化
-
else if ((retries & 1) == 0 &&
-
(f = entryForHash(this, hash)) != first) {
-
e = first = f; // re-traverse if entry changed
-
retries = -1;
-
}
-
}
-
return node;
-
}
1.7 get()操作
-
public V get(Object key) {
-
Segment<K,V> s; // manually integrate access methods to reduce overhead
-
HashEntry<K,V>[] tab;
-
int h = hash(key);
-
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
-
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
-
(tab = s.table) != null) {
-
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
-
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
-
e != null; e = e.next) {
-
K k;
-
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
-
return e.value;
-
}
-
}
-
return null;
-
}
从代码可以看出get方法并没有调用锁,它使用了volatile的可见性来实现线程安全的。
参考资料