(六) synchronized的源码分析

文章简介

前面我有文章介绍了synchronized的基本原理,这篇文章我会从jvm源码分析synchronized的实现逻辑,希望让大家有一个更加深度的认识

内容导航

  • 从synchronized的字节码说起

  • 什么是monitor

  • 分析synchronized的源码

从synchronized的字节码说起

由于synchronized的实现是在jvm层面,所以我们如果要看它的源码,需要从字节码入手。这段代码演示了synchronized作为实例锁的两种用法,我们观察一下这段代码生成的字节码

  1. public class App

  2. {

  3.    public synchronized void test1(){

  4.    }

  5.    public void test2(){

  6.        synchronized (this){

  7.  

  8.        }

  9.    }

  10.    public static void main( String[] args ){

  11.        System.out.println( "Hello World!" );

  12.    }

  13. }

进入classpath目录下找到App.class文件, 在cmd中输入 javap -v App.class查看字节码

  1. public synchronized void test1();

  2.    descriptor: ()V

  3.    flags: ACC_PUBLIC, ACC_SYNCHRONIZED

  4.    Code:

  5.      stack=0, locals=1, args_size=1

  6.         0: return

  7.      LineNumberTable:

  8.        line 10: 0

  9.      LocalVariableTable:

  10.        Start  Length  Slot  Name   Signature

  11.            0       1     0  this   Lcom/gupaoedu/openclass/App;

  12.  

  13.  public void test2();

  14.    descriptor: ()V

  15.    flags: ACC_PUBLIC

  16.    Code:

  17.      stack=2, locals=3, args_size=1

  18.         0: aload_0

  19.         1: dup

  20.         2: astore_1

  21.         3: monitorenter  //监视器进入,获取锁

  22.         4: aload_1

  23.         5: monitorexit  //监视器退出,释放锁

  24.         6: goto          14

  25.         9: astore_2

  26.        10: aload_1

  27.        11: monitorexit

  28.        12: aload_2

  29.        13: athrow

  30.        14: return

通过字节码我们可以发现,修饰在方法层面的同步关键字,会多一个 ACC_SYNCHRONIZED的flag;修饰在代码块层面的同步块会多一个 monitorentermonitorexit关键字。无论采用哪一种方式,本质上都是对一个对象的监视器(monitor)进行获取,而这个获取的过程是排他的,也就是同一个时刻只能有一个线程获得同步块对象的监视器。 在 synchronized的原理分析这篇文章中,有提到对象监视器。

synchronized关键字经过编译之后,会在同步块的前后分别形成monitorenter和monitorexit这两个字节码指令。当我们的JVM把字节码加载到内存的时候,会对这两个指令进行解析。这两个字节码都需要一个Object类型的参数来指明要锁定和解锁的对象。如果Java程序中的synchronized明确指定了对象参数,那么这个对象就是加锁和解锁的对象;如果没有明确指定,那就根据synchronized修饰的是实例方法还是类方法,获取对应的对象实例或Class对象来作为锁对象

什么是monitor

在分析源代码之前需要了解oop, oopDesc, markOop等相关概念,在Synchronized的原理分析这篇文章中,我们讲到了synchronized的同步锁实际上是存储在对象头中,这个对象头是一个Java对象在内存中的布局的一部分。Java中的每一个Object在JVM内部都会有一个native的C++对象oop/oopDesc与之对应。在hotspot源码 oop.hpp中oopDesc的定义如下

  1. class oopDesc {

  2.  friend class VMStructs;

  3. private:

  4.  volatile markOop  _mark;

  5.  union _metadata {

  6.    Klass*      _klass;

  7.    narrowKlass _compressed_klass;

  8.  } _metadata;

其中 markOop就是我们所说的Mark Word,用于存储锁的标识。 hotspot源码 markOop.hpp文件代码片段

  1. class markOopDesc: public oopDesc {

  2. private:

  3.  // Conversion

  4.  uintptr_t value() const { return (uintptr_t) this; }

  5.  

  6. public:

  7.  // Constants

  8.  enum { age_bits                 = 4,

  9.         lock_bits                = 2,

  10.         biased_lock_bits         = 1,

  11.         max_hash_bits            = BitsPerWord - age_bits - lock_bits - biased_lock_bits,

  12.         hash_bits                = max_hash_bits > 31 ? 31 : max_hash_bits,

  13.         cms_bits                 = LP64_ONLY(1) NOT_LP64(0),

  14.         epoch_bits               = 2

  15.  };

  16.  ...

  17. }

markOopDesc继承自oopDesc,并且扩展了自己的monitor方法,这个方法返回一个ObjectMonitor指针对象,在hotspot虚拟机中,采用ObjectMonitor类来实现monitor

  1.  bool has_monitor() const {

  2.    return ((value() & monitor_value) != 0);

  3.  }

  4.  ObjectMonitor* monitor() const {

  5.    assert(has_monitor(), "check");

  6.    // Use xor instead of &~ to provide one extra tag-bit check.

  7.    return (ObjectMonitor*) (value() ^ monitor_value);

  8.  }

ObjectMonitor.hpp中,可以看到ObjectMonitor的定义

  1. class ObjectMonitor {

  2. ...

  3.  ObjectMonitor() {

  4.    _header       = NULL; //markOop对象头

  5.    _count        = 0;    

  6.    _waiters      = 0,   //等待线程数

  7.    _recursions   = 0;   //重入次数

  8.    _object       = NULL;  

  9.    _owner        = NULL;  //获得ObjectMonitor对象的线程

  10.    _WaitSet      = NULL;  //处于wait状态的线程,会被加入到waitSet

  11.    _WaitSetLock  = 0 ;

  12.    _Responsible  = NULL ;

  13.    _succ         = NULL ;

  14.    _cxq          = NULL ;

  15.    FreeNext      = NULL ;

  16.    _EntryList    = NULL ; //处于等待锁BLOCKED状态的线程

  17.    _SpinFreq     = 0 ;  

  18.    _SpinClock    = 0 ;

  19.    OwnerIsThread = 0 ;

  20.    _previous_owner_tid = 0; //监视器前一个拥有线程的ID

  21.  }

  22. ...

简单总结一下,同步块的实现使用 monitorentermonitorexit指令,而同步方法是依靠方法修饰符上的flag ACC_SYNCHRONIZED来完成。其本质是对一个对象监视器(monitor)进行获取,这个获取过程是排他的,也就是同一个时刻只能有一个线程获得由synchronized所保护对象的监视器。所谓的监视器,实际上可以理解为一个同步工具,它是由Java对象进行描述的。在Hotspot中,是通过ObjectMonitor来实现,每个对象中都会内置一个ObjectMonitor对象

(六) synchronized的源码分析

简单分析synchronized的源码

monitorentermonitorexit这两个指令来开始阅读源码,JVM将字节码加载到内存以后,会对这两个指令进行解释执行, monitorenter, monitorexit的指令解析是通过 InterpreterRuntime.cpp中的两个方法实现

  1. InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem)

  2. InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem)

  3. //JavaThread 当前获取锁的线程

  4. //BasicObjectLock 基础对象锁

