(六)手撕并發(fā)編程之基于Semaphore與CountDownLatch分析AQS共享模式實(shí)現(xiàn)

引言

在上篇文章深入剖析并發(fā)之AQS獨(dú)占鎖&重入鎖(ReetrantLock)及Condition實(shí)現(xiàn)原理中我們?cè)赗eetrantLock鎖分析了AQS獨(dú)占模式的實(shí)現(xiàn)原理,本章則準(zhǔn)備從Semaphore信號(hào)量的角度出發(fā)一探AQS共享模式的具體實(shí)現(xiàn)。共享模式與獨(dú)占模式區(qū)別在于:共享模式下允許多條線程同時(shí)獲取鎖資源,而在之前分析的獨(dú)占模式中,在同一時(shí)刻只允許一條線程持有鎖資源。

一、快速認(rèn)識(shí)Semaphore信號(hào)量及實(shí)戰(zhàn)

Semaphore信號(hào)量是java.util.concurrent(JUC)包下的一個(gè)并發(fā)工具類,可以用來控制同一時(shí)刻訪問臨界資源(共享資源)的線程數(shù),以確保訪問臨界資源的線程能夠正確、合理的使用公共資源。而其內(nèi)部則于ReetrantLock一樣,都是通過直接或間接的調(diào)用AQS框架的方法實(shí)現(xiàn)。在Semaphore中存在一個(gè)“許可”的概念:

在初始化Semaphore信號(hào)量需要為這個(gè)許可傳入一個(gè)數(shù)值,該數(shù)值表示表示同一時(shí)刻可以訪問臨界資源的最大線程數(shù),也被稱為許可集。一條線程想要訪問臨界資源則需要先執(zhí)行acquire()獲取一個(gè)許可,如果線程在獲取時(shí)許可集已經(jīng)被分配完了,那么該線程則會(huì)進(jìn)入阻塞等待狀態(tài),直至有其他持有許可的線程釋放后才有可能獲取到許可。當(dāng)線程訪問完成臨界資源后則需要執(zhí)行release()方法釋放已獲取的許可。

其實(shí)通過如上這段描述,我們不難發(fā)現(xiàn),Semaphore信號(hào)量里面的“許可”概念與前面我們文章中,分析的互斥鎖中的“同步狀態(tài)標(biāo)識(shí)”有著異曲同工之妙,其實(shí)也就是我們所談的“鎖資源”。下面我們可以簡(jiǎn)單看看Semaphore類中所提供的方法:

// 調(diào)用該方法后線程會(huì)從許可集中嘗試獲取一個(gè)許可
public void acquire()

// 線程調(diào)用該方法時(shí)會(huì)釋放已獲取的許可
public void release()

// Semaphore構(gòu)造方法:permits→許可集數(shù)量
Semaphore(int permits) 

// Semaphore構(gòu)造方法:permits→許可集數(shù)量,fair→公平與非公平
Semaphore(int permits, boolean fair) 

// 從信號(hào)量中獲取許可,該方法不響應(yīng)中斷
void acquireUninterruptibly() 

// 返回當(dāng)前信號(hào)量中未被獲取的許可數(shù)
int availablePermits() 

// 獲取并返回當(dāng)前信號(hào)量中立即未被獲取的所有許可
int drainPermits() 

// 返回等待獲取許可的所有線程Collection集合
protected  Collection<Thread> getQueuedThreads();

// 返回等待獲取許可的線程估計(jì)數(shù)量
int getQueueLength() 

// 查詢是否有線程正在等待獲取當(dāng)前信號(hào)量中的許可
boolean hasQueuedThreads() 

// 返回當(dāng)前信號(hào)量的公平類型,如為公平鎖返回true,非公平鎖為false
boolean isFair() 

// 獲取當(dāng)前信號(hào)量中一個(gè)許可,當(dāng)沒有許可可用時(shí)直接返回false不阻塞線程
boolean tryAcquire() 

// 在給定時(shí)間內(nèi)獲取當(dāng)前信號(hào)量中一個(gè)許可,超時(shí)還未獲取成功則返回false
boolean tryAcquire(long timeout, TimeUnit unit) 

如上便是Semaphore信號(hào)量提供的一些主要方法,下面我們可以上個(gè)簡(jiǎn)單小案例演示,需求如下:

現(xiàn)在項(xiàng)目中有個(gè)需求,每晚需要長(zhǎng)時(shí)間處理大量的Excel表數(shù)據(jù)與數(shù)據(jù)庫中數(shù)據(jù)對(duì)賬請(qǐng)求,由于文件讀取屬于IO密集型任務(wù),我們可以使用多線程的方式優(yōu)化,加速處理速度。但是在該項(xiàng)目中因?yàn)檫€有其他業(yè)務(wù)要處理,為了保證整體性能,所以對(duì)于該業(yè)務(wù)的實(shí)現(xiàn)最多只能使用三個(gè)數(shù)據(jù)庫連接對(duì)象。因?yàn)槿绻?dāng)前業(yè)務(wù)線程同一時(shí)刻獲取的數(shù)據(jù)庫連接數(shù)量過多,會(huì)導(dǎo)致其他業(yè)務(wù)線程需要操作數(shù)據(jù)庫時(shí)獲取不到連接對(duì)象阻塞(因?yàn)閿?shù)據(jù)庫連接對(duì)象與線程對(duì)象一樣數(shù)據(jù)珍惜資源/資源有限),從而引發(fā)整體程序堆積大量客戶端請(qǐng)求導(dǎo)致系統(tǒng)整體癱瘓。這時(shí)我們就需要控制同一時(shí)刻最多只有三條線程拿到數(shù)據(jù)庫連接進(jìn)行操作,此時(shí)就可以使用Semaphore做流量控制。

public class SemaphoreDemo {
    public static void main(String[] args) {
        // 自定義線程池(后續(xù)文章會(huì)詳細(xì)分析到)
        // 環(huán)境:四核四線程CPU 任務(wù)阻塞系數(shù)0.9
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                4*2, 40,
                30, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1024*10),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
        // 設(shè)置信號(hào)量同一時(shí)刻最大線程數(shù)為3
        final Semaphore semaphore = new Semaphore(3);
        // 模擬100個(gè)對(duì)賬請(qǐng)求
        for (int index = 0; index < 100; index++) {
            final int serial = index;
            threadPool.execute(()->{
                try {
                    // 使用acquire()獲取許可
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() +
                            "線程成功獲取許可!請(qǐng)求序號(hào): " + serial);
                    // 模擬數(shù)據(jù)庫IO
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }  finally {
                    // 臨界資源訪問結(jié)束后釋放許可
                    semaphore.release();
                }
            });
        }
        // 關(guān)閉線程池資源
        threadPool.shutdown();
    }
}

