Java 并发编程(十七):CAS的实现原理
文章目录
1、简介
CAS 全称是 compare-and-swap ,在计算机科学中,比较和交换(CAS)是在多线程中用于实现同步的原子指令。
CAS 操作包含三个操作数, 内存位置、预期数值和新值,它将内存位置的值与给定值进行比较,如果相等,才将该内存位置的值修改为新值。原子性保证了新值是根据最新信息计算出来的; 如果在此期间该值已被另一个线程更新,则写入将失败。如果CAS指示尝试失败,则必须从头开始重复:重新读取位置,重新计算新值并再次尝试CAS。
在 Java 中,Java 并没有直接实现 CAS,CAS 相关的实现是通过 C++ 内联汇编的形式实现的。Java 代码需通过 JNI 才能调用。
2、背景说明
2.1、CPU 与 内存的交互方式
我们都知道,CPU 是通过总线和内存进行数据传输的。在多核心时代下,多个核心通过同一条总线和内存以及其他硬件进行通信。
2.2、示例说明
在内存指定位置处,存放了一个为1的数值。现在 CPU 两个核心同时执行该条指令。两个核心交替执行的流程如下:
- 核心1 从内存指定位置出读取数值1,并加载到寄存器中
- 核心2 从内存指定位置出读取数值1,并加载到寄存器中
- 核心1 将寄存器中值递减1
- 核心2 将寄存器中值递减1
- 核心1 将修改后的值写回内存
- 核心2 将修改后的值写回内存
经过执行上述流程,内存中的最终值是2,而我们期待的是3。要处理这个问题,就要避免两个或多个核心同时操作同一片内存区域
3、CAS 实现原理
利用CPU的CAS指令,同时借助JNI来完成Java的非阻塞算法,其它原子操作都是利用类似的特性完成的。 在 java.util.concurrent 下面的源码中,Atomic, ReentrantLock 都使用了Unsafe类中的方法来保证并发的安全性。
CAS操作是原子性的,所以多线程并发使用CAS更新数据时,可以不使用锁,JDK中大量使用了CAS来更新数据而防止加锁来保持原子更新。
4、源码分析
4.1、Java 原子操作类中CAS的应用
以 java.util.concurrent.atomic 包下的原子类 AtomicInteger 为例,可以看到它内部使用的sun.misc.Unsafe类来完成CAS操作,Unsafe类是更接近底层的操作类,可以在内存级别获取操作对象内存中的数据。value使用volatile修饰,保证修改对其他线程的可见性。
public class AtomicInteger extends Number implements java.io.Serializable {
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
//计算变量 value 在类对象中的偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
//volatile变量value
private volatile int value;
//原子更新为新值并返回旧值
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
//最终会设置成新值
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}
public final boolean compareAndSet(int expect, int update) {
/*
* compareAndSet 实际上只是一个包装,核心实现逻辑封装在 Unsafe 的
* compareAndSwapInt 方法中
*/
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
// 原子增操作,相当于原子性的 ++i
public final int incrementAndGet() {
//三个参数分别是,当前的实例 、value实例变量的偏移量 、递增的值。
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
//内部使用自旋的方式进行CAS更新(while循环进行CAS更新,如果更新失败,则循环再次重试)
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
//获取对象内存地址偏移量上的数值v
var5 = this.getIntVolatile(var1, var2);
//如果现在还是v,设置为 v + delta,否则返回false,继续循环再次重试.
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
...........................省略...........................
}
public final class Unsafe {
// compareAndSwapInt 是 native 类型的方法
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
...........................省略...........................
}
valueOffset 字段表示"value" 内存位置,是 compareAndSwap 方法的第二个参数需要的。
compareAndSwapInt 四个参数:
1、当前的实例 2、实例变量的内存地址偏移量 3、预期的旧值 4、要更新的值
4.2、openjdk 中 compareAndSwapInt 的实现
// openjdk/hotspot/src/share/vm/prims/unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
// openjdk/hotspot/src/share/vm/runtime/atomic.cpp
jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) {
assert(sizeof(jbyte) == 1, "assumption.");
uintptr_t dest_addr = (uintptr_t)dest;
uintptr_t offset = dest_addr % sizeof(jint);
volatile jint* dest_int = (volatile jint*)(dest_addr - offset);
jint cur = *dest_int;
jbyte* cur_as_bytes = (jbyte*)(&cur);
jint new_val = cur;
jbyte* new_val_as_bytes = (jbyte*)(&new_val);
new_val_as_bytes[offset] = exchange_value;
while (cur_as_bytes[offset] == compare_value) {
jint res = cmpxchg(new_val, dest_int, cur);
if (res == cur) break;
cur = res;
new_val = cur;
new_val_as_bytes[offset] = exchange_value;
}
return cur_as_bytes[offset];
}
以linux x86为例,MP表示multiprocessor,即多处理器。最终根据具体的处理器转换成汇编指令来实现CAS。多处理器时需要在前面加上lock指令。
这里的cmpxchgl是x86和Intel架构中的compare and exchange指令。
在实际执行时,CPU可以通过锁总线或锁缓存来实现
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
// 判断是否是多核 CPU
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
5、CAS 存在的问题
详见:https://blog.****.net/u010647035/article/details/79795950
6、参考资料
https://en.wikipedia.org/wiki/Compare-and-swap
《深入理解计算机系统》