我们基于monitorenter为入口,沿着偏向锁->轻量级锁->重量级锁的路径来分析synchronized的实现过程

  1. IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))

  2. #ifdef ASSERT

  3.  thread->last_frame().interpreter_frame_verify_monitor(elem);

  4. #endif

  5.  ...

  6.  if (UseBiasedLocking) {

  7.    // Retry fast entry if bias is revoked to avoid unnecessary inflation

  8.    ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);

  9.  } else {

  10.    ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);

  11.  }

  12.  ...

  13. #ifdef ASSERT

  14.  thread->last_frame().interpreter_frame_verify_monitor(elem);

  15. #endif

  16. IRT_END

UseBiasedLocking是在JVM启动的时候,是否启动偏向锁的标识

  • 如果支持偏向锁,则执行 ObjectSynchronizer::fast_enter的逻辑

  • 如果不支持偏向锁,则执行 ObjectSynchronizer::slow_enter逻辑,绕过偏向锁,直接进入轻量级锁

ObjectSynchronizer::fast_enter的实现在 synchronizer.cpp文件中,代码如下

  1. void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {

  2. if (UseBiasedLocking) { //判断是否开启了偏向锁

  3.    if (!SafepointSynchronize::is_at_safepoint()) { //如果不处于全局安全点

  4.      //通过`revoke_and_rebias`这个函数尝试获取偏向锁

  5.      BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);

  6.      if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {//如果是撤销与重偏向直接返回

  7.        return;

  8.      }

  9.    } else {//如果在安全点,撤销偏向锁

  10.      assert(!attempt_rebias, "can not rebias toward VM thread");

  11.      BiasedLocking::revoke_at_safepoint(obj);

  12.    }

  13.    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");

  14. }

  15.  

  16. slow_enter (obj, lock, THREAD) ;

  17. }

fast_enter方法的主要流程做一个简单的解释

  • 再次检查偏向锁是否开启

  • 当处于不安全点时,通过 revoke_and_rebias尝试获取偏向锁,如果成功则直接返回,如果失败则进入轻量级锁获取过程

  • revoke_and_rebias这个偏向锁的获取逻辑在 biasedLocking.cpp

  • 如果偏向锁未开启,则进入 slow_enter获取轻量级锁的流程

偏向锁的获取逻辑

