Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(原理篇)

前言

線程并發(fā)系列文章:

Java 線程基礎(chǔ)
Java 線程狀態(tài)
Java “優(yōu)雅”地中斷線程-實踐篇
Java “優(yōu)雅”地中斷線程-原理篇
真正理解Java Volatile的妙用
Java ThreadLocal你之前了解的可能有誤
Java Unsafe/CAS/LockSupport 應(yīng)用與原理
Java 并發(fā)"鎖"的本質(zhì)(一步步實現(xiàn)鎖)
Java Synchronized實現(xiàn)互斥之應(yīng)用與源碼初探
Java 對象頭分析與使用(Synchronized相關(guān))
Java Synchronized 偏向鎖/輕量級鎖/重量級鎖的演變過程
Java Synchronized 重量級鎖原理深入剖析上(互斥篇)
Java Synchronized 重量級鎖原理深入剖析下(同步篇)
Java并發(fā)之 AQS 深入解析(上)
Java并發(fā)之 AQS 深入解析(下)
Java Thread.sleep/Thread.join/Thread.yield/Object.wait/Condition.await 詳解
Java 并發(fā)之 ReentrantLock 深入分析(與Synchronized區(qū)別)
Java 并發(fā)之 ReentrantReadWriteLock 深入分析
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(原理篇)
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(應(yīng)用篇)
最詳細(xì)的圖文解析Java各種鎖(終極篇)
線程池必懂系列

前面分析了基于AQS的獨占鎖ReentrantLock、共享鎖/獨占鎖ReentrantReadWriteLock,它們內(nèi)部都實現(xiàn)了Lock 接口。而AQS還有其它常用的子類封裝器,它們雖然沒有實現(xiàn)Lock接口,但可以用來做線程間的同步,接下來將要來深入了解它們。
通過本篇文章,你將了解到:

1、Semaphore 原理分析
2、CountDownLatch 原理分析
3、CyclicBarrier 原理分析

1、Semaphore 原理分析

場景引入

ReentrantReadWriteLock 里有讀鎖和寫鎖,其中讀鎖是共享鎖,其核心是對AQS里的"state"變量進(jìn)行操作,每獲取一次鎖,將state加1,釋放鎖將state減1。從這里可以看出,將state作為共享資源能夠?qū)崿F(xiàn)線程間的協(xié)作。
現(xiàn)在有個需求:資源是共享的,但是數(shù)量有限,因此沒拿到資源的需要等待別人釋放資源。

將state作為標(biāo)記共享資源的數(shù)量,那么就有:

1、線程占有資源后將state減1,線程釋放資源后將state加1。
2、若線程沒拿到資源(資源都被其它線程占有了),那么掛起等待。
3、線程釋放資源后,喚醒其它等待該資源的線程。

image.png

這樣子,不用synchronized+wait/notify與ReentrantLock+await/signal,也依然能夠?qū)崿F(xiàn)線程間同步。
具體到現(xiàn)實場景:

如停車場只能容納一定數(shù)量的車子,當(dāng)停車場停滿了車(入場許可發(fā)放完了),其它想進(jìn)來的車子必須等待有其它車從停車場開出(釋放入場許可)。

Semaphore 構(gòu)造

指定初始的許可個數(shù):

#Semaphore.java
    public Semaphore(int permits) {
        //默認(rèn)是非公平
        sync = new NonfairSync(permits);
    }

    Sync(int permits) {
            setState(permits);
        }

可以看出許可的個數(shù)就是state的值。

Semaphore 占有許可

通過調(diào)用acquire(xx)占有許可:

#Semaphore.java
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        //交給AQS處理,可中斷
        sync.acquireSharedInterruptibly(permits);
    }

#AbstractQueuedSynchronizer.java
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //發(fā)生了中斷,直接返回
        if (Thread.interrupted())
            throw new InterruptedException();
        //嘗試修改state(減)
        if (tryAcquireShared(arg) < 0)
            //修改失敗,則掛起等待
            doAcquireSharedInterruptibly(arg);
    }

每次可以占有多個許可,若占有成功則直接返回,否則掛起等待。
具體的操作state在tryAcquireShared(xx)里實現(xiàn),此處以非公平模式說明:

#Semaphore.java
        final int nonfairTryAcquireShared(int acquires) {
            //死循環(huán)確保修改state成功,或者state已經(jīng)獲取完了
            for (;;) {
                //獲取state
                int available = getState();
                //減少state
                int remaining = available - acquires;
                if (remaining < 0 ||
                    //CAS 操作
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

Semaphore 釋放許可

占有許可做了相應(yīng)的任務(wù)后,就可以釋放許可了。
通過調(diào)用release(xx)釋放許可:

#Semaphore.java
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        //AQS 實現(xiàn)
        sync.releaseShared(permits);
    }

#AbstractQueuedSynchronizer.java
    public final boolean releaseShared(int arg) {
        //嘗試修改state(加)
        if (tryReleaseShared(arg)) {
            //成功修改state,喚醒后繼節(jié)點
            doReleaseShared();
            return true;
        }
        //修改失敗
        return false;
    }

具體的操作state在tryReleaseShared(xx)里實現(xiàn):

#Semaphore.java
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                //獲取state
                int current = getState();
                //增加
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                //修改
                if (compareAndSetState(current, next))
                    return true;
            }
        }