上述代碼中,在創(chuàng)建Semaphore信號(hào)量對(duì)象時(shí)為該對(duì)象初始化了三個(gè)許可,也就意味著在同一時(shí)刻允許三條線程同時(shí)訪問臨界資源。線程在訪問臨界資源之前,需要使用acquire()先成功獲取一個(gè)許可,才能訪問臨界資源。如果一條線程獲取許可,該信號(hào)量對(duì)象的許可集已經(jīng)被分配完時(shí),新來的線程需進(jìn)入等待狀態(tài)。之前獲取許可成功的線程在操作完成之后需執(zhí)行release()方法釋放已獲取的許可。我們執(zhí)行如上案例則可看到執(zhí)行結(jié)果幾乎每隔一千毫秒會(huì)出現(xiàn)三條線程同時(shí)訪問,如下:

/*
第一秒:程序運(yùn)行初
  pool-1-thread-1線程成功獲取許可!請(qǐng)求序號(hào): 0
  pool-1-thread-2線程成功獲取許可!請(qǐng)求序號(hào): 1
  pool-1-thread-3線程成功獲取許可!請(qǐng)求序號(hào): 2
第二秒:程序運(yùn)行1000ms后
  pool-1-thread-4線程成功獲取許可!請(qǐng)求序號(hào): 3
  pool-1-thread-5線程成功獲取許可!請(qǐng)求序號(hào): 4
  pool-1-thread-6線程成功獲取許可!請(qǐng)求序號(hào): 5
第三秒:程序運(yùn)行2000ms后
  pool-1-thread-7線程成功獲取許可!請(qǐng)求序號(hào): 6
  pool-1-thread-8線程成功獲取許可!請(qǐng)求序號(hào): 7
  pool-1-thread-2線程成功獲取許可!請(qǐng)求序號(hào): 8
第四秒:程序運(yùn)行3000ms后
  ........
*/

如上便是一個(gè)簡(jiǎn)單使用Demo,總體看來關(guān)于Semaphore信號(hào)量的用法還是比較簡(jiǎn)單的。不過我們也在前面提到過這么一句話:

Semaphore信號(hào)量里的“許可”概念與前面我們文章中分析的互斥鎖的“同步狀態(tài)標(biāo)識(shí)”有著異曲同工之妙。

那我們能否使用Semaphore信號(hào)量實(shí)現(xiàn)一把獨(dú)占鎖呢?答案也是肯定的,可以。我們只需要在創(chuàng)建信號(hào)量對(duì)象時(shí),只給許可集分配一個(gè)數(shù)量即可,如下:

final Semaphore semaphore = new Semaphore(1);

二、Semaphore信號(hào)量中AQS的共享模式實(shí)現(xiàn)

Semaphore信號(hào)量其實(shí)與我們上篇文章所分析的ReetrantLock類結(jié)構(gòu)大致相同,其內(nèi)部存在繼承自AbstractQueuedSynchronizer內(nèi)部Sync類以及它的兩個(gè)子類:FairSync公平鎖類和NofairSync非公平鎖類,從這我們也可以看出,Semaphore的內(nèi)部實(shí)現(xiàn)其實(shí)與ReetrantLock一樣都是基于AQS組件實(shí)現(xiàn)的。在上一篇文章中我們也曾提到,AQS設(shè)計(jì)的初衷并不打算直接作為調(diào)用類對(duì)外暴露服務(wù),而只是作為并發(fā)包基礎(chǔ)組件,為其他并發(fā)工具類提供基礎(chǔ)設(shè)施,如維護(hù)同步隊(duì)列、控制/修改同步狀態(tài)等。具體的獲取鎖和釋放鎖的邏輯則交給子類自己去實(shí)現(xiàn),從而也能最大程度的保留框架的靈活性。因此無論是Semaphore還是ReetrantLock都需要獨(dú)自實(shí)現(xiàn)tryAcquireShared(int arg)獲取鎖方法以及tryReleaseShared(int arg)釋放鎖方法。AQS總體類圖關(guān)系如下:

AQS DML類圖結(jié)構(gòu)

如上圖,Semaphore與ReetrantLock的結(jié)構(gòu)大致相同,而實(shí)現(xiàn)思路也大致相同,獲取鎖(許可)的方法tryAcquireShared(arg)分別由兩個(gè)子類FairSync和NofairSync實(shí)現(xiàn),因?yàn)楣芥i和非公平鎖的加鎖方式畢竟存在些許不同,而釋放鎖tryReleaseShared(arg)的邏輯則交由Sync實(shí)現(xiàn),因?yàn)獒尫挪僮鞫际窍嗤?,因此放在父類Sync中實(shí)現(xiàn)自然是最好的方式。下面我們就從Semaphore源碼的角度分析AQS共享模式的具體實(shí)現(xiàn)原理,我們先從非公平鎖的獲取鎖實(shí)現(xiàn)開始。

2.1、AQS共享模式之Semaphore的NofairSync非公平鎖實(shí)現(xiàn)

我們?cè)趧?chuàng)建Semaphore對(duì)象時(shí),也和ReetrantLock一樣手動(dòng)選擇公平鎖和非公平鎖:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

通過Semaphore的構(gòu)造函數(shù)我們不難發(fā)現(xiàn),如果在我們創(chuàng)建時(shí)不選擇聲明公平類型,Semaphore默認(rèn)創(chuàng)建的是非公平鎖類型,NonfairSync構(gòu)造如下:

static final class NonfairSync extends Sync {
    // 構(gòu)造函數(shù):將給定的許可數(shù)permits傳給父類同步狀態(tài)標(biāo)識(shí)state
    NonfairSync(int permits) {
          super(permits);
    }
   // 釋放鎖的方法實(shí)現(xiàn)則是直接調(diào)用父類Sync的釋放鎖方法
   protected int tryAcquireShared(int acquires) {
       return nonfairTryAcquireShared(acquires);
   }
}

從如上源碼中,我們可以得知Semaphore中的非公平鎖NonfairSync類的構(gòu)造函數(shù)是基于調(diào)用父類Sync構(gòu)造函數(shù)完成的,而在創(chuàng)建Semaphore對(duì)象時(shí)傳入的許可數(shù)permits最終則會(huì)傳遞給AQS同步器的同步狀態(tài)標(biāo)識(shí)state,如下:

// 父類 - Sync類構(gòu)造函數(shù)
Sync(int permits) {
    setState(permits); // 調(diào)用AQS內(nèi)部的set方法
}

// AQS(AbstractQueuedSynchronizer)同步器
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer {
    // 同步狀態(tài)標(biāo)識(shí)
    private volatile int state;
    
    protected final int getState() {
        return state;
    }
    protected final void setState(int newState) {
        state = newState;
    }
    // 對(duì)state變量進(jìn)行CAS操作
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
}

