CyclicBarrier源碼分析

1. CyclicBarrier 定義

CyclicBarrier 內(nèi)部是通過 ReeantrantLock, Condition 以及計數(shù)器count, 來控制線程的執(zhí)行; 當所有線程都到達統(tǒng)一的地方再執(zhí)行接下來的代碼.

特點:

1. CyclicBarrier 區(qū)別于 CountDownLatch 是可以重用

2. 用于CyclicBarrier的線程其中有一個被中斷或等待超時, 都會造成, barrier broken, 并且重置初始的值 generation

先看一個簡單的 demo


import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * Created by wzx on 2018/1/21.
 */
public class TestCyclicBarrier {

    private static final Logger logger = Logger.getLogger(TestCyclicBarrier.class);

    private static final int THREAD_NUM = 5;

    public static void main(String[] args) {
        CyclicBarrier cb = new CyclicBarrier(THREAD_NUM, new Runnable() {
            public void run() {
                logger.info("Inside Barrier");
            }
        });

        List<Thread> threads = new ArrayList<>();
        for(int i = 0; i < THREAD_NUM; i++){
            Thread thread = new Thread(new WorkerThread(cb));
            threads.add(thread);
            thread.start();
        }

        // wait until done
        for(Thread thread : threads){
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        logger.info("All Thread done()");
    }

    public static class WorkerThread implements Runnable{

        CyclicBarrier barrier;

        public WorkerThread(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        public void run() {
            try {
                logger.info("Working's waiting");
                // 線程在這里等待, 直到所有線程都到達barrier
                barrier.await();
                logger.info("Thread ID:" + Thread.currentThread().getId() + " Working");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }
}

執(zhí)行結果:

[2017-02-15 14:12:39,506] INFO  Thread-0 (TestCyclicBarrier.java:57) - Working's waiting
[2017-02-15 14:12:39,506] INFO  Thread-3 (TestCyclicBarrier.java:57) - Working's waiting
[2017-02-15 14:12:39,506] INFO  Thread-1 (TestCyclicBarrier.java:57) - Working's waiting
[2017-02-15 14:12:39,506] INFO  Thread-2 (TestCyclicBarrier.java:57) - Working's waiting
[2017-02-15 14:12:39,506] INFO  Thread-4 (TestCyclicBarrier.java:57) - Working's waiting
[2017-02-15 14:12:39,509] INFO  Thread-4 (TestCyclicBarrier.java:23) - Inside Barrier
[2017-02-15 14:12:39,510] INFO  Thread-4 (TestCyclicBarrier.java:60) - Thread ID:15 Working
[2017-02-15 14:12:39,510] INFO  Thread-0 (TestCyclicBarrier.java:60) - Thread ID:11 Working
[2017-02-15 14:12:39,510] INFO  Thread-3 (TestCyclicBarrier.java:60) - Thread ID:14 Working
[2017-02-15 14:12:39,511] INFO  Thread-2 (TestCyclicBarrier.java:60) - Thread ID:13 Working
[2017-02-15 14:12:39,510] INFO  Thread-1 (TestCyclicBarrier.java:60) - Thread ID:12 Working
[2017-02-15 14:12:39,512] INFO  main (TestCyclicBarrier.java:42) - All Thread done()

執(zhí)行步驟:

(1) 一共有5個線程要求它們都到達 barrier.await() 才能繼續(xù)向下執(zhí)行

(2) 前4個線程調(diào)用 barrier.await() 時其實時在內(nèi)部統(tǒng)一調(diào)用 Reeantrant.lock()獲取 lock, 然后再調(diào)用 Condition.await() 將lock釋放, 等待喚醒

(3) 第五個線程到達 barrier.await() 處, 先調(diào)用 Reeantrant.lock() 然后發(fā)現(xiàn)自己是最后一個線程, 則直接調(diào)用 Condition.signalAll() 喚醒其他線程, 最后自己釋放 lock

(4) 其他4個線程被 signal 了都爭相獲取 lock, 最后又釋放

2. CyclicBarrier 構造函數(shù)

下面兩個構造函數(shù)的主要區(qū)別在于 command, 用于當所有線程都到達 barrier 時執(zhí)行的

/**
 * 指定 barrierCommand 的構造 CyclicBarrier
 */
public CyclicBarrier(int parties, Runnable barrierCommand) {
    if(parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierCommand;
}

/**
 * 構造 CyclicBarrier
 */
public CyclicBarrier(int parties){
    this(parties, null);
}

3. CyclicBarrier 主要屬性
private static class Generation{
    boolean broken = false;
}

/** The lock for guarding barrier entry */
/** 全局的重入 lock */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
/** 控制線程等待  */
private final Condition trip = lock.newCondition();
/** The number of parties */
/** 參與到這次 barrier 的參與者個數(shù) */
private final int parties;
/** The command to run when tripped */
/** 到達 barrier 時執(zhí)行的command */
private final Runnable barrierCommand;
/** The current generation */
/** 初始化 generation */
private Generation generation = new Generation();

4. CyclicBarrier 生成 generation 方法

這是在 一個 barrier 完成后, 重新初始化值

/**
 * Updates state on barrier trip and wakes up everyone.
 * Called only while holding lock.
 */
/** 生成下一個 generation */
private void nextGeneration(){
    // signal completion of last generation
    // 喚醒所有等待的線程來獲取 AQS 的state的值
    trip.signalAll();
    // set up next generation
    // 重新賦值計算器
    count = parties;
    // 重新初始化 generation
    generation = new Generation();
}

5. CyclicBarrier breakBarrier 方法

breakBarrier 主要用于等待的線程當被中斷, 或等待超時執(zhí)行

/**
 * Sets current barrier generation as broken and wakes up everyone
 * Called only while holding lock
 */
/** 當某個線程被中斷 / 等待超時 則將 broken = true, 并且喚醒所有等待中的線程 */
private void breakBarrier(){
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

6. CyclicBarrier 主方法 awaitXX

await 方法主要用于等待獲取, 具體看下面的 comment

/**
 * 進行等待所有線程到達 barrier
 * 除非: 其中一個線程被 inetrrupt
 */
public int await() throws InterruptedException, BrokenBarrierException{
    try{
        return dowait(false, 0L);
    }catch (TimeoutException toe){
        throw new Error(toe); // cannot happen
    }
}

/**
 * 進行等待所有線程到達 barrier
 * 除非: 等待超時
 */
public int await(long timeout, TimeUnit unit) throws Exception{
    return dowait(true, unit.toNanos(timeout));
}

/**
 * Main barrier code, covering the various policies
 */
/**
 * CyclicBarrier 的核心方法, 主要是所有線程都獲取一個 ReeantrantLock 來控制
 */
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException, TimeoutException{
    final ReentrantLock lock = this.lock;
    lock.lock();                            // 1\. 獲取 ReentrantLock
    try{
        final Generation g = generation;

        if(g.broken){                       // 2\. 判斷 generation 是否已經(jīng) broken
            throw new BrokenBarrierException();
        }

        if(Thread.interrupted()){           // 3\. 判斷線程是否中斷, 中斷后就 breakBarrier
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;                // 4\. 更新已經(jīng)到達 barrier 的線程數(shù)
        if(index == 0){ // triped           // 5\. index == 0 說明所有線程到達了 barrier
            boolean ranAction = false;
            try{
                final Runnable command = barrierCommand;
                if(command != null){        // 6\. 最后一個線程到達 barrier, 執(zhí)行 command
                    command.run();
                }
                ranAction = true;
                nextGeneration();           // 7\. 更新 generation
                return 0;
            }finally {
                if(!ranAction){
                    breakBarrier();
                }
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for(;;){
            try{
                if(!timed){
                    trip.await();           // 8\. 沒有進行 timeout 的 await
                }else if(nanos > 0L){
                    nanos = trip.awaitNanos(nanos); // 9\. 進行 timeout 方式的等待
                }
            }catch (InterruptedException e){
                if(g == generation && !g.broken){ // 10\. 等待的過程中線程被中斷, 則直接喚醒所有等待的 線程, 重置 broken 的值
                    breakBarrier();
                    throw e;
                }else{
                    /**
                     * We're about to finish waiting even if we had not
                     * been interrupted, so this interrupt is deemed to
                     * "belong" to subsequent execution
                     */
                    /**
                     * 情況
                     *  1\. await 拋 InterruptedException && g != generation
                     *      所有線程都到達 barrier(這是會更新 generation), 并且進行喚醒所有的線程; 但這時 當前線程被中斷了
                     *      沒關系, 當前線程還是能獲取 lock, 但是為了讓外面的程序知道自己被中斷過, 所以自己中斷一下
                     *  2\. await 拋 InterruptedException && g == generation && g.broken = true
                     *      其他線程觸發(fā)了 barrier broken, 導致 g.broken = true, 并且進行 signalALL(), 但就在這時
                     *      當前的線程也被 中斷, 但是為了讓外面的程序知道自己被中斷過, 所以自己中斷一下
                     *
                     */
                    Thread.currentThread().interrupt();
                }
            }

            if(g.broken){                       // 11\. barrier broken 直接拋異常
                throw new BrokenBarrierException();
            }

            if(g != generation){                 // 12\. 所有線程到達 barrier 直接返回
                return index;
            }

            if(timed && nanos <= 0L){           // 13\. 等待超時直接拋異常, 重置 generation
                breakBarrier();
                throw new TimeoutException();
            }
        }
    }finally {
        lock.unlock();                          // 14\. 調(diào)用 awaitXX 獲取lock后進行釋放lock
    }
}

7. CyclicBarrier 一般方法
/**
 * 判斷 barrier 是否 broken = true
 */
public boolean isBroken(){
    final ReentrantLock lock = this.lock;
    lock.lock();
    try{
        return generation.broken;
    }finally {
        lock.unlock();
    }
}

// 重置 barrier
public void reset(){
    final ReentrantLock lock = this.lock;
    lock.lock();
    try{
        breakBarrier();  // break the current generation
        nextGeneration(); // start a new generation
    }finally {
        lock.unlock();
    }
}

/**
 * 獲取等待中的線程
 */
public int getNumberWaiting(){
    final ReentrantLock lock = this.lock;
    lock.lock();
    try{
        return parties - count;
    }finally {
        lock.unlock();
    }
}

8. 總結

CyclicBarrier 主要用 ReeantrantLock 與 Condition 來控制線程資源的獲取, 在理解 CyclicBarrier時, 首先需要理解 ReentrantLock, Condition.

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容