可以看出:

釋放許可,增加state,占有許可,減少state。

另外,Semaphore 占有許可可分為公平與非公平模式,占有許可過程可中斷/不可中斷。


image.png

Semaphore 與Lock 區(qū)別

與ReentrantLock、ReentrantReadWriteLock 區(qū)別在于從不同的角度看待state:

1、ReentrantLock、ReentrantReadWriteLock 獲取鎖的過程是將state值增大,而Semaphore 占有許可是將state值減小。
2、ReentrantLock、ReentrantReadWriteLock 釋放鎖的過程是將state值減小,而Semaphore 釋放許可是將state值增大。
3、這也是AQS的靈活之處,將具體的"state"鎖代表的意義由子類實現(xiàn),可實現(xiàn)不同場景的應(yīng)用。

2、CountDownLatch 原理分析

場景引入

A、B、C三個線程協(xié)作:

A 等待B、C完成任務(wù)后再進(jìn)行下一步操作。

這場景我們可能會想到用Thread.join(),A調(diào)用B.join(),C.join(),A阻塞等待,當(dāng)B、C線程執(zhí)行結(jié)束后喚醒A。這種方式雖然能夠解決問題,但是有些不盡人意的地方:比如說A不一定要等待B、C執(zhí)行完成,而是B、C中途完成某個任務(wù)后通知A;又比如,B、C線程不止執(zhí)行一次任務(wù),而是一定的次數(shù)后才會喚醒A,這個時候使用Thread.join() 就無法解決問題了。
而CountDownLatch 可以很好地解決這問題。

CountDownLatch 構(gòu)造

#CountDownLatch.java
    //初始化次數(shù)
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    Sync(int count) {
        //設(shè)置state
            setState(count);
        }

可以看出,count的值最終反饋到state上。

CountDownLatch 等待

通過await(xx)等待state變?yōu)?:

#CountDownLatch.java
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        //超時返回
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

#AbstractQueuedSynchronizer.java
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        //該方法響應(yīng)中斷
        if (Thread.interrupted())
            throw new InterruptedException();
        //主要工作在tryAcquireShared(xx)里
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

又是AQS的套路,具體的操作state在tryAcquireShared(xx)里實現(xiàn):

#CountDownLatch.java
    protected int tryAcquireShared(int acquires) {
        //若state == 0,則返回1,否則-1
        //外層判斷>=0,說明當(dāng)前state還有數(shù)量,則需要阻塞等待,否則不阻塞
            return (getState() == 0) ? 1 : -1;
        }

與其它子類實現(xiàn)的tryAcquireShared(xx)方法不同的是,CountDownLatch里的Sync并沒有修改state的值,僅僅只是判斷state?=0進(jìn)而做具體的操作而已。
由此可知:CountDownLatch 是基于AQS的共享模式。

CountDownLatch 倒數(shù)計數(shù)

既然調(diào)用await(xx)可能會使得線程阻塞等待,那么勢必有其它線程喚醒它,調(diào)用的方法即是countDown():

#CountDownLatch.java
    public void countDown() {
        sync.releaseShared(1);
    }

#AbstractQueuedSynchronizer.java
    public final boolean releaseShared(int arg) {
        //子類實現(xiàn)
        if (tryReleaseShared(arg)) {
            //AQS里實現(xiàn),喚醒阻塞的線程
            doReleaseShared();
            return true;
        }
        return false;
    }

同樣的,具體的操作state在tryReleaseShared(xx)里實現(xiàn):

#CountDownLatch.java
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                //獲取state
                int c = getState();
                //若當(dāng)前state==0,說明已經(jīng)沒有可以釋放的了
                if (c == 0)
                    return false;
                int nextc = c-1;
                //CAS修改
                if (compareAndSetState(c, nextc))
                    //說明可以喚醒其它線程了
                    return nextc == 0;
            }
        }

也即是說,當(dāng)線程調(diào)用await(xx)阻塞后,其它線程通過countDown()修改state值,若是發(fā)現(xiàn)state最終變?yōu)?了,那么喚醒阻塞的線程。
用圖表示CountDownLatch主要結(jié)構(gòu)如下:


image.png

CountDownLatch 與其它AQS子類封裝器的區(qū)別

前面已經(jīng)分析了基于AQS的封裝器:ReentrantLock、ReentrantReadWriteLock、Semaphore,它們對state值的修改包括增加與減少,而CountDownLatch 只是減小state的值,用以實現(xiàn)倒數(shù)計數(shù)的功能。
可類比場景如下:

1、田徑運動場開始百米賽跑。
2、運動員在跑道上各就各位(多個線程調(diào)用await 阻塞等待)。
3、裁判喊倒數(shù)3、2、1(線程調(diào)用countDown)。
4、等待倒數(shù)結(jié)束,發(fā)令槍響,運動員就開始跑(線程被喚醒,繼續(xù)做事)。