從上述分析中可知,Semaphore對(duì)象創(chuàng)建時(shí)傳入的許可數(shù)permits,實(shí)則其實(shí)最終是在對(duì)AQS內(nèi)部的state進(jìn)行初始化。初始化完成后,state代表著當(dāng)前信號(hào)量對(duì)象的可用許可數(shù)。

2.1.1、信號(hào)量中非公平鎖NonfairSync獲取許可/鎖實(shí)現(xiàn)

我們?cè)谑褂肧emaphore時(shí)獲取鎖是調(diào)用Semaphore.acquire()方法,,調(diào)用該方法的線程會(huì)開始獲取鎖/許可,嘗試對(duì)permits/state進(jìn)行CAS加一,CAS成功則代表獲取成功。下面我們來分析一下Semaphore獲取許可的方法acquire()的具體實(shí)現(xiàn),源碼如下:

// Semaphore類 → acquire()方法
public void acquire() throws InterruptedException {
      // Sync類繼承AQS,此處直接調(diào)用AQS內(nèi)部的acquireSharedInterruptibly()方法
      sync.acquireSharedInterruptibly(1);
  }

// AbstractQueuedSynchronizer類 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 判斷是否出現(xiàn)線程中斷信號(hào)(標(biāo)志)
    if (Thread.interrupted())
        throw new InterruptedException();
    // 如果tryAcquireShared(arg)執(zhí)行結(jié)果不小于0,則線程獲取同步狀態(tài)成功
    if (tryAcquireShared(arg) < 0)
        // 未獲取成功加入同步隊(duì)列阻塞等待
        doAcquireSharedInterruptibly(arg);
}

信號(hào)量獲取許可的方法acquire()最終是通過Sync對(duì)象調(diào)用AQS內(nèi)部的acquireSharedInterruptibly()方法完成的,而acquireSharedInterruptibly()在獲取同步狀態(tài)標(biāo)識(shí)的過程中是可以響應(yīng)線程中斷操作的,如果該操作沒有沒中斷,則首先調(diào)用tryAcquireShared(arg)嘗試獲取一個(gè)許可數(shù),獲取成功則返回執(zhí)行業(yè)務(wù),方法結(jié)束。如果獲取失敗,則調(diào)用doAcquireSharedInterruptibly(arg)將當(dāng)前線程加入同步隊(duì)列阻塞等待。不過值得我們注意的是:tryAcquireShared(arg)方法是AQS提供的模板方法,并沒有提供具體實(shí)現(xiàn),而是把具體實(shí)現(xiàn)的邏輯交由子類完成,我們先看看信號(hào)量中非公平鎖NonfairSync類的實(shí)現(xiàn):

    // Semaphore類 → NofairSync內(nèi)部類 → tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
    // 調(diào)用了父類Sync中的實(shí)現(xiàn)方法
    return nonfairTryAcquireShared(acquires);
}

// Syn類 → nonfairTryAcquireShared()方法
abstract static class Sync extends AbstractQueuedSynchronizer {
    final int nonfairTryAcquireShared(int acquires) {
         // 開啟自旋死循環(huán)
         for (;;) {
             int available = getState();
             int remaining = available - acquires;
             // 判斷信號(hào)量中可用許可數(shù)是否已<0或者CAS執(zhí)行是否成功
             if (remaining < 0 ||
                 compareAndSetState(available, remaining))
                 return remaining;
         }
     }
}

nonfairTryAcquireShared(acquires)方法首先獲取到state值后,減去一得到remaining值,如果不小于0則代表著當(dāng)前信號(hào)量中還存在可用許可,當(dāng)前線程開始嘗試cas更新state值,cas成功則代表獲取同步狀態(tài)成功,返回remaining值。反之,如果remaining值小于0則代表著信號(hào)量中的許可數(shù)已被其他線程獲取,目前不存在可用許可數(shù),直接返回小于0的remaining值,nonfairTryAcquireShared(acquires)方法執(zhí)行結(jié)束,回到AQS的acquireSharedInterruptibly()方法。當(dāng)返回的remaining值小于0時(shí),if(tryAcquireShared(arg)<0)條件成立,進(jìn)入if執(zhí)行doAcquireSharedInterruptibly(arg)方法將當(dāng)前線程加入同步隊(duì)列阻塞,等待其他線程釋放同步狀態(tài)。線程入列方法如下:

// AbstractQueuedSynchronizer類 → doAcquireSharedInterruptibly()方法
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 創(chuàng)建節(jié)點(diǎn)狀態(tài)為Node.SHARED共享模式的節(jié)點(diǎn)并將其加入同步隊(duì)列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
     // 開啟自旋操作
     for (;;) {
         final Node p = node.predecessor();
         // 判斷前驅(qū)節(jié)點(diǎn)是否為head
         if (p == head) {
             // 嘗試獲取同步狀態(tài)state
             int r = tryAcquireShared(arg);
             // 如果r不小于0說明獲取同步狀態(tài)成功
             if (r >= 0) {
                 // 將當(dāng)前線程結(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)并喚醒后繼節(jié)點(diǎn)線程             
                 setHeadAndPropagate(node, r);
                 p.next = null; // 置空方便GC
                 failed = false;
                 return;
             }
         }
       // 調(diào)整同步隊(duì)列中node節(jié)點(diǎn)的狀態(tài)并判斷是否應(yīng)該被掛起 
       // 并判斷是否存在中斷信號(hào),如果需要中斷直接拋出異常結(jié)束執(zhí)行
         if (shouldParkAfterFailedAcquire(p, node) &&
             parkAndCheckInterrupt())
             throw new InterruptedException();
     }
    } finally {
     if (failed)
         // 結(jié)束該節(jié)點(diǎn)線程的請(qǐng)求
         cancelAcquire(node);
    }
}