BiasedLocking::revoke_and_rebias 是用来获取当前偏向锁的状态(可能是偏向锁撤销后重新偏向)。这个方法的逻辑在 biasedLocking.cpp

  1. BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {

  2.  assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");

  3.  markOop mark = obj->mark(); //获取锁对象的对象头

  4.  //判断mark是否为可偏向状态,即mark的偏向锁标志位为1,锁标志位为 01,线程id为null

  5.  if (mark->is_biased_anonymously() && !attempt_rebias) {

  6.    //这个分支是进行对象的hashCode计算时会进入,在一个非全局安全点进行偏向锁撤销

  7.    markOop biased_value       = mark;

  8.    //创建一个非偏向的markword

  9.    markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());

  10.    //Atomic:cmpxchg_ptr是CAS操作,通过cas重新设置偏向锁状态

  11.    markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);

  12.    if (res_mark == biased_value) {//如果CAS成功,返回偏向锁撤销状态

  13.      return BIAS_REVOKED;

  14.    }

  15.  } else if (mark->has_bias_pattern()) {//如果锁对象为可偏向状态(biased_lock:1, lock:01,不管线程id是否为空),尝试重新偏向

  16.    Klass* k = obj->klass();

  17.    markOop prototype_header = k->prototype_header();

  18.    //如果已经有线程对锁对象进行了全局锁定,则取消偏向锁操作

  19.    if (!prototype_header->has_bias_pattern()) {

  20.      markOop biased_value       = mark;

  21.      //CAS 更新对象头markword为非偏向锁

  22.      markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);

  23.      assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");

  24.      return BIAS_REVOKED; //返回偏向锁撤销状态

  25.    } else if (prototype_header->bias_epoch() != mark->bias_epoch()) {

  26.      //如果偏向锁过期,则进入当前分支

  27.      if (attempt_rebias) {//如果允许尝试获取偏向锁

  28.        assert(THREAD->is_Java_thread(), "");

  29.        markOop biased_value       = mark;

  30.        markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());

  31.        //通过CAS 操作, 将本线程的 ThreadID 、时间错、分代年龄尝试写入对象头中

  32.        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);

  33.        if (res_mark == biased_value) { //CAS成功,则返回撤销和重新偏向状态

  34.          return BIAS_REVOKED_AND_REBIASED;

  35.        }

  36.      } else {//不尝试获取偏向锁,则取消偏向锁

  37.        //通过CAS操作更新分代年龄

  38.        markOop biased_value       = mark;

  39.        markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());

  40.        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);

  41.        if (res_mark == biased_value) { //如果CAS操作成功,返回偏向锁撤销状态

  42.          return BIAS_REVOKED;

  43.        }

  44.      }

  45.    }

  46.  }

  47.  ...//省略

  48. }

偏向锁的撤销

当到达一个全局安全点时,这时会根据偏向锁的状态来判断是否需要撤销偏向锁,调用 revoke_at_safepoint方法,这个方法也是在 biasedLocking.cpp中定义的

  1. void BiasedLocking::revoke_at_safepoint(Handle h_obj) {

  2.  assert(SafepointSynchronize::is_at_safepoint(), "must only be called while at safepoint");

  3.  oop obj = h_obj();

  4.  //更新撤销偏向锁计数,并返回偏向锁撤销次数和偏向次数

  5.  HeuristicsResult heuristics = update_heuristics(obj, false);

  6.  if (heuristics == HR_SINGLE_REVOKE) {//可偏向且未达到批量处理的阈值(下面会单独解释)

  7.    revoke_bias(obj, false, false, NULL); //撤销偏向锁

  8.  } else if ((heuristics == HR_BULK_REBIAS) ||

  9.             (heuristics == HR_BULK_REVOKE)) {//如果是多次撤销或者多次偏向

  10.    //批量撤销

  11.    bulk_revoke_or_rebias_at_safepoint(obj, (heuristics == HR_BULK_REBIAS), false, NULL);

  12.  }

  13.  clean_up_cached_monitor_info();

  14. }

偏向锁的释放,需要等待全局安全点(在这个时间点上没有正在执行的字节码),首先暂停拥有偏向锁的线程,然后检查持有偏向锁的线程是否还活着,如果线程不处于活动状态,则将对象头设置成无锁状态。如果线程仍然活着,则会升级为轻量级锁,遍历偏向对象的所记录。栈帧中的锁记录和对象头的Mark Word要么重新偏向其他线程,要么恢复到无锁,或者标记对象不适合作为偏向锁。最后唤醒暂停的线程。

JVM内部为每个类维护了一个偏向锁revoke计数器,对偏向锁撤销进行计数,当这个值达到指定阈值时,JVM会认为这个类的偏向锁有问题,需要重新偏向(rebias),对所有属于这个类的对象进行重偏向的操作成为 批量重偏向(bulk rebias)。在做bulk rebias时,会对这个类的epoch的值做递增,这个epoch会存储在对象头中的epoch字段。在判断这个对象是否获得偏向锁的条件是:markword的 biased_lock:1、lock:01、threadid和当前线程id相等、epoch字段和所属类的epoch值相同,如果epoch的值不一样,要么就是撤销偏向锁、要么就是rebias; 如果这个类的revoke计数器的值继续增加到一个阈值,那么jvm会认为这个类不适合偏向锁,就需要进行bulk revoke操作

轻量级锁的获取逻辑