可以看出,運動員不會去干涉裁判的倒數(shù)(修改state值)。

3、CyclicBarrier 原理分析

場景引入

在CountDownLatch 場景里說到運動員需要裁判,想想可以不需要裁判嗎?運動員之間自發(fā)倒數(shù),倒數(shù)結(jié)束就一起跑。
更普遍的場景是:

1、幾個驢友想去某個景點旅游,約定了在某個地方集合后再一起出發(fā)。
2、每個驢友到達(dá)集合點時打卡并看人都到齊了沒,沒到齊則等待。
3、若最后一個參與者過來后發(fā)現(xiàn)人到齊了,于是告訴大家不用等了,出發(fā)吧。

CyclicBarrier 可滿足該場景的需求。

CyclicBarrier 構(gòu)造

#CyclicBarrier.java
    public CyclicBarrier(int parties, Runnable barrierAction) {
        //必須要有參與者
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        //臨時變量count
        this.count = parties;
        //參與者都到達(dá)了后執(zhí)行的動作
        this.barrierCommand = barrierAction;
    }

可以看出,此處并沒有AQS介入,也就是沒有直接修改state。
CyclicBarrier是通過ReentrantLock + Condition 來實現(xiàn)線程間同步的:

#CyclicBarrier.java
    //獨占鎖,為了互斥修改count
    private final ReentrantLock lock = new ReentrantLock();
    //線程等待條件
    private final Condition trip = lock.newCondition();
    //修改的共享變量
    private int count;

CyclicBarrier 等待參與者

接著來分析,如何實現(xiàn)線程間的同步的。

#CyclicBarrier.java
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            //實際調(diào)用doWait(),此處是不限時等待
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //先鎖住
        lock.lock();
        try {
            final Generation g = generation;
            //等待過程被中斷
            if (g.broken)
                throw new BrokenBarrierException();
            //中斷了線程
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //等待個數(shù)-1
            int index = --count;
            if (index == 0) {
                //都到齊了,無需等待了
                boolean ranAction = false;
                try {
                    //執(zhí)行既定的方法
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //開始下一輪
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            //走到這,說明還需要等待
            for (;;) {
                try {
                    if (!timed)
                        //不限時等待
                        trip.await();
                    else if (nanos > 0L)
                        //限時等待,時間到了就返回
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //等待過程被中斷,則拋出異常
                    if (g == generation && ! g.broken) {
                        //不等了,喚醒其它線程
                        breakBarrier();
                        throw ie;
                    } else {
                        ...
                        Thread.currentThread().interrupt();
                    }
                }

                //醒來后發(fā)現(xiàn)已經(jīng)被中斷了,則直接拋出異常
                if (g.broken)
                    throw new BrokenBarrierException();

                //已經(jīng)開啟了下一輪,說明前面一輪都到齊了結(jié)束了
                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    //超時了還是沒到齊,不等了,喚醒其它線程
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

線程先獲取獨占鎖,然后修改count值,若發(fā)現(xiàn)修改后count !=0,那么還需要等待,等待借助的是Condition.await(xx)方法。
有等待,自然有喚醒的地方:

#CyclicBarrier.java
    private void breakBarrier() {
        //置為true,表示已經(jīng)結(jié)束等待了
        generation.broken = true;
        //重置count,復(fù)用的關(guān)鍵
        count = parties;
        //喚醒其它在等待的線程
        trip.signalAll();
    }

用圖表示,等待/喚醒過程如下:


image.png

來看看CyclicBarrier 主要方法:


image.png

CyclicBarrier與CountDownLatch 區(qū)別

看到這,你也許已經(jīng)發(fā)現(xiàn)了CyclicBarrier 和CountDownLatch 實現(xiàn)的功能很相似,都是等待某個條件滿足后再進(jìn)行下一步的動作,兩者不同之處在于:

1、CountDownLatch 參與的線程分為兩類:一個是等待者,另一個是計數(shù)者;CyclicBarrier 參與的線程既是等待者,也是計數(shù)者。
2、CountDownLatch 完成一次完整的協(xié)作過程后不能再復(fù)用,CountDownLatch 可以復(fù)用(不用重新新建CountDownLatch 對象)。
3、CountDownLatch 的計數(shù)值與線程個數(shù)沒有必然聯(lián)系,CyclicBarrier 的初始計數(shù)值與線程個數(shù)一致。
4、CountDownLatch 基于AQS實現(xiàn),CyclicBarrier 基于ReentrantLock&Condition實現(xiàn)(內(nèi)部也是基于AQS)。

你可能還有疑惑,在下一篇應(yīng)用篇將會重點體現(xiàn)兩者區(qū)別。
下篇文章重點分析:Semaphore/CountDownLatch/CyclicBarrier 實際應(yīng)用。

本文基于jdk1.8。

您若喜歡,請點贊、關(guān)注,您的鼓勵是我前進(jìn)的動力

持續(xù)更新中,和我一起步步為營系統(tǒng)、深入學(xué)習(xí)Android/Java

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容