// AbstractQueuedSynchronizer類 → setHeadAndPropagate()方法
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 獲取同步隊(duì)列中原本的head頭節(jié)點(diǎn)
    setHead(node); // 將傳入的node節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
    /*
     * propagate=剩余可用許可數(shù),h=舊的head節(jié)點(diǎn)
     * h==null,(h=head)==null:
     *      非空判斷的標(biāo)準(zhǔn)寫法,避免原本head以及新的頭節(jié)點(diǎn)node為空
     * 如果當(dāng)前信號(hào)量對(duì)象中剩余可用許可數(shù)大于0或者
     * 原本頭節(jié)點(diǎn)h或者新的頭節(jié)點(diǎn)node不是結(jié)束狀態(tài)則喚醒后繼節(jié)點(diǎn)線程
     * 
     * 寫兩個(gè)if的原因在于避免造成不必要的喚醒,因?yàn)楹苡锌赡軉拘蚜撕罄m(xù)
     * 節(jié)點(diǎn)的線程之后,還沒有線程釋放許可/鎖,從而導(dǎo)致再次陷入阻塞
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 避免傳入的node為同步隊(duì)列的唯一節(jié)點(diǎn),
        // 因?yàn)殛?duì)列中如果只存在node一個(gè)節(jié)點(diǎn),那么后驅(qū)節(jié)點(diǎn)s必然為空
        if (s == null || s.isShared())
            doReleaseShared(); // 喚醒后繼節(jié)點(diǎn)
    }
}

doAcquireSharedInterruptibly(arg)方法中總共做了三件事:

  • 一、創(chuàng)建一個(gè)狀態(tài)為Node.SHARED共享模式的節(jié)點(diǎn),并通過addWaiter()加入隊(duì)列
  • 二、加入成功后開啟自旋,判斷前驅(qū)節(jié)點(diǎn)是否為head,是則嘗試獲取同步狀態(tài)標(biāo)識(shí),獲取成功后,將自己設(shè)置為head節(jié)點(diǎn),如果可用許可數(shù)大于0則喚醒后繼節(jié)點(diǎn)的線程
  • 三、如果前驅(qū)節(jié)點(diǎn)不為head的節(jié)點(diǎn)以及前驅(qū)節(jié)點(diǎn)為head節(jié)點(diǎn)但獲取同步狀態(tài)失敗的節(jié)點(diǎn),則調(diào)用shouldParkAfterFailedAcquire(p,node)判斷前驅(qū)節(jié)點(diǎn)的狀態(tài)是否為SIGNAL狀態(tài)(一般shouldParkAfterFailedAcquire(p,node)中的for循環(huán)至少需要執(zhí)行兩次以上才會(huì)返回ture,第一次把前驅(qū)節(jié)點(diǎn)設(shè)置為SIGNAL狀態(tài),第二次檢測(cè)到SIGNAL狀態(tài)),如果是則調(diào)用parkAndCheckInterrupt()掛起當(dāng)前線程并返回線程中斷狀態(tài)

如上便是doAcquireSharedInterruptibly(arg)方法的大概工作,接下來我們可以看看shouldParkAfterFailedAcquire()以及parkAndCheckInterrupt()方法:

// AbstractQueuedSynchronizer類 → shouldParkAfterFailedAcquire()方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 獲取當(dāng)前節(jié)點(diǎn)的等待狀態(tài)
    int ws = pred.waitStatus;
    // 如果為等待喚醒(SIGNAL)狀態(tài)則返回true
    if (ws == Node.SIGNAL)
        return true;
    // 如果當(dāng)前節(jié)點(diǎn)等待狀態(tài)大于0則說明是結(jié)束狀態(tài),
    // 遍歷前驅(qū)節(jié)點(diǎn)直到找到?jīng)]有結(jié)束狀態(tài)的節(jié)點(diǎn)
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果當(dāng)前節(jié)點(diǎn)等待狀態(tài)小于0又不是SIGNAL狀態(tài),
        // 則將其設(shè)置為SIGNAL狀態(tài),代表該節(jié)點(diǎn)的線程正在等待喚醒
        // 也就是代表節(jié)點(diǎn)是剛從Condition的條件等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列,
        // 節(jié)點(diǎn)狀態(tài)為CONDITION狀態(tài)(Semaphore中不存在condition的概念,
        // 所以同步隊(duì)列不會(huì)出現(xiàn)這個(gè)狀態(tài)的節(jié)點(diǎn),此處代碼不會(huì)執(zhí)行)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

// AbstractQueuedSynchronizer類 → parkAndCheckInterrupt()方法
private final boolean parkAndCheckInterrupt() {
    // 將當(dāng)前線程掛起
    LockSupport.park(this);
    // 獲取線程中斷狀態(tài),interrupted()是判斷當(dāng)前中斷狀態(tài),
    // 而并不是中斷線程,線程需要中斷返回true,反之false
    return Thread.interrupted();
}

LockSupport → park()方法
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    // 設(shè)置當(dāng)前線程的監(jiān)視器blocker
    setBlocker(t, blocker);
    // 調(diào)用了native方法到JVM級(jí)別的阻塞機(jī)制阻塞當(dāng)前線程
    UNSAFE.park(false, 0L);
    // 阻塞結(jié)束后把blocker置空
    setBlocker(t, null);
}

shouldParkAfterFailedAcquire()方法的作用是判斷節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是否為等待喚醒狀態(tài)(SIGNAL狀態(tài)),如果是則返回true。如果前驅(qū)節(jié)點(diǎn)的waitStatus大于0(只有CANCELLED結(jié)束狀態(tài)=1>0),既代表該前驅(qū)節(jié)點(diǎn)已沒有用了,應(yīng)該從同步隊(duì)列移除,執(zhí)行do/while循環(huán)遍歷所有前前驅(qū)節(jié)點(diǎn),直到尋找到非CANCELLED結(jié)束狀態(tài)的節(jié)點(diǎn)。如果節(jié)點(diǎn)狀態(tài)為SIGNAL等待喚醒狀態(tài)則直接調(diào)用parkAndCheckInterrupt()掛起當(dāng)前線程。至此整個(gè)Semaphore.acquire()獲取許可的方法流程結(jié)束。如下圖:

AQS之圖解共享式獲取鎖過程

如上圖,在AQS同步器中存在一個(gè)變量state,Semaphore信號(hào)量對(duì)象在初始化時(shí)傳遞的permits許可數(shù)會(huì)間接的賦值給AQS中的state同步標(biāo)識(shí),而permits/state則代表著同一時(shí)刻可同時(shí)訪問臨界/共享資源的最大線程數(shù)。當(dāng)一條線程調(diào)用Semaphore.acquire()獲取許可時(shí),會(huì)首先判斷state是否大于0,如果大于則代表還有可用許可數(shù),state減1,線程獲取成功并返回執(zhí)行。直到state為零時(shí),代表著當(dāng)前信號(hào)量已經(jīng)不存在可用許可數(shù)了,后續(xù)請(qǐng)求的線程則需要封裝成Node節(jié)點(diǎn)并將其加入同步隊(duì)列開啟自旋操作直至有線程釋放許可(state加一)。

至此,AQS共享模式中非公平鎖的獲取鎖原理分析完畢。但是我們?nèi)缟戏治龅氖强身憫?yīng)線程中斷請(qǐng)求的獲取許可方式,而Semaphore中也實(shí)現(xiàn)了一套不可中斷式的獲取方法,如下:

// Semaphore類 → acquireUninterruptibly()方法
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

// AbstractQueuedSynchronizer類 → acquireShared()方法
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

// AbstractQueuedSynchronizer類 → doAcquireShared()方法
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 在前面的可中斷式獲取鎖方法中此處是直接拋出異常強(qiáng)制中斷線程的
                // 而在不可中斷式的獲取方法中,這里是沒有拋出異常中斷線程的
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

觀察如上源碼不難發(fā)現(xiàn),可響應(yīng)線程中斷的方法與不可響應(yīng)線程中斷的方法區(qū)別在于:

可響應(yīng)線程中斷的方法在每次操作之前會(huì)先檢測(cè)線程中斷信號(hào),如果線程需要中斷操作,則直接拋出異常強(qiáng)制中斷線程的執(zhí)行。反之,不可響應(yīng)線程中斷的方法不會(huì)檢測(cè)線程中斷信號(hào),而且不會(huì)拋出異常強(qiáng)制中斷。

2.1.2、信號(hào)量中非公平鎖NonfairSync釋放許可/鎖實(shí)現(xiàn)

使用Semaphore時(shí)釋放鎖則調(diào)用的是Semaphore.release()方法,調(diào)用該方法之后線程持有的許可會(huì)被釋放,同時(shí)permits/state加一,接下來Semaphore獲取許可的方法release()的具體實(shí)現(xiàn),源碼如下:

// Semaphore類 → release()方法
public void release() {
    sync.releaseShared(1);
}

// AbstractQueuedSynchronizer類 → releaseShared(arg)方法
public final boolean releaseShared(int arg) {
    // 調(diào)用子類Semaphore中tryReleaseShared()方法實(shí)現(xiàn)
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

與之前獲取許可的方法一樣,Semaphore釋放許可的方法release()也是通過間接調(diào)用AQS內(nèi)部的releaseShared(arg)完成。因?yàn)锳QS的releaseShared(arg)是魔法方法,所以最終的邏輯實(shí)現(xiàn)由Semaphore的子類Sync完成,如下:

// Semaphore類 → Sync子類 → tryReleaseShared()方法
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 獲取AQS中當(dāng)前同步狀態(tài)state值
        int current = getState();
        // 對(duì)當(dāng)前的state值進(jìn)行增加操作
        int next = current + releases;
        // 不可能出現(xiàn),除非傳入的releases為負(fù)數(shù)
        if (next < current) 
            throw new Error("Maximum permit count exceeded");
        // CAS更新state值為增加之后的next值
        if (compareAndSetState(current, next))
            return true;
    }
}

釋放鎖/許可的方法邏輯相對(duì)來說比較簡(jiǎn)單,對(duì)AQS中的state加一釋放獲取的同步狀態(tài)。不過值得注意的是:在我們上篇文章分享的AQS獨(dú)占模式實(shí)現(xiàn)中,釋放鎖的邏輯中是沒有保證線程安全的,因?yàn)楠?dú)占模式的釋放鎖邏輯永遠(yuǎn)只會(huì)存在一條線程同時(shí)操作。而在共享模式中,可能會(huì)存在多條線程同時(shí)釋放許可/鎖資源,所以在此處使用了CAS+自旋的方式保證線程安全問題。

如果此處tryReleaseShared(releases)CAS更新成功,那么則會(huì)進(jìn)入if(tryReleaseShared(arg))中執(zhí)行doReleaseShared();喚醒后繼節(jié)點(diǎn)線程。

// AbstractQueuedSynchronizer類 → doReleaseShared()方法
private void doReleaseShared() {
    /*
     * 為了防止釋放過程中有其他線程進(jìn)入隊(duì)列,這里必須開啟自旋
     * 如果頭節(jié)點(diǎn)設(shè)置失敗則重新檢測(cè)繼續(xù)循環(huán)
     */
    for (;;) {
        // 獲取隊(duì)列head頭節(jié)點(diǎn)
        Node h = head; 
        // 如果頭節(jié)點(diǎn)不為空并且隊(duì)列中還存在其他節(jié)點(diǎn)
        if (h != null && h != tail) { 
            // 獲取頭節(jié)點(diǎn)的節(jié)點(diǎn)狀態(tài)
            int ws = h.waitStatus; 
            // 如果節(jié)點(diǎn)狀態(tài)為SIGNAL等待喚醒狀態(tài)則代表
            if (ws == Node.SIGNAL) { 
                // 嘗試cas修改節(jié)點(diǎn)狀態(tài)值為0
                // 失敗則繼續(xù)下次循環(huán)
                // 成功則喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;         
                unparkSuccessor(h);  // 喚醒后繼節(jié)點(diǎn)線程
            }
            // 節(jié)點(diǎn)狀態(tài)為0時(shí)嘗試將節(jié)點(diǎn)狀態(tài)修改為PROPAGATE傳播狀態(tài)
            // 失敗則跳出循環(huán)繼續(xù)下次循環(huán)
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;               
        }
        // 如果當(dāng)前隊(duì)列頭節(jié)點(diǎn)發(fā)生變化繼續(xù)循環(huán),反之則終止自旋
        if (h == head)
            break;
    }
}
// AbstractQueuedSynchronizer類 → unparkSuccessor()方法
// 參數(shù):傳入需要喚醒后繼節(jié)點(diǎn)的節(jié)點(diǎn)
private void unparkSuccessor(Node node) {
    // 獲取node節(jié)點(diǎn)的線程狀態(tài)
    int ws = node.waitStatus;
    if (ws < 0)
        // 設(shè)置head節(jié)點(diǎn)為0
        compareAndSetWaitStatus(node, ws, 0);
    // 獲取后繼節(jié)點(diǎn)
    Node s = node.next;
    // 如果后繼節(jié)點(diǎn)為空或線程狀態(tài)已經(jīng)結(jié)束
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 遍歷整個(gè)隊(duì)列拿到可喚醒的節(jié)點(diǎn)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 喚醒后繼節(jié)點(diǎn)線程
        LockSupport.unpark(s.thread);
}