轻量级锁的获取,是调用 ::slow_enter方法,该方法同样位于 synchronizer.cpp文件中

  1. void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {

  2.  markOop mark = obj->mark();

  3.  assert(!mark->has_bias_pattern(), "should not see bias pattern here");

  4.  

  5.  if (mark->is_neutral()) { //如果当前是无锁状态, markword的biase_lock:0,lock:01

  6.    //直接把mark保存到BasicLock对象的_displaced_header字段

  7.    lock->set_displaced_header(mark);

  8.    //通过CAS将mark word更新为指向BasicLock对象的指针,更新成功表示获得了轻量级锁

  9.    if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {

  10.      TEVENT (slow_enter: release stacklock) ;

  11.      return ;

  12.    }

  13.    // Fall through to inflate() ...

  14.  }

  15.  //如果markword处于加锁状态、且markword中的ptr指针指向当前线程的栈帧,表示为重入操作,不需要争抢锁

  16.  else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {

  17.    assert(lock != mark->locker(), "must not re-lock the same lock");

  18.    assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");

  19.    lock->set_displaced_header(NULL);

  20.    return;

  21.  }

  22.  

  23. #if 0

  24.  // The following optimization isn't particularly useful.

  25.  if (mark->has_monitor() && mark->monitor()->is_entered(THREAD)) {

  26.    lock->set_displaced_header (NULL) ;

  27.    return ;

  28.  }

  29. #endif

  30.  //代码执行到这里,说明有多个线程竞争轻量级锁,轻量级锁通过`inflate`进行膨胀升级为重量级锁

  31.  lock->set_displaced_header(markOopDesc::unused_mark());

  32.  ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);

  33. }

轻量级锁的获取逻辑简单再整理一下

  1. mark->is_neutral()方法, is_neutral这个方法是在 markOop.hpp中定义,如果 biased_lock:0且lock:01表示无锁状态

  2. 如果mark处于无锁状态,则进入步骤(3),否则执行步骤(5)

  3. 把mark保存到BasicLock对象的displacedheader字段

  4. 通过CAS尝试将markword更新为指向BasicLock对象的指针,如果更新成功,表示竞争到锁,则执行同步代码,否则执行步骤(5)

  5. 如果当前mark处于加锁状态,且mark中的ptr指针指向当前线程的栈帧,则执行同步代码,否则说明有多个线程竞争轻量级锁,轻量级锁需要膨胀升级为重量级锁

轻量级锁的释放逻辑

轻量级锁的释放是通过 monitorexit调用

  1. IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))

  2. #ifdef ASSERT

  3.  thread->last_frame().interpreter_frame_verify_monitor(elem);

  4. #endif

  5.  Handle h_obj(thread, elem->obj());

  6.  assert(Universe::heap()->is_in_reserved_or_null(h_obj()),

  7.         "must be NULL or an object");

  8.  if (elem == NULL || h_obj()->is_unlocked()) {

  9.    THROW(vmSymbols::java_lang_IllegalMonitorStateException());

  10.  }

  11.  ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);

  12.  // Free entry. This must be done here, since a pending exception might be installed on

  13.  // exit. If it is not cleared, the exception handling code will try to unlock the monitor again.

  14.  elem->set_obj(NULL);

  15. #ifdef ASSERT

  16.  thread->last_frame().interpreter_frame_verify_monitor(elem);

  17. #endif

  18. IRT_END

这段代码中主要是通过 ObjectSynchronizer::slow_exit来执行

  1. void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {

  2.  fast_exit (object, lock, THREAD) ;

  3. }

ObjectSynchronizer::fast_exit的代码如下

  1. void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {

  2.  assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");

  3.  // if displaced header is null, the previous enter is recursive enter, no-op

  4.  markOop dhw = lock->displaced_header(); //获取锁对象中的对象头

  5.  markOop mark ;

  6.  if (dhw == NULL) {

  7.     // Recursive stack-lock.

  8.     // Diagnostics -- Could be: stack-locked, inflating, inflated.

  9.     mark = object->mark() ;

  10.     assert (!mark->is_neutral(), "invariant") ;

  11.     if (mark->has_locker() && mark != markOopDesc::INFLATING()) {

  12.        assert(THREAD->is_lock_owned((address)mark->locker()), "invariant") ;

  13.     }

  14.     if (mark->has_monitor()) {

  15.        ObjectMonitor * m = mark->monitor() ;

  16.        assert(((oop)(m->object()))->mark() == mark, "invariant") ;

  17.        assert(m->is_entered(THREAD), "invariant") ;

  18.     }

  19.     return ;

  20.  }

  21.  

  22.  mark = object->mark() ; //获取线程栈帧中锁记录(LockRecord)中的markword

  23.  

  24.  // If the object is stack-locked by the current thread, try to

  25.  // swing the displaced header from the box back to the mark.

  26.  if (mark == (markOop) lock) {

  27.     assert (dhw->is_neutral(), "invariant") ;

  28.     //通过CAS尝试将Displaced Mark Word替换回对象头,如果成功,表示锁释放成功。

  29.     if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {

  30.        TEVENT (fast_exit: release stacklock) ;

  31.        return;

  32.     }

  33.  }

  34.  //锁膨胀,调用重量级锁的释放锁方法

  35.  ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;

  36. }

轻量级锁的释放也比较简单,就是将当前线程栈帧中锁记录空间中的Mark Word替换到锁对象的对象头中,如果成功表示锁释放成功。否则,锁膨胀成重量级锁,实现重量级锁的释放锁逻辑

锁膨胀的过程分析

