public class LockTest {

    private int sum1;
    private AtomicInteger sum2 = new AtomicInteger(0);
    private int sum3;
    private ReentrantLock lock;

     * 悲观锁,使用对象的 Monitor 锁
    public synchronized void increase1() {

     * 悲观锁,使用 ReentrantLock 提供的锁
    public void increase3() {

     * 乐观锁,下层通过 CAS 实现
    public void increase2() {

2.1 乐观锁实现

这里我们简单地说一下乐观锁使用到的CAS,关于悲观锁的内容我们后面再说,CAS全称Compare And Swap(比较与交换),是一种无锁算法。在不使用锁(没有线程被阻塞)的情况下实现多线程之间的变量同步。java.util.concurrent包中的原子类就是通过CAS来实现了乐观锁。


  • 需要读写的内存值V
  • 进行比较的值A
  • 要写入的新值B


public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
        } catch (Exception ex) { throw new Error(ex); }

    private volatile int value;


    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;



public final int getAndAddInt(Object o, long offset, int delta) {
    int previousValue;
    do {
        previousValue = this.getIntVolatile(o, offset);
    } while(!this.compareAndSwapInt(o, offset, previousValue, previousValue + delta));

    return var5;



// unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, 
        jobject obj, jlong offset, jint e, jint x))
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;

代码中能看到cmpxchg有基于各个平台的实现,这里我选择Linux X86平台下的源码分析:

// atomic_linux_x86.inline.hpp
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;

// Adding a lock prefix to an instruction on MP machine
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "




asm ( assembler template
    : output operands                  /* optional */
    : input operands                   /* optional */
    : list of clobbered registers      /* optional */


  • template就是cmpxchgl %1, (%3)表示汇编模板
  • output operands表示输出操作数, =a对应eax寄存器
  • input operand表示输入参数,%1就是exchange_value, %3dest, %4就是mpr表示任意寄存器,a还是eax寄存器
  • list of clobbered registers就是些额外参数,cc表示编译器cmpxchgl的执行将影响到标志寄存器,memory告诉编译器要重新从内存中读取变量的最新值

Linux X86平台下,最终JDK通过CPUcmpxchgl指令的支持,实现AtomicIntegerCAS操作的原子性。虽然我们说CAS是无锁化的设计,但是在机器指令这一层面来看实际上也会使用到内存锁定,才能达到原子化的目标。

2.2 乐观锁的问题


  1. ABA问题。CAS需要在操作值的时候检查内存值是否发生变化,没有发生变化才会更新内存值。但是如果内存值原来是A,后来变成了B,然后又变成了A,那么CAS进行检查时会发现值没有发生变化,但是实际上是有变化的。ABA问题的解决思路就是在变量前面添加版本号,每次变量更新的时候都把版本号加一,这样变化过程就从“A-B-A”变成了“1A-2B-3A”。
    • JDK1.5开始提供了AtomicStampedReference类来解决ABA问题,具体操作封装在compareAndSet()中。compareAndSet()首先检查当前引用和当前标志与预期引用和预期标志是否相等,如果都相等,则以原子方式将引用值和标志的值设置为给定的更新值。
  2. 循环时间长开销大。CAS操作如果长时间不成功,会导致其一直自旋,给CPU带来非常大的开销。
  3. 只能保证一个共享变量的原子操作。对一个共享变量执行操作时,CAS能够保证原子操作,但是对多个共享变量操作时,CAS是无法保证操作的原子性的。
    • JavaJDK 1.5开始提供了AtomicReference类来保证引用对象之间的原子性,可以把多个变量放在一个对象里来进行CAS操作。CAS操作比较的是对象的地址。
// java.util.concurrent.atomic.AtomicStampedReference#compareAndSet

* Atomically sets the value of both the reference and stamp
* to the given update values if the
* current reference is {@code ==} to the expected reference
* and the current stamp is equal to the expected stamp.


* @param expectedReference the expected value of the reference
* @param newReference the new value for the reference
* @param expectedStamp the expected value of the stamp
* @param newStamp the new value for the stamp
* @return {@code true} if successful

public boolean compareAndSet(V expectedReference,
                            V newReference,
                            int expectedStamp,
                            int newStamp) {
   Pair<V> current = pair;
       expectedReference == current.reference &&
       expectedStamp == current.stamp &&
       ((newReference == current.reference &&
         newStamp == current.stamp) ||
        casPair(current, Pair.of(newReference, newStamp)));

private boolean casPair(Pair<V> cmp, Pair<V> val) {
   return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);


// Unsafe.h
virtual jboolean compareAndSwapObject(::java::lang::Object *, jlong, ::java::lang::Object *,
         ::java::lang::Object *);

// natUnsafe.cc
static inline bool
compareAndSwap (volatile jobject *addr, jobject old, jobject new_val)
    jboolean result = false;
    spinlock lock;

    // 如果字段的地址与期望的地址相等则将字段的地址更新
    if ((result = (*addr == old)))
        *addr = new_val;
    return result;

// natUnsafe.cc
sun::misc::Unsafe::compareAndSwapObject (jobject obj, jlong offset,
                     jobject expect, jobject update)
    // 获取字段地址并转换为字符串
    jobject *addr = (jobject*)((char *) obj + offset);
    // 调用 compareAndSwap 方法进行比较
    return compareAndSwap (addr, expect, update);




而为了让当前线程“稍等一下”,我们需让当前线程进行自旋,如果在自旋完成后前面锁定同步资源的线程已经释放了锁,那么当前线程就可以不必阻塞而是直接获取同步资源,从而避免切换线程的开销。这就是自旋锁。这很类似于前面AtomicInteger的实现方式,AtomicInteger CAS的期待值是previousValue,目标值可能是任意一个数,而一般在自旋锁中,用0表示未锁定状态,用1表示锁定状态,那么加锁过程CAS的期待值就是0,目标值就是1

public class SpinLock {

   private AtomicBoolean locked = new AtomicBoolean(false);

   public void lock() {
      while (!locked.compareAndSet(false, true));

   public void unlock() {


既然自旋锁的等待过程这么浪费CPU,那么我们有没有什么办法能够改善它呢?JDK 6中引入了自适应的自旋锁(适应性自旋锁)。自适应意味着自旋的时间(次数)不再固定,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也是很有可能再次成功,进而它将允许自旋等待持续相对更长的时间。如果对于某个锁,自旋很少成功获得过,那在以后尝试获取这个锁时将可能省略掉自旋过程,直接阻塞线程,避免浪费处理器资源。





 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() && // 先检查任务队列中是否有排在自己前面的线程等待
            compareAndSetState(0, acquires)) { // 没有的话,自己再试图获取锁
            return true;
    } else if (current == getExclusiveOwnerThread()) {
        // 如果已经是自己持有锁,则计数器加1
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        return true;
    return false;

 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) { // 非公平锁直接试图加锁,不检查等待队列
            return true;
    } else if (current == getExclusiveOwnerThread()) {
        // 如果已经是自己持有锁,则计数器加1
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        return true;
    return false;


 * Queries whether any threads have been waiting to acquire longer
 * than the current thread.
 * <p>An invocation of this method is equivalent to (but may be
 * more efficient than):
 *  <pre> {@code
 * getFirstQueuedThread() != Thread.currentThread() &&
 * hasQueuedThreads()}</pre>
 * <p>Note that because cancellations due to interrupts and
 * timeouts may occur at any time, a {@code true} return does not
 * guarantee that some other thread will acquire before the current
 * thread.  Likewise, it is possible for another thread to win a
 * race to enqueue after this method has returned {@code false},
 * due to the queue being empty.
 * <p>This method is designed to be used by a fair synchronizer to
 * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
 * Such a synchronizer's {@link #tryAcquire} method should return
 * {@code false}, and its {@link #tryAcquireShared} method should
 * return a negative value, if this method returns {@code true}
 * (unless this is a reentrant acquire).  For example, the {@code
 * tryAcquire} method for a fair, reentrant, exclusive mode
 * synchronizer might look like this:
 *  <pre> {@code
 * protected boolean tryAcquire(int arg) {
 *   if (isHeldExclusively()) {
 *     // A reentrant acquire; increment hold count
 *     return true;
 *   } else if (hasQueuedPredecessors()) {
 *     return false;
 *   } else {
 *     // try to acquire normally
 *   }
 * }}</pre>
 * @return {@code true} if there is a queued thread preceding the
 *         current thread, and {@code false} if the current thread
 *         is at the head of the queue or the queue is empty
 * @since 1.7
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    // 队列中第一个节点是一个虚节点,如果 tail == head 说明只存在该虚节点,不存在实际数据
    // 当 tail != head 时判断 head.next 是不是等于 null,等于 null 也说明不存在实际数据,
    // 最后如果 head.next.thread == Thread.currentThread() 的话说明持有锁的是当前线程
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());




public class LockTest{

    public synchronized void doSomeThing() {

    public synchronized void doOtherThings() {
        // doOtherThings



 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            return true;
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires; // 通过一个 int 来保存重入次数
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        return true;
    return false;


protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 直到该值为0时,才真正的释放锁
        free = true;
    return free;


protected boolean tryAcquire(int acquires) {
    if (compareAndSetState(0, 1)) {
        owner = Thread.currentThread();
        return true;
    return false;

protected boolean tryRelease(int releases) {
    if (Thread.currentThread() != owner) {
        throw new IllegalMonitorStateException();
    owner = null;
    return true;






public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** Performs all synchronization mechanics */
    final Sync sync;

public interface ReadWriteLock {
     * Returns the lock used for reading.
     * @return the lock used for reading
    Lock readLock();

     * Returns the lock used for writing.
     * @return the lock used for writing
    Lock writeLock();


public static class ReadLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -5992448646407690164L;
    private final Sync sync;

    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
public static class WriteLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -4992448646407690164L;
    private final Sync sync;

    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;





  1. 首先获取当前的状态state

  2. 取状态值的后16位作为writeState

  3. 如果当前状态值state不为0,则说明要么写锁被占用要么读锁被占用

    ①. 如果writeState0,说明有读锁占用,获取锁失败,也就是说如果一个线程先获取读锁,再获取写锁的话,会发生死锁

    ②. 如果writeState不为0,需要看一下当前线程和持有锁的线程是否是同一个线程,是的话就加锁成功(重入锁),否则加锁失败

  4. 走到第四步说明state为空,这时候,检查writerShouldBlock它实际上是为了实现公平锁而出现的,

    ①. 如果是公平锁writerShouldBlock的内容就是确认队列中有没有前驱节点

    ②. 非公平锁则直接是false

  5. 如果第四步返回了false,就尝试进行CAS加锁,如果CAS失败,则加锁失败,否则加锁成功,将当前持有锁的线程置为当前线程。

protected final boolean tryAcquire(int acquires) {
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // w == 0 说明有线程持有读锁,w != 0 需要检查当前持有排它锁的线程是不是当前线程,不是的话就加锁失败
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 当前线程重入,计数器增加
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    return true;

static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

static final int SHARED_SHIFT   = 16;

static final class NonfairSync extends Sync {
    final boolean writerShouldBlock() {
        return false; // writers can always barge

static final class FairSync extends Sync {
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();




  1. 如果writeState不为0,并且持有该写锁的线程不是当前线程,则直接返回失败,否则尝试获得锁

    ①. 如果先获得了写锁,再尝试获取读锁则不会发生死锁,这和前面的先获得读锁,再获得写锁不同,之所以这么设计可以参考文档:

Additionally, a writer can acquire the read lock, but not vice-versa. Among other applications, reentrancy can be useful when write locks are held during calls or callbacks to methods that perform reads under read locks. If a reader tries to acquire the write lock it will never succeed.

  1. 接下来判断当前线程是否需要阻塞:

    ①. 对于公平锁来说,只要队列中有前序节点则阻塞,因为只有读锁时,是不需要入队的,队列不为空,说明队列中存在或者存在过一个排它锁,这里直接入队等待

    ②. 对于非公平锁来说,只要当前等待队列的第一个Waiter是排他锁,就阻塞,也就是下面的apparentlyFirstQueuedIsExclusive函数

  2. 如果不需要阻塞,则判断重入次数是否溢出,没溢出的话就通过CAS修改state的共享锁部分,即前16位,每次增加1<<16

  3. 如果上述过程成功获得了锁,这里还有一道工序,就是保存当前线程的读锁持有数,本质上说通过一个ThreadLocal来保存这个读锁持有数即可,但是出于性能的考虑(ThreadLocal性能不够好),这里另外增加了firstReaderfirstReaderHoldCount来保存第一个获得读锁的线程重入数,此外还使用cachedHoldCounter来保存上一次调用时的线程的重入数,来达到加速的目的

  4. 当上述过程仍然没有获得锁时,进入fullTryAcquireShared逻辑

protected final int tryAcquireShared(int unused) {
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
    Thread current = Thread.currentThread();
    int c = getState();
    // 如果排它锁不为0,并且持有排它锁的线程不是当前线程,就返回
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    // 根据是不是公平锁,做出阻塞决定,读锁数是否溢出,检查通过的话通过 cas 加锁
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
        return 1;
    return fullTryAcquireShared(current);

 * Returns {@code true} if the apparent first queued thread, if one
 * exists, is waiting in exclusive mode.  If this method returns
 * {@code true}, and the current thread is attempting to acquire in
 * shared mode (that is, this method is invoked from {@link
 * #tryAcquireShared}) then it is guaranteed that the current thread
 * is not the first queued thread.  Used only as a heuristic in
 * ReentrantReadWriteLock.
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;

static final int SHARED_SHIFT   = 16;

static final int SHARED_UNIT    = (1 << SHARED_SHIFT);

static final class NonfairSync extends Sync {
    final boolean readerShouldBlock() {
        /* As a heuristic to avoid indefinite writer starvation,
         * block if the thread that momentarily appears to be head
         * of queue, if one exists, is a waiting writer.  This is
         * only a probabilistic effect since a new reader will not
         * block if there is a waiting writer behind other enabled
         * readers that have not yet drained from the queue.
        return apparentlyFirstQueuedIsExclusive();

  * Fair version of Sync
static final class FairSync extends Sync {
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();


  1. 整个fullTryAcquireShared就是一个循环CAS的过程

  2. 如果发现写锁被占用,并且持有写锁的的进程不是当前线程,则获取锁失败

  3. 然后检查是否需要阻塞(这里有一个额外的判断:如果当前线程是读锁重入,也不会进行阻塞):

    ①. 对于公平锁来说,只要队列中有前序节点则阻塞(说明之前有人成功获取过写锁,获取正在获取写锁,才会出现队列)

    ②. 对于非公平锁来说,只要当前等待队列的第一个Waiter是排他锁,就阻塞,也就是上述的apparentlyFirstQueuedIsExclusive函数,这里大家可能会有一个疑问:为什么只判断队列头不是写锁即可,如果等待队列是Read->Read->Write不就有问题了吗(读锁插队写锁),确实会存在这个问题,apparentlyFirstQueuedIsExclusive只能在一定程度上缓解读锁插队写锁的情况,因为并不是公平锁,所以没有必要完全保证读锁不能插队写锁

  4. 处理外阻塞逻辑后,判断重入数是否溢出,溢出则抛出异常

  5. 最后通过CAS,进行state读锁段+ 1,修改成功的话,就维护当前线程的读锁持有数

 * Full version of acquire for reads, that handles CAS misses
 * and reentrant reads not dealt with in tryAcquireShared.
final int fullTryAcquireShared(Thread current) {
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        // 如果排它锁不为0,并且持有排它锁的线程不是当前线程,就返回
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // 如果当前线程获取过读锁,现在这次是读锁重入的话,就不进行阻塞,否则才会阻塞
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                if (rh.count == 0)
                    return -1;
        // 读锁数是否溢出
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 通过 cas 加锁, 然后维护读锁持有数
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                cachedHoldCounter = rh; // cache for release
            return 1;