doReleaseShared()方法直接在執(zhí)行中獲取head節(jié)點(diǎn),通過調(diào)用unparkSuccessor()方法喚醒head后繼節(jié)點(diǎn)中的線程。而因?yàn)樵摲椒w的邏輯是一個(gè)for(;;){}死循環(huán),退出的條件為:只當(dāng)隊(duì)頭不發(fā)生改變時(shí)才退出,如果發(fā)生改變了則代表著一定有其他線程在當(dāng)前線程釋放鎖/許可的過程中獲取到了鎖,所以當(dāng)前循環(huán)會(huì)繼續(xù),在第二次循環(huán)過程中,在滿足條件h.waitStauts==0的情況下,這里會(huì)把head節(jié)點(diǎn)的waitStauts設(shè)置為Node.PROPAGATE傳播狀態(tài)是為了保證喚醒傳遞。因?yàn)锳QS共享模式下是會(huì)出現(xiàn)多個(gè)線程同時(shí)對(duì)同步狀態(tài)標(biāo)識(shí)state進(jìn)行操作,如線程T1在執(zhí)行release()→doReleaseShared()釋放許可操作,剛喚醒后繼線程準(zhǔn)備替換為head頭節(jié)點(diǎn)(準(zhǔn)備替換但是還沒替換),此時(shí)另外一條線程T2正好在同一時(shí)刻執(zhí)行acquire()→doAcquireShared()→setHeadAndPropagate()獲取鎖操作,假設(shè)T2線程獲取的是最后一個(gè)可用許可,在執(zhí)行到setHeadAndPropagate()方法(這個(gè)方法中存在一個(gè)超長(zhǎng)判斷),傳入的propagate=0