重量级锁是通过对象内部的监视器(monitor)来实现,而monitor的本质是依赖操作系统底层的MutexLock实现的。我们先来看锁的膨胀过程,从前面的分析中已经知道了所膨胀的过程是通过 ObjectSynchronizer::inflate方法实现的,代码如下

  1. ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {

  2.  // Inflate mutates the heap ...

  3.  // Relaxing assertion for bug 6320749.

  4.  assert (Universe::verify_in_progress() ||

  5.          !SafepointSynchronize::is_at_safepoint(), "invariant") ;

  6.  

  7.  for (;;) { //通过无意义的循环实现自旋操作

  8.      const markOop mark = object->mark() ;

  9.      assert (!mark->has_bias_pattern(), "invariant") ;

  10.  

  11.      if (mark->has_monitor()) {//has_monitor是markOop.hpp中的方法,如果为true表示当前锁已经是重量级锁了

  12.          ObjectMonitor * inf = mark->monitor() ;//获得重量级锁的对象监视器直接返回

  13.          assert (inf->header()->is_neutral(), "invariant");

  14.          assert (inf->object() == object, "invariant") ;

  15.          assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");

  16.          return inf ;

  17.      }

  18.  

  19.      if (mark == markOopDesc::INFLATING()) {//膨胀等待,表示存在线程正在膨胀,通过continue进行下一轮的膨胀

  20.         TEVENT (Inflate: spin while INFLATING) ;

  21.         ReadStableMark(object) ;

  22.         continue ;

  23.      }

  24.  

  25.      if (mark->has_locker()) {//表示当前锁为轻量级锁,以下是轻量级锁的膨胀逻辑

  26.          ObjectMonitor * m = omAlloc (Self) ;//获取一个可用的ObjectMonitor

  27.          // Optimistically prepare the objectmonitor - anticipate successful CAS

  28.          // We do this before the CAS in order to minimize the length of time

  29.          // in which INFLATING appears in the mark.

  30.          m->Recycle();

  31.          m->_Responsible  = NULL ;

  32.          m->OwnerIsThread = 0 ;

  33.          m->_recursions   = 0 ;

  34.          m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;   // Consider: maintain by type/class

  35.          /**将object->mark_addr()和mark比较,如果这两个值相等,则将object->mark_addr()

  36.          改成markOopDesc::INFLATING(),相等返回是mark,不相等返回的是object->mark_addr()**/

  37.                     markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;

  38.          if (cmp != mark) {//CAS失败

  39.             omRelease (Self, m, true) ;//释放监视器

  40.             continue ;       // 重试

  41.          }

  42.  

  43.          markOop dmw = mark->displaced_mark_helper() ;

  44.          assert (dmw->is_neutral(), "invariant") ;

  45.  

  46.          //CAS成功以后,设置ObjectMonitor相关属性

  47.          m->set_header(dmw) ;

  48.  

  49.  

  50.          m->set_owner(mark->locker());

  51.          m->set_object(object);

  52.          // TODO-FIXME: assert BasicLock->dhw != 0.

  53.  

  54.  

  55.          guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ;

  56.          object->release_set_mark(markOopDesc::encode(m));

  57.  

  58.  

  59.          if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;

  60.          TEVENT(Inflate: overwrite stacklock) ;

  61.          if (TraceMonitorInflation) {

  62.            if (object->is_instance()) {

  63.              ResourceMark rm;

  64.              tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",

  65.                (void *) object, (intptr_t) object->mark(),

  66.                object->klass()->external_name());

  67.            }

  68.          }

  69.          return m ; //返回ObjectMonitor

  70.      }

  71.      //如果是无锁状态

  72.      assert (mark->is_neutral(), "invariant");

  73.      ObjectMonitor * m = omAlloc (Self) ; ////获取一个可用的ObjectMonitor

  74.      //设置ObjectMonitor相关属性

  75.      m->Recycle();

  76.      m->set_header(mark);

  77.      m->set_owner(NULL);

  78.      m->set_object(object);

  79.      m->OwnerIsThread = 1 ;

  80.      m->_recursions   = 0 ;

  81.      m->_Responsible  = NULL ;

  82.      m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;       // consider: keep metastats by type/class

  83.      /**将object->mark_addr()和mark比较,如果这两个值相等,则将object->mark_addr()

  84.          改成markOopDesc::encode(m),相等返回是mark,不相等返回的是object->mark_addr()**/

  85.      if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {

  86.          //CAS失败,说明出现了锁竞争,则释放监视器重行竞争锁

  87.          m->set_object (NULL) ;

  88.          m->set_owner  (NULL) ;

  89.          m->OwnerIsThread = 0 ;

  90.          m->Recycle() ;

  91.          omRelease (Self, m, true) ;

  92.          m = NULL ;

  93.          continue ;

  94.          // interference - the markword changed - just retry.

  95.          // The state-transitions are one-way, so there's no chance of

  96.          // live-lock -- "Inflated" is an absorbing state.

  97.      }

  98.  

  99.      if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;

  100.      TEVENT(Inflate: overwrite neutral) ;

  101.      if (TraceMonitorInflation) {

  102.        if (object->is_instance()) {

  103.          ResourceMark rm;

  104.          tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",

  105.            (void *) object, (intptr_t) object->mark(),

  106.            object->klass()->external_name());

  107.        }

  108.      }

  109.      return m ; //返回ObjectMonitor对象

  110.  }

  111. }

