ReentrantLock源碼分析

前言

ReentrantLock這個類,相信大家多多少少在項目里都會去用到這個類,但我相信大部分人都沒去研究過源碼。我在這里把我學習這個類的一些經驗和心得分享出來,希望對大家有所幫助,水平有限,文章中有錯誤的地方也請不吝指正,共同進步。

學習基礎

這個類依舊是根據(jù)aqs框架去實現(xiàn)的,如果不知道什么是aqs的,可以去看看我寫的aqs的學習筆記aqs獨占鎖aqs共享鎖的源碼,aqs是基礎,關于aqs的部分,本文將不再贅述。

引導

關于ReentrantLock的源碼解析,百度上沒有一萬個人寫,也至少有八千個人寫,大家隨便百度一篇原創(chuàng),我相信大家都會有很大收獲。所以這篇文章,我想從產品設計的角度去還原Doug Lea是如何開發(fā)出來ReentrantLock這個類的。還原僅靠猜測,不對的地方還原指正。

功能規(guī)劃

我們假設自己就是Doug Lea,當前jdk是1.4版本。我們需要在源碼里提供一個非常方便的加鎖的工具類,名字已經想好了,就叫ReentrantLock,目的是開發(fā)者在使用的時候可以很方便的進行加鎖,解鎖。而此時java中只有一個synchronized關鍵字。既然我們要開發(fā)一個新的工具類,我們肯定是希望比synchronized更好用,覆蓋的場景更廣泛。所以我們首先分析synchronized的缺點(1.4版本的缺點):

  1. synchronized加鎖的代價太大。
  2. synchronized不支持操作部分線程,釋放鎖的時候等同于喚醒全部。
  3. synchronized毫無公平性可言。
  4. 鎖的粒度太粗,一個實例的多個synchronized方法不能同時被多個線程訪問。
  5. synchronized在阻塞的過程中不可被中斷。
  6. synchronized超時時會一直鎖住,甚至會造成死鎖。

設計

第一點:
早期synchronized獲取不到鎖就會被阻塞,而阻塞線程需要cpu從用戶態(tài)轉換到內核態(tài),這是一個很消耗資源的操作,所以我們需要開發(fā)一個api級別的框架,這個框架就是aqs,aqs利用cas自旋和阻塞,多次cas獲取不到資源的時候再去阻塞,被喚醒的線程再使用cas去獲取資源,不是單純的cas自旋或者阻塞,這樣就同時兼顧了性能和效率。這樣,我們開發(fā)了一個aqs框架作為ReentrantLock的基礎,來解決synchronized加鎖代價大的問題。同時也提供了一個可擴展的aqs框架。
(synchronized以前確實是很笨重, 但是在各個版本優(yōu)化之后,現(xiàn)在的性能和ReentrantLock是相差無幾的,官方建議是如果synchronized能實現(xiàn)你的需求,那么盡量使用synchronized,具體的優(yōu)化大家可以百度一下。)

第二點:
我們想自定義規(guī)則讓部分線程去爭搶資源,讓部分線程不去爭搶資源。我們需要使用到aqs的node節(jié)點鏈表。但是我們知道,node隊列是輪詢喚醒全部,沒有對線程做區(qū)分,我們只想要操作部分線程。于是我們想到一種方法:
我們根據(jù)我們的定義,把線程歸類,然后把歸好類的線程放到一個個單獨的隊列里,我們需要操作哪部分線程, 就把這部分線程所在的隊列的所有線程,放到aqs隊列里,剩下的事情我們交給aqs,這樣我們就做到了操作部分線程,而不影響其他線程。

第三點
我們需要設計一個公平鎖,但是非公平鎖也肯定要有,因為非公平鎖性能更好,而且大部分人可能并不關心公平性。所以我們需要在代碼里對公平和非公平分開實現(xiàn)。

第四點
synchronized鎖粒度太粗,這跟synchronized的實現(xiàn)機制有關,因為synchronized是jvm去實現(xiàn)的,是對對象和類的操控。所以synchronized方法獲取的是對象鎖或者類鎖,當synchronized鎖住實例或者類對象的時候,其他線程再去獲取會被阻塞。而我們設計了ReentrantLock,這樣我們需要幾把鎖,在我們的類里定義幾個成員變量就可以了,各個ReentrantLock對象之間互不影響,這樣這個問題也就不解自解了。