// AbstractQueuedSynchronizer類 → setHeadAndPropagate()方法
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 獲取同步隊(duì)列中原本的head頭節(jié)點(diǎn)
    setHead(node); // 將傳入的node節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
    /*
     * propagate=剩余可用許可數(shù),h=舊的head節(jié)點(diǎn)
     * h==null,(h=head)==null:
     *      非空判斷的標(biāo)準(zhǔn)寫法,避免原本head以及新的頭節(jié)點(diǎn)node為空
     * 如果當(dāng)前信號(hào)量對(duì)象中剩余可用許可數(shù)大于0或者
     * 原本頭節(jié)點(diǎn)h或者新的頭節(jié)點(diǎn)node不是結(jié)束狀態(tài)則喚醒后繼節(jié)點(diǎn)線程
     * 
     * 寫兩個(gè)if的原因在于避免造成不必要的喚醒,因?yàn)楹苡锌赡軉拘蚜撕罄m(xù)
     * 節(jié)點(diǎn)的線程之后,還沒有線程釋放許可/鎖,從而導(dǎo)致再次陷入阻塞
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 避免傳入的node為同步隊(duì)列的唯一節(jié)點(diǎn),
        // 因?yàn)殛?duì)列中如果只存在node一個(gè)節(jié)點(diǎn),那么后驅(qū)節(jié)點(diǎn)s必然為空
        if (s == null || s.isShared())
            doReleaseShared(); // 喚醒后繼節(jié)點(diǎn)
    }
}

// 超長(zhǎng)判斷:該判斷的作用在于面對(duì)各種特殊情況能夠時(shí)保證及時(shí)獲取鎖
if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    // 避免傳入的node為同步隊(duì)列的唯一節(jié)點(diǎn),
    // 因?yàn)殛?duì)列中如果只存在node一個(gè)節(jié)點(diǎn),那么后驅(qū)節(jié)點(diǎn)s必然為空
    if (s == null || s.isShared())
        doReleaseShared(); // 喚醒后繼節(jié)點(diǎn)
}

根據(jù)這個(gè)超長(zhǎng)判斷的邏輯,因?yàn)閭魅氲?code>propagate=0代表著當(dāng)前已經(jīng)沒有可用許可數(shù)了,不滿足超長(zhǎng)判斷中的第一個(gè)條件propagate>0,所以T2線程獲取鎖之后理論上是不需要再喚醒其他線程獲取鎖/許可,但是因?yàn)門1線程已經(jīng)訪問完成臨界資源了正在釋放持有的許可,那么就會(huì)造成一種情況:隊(duì)列中head節(jié)點(diǎn)的后繼節(jié)點(diǎn)如果此時(shí)嘗試獲取鎖/許可,那么有很大幾率獲取到T1線程釋放的許可。所以在釋放鎖時(shí),將head節(jié)點(diǎn)的waitStauts設(shè)置為Node.PROPAGATE傳播狀態(tài)值為-3,滿足這個(gè)超長(zhǎng)判斷中的第三個(gè)條件h.waitStatus < 0,所以此時(shí)T2也會(huì)喚醒head后繼節(jié)點(diǎn)中等待獲取鎖/許可資源的線程。這樣去實(shí)現(xiàn)的好處在于:能夠充分照顧到head的后繼節(jié)點(diǎn)同時(shí)也能保證喚醒的傳遞。

至于這里為什么獲取到鎖/許可的線程需要繼續(xù)喚醒后繼節(jié)點(diǎn)線程?因?yàn)檫@里是共享鎖,而不是獨(dú)占鎖。一個(gè)線程剛獲得了共享鎖/許可,那么很有可能還有剩余的共享鎖可供排隊(duì)在后面的線程獲得,所以需要喚醒后面的線程。

至此,釋放許可邏輯結(jié)束,對(duì)比獲取許可的邏輯相對(duì)來說要簡(jiǎn)單許多,只需要更新state值后調(diào)用doReleaseShared()方法喚醒后繼節(jié)點(diǎn)線程即可。但是在理解doReleaseShared()方法時(shí)需要額外注意:調(diào)用doReleaseShared()方法的線程會(huì)存在兩種:

  • 一是釋放共享鎖/許可數(shù)的線程。調(diào)用release()方法釋放許可時(shí)必然調(diào)用它喚醒后繼線程
  • 二是剛獲取到共享鎖/許可數(shù)的線程。一定情況下,在滿足“超長(zhǎng)判斷”的任意條件時(shí)也會(huì)調(diào)用它喚醒后繼線程

2.2、AQS共享模式之Semaphore的FairSync公平鎖實(shí)現(xiàn)

AQS共享模式中的公平鎖實(shí)現(xiàn)除開在獲取鎖的邏輯上與非公平鎖的有些許不同外,其他的實(shí)現(xiàn)大致相同。

2.2.1、信號(hào)量中公平鎖FairSync獲取許可/鎖實(shí)現(xiàn)

公平鎖的概念是指先請(qǐng)求鎖的線程一定比后請(qǐng)求鎖的線程要先執(zhí)行,先獲取到鎖資源,從時(shí)間上需要保證執(zhí)行的先后順序。

公平鎖獲取許可執(zhí)行邏輯:Semaphore.acquire()獲取許可方法 → AQS.acquireSharedInterruptibly()方法 → AQS.tryAcquireShared()獲取共享鎖模板方法 → FairSync.tryAcquireShared()方法

// Semaphore類 → 構(gòu)造方法
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// Semaphore類 → acquire()獲取鎖方法
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// AbstractQueuedSynchronizer類 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 調(diào)用AQS定義的獲取共享鎖的模板方法
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
// AbstractQueuedSynchronizer類 → tryAcquireShared()模板方法
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

// Semaphore類 → FairSync公平鎖類
static final class FairSync extends Sync {
    FairSync(int permits) {
        super(permits);
    }
    
    // Semaphore類 → FairSync內(nèi)部類 → tryAcquireShared()子類實(shí)現(xiàn)
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 不同點(diǎn):先判斷隊(duì)列中是否存在節(jié)點(diǎn)后再執(zhí)行獲取鎖操作
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

從獲取鎖/許可的代碼中可以非常明顯的看出,公平鎖的實(shí)現(xiàn)相對(duì)來說,與非公平鎖的唯一不同點(diǎn)在于:公平鎖的模式下獲取鎖,會(huì)先調(diào)用hasQueuedPredecessors()方法判斷同步隊(duì)列中是否存在節(jié)點(diǎn),如果存在則直接返回-1回到acquireSharedInterruptibly()方法if(tryAcquireShared(arg)<0)判斷,調(diào)用doAcquireSharedInterruptibly(arg)方法將當(dāng)前線程封裝成Node.SHARED共享節(jié)點(diǎn)加入同步隊(duì)列等待。反之,如果隊(duì)列中不存在節(jié)點(diǎn)則嘗試直接獲取鎖/許可。

2.2.2、信號(hào)量中公平鎖FairSync釋放許可/鎖實(shí)現(xiàn)

公平鎖釋放許可的邏輯與非公平鎖的實(shí)現(xiàn)是一致的,因?yàn)槎际荢ync類的子類,而釋放鎖的邏輯都是對(duì)state減一更新后,喚醒后繼節(jié)點(diǎn)的線程。所以關(guān)于釋放鎖的具體實(shí)現(xiàn)則是交由Sync類實(shí)現(xiàn),這里不再重復(fù)贅述。

三、ReetrantLock與Semaphore的區(qū)別

對(duì)比項(xiàng) ReetrantLock Semaphore
實(shí)現(xiàn)模式 獨(dú)占模式 共享模式
獲取鎖方法 tryAcquire() tryAcquireShared()
釋放鎖方法 tryRelease() tryAcquireShared()
是否支持重入 支持 不支持
線程中斷 支持 支持
Condition 支持 不支持
隊(duì)列數(shù)量 一個(gè)同步+多個(gè)等待 單個(gè)同步
節(jié)點(diǎn)類型 Node.EXCLUSIVE Node.SHARED

四、共享模式的其他實(shí)現(xiàn)者

除開Semaphore信號(hào)量的實(shí)現(xiàn)是基于AQS的共享模式之外,在JUC并發(fā)包中CountDownLatch、ReetrantReadWriteLock讀寫鎖的Read讀鎖等都是基于AQS的共享模式實(shí)現(xiàn)的,下面我們也可以簡(jiǎn)單的看看關(guān)于CountDownLatch的用法。

4.1、CountDownLatch應(yīng)用場(chǎng)景實(shí)戰(zhàn)

在CountDownLatch初始化時(shí)和Semaphore一樣,我們需要傳入一個(gè)數(shù)字count作為最大線程數(shù)

CountDownLatch countDownLatch = new CountDownLatch(3);

這個(gè)參數(shù)同樣會(huì)間接的賦值給AQS內(nèi)部的state同步狀態(tài)標(biāo)識(shí)。一般我們會(huì)調(diào)用它的兩個(gè)方法:await()與countDown()

  • await():調(diào)用await()方法的線程會(huì)被封裝成共享節(jié)點(diǎn)加入同步隊(duì)列阻塞等待,直至state=0時(shí)才會(huì)喚醒同步隊(duì)列中所有的線程
  • countDown():調(diào)用countDown()方法的線程會(huì)對(duì)state減一

而關(guān)于CountDownLatch有兩種用法:

  • 一、多等一:初始化count=1,多條線程await()阻塞,一條線程調(diào)用countDown()喚醒所有阻塞線程
  • 二、一等多:初始化count=x,多線程countDown()對(duì)count進(jìn)行減一,一條線程await()阻塞,當(dāng)count=0時(shí)阻塞的線程開始執(zhí)行

如上兩種用法在我們的項(xiàng)目中也可以有很多的應(yīng)用場(chǎng)景,多等一的用法我們可以用來在一定程度上模擬并發(fā)測(cè)試接口并發(fā)安全問題、死鎖問題等,如:

final CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 1; i <= 3; i++) {
    new Thread(() -> {
        try {
            System.out.println("線程:" + Thread.currentThread().getName()
            + "....阻塞等待!");
            countDownLatch.await();
            // 可以在此處調(diào)用需要并發(fā)測(cè)試的方法或接口
            System.out.println("線程:" + Thread.currentThread().getName()
            + "....開始執(zhí)行!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "T" + i).start();
}
Thread.sleep(1000);
countDownLatch.countDown();

/*
   程序開始運(yùn)行:
    線程:T2....阻塞等待!
    線程:T1....阻塞等待!
    線程:T3....阻塞等待!
   程序運(yùn)行一秒后(三條線程幾乎同時(shí)執(zhí)行):
    線程:T2....開始執(zhí)行!
    線程:T1....開始執(zhí)行!
    線程:T3....開始執(zhí)行!
*/

