并发编程--CAS深入理解,以及ABA问题的处理!
hello大家好,好久不见我是小卡。好久没用更新博客了,今天有点时间,就来个大家谈谈我们在代码中经常使用到的一些容器的底层的相关算法,写得不对的希望大家评论留言、一键三连。
在日常的高并发、多线程的开发中,通常使用的hashMap就无法满足我们的需求了,因为hashMap中的所有操作的是没有加锁的,所以在高并发的情况下可能会出现数据安全性问题。
有朋友会问,为啥不用hashTable呢?hashTable不是线程安全的吗?
有这样想法朋友你还在第一层,其实呢我已经在第五层了,起飞!!!为什么我会这样讲,首先hashTable的确是线程安全的,我们来看一看他的几个方法:
public synchronized V put(K key, V value){......} public synchronized V get(Object key){......}
hashTable里面方法都是见了synchronized关键字的,synchronized关键字是一个独占锁(是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁)非常的笨重,消耗大量的cpu资源。基本是不会使用滴。所以小卡说你是第一层。
我们实际开发中高并发使用的安全容器叫做concurrentHashMap。concurrentHashMap是一个非常牛逼、也是一个使用非常普遍的线程安全容器。首先我们来看看他究竟是个啥:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable
我们再来看看他的核心方法:
public V put(K key, V value) { return putVal(key, value, false); }
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
} ......
其中最重要的就是这个casTabAt(),首先各位铁汁,如果你看到casTabAt()这里,恭喜你铁汁,你已经到了第三层了。 我们一步一步往下看,看看和这个casTabAt()究竟是何方神圣:
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
这就到咱们今天要说的关键:CAS算法(compareAndSwap),这里点进去搞清楚,那么恭喜你,起飞了!!!
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
铁汁们看见和这个native了吗?这底层是c语言写的,在idea里面这里就点不进去了!那怎么办呢?
Oracle的jdk的源码就到sun.misc.Usafe.java这里就结束了,native方法不能忘下面再走了。这时候我使用百度搜索,发现openJdk里面的这段代码可以查看。
于是我这边就去gitHub上面下载openJdk的源码,希望通过这些代码能够彻底理解cas方法的核心。
去github下载是真的慢,所以弟弟我在这里给各位有兴趣看的规格准备好了免费的提取链接。
openJdk源码下载:链接: https://pan.baidu.com/s/1E9MHL8JcYSlbSImU1_U2WQ 提取码: xpmt
首先点开Usafe.java方法:
public final class Unsafe { ......
/**
* Atomically update Java variable to <tt>x</tt> if it is currently
* holding <tt>expected</tt>.
* @return <tt>true</tt> if successful
*/
public final native boolean compareAndSwapObject(Object o, long offset,Object expected,Object x);
......
}
能理解到这一层的铁汁萌,已经是第四层了,为什么说不是第五层呢?因为这还不是最底层的,那么就我们一起进入第五层,起飞!
先来张图片:
我尼玛炸了呀!.cpp这是什么文件啊?我不到啊?马上百度一下:
原来是c++文件,.cpp = c plus plus,涨知识了!但是俺还是看不懂,在已我java的逻辑查看一遍之后又去请教c++组的同事将这段cas的逻辑理解。
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapObject(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jobject e_h, jobject x_h)) UnsafeWrapper("Unsafe_CompareAndSwapObject"); // 新值 oop x = JNIHandles::resolve(x_h); // 预期值 oop e = JNIHandles::resolve(e_h); // 内存值 oop p = JNIHandles::resolve(obj); // 计算p在堆内存中的具体位置 HeapWord* addr = (HeapWord *)index_oop_from_field_offset_long(p, offset); // 调用另一个原子性操作方法,并返回结果 oop res = oopDesc::atomic_compare_exchange_oop(x, addr, e, true); // 如果返回的res等于e,则判定满足compare条件(说明res应该为内存中的当前值),但实际上会有ABA的问题 jboolean success = (res == e); if (success) // success为true时,说明此时已经内存值和预期值一致,这时候将x的值放在p的堆内存位置 // 调用的是最底层的cmpxchg指令(计算机指令!!!) update_barrier_set((void*)addr, x); return success; UNSAFE_END
不难发现,这里的最关键的方法还是 atomic_compare_exchange_oop(x, addr, e, true),因为最重要的步骤就是这一步,只有在将预期值e 和 内存值p 比较得到相同的结果之后,才能够进行下一步的替换操作,那么我们顺藤摸瓜,继续往下走。
全局搜索atomic_compare_exchange_oop方法,找到的第一个是oop.hpp的类:
static oop atomic_compare_exchange_oop(oop exchange_value, volatile HeapWord *dest, oop compare_value, bool prebarrier = false);
// 然后在点开他的实现类
inline oop oopDesc::atomic_compare_exchange_oop(oop exchange_value, volatile HeapWord *dest, oop compare_value, bool prebarrier) { // 如果使用了压缩普通对象指针(CompressedOops),有一个重新编解码的过程 if (UseCompressedOops) { if (prebarrier) { update_barrier_set_pre((narrowOop*)dest, exchange_value); } // encode exchange and compare value from oop to T narrowOop val = encode_heap_oop(exchange_value); // 新值 narrowOop cmp = encode_heap_oop(compare_value); // 预期值 narrowOop old = (narrowOop) Atomic::cmpxchg(val, (narrowOop*)dest, cmp); // decode old from T to oop return decode_heap_oop(old); } else { if (prebarrier) { update_barrier_set_pre((oop*)dest, exchange_value); } // 可以看到这里继续调用了其他方法 return (oop)Atomic::cmpxchg_ptr(exchange_value, (oop*)dest, compare_value); } } 现在我们来看最后一层: inline jlong Atomic::cmpxchg (jlong exchange_value, volatile jlong* dest, jlong compare_value) { int mp = os::is_MP(); jint ex_lo = (jint)exchange_value; jint ex_hi = *( ((jint*)&exchange_value) + 1 ); jint cmp_lo = (jint)compare_value; jint cmp_hi = *( ((jint*)&compare_value) + 1 ); __asm { push ebx push edi mov eax, cmp_lo mov edx, cmp_hi mov edi, dest mov ebx, ex_lo mov ecx, ex_hi LOCK_IF_MP(mp) cmpxchg8b qword ptr [edi] pop edi pop ebx } }
说实话看到这里,小卡我看不懂了,这个cmpxchg方法里面的方法好像都是些计算机指令了,大概的意思就是如何比较两个node的值吧。有兴趣的铁汁可以执行研究一哈。
再来说一下刚刚提到的ABA的问题。什么是ABA,为什么会出现ABA?
ABA问题指的是,在使用CAS算法多预期值和内存值作比较的时候存在一种情况,预期值为A,内存值也为A,但是内存值的这个A实在被其他线程操作过后的A(old : A, Thread1 -> B , Thread2 -> A),这时候虽然在做 atomic_compare_exchange_oop 比较时的结果是success,但是确没内味儿了。
为啥会出现这种情况?在多cpu的服务器中可能会出现多线程操作这个容器,并同时执行CAS,因为哥哥cpu之前的任务调度排序不同,执行的速度也可能会不同,就可能会出现A还在执行compare方法的时候,B线程已经执行完swap操作,同时将内存值修改成了A线程的预期值,这时候计算机以为操作是对的。但是这确实有个错误的操作。
如何避免ABA这种情况发生?
Java中提供了AtomicStampedReference和AtomicMarkableReference来解决ABA问题 。
AtomicStampedReference可以原子更新两个值:引用和版本号,通过版本号来区别节点的循环使用。
这就是目前小卡对cas已经ABA的理解,希望大家支持,下一期我们就来看看AtomicStampedReference和AtomicMarkableReference这两个东东。
铁汁们,一键三连!!!