无锁原理详解(CAS,Compare and swap,比较和交换)、java无锁类的使用及无锁算法详细介绍

java中一些工具类大量使用了无锁工具,比如AtomicInteger、Unsafe、AtomicIntegerArray、AtomicReference,可见,无锁的应用是比较广泛的。

那么,什么是无锁呢?

无锁,首先是无障碍的运行,而无障碍是指所有的线程能够同时进入临界区,但是无锁在无障碍的基础上面增加了一条:每次竞争必须能够在竞争中决定出一个优胜者。因此相对于无障碍来来讲,无锁从理论上来讲是一个切实可行的方案。本篇文章将围绕无锁的原理、java无锁类的使用、无锁算法依次展开,见下图:

无锁原理详解(CAS,Compare and swap,比较和交换)、java无锁类的使用及无锁算法详细介绍

无锁的原理:CAS指令

无锁的原理是使用了CAS指令。

CAS思路如下:有个场景需要对临界区中的某个数据进行赋值,如果多个线程同时进入临界区,应该只有一个线程能够成功。那么怎么判断哪个线程可以成功?无锁要求线程在对数据进行操作的时候,需要先给出一个期望值,如果数据的实际值跟期望值相符,线程就可以把数据写入临界区,否则只能失败。因为实际的结果若与期望值不相符,就表示数据在修改的过程当中被其它的线程修改过,所以这次就无法继续修改,需要等到下一次重新来过。所以,每一次CAS(Compare and swap,比较和交换)操作时,线程会先去读一下当前值,然后进行一次CAS操作:把期望值(刚才读出来的数据)跟现在的数据作一个比较,如果比较成功就可以把新的值写入。

关于CAS的两个问题

问题1:CAS操作对比期望值与实际值失败后就重试的策略,会不会很浪费资源?

关于这个问题,CAS是抱着乐观的态度进行操作的。它认为自己有非常在的概率是能够成功完成当前操作的,所以在CAS看来,完不成便重试是一个小概率事件。

问题2:CAS操作,先读再比较,然后设置值,步骤这么多,会不会在步骤之间被其它线程干扰导致冲突,这里是不是一个bug?

这个担心纯属多余,因为CAS操作的整个过程是原子操作,它由一条cpu指令完成,并没有把取数据、比较、设置值写在3个cpu指令里。 这条cpu指令叫cmpxchg,它的逻辑如下:


  1. if(accumulator == Destination){
  2. ZF = 1;
  3. Destination = Source;
  4. } else {
  5. ZF = 0;
  6. accumulator = Destination;
  7. }

看目标值是不是跟计算值相等,如果相等就设置一个跳转标志,并且把原始数据设置到目标里面去,否则跳转标志就不设了。所以,CAS它从指令层面来保证操作的可靠性。

java无锁类的使用

java中无锁类底层是使用上面介绍的比较交换指令来实现的。相对于在进入临界区之前系统可能对其进行挂起的阻塞方式,无锁的性能会更好。一般来说,通过无锁的方式,线程不可能被挂起,它只会不断地作重试,除非人为将线程挂起。如果一个线程被操作系统挂起,它作一次上下文交换,可能需要8万个时钟周期,这看起来是一个非常庞大的数字,如果仅作一次重试呢?不太复杂循环体可能在10条cpu指令以内就执行完了,这估计也就只需要10个以内的cpu时钟周期。那么无锁就相当于拿几个时钟周期的成本去对赌8万个时间周期,除非运行非常差,重试了几万次都是失败的结果,否则对赌下来基本上都有得赚,亏也不会亏太多。所以无锁操作比阻塞操作的性能要好很多。

下面对java当中一些常用的无锁类作介绍。

java无锁类AtomicInteger

在无锁类当中,最有名的一个应该是AtomicInteger,无锁的整数。AtomicInteger在java.util.concurrent.atomic包中,继承自java.lang.Number类。

AtomicInteger内部有一个非常重要的字段“value”,用关键词volatile修饰,它是其封装的数据类型,AtomicInteger类所有的操作基本都是对成员变量“value”所做,"value"才是它内部真正的值,AtomicInteger类只是对它的一个包装而已。