如上案例中,創(chuàng)建了一個(gè)CountDownLatch對(duì)象,初始化時(shí)傳遞count=1,循環(huán)創(chuàng)建三條線程T1,T2,T3阻塞等待,主線程在一秒后調(diào)用countDown()喚醒了同步隊(duì)列中的三條線程繼續(xù)執(zhí)行,原理如下:

CountDownLatch多等一原理

我們?cè)趯?shí)際開發(fā)過程中,往往很多并發(fā)任務(wù)都存在前后依賴關(guān)系,如詳情頁需要調(diào)用多個(gè)接口完成數(shù)據(jù)聚合、并行執(zhí)行獲取到數(shù)據(jù)后需要進(jìn)行結(jié)果合并、多個(gè)操作完成后需要進(jìn)行數(shù)據(jù)檢查等等,而這些場(chǎng)景下我們可以使用一等多的用法:

final CountDownLatch countDownLatch = new CountDownLatch(3);
Map data = new HashMap();
for (int i = 1; i <= 3; i++) {
    final int page = i;
    new Thread(() -> {
        System.out.println("線程:" + Thread.currentThread().getName() +
                "....讀取分段數(shù)據(jù):"+(page-1)*200+"-"+page*200+"行");
        // 數(shù)據(jù)加入結(jié)果集:data.put();
        countDownLatch.countDown();
    }, "T" + i).start();
}
countDownLatch.await();
System.out.println("線程:" + Thread.currentThread().getName() 
        + "....對(duì)數(shù)據(jù)集:data進(jìn)行處理");
        
/*
運(yùn)行結(jié)果:
    線程:T1....讀取分段數(shù)據(jù):0-200行
    線程:T2....讀取分段數(shù)據(jù):200-400行
    線程:T3....讀取分段數(shù)據(jù):400-600行

    線程main....對(duì)數(shù)據(jù)集:data進(jìn)行處理
*/