第五點
synchronized在阻塞的過程中是忽略中斷的,只有在獲取到鎖的時候才會去響應中斷。為了解決這個問題我們需要在ReentrantLock設計一個可以檢測中斷的lock方法, 我們暫且命名為lockInterruptibly()。aqs中分為cas和阻塞喚醒兩個過程,我們可以在自旋cas中加入檢測中斷的邏輯,這樣我們在獲取鎖的部分過程中就做到了檢測中斷。

第六點
synchronized代碼塊中,如果碰到一段代碼偶爾需要執(zhí)行1分鐘或者兩分鐘,那其他的線程獲取鎖等待的時間就太長了,那顯然不是我們能接受的,所以我們在ReentrantLock設計一個根據(jù)我們設定時間去獲取鎖的操作,超過我們期望的時間就不再去獲取鎖, 直接返回結果。

設計工作已經做完了,我們接下來就是把我們的需求翻譯成代碼。

實現(xiàn)公平非公平(copy)

公平鎖和非公平鎖,區(qū)別只有在tryAcquire()方法上,所有會有很多可以共用的代碼。為了代碼美觀,我們先寫一個公平鎖和非公平鎖都可以繼承的基類,這個類就是Sync,Sync的功能依賴aqs,所以我們要繼承aqs。

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    /**
           * Performs {@link Lock#lock}. The main reason for subclassing
           * is to allow fast path for nonfair version.
           */
    abstract void lock();

    /**
           * Performs non-fair tryLock.  tryAcquire is implemented in
           * subclasses, but both need nonfair try for trylock method.
           */
    // 非公平鎖獲取資源
    final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
        if (compareAndSetState(0, acquires)) {
          setExclusiveOwnerThread(current);
          return true;
        }
      }
      else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
          throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
      }
      return false;
    }
    // 釋放資源
    protected final boolean tryRelease(int releases) {
      int c = getState() - releases;
      if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
      boolean free = false;
      if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
      }
      setState(c);
      return free;
    }
    // 判斷當前占用線程是否是當前線程
    protected final boolean isHeldExclusively() {
      // While we must in general read state before owner,
      // we don't need to do so to check if current thread is owner
      return getExclusiveOwnerThread() == Thread.currentThread();
    }

    final ConditionObject newCondition() {
      return new ConditionObject();
    }

    // Methods relayed from outer class
    // 獲取當前占用線程
    final Thread getOwner() {
      return getState() == 0 ? null : getExclusiveOwnerThread();
    }
    // 判斷當前線程占用了多少資源
    final int getHoldCount() {
      return isHeldExclusively() ? getState() : 0;
    }
    //  判斷是否已加鎖
    final boolean isLocked() {
      return getState() != 0;
    }

    /**
           * Reconstitutes the instance from a stream (that is, deserializes it).
           */
    private void readObject(java.io.ObjectInputStream s)
      throws java.io.IOException, ClassNotFoundException {
      s.defaultReadObject();
      setState(0); // reset to unlocked state
    }
}

然后我們定義一個公平鎖類和一個非公平鎖類,繼承這個Sync

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
    final void lock() {
      if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
      else
        acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
      return nonfairTryAcquire(acquires);
    }
}
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
      acquire(1);
    }

    /**
           * Fair version of tryAcquire.  Don't grant access unless
           * recursive call or no waiters or is first.
           */
    protected final boolean tryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
          setExclusiveOwnerThread(current);
          return true;
        }
      }
      else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
          throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
      }
      return false;
    }
}

這樣,公平鎖和非公平鎖就實現(xiàn)了,他們倆唯一的區(qū)別就是公平鎖在獲取資源的時候,會調用hasQueuedPredecessors()方法判斷一下是否存在等待隊列,如果有就去乖乖排隊,這樣就保證了公平性,先來后到。

public ReentrantLock() {
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

我們用構造方法區(qū)控制使用者想要什么類型的鎖,至此,公平和非公平我們已經實現(xiàn)。

實現(xiàn)操作部分線程

我們已經有了思路,定義一個個隊列來分別存放線程,那么我們可以把這一個個線程定義為對象的一個屬性,所以我們定義一個接口,來操作這些線程。這個類就是Condition

public interface Condition {

    void await() throws InterruptedException;
    
    void awaitUninterruptibly();

    long awaitNanos(long nanosTimeout) throws InterruptedException;

    boolean await(long time, TimeUnit unit) throws InterruptedException;

    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal();

    void signalAll();
}

同樣,定義一個類來保存這個隊列。

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}
public Condition newCondition() {
    return sync.newCondition();
}
final ConditionObject newCondition() {
    return new ConditionObject();
}