该类主要包含如下方法:

  • ">get()取得当前整数值;

  • ">set(int newValue)设置当前整数值;

  • ">getAndSet(int newValue)设置一个新值并返回旧的值;

  • ">compareAndSet(int expect,int u)参数expect是期望值,参数u是要设置的值,如果期望值等于当前值,即设置成功返回true,否则失败返回false;

    
    
    1. public final boolean compareAndSet(int expect, int update) {
    2. return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    3. }
    ">compareAndSet方法内部用到了一个unsafe对象,从名字可以看到它是一个不安全的操作。我们知道,java相对于c++或者c更为安全是因为java把有关指针的一些内容给封装起来了或者说屏蔽掉了,而Unsafe类恰恰相反,它会提供一些相对于java底层类似于指针的一些操作,比如这里的compareAndSwapInt方法,表示针对this对象,偏移量为valueOffset的数据,看它的期望值是多少,这是一个非常有c感觉的一个操作。

  • ">getAndIncrement()
    取得旧值,在当前值上+1,这是一个线程安全的操作;

    
    
    1. public final int getAndIncrement(){
    2. for(;;){
    3. int current = get();
    4. int next = current + 1;
    5. if(compareAndSet(current,set))
    6. return current;
    7. }
    8. }
    ">我们可以从其实现中看到,先用get方法取出当前值current,对其加1并赋值给next(这里注意,这个加1一定是对当前值加1,不可能是其它线程改过的数据),然后对当前值与+1后的值next进行比较,如果成功,直接返回current。如果在执行完加1之后,有其它线程先一步修改了当前数据,会导致实际的current与线程取到的current不相符,所以compareAndSet方法执行失败返回false,程序无法执行到return语句,直接回到循环体的开始再重试一遍,直到设置成功为止。从getAndIncrement方法的实现代码中,我们可以看到无锁的算法的基本实现形式,即套在一个死循环当中一直作重试,直到自己成功为止。这是一个非常通用的思路,下面的getAndDecrement、incrementAndGet等方法也是这种实现思路。

  • ">getAndDecrement()这个方法与上一个方法类似,表示减1,这也是一个线程安全的操作;

  • ">getAndAdd(int delta)将当前值加上delta,并返回旧值,getAndIncrement与getAndDecrement这两个函数都可以拿它来实现,只不过是一个传1,一个传-1;

  • ">incrementAndGet()当前值+1,返回新值,与getAndIncrement函数的区别只是返回值新旧的问题;

  • ">decrementAndGet()递减,对应incrementAndGet;

  • ">addAndGet(int delta)类似于getAndAdd,只是返回的是增加后的值;

AtomicInteger用法举例:


  1. package com.javaguest.unlocked;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. public class AtomicIntegerDemo {
  4. static AtomicInteger atomicInteger = new AtomicInteger();
  5. static class IncrementRunnable implements Runnable {
  6. @Override
  7. public void run() {
  8. // 对atomicInteger包装的整数值加10000次
  9. for (int i = 0; i < 10000; i++) {
  10. atomicInteger.incrementAndGet();
  11. }
  12. }
  13. }
  14. public static void main(String[] args) throws InterruptedException {
  15. // 创建10个线程去修改atomicInteger封装的整数值
  16. Thread[] threads = new Thread[10];
  17. for (int i = 0; i < 10; i++) {
  18. threads[i] = new Thread(new IncrementRunnable());
  19. }
  20. for (int i = 0; i < 10; i++) {
  21. threads[i].start();
  22. }
  23. for (int i = 0; i < 10; i++) {
  24. threads[i].join();
  25. }
  26. System.out.println(atomicInteger);
  27. }
  28. }

这个实例的运行结果为:

无锁原理详解(CAS,Compare and swap,比较和交换)、java无锁类的使用及无锁算法详细介绍

我们可以看到在10个线程同时对其加加10000次后,其结果为100000,表示它是线程安全的。

java无锁类Unsafe

Unsafe提供了一些不太安全的操作,它主要是在jdk内部使用,并不对外提供。如果你想要拿到Unsafe的实例,是需要动一些手脚的。它提供了如下操作:

  • ">根据偏移量去设置值;
    什么叫根据偏移量去设置数据呢?

    
    
    1. valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
    ">上面的代码表示通过unsafe对象拿到AtomicInteger在成员变量value上的偏移量。如果说有写过c代码,这里就会比较好理解。无锁原理详解(CAS,Compare and swap,比较和交换)、java无锁类的使用及无锁算法详细介绍比如说,c里面有一个结构体,定义了两个int类型的成员a与b,如果a的偏移量是0,b的偏移量就是4,只要需要知道结构体所在的基地址(0x7fff0815c0e0),加上偏移量4就能得到b的地址(0x7fff0815c0e4=0x7fff0815c0e0+4),有了b的地址就能对b进行操作。回到java中,也是一样,只不过结构体换成了Class。Class会比c语言中结构体struct复杂一些,它还存有一些对象头部的信息,比如对象年龄等,然后才是字段,比如int value。这个objectFieldOffset方法它拿到的就是value字段在class中的偏移量,所以后面我们就可以根据这个偏移量去做一些设置,比如前方文中中的提到的unsafe.compareAndSwapInt(this,valueOffset,expect,update);就是对当前AtomicInteger对象在valueOffset偏移量的位置上做设置操作:我期望它是expect,更新成update。

  • ">park()方法,其作用是把线程停下来;

  • ">底层的CAS操作,比如AtomicInteger类里的compareAndSet方法,它其实是在Unsafe类里实现的;

  • ">Unsafe类是非公开的api,它在不同版本的jdk中,可能会出现比较大的差异,因为java官方不希望我们直接去使用Unsafe,所以不对外保证向前或向后兼容。