如上一等多的案例中,for循環(huán)開啟三個(gè)線程T1,T2,T3并行執(zhí)行同時(shí)讀取數(shù)據(jù)增快處理效率,讀取完成之后將數(shù)據(jù)加入data結(jié)果集中匯總,主線程等待三條線程讀取完成后對(duì)數(shù)據(jù)集data進(jìn)行處理,如下:

CountDownLatch一等多原理

4.2、CountDownLatch實(shí)現(xiàn)原理

前面我們?cè)岬竭^,CountDownLatch也是基于AQS共享模式實(shí)現(xiàn)的,與Semaphore一樣,會(huì)將傳入的count間接的賦值給AQS內(nèi)部的state同步狀態(tài)標(biāo)識(shí)。

private final Sync sync;

// CountDownLatch構(gòu)造方法
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 對(duì)其內(nèi)部Sync對(duì)象進(jìn)行初始化
    this.sync = new Sync(count);
}

// CountDownLatch類 → Sync內(nèi)部類
private static final class Sync extends AbstractQueuedSynchronizer {
    // Sync構(gòu)造函數(shù):對(duì)AQS內(nèi)部的state進(jìn)行賦值
    Sync(int count) {setState(count);}
    
    // 調(diào)用await()方法最終會(huì)調(diào)用到這里
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
}

// CountDownLatch類 → await()方法
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

在CountDownLatch中,存在一個(gè)Sync內(nèi)部類,當(dāng)我們創(chuàng)建一個(gè)CountDownLatch對(duì)象時(shí),實(shí)則其內(nèi)部的構(gòu)造函數(shù)是在對(duì)其sync對(duì)象進(jìn)行初始化,與我們前面所說的一樣

CountDownLatch countDownLatch = new CountDownLatch(count);

初始化時(shí)傳遞的count數(shù)字最終會(huì)通過調(diào)用setState(state)方法賦值給AQS內(nèi)部的同步狀態(tài)標(biāo)識(shí)state變量,而當(dāng)線程調(diào)用await()方法時(shí),會(huì)調(diào)用AQS的acquireSharedInterruptibly()方法:

// AbstractQueuedSynchronizer類 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 最終調(diào)用到CountDownLatch內(nèi)部Sync類的tryAcquireShared()方法
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

// CountDownLatch類 → Sync內(nèi)部類 → tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

因?yàn)锳QS中tryAcquireShared(arg)方法僅是模板方法的原因,所以線程在執(zhí)行acquireSharedInterruptibly()方法時(shí)最終會(huì)調(diào)用到CountDownLatch內(nèi)部Sync類的tryAcquireShared()方法。當(dāng)count/state為0時(shí)返回true,反之,count不為0時(shí)返回false,最終回到if(tryAcquireShared(arg)<0)執(zhí)行時(shí),如果count不為0則執(zhí)行doAcquireSharedInterruptibly(arg)方法將當(dāng)前線程信息封裝成Node.SHARED共享節(jié)點(diǎn)加入同步隊(duì)列阻塞等待。原理如下:

CountDownLatch.await原理

如上便是CountDownLatch.await()的實(shí)現(xiàn)原理,大體來說還是比較簡(jiǎn)單。下面我們接著分析一下CountDownLatch.countDown()方法的實(shí)現(xiàn)。

// CountDownLatch類 → countDown()方法
public void countDown() {
    sync.releaseShared(1);
}

// AbstractQueuedSynchronizer類 → releaseShared()方法
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

// AbstractQueuedSynchronizer類 → 模板方法:tryReleaseShared()
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

// CountDownLatch類 → Sync內(nèi)部類 → tryReleaseShared()方法
// 調(diào)用countDown()方法最終會(huì)調(diào)用到這里
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

調(diào)用CountDownLatch.countDown()方法后會(huì)調(diào)用AQS的tryReleaseShared()模板方法,最終調(diào)用CountDownLatchSync內(nèi)部類tryReleaseShared()方法。在該方法中首先會(huì)先對(duì)于state/count進(jìn)行一次為0判斷,如果不為0則對(duì)count/state減一,然后再次對(duì)更新之后的state/count進(jìn)行為0判斷,如果減一后state等于0返回true,回到releaseShared()if(tryReleaseShared(arg))執(zhí)行doReleaseShared()喚醒同步隊(duì)列中的阻塞線程。反之,如果減一后不為0,當(dāng)前線程則直接返回,方法結(jié)束。如下:

CountDownLatch.countDown原理

4.3、CountDownLatch與CyclicBarrier的區(qū)別

在JUC包中還存在另一個(gè)和CountDownLatch作用相同的工具類:CyclicBarrier。與CountDownLatch不同的是:CyclicBarrier是基于AQS的獨(dú)占模式實(shí)現(xiàn)的,其內(nèi)部通過ReetrantLock與Condition實(shí)現(xiàn)線程阻塞與喚醒。對(duì)比如下:

對(duì)比項(xiàng) CountDownLatch CyclicBarrier
實(shí)現(xiàn)模式 共享模式 獨(dú)占模式
計(jì)數(shù)方式 減法 減法
復(fù)用支持 不可復(fù)用 計(jì)數(shù)可置0
重置支持 不可重置 可重置
設(shè)計(jì)重點(diǎn) 一等多 多等多

五、總結(jié)

通過Semaphore與CountDownLatch原理進(jìn)行分析后,不難得知,在初始化時(shí)傳遞的許可數(shù)/計(jì)數(shù)器最終都會(huì)間接的傳遞給AQS的同步狀態(tài)標(biāo)識(shí)state。當(dāng)一條線程嘗試獲取共享鎖時(shí),會(huì)對(duì)state減一,當(dāng)state為0時(shí)代表沒有可用共享鎖了,其他后續(xù)請(qǐng)求的線程會(huì)被封裝成共享節(jié)點(diǎn)加入同步隊(duì)列等待,直至其他持有共享鎖的線程釋放(state加一)。不過與獨(dú)占模式不同的是:共享模式中,除開釋放鎖時(shí)會(huì)喚醒后繼節(jié)點(diǎn)的線程外,獲取共享鎖成功的線程也會(huì)在滿足一定條件下喚醒后繼節(jié)點(diǎn)。至于共享模式中的公平鎖與非公平鎖則與之前的獨(dú)占模式的公平鎖與非公平鎖相同,公平鎖情況下,先判斷隊(duì)列是否存在Node再獲取鎖,從而保證每條線程獲取共享鎖時(shí)都是先到先得的順序執(zhí)行的。而非公平鎖情況下,通過線程競(jìng)爭(zhēng)的方式獲取,不管隊(duì)列中是否已經(jīng)存在Node節(jié)點(diǎn),請(qǐng)求的線程都會(huì)先執(zhí)行一遍獲取鎖的邏輯,只要執(zhí)行成功就能獲取到共享鎖,獲得線程執(zhí)行權(quán)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容