我們只需要調用newCondition()方法就可以獲得一個保存一類線程的對象。ConditionObject對象中還有很多方法,我想單獨寫一篇文章來說明。
至此,操作部分線程的功能我們也實現(xiàn)了,這其實就是ReentrantLock的Condition場景的應用。

實現(xiàn)可中斷

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
    throws InterruptedException {
    // 獲取鎖之前校驗一下線程的中斷狀態(tài)
    if (Thread.interrupted())
      throw new InterruptedException();
    if (!tryAcquire(arg))
      doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
      for (;;) {
        final Node p = node.predecessor();
        if (p == head && tryAcquire(arg)) {
          setHead(node);
          p.next = null; // help GC
          failed = false;
          return;
        }
        // 自旋里這個if判斷至少會走兩邊,每走一遍都去判斷一下線程的中斷狀態(tài)
        if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
          throw new InterruptedException();
      }
    } finally {
      if (failed)
        cancelAcquire(node);
    }
}

中斷操作已經在代碼里注釋說明。

獲取鎖設定超時時間

public boolean tryLock(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  throws InterruptedException {
    if (Thread.interrupted())
      throw new InterruptedException();
    return tryAcquire(arg) ||
      doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
  throws InterruptedException {
    if (nanosTimeout <= 0L)
      return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
      for (;;) {
        final Node p = node.predecessor();
        if (p == head && tryAcquire(arg)) {
          setHead(node);
          p.next = null; // help GC
          failed = false;
          return true;
        }
        nanosTimeout = deadline - System.nanoTime();
        if (nanosTimeout <= 0L)
          return false;
        if (shouldParkAfterFailedAcquire(p, node) &&
            nanosTimeout > spinForTimeoutThreshold)
          LockSupport.parkNanos(this, nanosTimeout);
        if (Thread.interrupted())
          throw new InterruptedException();
      }
    } finally {
      if (failed)
        cancelAcquire(node);
    }
}

代碼很簡單,就是獲取鎖和每次自旋cas過程前,判斷一下是否超時,如果超時,直接返回false

總結

至此,ReentrantLock的大部分功能我們已經實現(xiàn)了,當你去翻閱源碼的時候。你會發(fā)現(xiàn),ReentrantLock里也就這么點東西,剩下的方法無非是開發(fā)者在開發(fā)時封裝了一些公用的方法。大家對比一下我們平時開發(fā)的過程,流程是不是非常的相似:

  • 確定需求
  • 確定需求實現(xiàn)的方式
  • 書寫代碼
  • 對代碼重構,封裝公共方法

由此可見,一個類,我們只要知道了設計這個類的初衷是為了解決什么問題,然后我們再去看源碼里作者是如何解決這個問題的,用什么方式去解決這個問題。
再深入一點我們可以研究一下,用這種方式去解決這個問題,但是代碼為什么要這么寫,這么寫有什么好處?如果是我們來實現(xiàn)這個功能,我們的代碼跟作者的代碼孰優(yōu)孰劣?一番比較下來,我們可以學習源碼里對方法的優(yōu)雅封裝,對變量的合理使用以及一些對異常情況的處理方式,源碼里真的是行行都是精髓。

寫后感

自從開始寫博客到現(xiàn)在,我發(fā)現(xiàn)我對于一件事情的梳理更快速和規(guī)范了,我覺得這有利于我思考,這得益于我每寫一篇博客做的總結,這也是一種我認為的提升自己的方式。
比如這篇文章所講的ReentrantLock和synchronized,在寫這篇文章之前,如果你問我他們兩個有什么區(qū)別,我的回答就是不知道,因為我不能條理清晰的去告訴你這兩個的區(qū)別和優(yōu)劣。也就是說原理我懂一點,但是我不知道如何組織語言。現(xiàn)在如果你們再問我,那么我可以很自信的跟你說個一二三來。這篇文章中有我以前就懂的知識,也有我在寫的時候一知半解而去學習的知識。我把這些知識用我的語言梳理脈絡去講給你們聽,我自身也在一個高度學習的狀態(tài)。贈人玫瑰,手有余香。如果大家到了一個學習的瓶頸期,不妨也試試寫博客這種方式。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容