Unsafe类里的一些主要函数:

  • ">getInt(Objcet object,long offset) //获取对象object在指定偏移量offset上的整数值;

  • ">putInt(Objcet object,long offset,int i) //设置对象object在指定偏移量offset上的整数值i;

  • ">objectFieldOffset(Field field) //获得字段field在对象上的偏移量;

  • ">putIntVolatile(Object obj,long offset,int i) //使用volatile语义设置对象obj在指定偏移量offset上的整数值为i,如果使用该方法去设置值,其它线程能够马上看到当前线程的改动,如果用putInt方法它仅是普通的设置;

  • ">putOrderedInt(Object obj,long offset,int i) //putOrderedInt方法与putIntVolatile方法的作用一样,但是它要求被操作的字段本身就是被volatile修饰的;

  • ">getIntVolatile(Object obj,long offset) //使用volatile语义获得给定对象的int值。

以上这些Unsafe类的操作在jdk内部是被大量使用的,包括java的一些第三方高性能框架,它们内部也会使用Unsafe类。

java无锁类AtomicReference

和封装整数的AtomicInteger相比,AtomicReference封装的是一个对象,它是对封装对象的引用,使用这个对象的引用来对对象进行修改,就能够保证对象的线程安全。

AtomicReference是一个模板,可以用来封装任意类型的数据,它里面的实现与AtomicInteger非常类似,它有成员变量value,也有一个偏移量valueOffset,有get、set方法,也有compareAndSet方法。在这里仅列举一个例子来说明AtomicReference的用法:


  1. package com.javaguest.unlocked;
  2. import java.util.concurrent.atomic.AtomicReference;
  3. public class AtomicReferenceDemo {
  4. // 包装一个字符串,初始化其值为"Hyes"
  5. public static final AtomicReference<String> atomicReference = new AtomicReference<String>("Hyes");
  6. public static void main(String[] args) throws InterruptedException {
  7. // 使用10个线程来修改它,以验证通过AtomicReference类来修改字符串对象是否线程安全
  8. for (int i = 0; i < 10; i++) {
  9. new Thread() {
  10. public void run() {
  11. try {
  12. Thread.sleep(Math.abs((int) (Math.random() * 100)));
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. if (atomicReference.compareAndSet("Hyes", "www.hao124.net")) {
  17. System.out.println(
  18. "Thread " + Thread.currentThread().getId() + " change value to www.hao124.net");
  19. } else {
  20. System.out.println("Thread " + Thread.currentThread().getId() + " failed");
  21. }
  22. };
  23. }.start();
  24. }
  25. }
  26. }

代码中开启了10个线程,每个线程都会尝试把"hyes"修改成"www.hao124.net"。 很显然在这个修改过程当中,仅有一个线程能够修改成功,而其它线程由于数据已经被修改成“www.hao124.net”, 就不能再执行修改操作,因为在比较时,期望值与实际值不相符。

无锁原理详解(CAS,Compare and swap,比较和交换)、java无锁类的使用及无锁算法详细介绍

从执行结果的打印中可以看到,仅有一个线程修改成功。由于对线程的休眠时间使用了随机数,所以线程完成的顺序也是随机的,我们看到的打印结果中线程id也就是乱序的。

因此,如果你有一个对象,希望在修改时保证该对象是线程安全的,就可以使用AtomicReference来包装它。

无锁算法

Vector实现的简单介绍

首先以Vector的add方法为例简单介绍一下Vector的实现,为下面介绍无锁的vector作一个铺垫,因为它们都是vector,所以在实现上有一些相似之处。

无锁原理详解(CAS,Compare and swap,比较和交换)、java无锁类的使用及无锁算法详细介绍

Vector是一个向量,表示一个数组,add方法是一个同步方法,所以它保证了每次操作的时候只有一个线程对vector中的数组进行操作,成员变量protected Object[] elementData是Vector中最核心的一个元素,所有的元素都保存在这个Object数组当中,当add一个元素进去时,它首先是记录vector元素被修改的次数modCount++,这跟业务没有太大的关系,然后它会做一个容量检查:

无锁原理详解(CAS,Compare and swap,比较和交换)、java无锁类的使用及无锁算法详细介绍

无锁原理详解(CAS,Compare and swap,比较和交换)、java无锁类的使用及无锁算法详细介绍

因为vector内部的实现是一个数组,容量是固定的,它不像链表一样会做动态的增加,所以初始化vector的时候它的元素个数就已经确定了。每次向Vextor中增加元素时,它会先检查会不会越界,如果不会越界就会将当前元素加到数组的尾部。若发现越界就做扩展,这里如果在初始化Vector时指定了每次拓展的数量,将直接扩展多capacityIncrement个元素的空间,否则将其容量翻倍,这里建议在使用vector的过程中最好是指定其扩展容量,不然每次成倍的增长,扩展的次数越多,只会带来更大空间的浪费。从代码中我们可以看到,扩容的方法是新建一个更大容量的数组,然后把老数组的元素copy到新数组中。

对Vector实现的介绍就这些了。


=================

add add add update update update update(更新)

select 其它动作, select 更新,cas, 乐观锁(没有那么多了,机率很少) update xxx set where (select count () table where id =xxx)