锁膨胀的过程稍微有点复杂,整个锁膨胀的过程是通过自旋来完成的,具体的实现逻辑简答总结以下几点

  • 1. mark->has_monitor() 判断如果当前锁对象为重量级锁,也就是lock:10,则执行(2),否则执行(3)

  • 2.通过 mark->monitor获得重量级锁的对象监视器ObjectMonitor并返回,锁膨胀过程结束

  • 3.如果当前锁处于 INFLATING,说明有其他线程在执行锁膨胀,那么当前线程通过自旋等待其他线程锁膨胀完成

  • 4.如果当前是轻量级锁状态 mark->has_locker(),则进行锁膨胀。首先,通过omAlloc方法获得一个可用的ObjectMonitor,并设置初始数据;然后通过CAS将对象头设置为`markOopDesc:INFLATING,表示当前锁正在膨胀,如果CAS失败,继续自旋

  • 5.如果是无锁状态,逻辑类似第4步骤

锁膨胀的过程实际上是获得一个ObjectMonitor对象监视器,而真正抢占锁的逻辑,在 ObjectMonitor::enter方法里面

重量级锁的竞争逻辑

重量级锁的竞争,在 ObjectMonitor::enter方法中,代码文件在 objectMonitor.cpp重量级锁的代码就不一一分析了,简单说一下下面这段代码主要做的几件事

  • 通过CAS将monitor的 _owner字段设置为当前线程,如果设置成功,则直接返回

  • 如果之前的 _owner指向的是当前的线程,说明是重入,执行 _recursions++增加重入次数

  • 如果当前线程获取监视器锁成功,将 _recursions设置为1, _owner设置为当前线程

  • 如果获取锁失败,则等待锁释放

  1. void ATTR ObjectMonitor::enter(TRAPS) {

  2.  // The following code is ordered to check the most common cases first

  3.  // and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.

  4.  Thread * const Self = THREAD ;

  5.  void * cur ;

  6.  

  7.  cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;

  8.  if (cur == NULL) {//CAS成功

  9.     // Either ASSERT _recursions == 0 or explicitly set _recursions = 0.

  10.     assert (_recursions == 0   , "invariant") ;

  11.     assert (_owner      == Self, "invariant") ;

  12.     // CONSIDER: set or assert OwnerIsThread == 1

  13.     return ;

  14.  }

  15.  

  16.  if (cur == Self) {

  17.     // TODO-FIXME: check for integer overflow!  BUGID 6557169.

  18.     _recursions ++ ;

  19.     return ;

  20.  }

  21.  

  22.  if (Self->is_lock_owned ((address)cur)) {

  23.    assert (_recursions == 0, "internal state error");

  24.    _recursions = 1 ;

  25.    // Commute owner from a thread-specific on-stack BasicLockObject address to

  26.    // a full-fledged "Thread *".

  27.    _owner = Self ;

  28.    OwnerIsThread = 1 ;

  29.    return ;

  30.  }

  31.  

  32.  // We've encountered genuine contention.

  33.  assert (Self->_Stalled == 0, "invariant") ;

  34.  Self->_Stalled = intptr_t(this) ;

  35.  

  36.  // Try one round of spinning *before* enqueueing Self

  37.  // and before going through the awkward and expensive state

  38.  // transitions.  The following spin is strictly optional ...

  39.  // Note that if we acquire the monitor from an initial spin

  40.  // we forgo posting JVMTI events and firing DTRACE probes.

  41.  if (Knob_SpinEarly && TrySpin (Self) > 0) {

  42.     assert (_owner == Self      , "invariant") ;

  43.     assert (_recursions == 0    , "invariant") ;

  44.     assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;

  45.     Self->_Stalled = 0 ;

  46.     return ;

  47.  }

  48.  

  49.  assert (_owner != Self          , "invariant") ;

  50.  assert (_succ  != Self          , "invariant") ;

  51.  assert (Self->is_Java_thread()  , "invariant") ;

  52.  JavaThread * jt = (JavaThread *) Self ;

  53.  assert (!SafepointSynchronize::is_at_safepoint(), "invariant") ;

  54.  assert (jt->thread_state() != _thread_blocked   , "invariant") ;

  55.  assert (this->object() != NULL  , "invariant") ;

  56.  assert (_count >= 0, "invariant") ;

  57.  

  58.  // Prevent deflation at STW-time.  See deflate_idle_monitors() and is_busy().

  59.  // Ensure the object-monitor relationship remains stable while there's contention.

  60.  Atomic::inc_ptr(&_count);

  61.  

  62.  EventJavaMonitorEnter event;

  63.  

  64.  { // Change java thread status to indicate blocked on monitor enter.

  65.    JavaThreadBlockedOnMonitorEnterState jtbmes(jt, this);

  66.  

  67.    DTRACE_MONITOR_PROBE(contended__enter, this, object(), jt);

  68.    if (JvmtiExport::should_post_monitor_contended_enter()) {

  69.      JvmtiExport::post_monitor_contended_enter(jt, this);

  70.    }

  71.  

  72.    OSThreadContendState osts(Self->osthread());

  73.    ThreadBlockInVM tbivm(jt);

  74.  

  75.    Self->set_current_pending_monitor(this);

  76.  

  77.    // TODO-FIXME: change the following for(;;) loop to straight-line code.

  78.    for (;;) {

  79.      jt->set_suspend_equivalent();

  80.      // cleared by handle_special_suspend_equivalent_condition()

  81.      // or java_suspend_self()

  82.  

  83.      EnterI (THREAD) ;

  84.  

  85.      if (!ExitSuspendEquivalent(jt)) break ;

  86.  

  87.      //

  88.      // We have acquired the contended monitor, but while we were

  89.      // waiting another thread suspended us. We don't want to enter

  90.      // the monitor while suspended because that would surprise the

  91.      // thread that suspended us.

  92.      //

  93.          _recursions = 0 ;

  94.      _succ = NULL ;

  95.      exit (false, Self) ;

  96.  

  97.      jt->java_suspend_self();

  98.    }

  99.    Self->set_current_pending_monitor(NULL);

  100.  }

  101. ...//此处省略无数行代码

如果获取锁失败,则需要通过自旋的方式等待锁释放,自旋执行的方法是 ObjectMonitor::EnterI,部分代码如下

  • 将当前线程封装成ObjectWaiter对象node,状态设置成TS_CXQ

  • 通过自旋操作将node节点push到_cxq队列

  • node节点添加到_cxq队列之后,继续通过自旋尝试获取锁,如果在指定的阈值范围内没有获得锁,则通过park将当前线程挂起,等待被唤醒

  1. void ATTR ObjectMonitor::EnterI (TRAPS) {

  2.    Thread * Self = THREAD ;

  3.    ...//省略很多代码

  4.    ObjectWaiter node(Self) ;

  5.    Self->_ParkEvent->reset() ;

  6.    node._prev   = (ObjectWaiter *) 0xBAD ;

  7.    node.TState  = ObjectWaiter::TS_CXQ ;

  8.  

  9.    // Push "Self" onto the front of the _cxq.

  10.    // Once on cxq/EntryList, Self stays on-queue until it acquires the lock.

  11.    // Note that spinning tends to reduce the rate at which threads

  12.    // enqueue and dequeue on EntryList|cxq.

  13.    ObjectWaiter * nxt ;

  14.    for (;;) { //自旋,讲node添加到_cxq队列

  15.        node._next = nxt = _cxq ;

  16.        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

  17.  

  18.        // Interference - the CAS failed because _cxq changed.  Just retry.

  19.        // As an optional optimization we retry the lock.

  20.        if (TryLock (Self) > 0) {

  21.            assert (_succ != Self         , "invariant") ;

  22.            assert (_owner == Self        , "invariant") ;

  23.            assert (_Responsible != Self  , "invariant") ;

  24.            return ;

  25.        }

  26.    }

  27.    ...//省略很多代码

  28.    //node节点添加到_cxq队列之后,继续通过自旋尝试获取锁,如果在指定的阈值范围内没有获得锁,则通过park将当前线程挂起,等待被唤醒

  29.    for (;;) {

  30.        if (TryLock (Self) > 0) break ;

  31.        assert (_owner != Self, "invariant") ;

  32.  

  33.        if ((SyncFlags & 2) && _Responsible == NULL) {

  34.           Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;

  35.        }

  36.  

  37.        // park self //通过park挂起当前线程

  38.        if (_Responsible == Self || (SyncFlags & 1)) {

  39.            TEVENT (Inflated enter - park TIMED) ;

  40.            Self->_ParkEvent->park ((jlong) RecheckInterval) ;

  41.            // Increase the RecheckInterval, but clamp the value.

  42.            RecheckInterval *= 8 ;

  43.            if (RecheckInterval > 1000) RecheckInterval = 1000 ;

  44.        } else {

  45.            TEVENT (Inflated enter - park UNTIMED) ;

  46.            Self->_ParkEvent->park() ;//当前线程挂起

  47.        }

  48.  

  49.        if (TryLock(Self) > 0) break ; //当线程被唤醒时,会从这里继续执行

  50.  

  51.  

  52.        TEVENT (Inflated enter - Futile wakeup) ;

  53.        if (ObjectMonitor::_sync_FutileWakeups != NULL) {

  54.           ObjectMonitor::_sync_FutileWakeups->inc() ;

  55.        }

  56.        ++ nWakeups ;

  57.  

  58.        if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;

  59.  

  60.        if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {

  61.           Self->_ParkEvent->reset() ;

  62.           OrderAccess::fence() ;

  63.        }

  64.        if (_succ == Self) _succ = NULL ;

  65.  

  66.        

  67.        OrderAccess::fence() ;

  68.    }

  69.    ...//省略很多代码

  70. }

TryLock(self)的代码是在 ObjectMonitor::TryLock定义的,代码的实现如下

代码的实现原理很简单,通过自旋,CAS设置monitor的_owner字段为当前线程,如果成功,表示获取到了锁,如果失败,则继续被挂起

  1. int ObjectMonitor::TryLock (Thread * Self) {

  2.   for (;;) {

  3.      void * own = _owner ;

  4.      if (own != NULL) return 0 ;

  5.      if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {

  6.         // Either guarantee _recursions == 0 or set _recursions = 0.

  7.         assert (_recursions == 0, "invariant") ;

  8.         assert (_owner == Self, "invariant") ;

  9.         // CONSIDER: set or assert that OwnerIsThread == 1

  10.         return 1 ;

  11.      }

  12.      // The lock had been free momentarily, but we lost the race to the lock.

  13.      // Interference -- the CAS failed.

  14.      // We can either return -1 or retry.

  15.      // Retry doesn't make as much sense because the lock was just acquired.

  16.      if (true) return -1 ;

  17.   }

  18. }

重量级锁的释放

重量级锁的释放是通过 ObjectMonitor::exit来实现的,释放以后会通知被阻塞的线程去竞争锁

  • 判断当前锁对象中的owner没有指向当前线程,如果owner指向的BasicLock在当前线程栈上,那么将_owner指向当前线程

  • 如果当前锁对象中的_owner指向当前线程,则判断当前线程重入锁的次数,如果不为0,继续执行ObjectMonitor::exit(),直到重入锁次数为0为止

  • 释放当前锁,并根据QMode的模式判断,是否将_cxq中挂起的线程唤醒。还是其他操作

  1. void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {

  2.   Thread * Self = THREAD ;

  3.   if (THREAD != _owner) {//如果当前锁对象中的_owner没有指向当前线程

  4.     //如果_owner指向的BasicLock在当前线程栈上,那么将_owner指向当前线程

  5.     if (THREAD->is_lock_owned((address) _owner)) {

  6.       // Transmute _owner from a BasicLock pointer to a Thread address.

  7.       // We don't need to hold _mutex for this transition.

  8.       // Non-null to Non-null is safe as long as all readers can

  9.       // tolerate either flavor.

  10.       assert (_recursions == 0, "invariant") ;

  11.       _owner = THREAD ;

  12.       _recursions = 0 ;

  13.       OwnerIsThread = 1 ;

  14.     } else {

  15.       // NOTE: we need to handle unbalanced monitor enter/exit

  16.       // in native code by throwing an exception.

  17.       // TODO: Throw an IllegalMonitorStateException ?

  18.       TEVENT (Exit - Throw IMSX) ;

  19.       assert(false, "Non-balanced monitor enter/exit!");

  20.       if (false) {

  21.          THROW(vmSymbols::java_lang_IllegalMonitorStateException());

  22.       }

  23.       return;

  24.     }

  25.   }

  26.   //如果当前,线程重入锁的次数,不为0,那么就重新走ObjectMonitor::exit,直到重入锁次数为0为止

  27.   if (_recursions != 0) {

  28.     _recursions--;        // this is simple recursive enter

  29.     TEVENT (Inflated exit - recursive) ;

  30.     return ;

  31.   }

  32.  ...//此处省略很多代码

  33.  for (;;) {

  34.    if (Knob_ExitPolicy == 0) {

  35.      OrderAccess::release_store(&_owner, (void*)NULL);   //释放锁

  36.      OrderAccess::storeload();                        // See if we need to wake a successor

  37.      if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {

  38.        TEVENT(Inflated exit - simple egress);

  39.        return;

  40.      }

  41.      TEVENT(Inflated exit - complex egress);

  42.      //省略部分代码...

  43.    }

  44.    //省略部分代码...

  45.    ObjectWaiter * w = NULL;

  46.    int QMode = Knob_QMode;

  47.    //根据QMode的模式判断,

  48.    //如果QMode == 2则直接从_cxq挂起的线程中唤醒    

  49.    if (QMode == 2 && _cxq != NULL) {

  50.      w = _cxq;

  51.      ExitEpilog(Self, w);

  52.      return;

  53.    }

  54.     //省略部分代码... 省略的代码为根据QMode的不同,不同的唤醒机制

  55.  }

  56. }

根据不同的策略(由QMode指定),从cxq或EntryList中获取头节点,通过ObjectMonitor::ExitEpilog方法唤醒该节点封装的线程,唤醒操作最终由unpark完成

  1. void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {

  2.   assert (_owner == Self, "invariant") ;

  3.  

  4.   // Exit protocol:

  5.   // 1. ST _succ = wakee

  6.   // 2. membar #loadstore|#storestore;

  7.   // 2. ST _owner = NULL

  8.   // 3. unpark(wakee)

  9.  

  10.   _succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;

  11.   ParkEvent * Trigger = Wakee->_event ;

  12.  

  13.   // Hygiene -- once we've set _owner = NULL we can't safely dereference Wakee again.

  14.   // The thread associated with Wakee may have grabbed the lock and "Wakee" may be

  15.   // out-of-scope (non-extant).

  16.   Wakee  = NULL ;

  17.  

  18.   // Drop the lock

  19.   OrderAccess::release_store_ptr (&_owner, NULL) ;

  20.   OrderAccess::fence() ;                               // ST _owner vs LD in unpark()

  21.  

  22.   if (SafepointSynchronize::do_call_back()) {

  23.      TEVENT (unpark before SAFEPOINT) ;

  24.   }

  25.  

  26.   DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);

  27.   Trigger->unpark() ; //unpark唤醒线程

  28.  

  29.   // Maintain stats and report events to JVMTI

  30.   if (ObjectMonitor::_sync_Parks != NULL) {

  31.      ObjectMonitor::_sync_Parks->inc() ;

  32.   }

  33. }

分析源码,需要很大的耐心,希望大家能有耐心看下去。

(六) synchronized的源码分析