7.并發(fā)工具類

1.概述

同步工具類可以是任何一個類,只要它根據(jù)其自身的狀態(tài)來協(xié)調(diào)線程的控制流。阻塞隊列可以作為同步工具類,其他類型的同步工具還包括信號量(Semaphore),柵欄(Latch),閉鎖(Latch).

2.閉鎖

閉鎖可以延遲線程的進度直到到達終止狀態(tài),閉鎖的作用相當于一扇門:在閉鎖到達結束狀態(tài)之前,這扇門一直是關閉的,并且沒有任何線程能通過,當?shù)竭_結束狀態(tài)時,這扇門會打開并允許所有的線程通過。
CountDownLatch
構造器:
CountDownLatch的構造函數(shù)接受一個int類型的參數(shù)作為計數(shù)器,如果你想要等待N個點完成,這里就傳入N。

    public CountDownLatch(int var1) {
        if (var1 < 0) {
            throw new IllegalArgumentException("count < 0");
        } else {
            this.sync = new CountDownLatch.Sync(var1);
        }
    }
方法 說明
countDown() 每次調(diào)用該方法N就會減一
await() 在計數(shù)到達零之前,該方法會阻塞當前線程。之后會釋放所有等待的線程,所有的后續(xù)調(diào)用都將立即返回。
await(long time,TimeUnit unit) 這個方法等待特定時間后,就不會阻塞當前線程。

CountDownLatch計數(shù)無法被重置。如果需要重置計數(shù),請考慮使用CyclicBarria。 由于countDown()方法可以用在任何地方,所以這里說的N個點可以是N個線程,也可以是1個線程里的N個執(zhí)行步驟。

簡單入門:

public class CountDownLatchTest {
    private static CountDownLatch c = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            System.out.println(1);
            c.countDown();
            System.out.println(2);
            c.countDown();
        }).start();
        c.await();
        System.out.println(3);
    }
}

輸出

1
2
3

進階用法:
下面給出了兩個類,其中一組 worker 線程使用了兩個倒計數(shù)鎖存器:

  • 第一個類是一個啟動信號,在 driver 為繼續(xù)執(zhí)行 worker 做好準備之前,它會阻止所有的 worker 繼續(xù)執(zhí)行。
  • 第二個類是一個完成信號,它允許 driver 在完成所有 worker 之前一直等待。
public class Worker implements Runnable {
    private CountDownLatch startSignal;
    private CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }

    public void run() {
        try {
            startSignal.await();
            work();
            doneSignal.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void work() {
    }
}
public class Driver {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);//開始信號
        CountDownLatch doneSignal = new CountDownLatch(5);//結束信號
        for (int i = 0; i < 5; i++) {
            new Thread(
                    new Worker(startSignal, doneSignal)
            ).start();
        }
        doSomethingElse("before startSignal");
        startSignal.countDown();//開始所有工作
        doSomethingElse("before doneSignal");
        doneSignal.await();//等待所有的工作結束
        doSomethingElse("done!");
    }

    public static void doSomethingElse(String str) {
        System.out.println(str);
    }
}

輸出:
before startSignal
before doneSignal
done!

3.信號量

Semaphore

一個計數(shù)信號量。從概念上講,信號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然后再獲取該許可。每個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數(shù),并采取相應的行動。

計算信號量的一種簡化形式是二值信號量,即初始值為1的Semaphore,二值信號量可以用做互斥體,并具備不可重入的加鎖語義。

Semaphore 通常用于限制可以訪問某些資源(物理或邏輯的)的線程數(shù)目。例如:

public class SemaphoreTest {
    private static final int THREAD_COUNT = 30;
    private static ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
    private static Semaphore s = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            executorService.execute(() -> {
                try {
                    s.acquire();
                    System.out.println("save data thread=" + Thread.currentThread().getName());
                    s.release();
                } catch (InterruptedException e) {
                }
            });
        }
    }
}

代碼中,雖然有30個線程在執(zhí)行,但是只允許10個并發(fā)執(zhí)行。Semaphore的構造方法 Semaphore(int permits)接受一個整型的數(shù)字,表示可用的許可證數(shù)量。Semaphore(10)表示允 許10個線程獲取許可證,也就是最大并發(fā)數(shù)是10。Semaphore的用法也很簡單,首先線程使用 Semaphore的acquire()方法獲取一個許可證,使用完之后調(diào)用release()方法歸還許可證。還可以 用tryAcquire()方法嘗試獲取許可證。

4.柵欄(屏障)

CyclicBarrier
字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫做同步點)時阻塞,直到最后一個線程到達屏障時,屏障才會打開,所有被屏障攔截的線程才會繼續(xù)執(zhí)行。同時,Barrier在釋放等待線程之后可以重用。

CyclicBarrier支持一個可選的Runnable命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令只在每個屏障點運行一次。若在繼續(xù)所有參與線程之前更新共享狀態(tài),此屏障操作 很有用。

柵欄與閉鎖的關鍵區(qū)別:

  • 閉鎖是一次性對象,柵欄可以重用,也可以reset()重置。
  • 所有線程必須都到達柵欄位置,才能繼續(xù)執(zhí)行。閉鎖用于等待事件,而柵欄用于等待其他線程。

示例用法:
用戶多線程計算數(shù)據(jù),最后合并計算結果的場景。

一個Excel保存了用戶所有的銀行流水,每個Sheet保存了一個賬戶近一年的每筆銀行流水,現(xiàn)在需要統(tǒng)計用戶的日均銀行流水,先用多線程處理每個sheet里的銀行流水,都執(zhí)行完了之后,得到每個sheet的日均銀行流水,最后,再用barrierAction用這些線程的計算結果,計算整個Excel的日均銀行流水。

import java.util.Map;
import java.util.concurrent.*;

/**
 * Created by dachao on 16-7-28.
 */
public class BankWaterService implements Runnable {
    /*
    創(chuàng)建4個屏障,處理完之后執(zhí)行當前類的run方法
     */
    private CyclicBarrier c = new CyclicBarrier(4, this);
    /*
    假設只有4個sheet,所以只啟動4個線程
     */
    private Executor executor = Executors.newFixedThreadPool(4);
    /*
    保存每個sheet計算出的銀流結果
     */
    private ConcurrentHashMap<String, Integer> count = new ConcurrentHashMap<>();

    private void count() {
        for (int i = 0; i < 4; i++) {
            executor.execute(() -> {
                //計算當前sheet的銀行數(shù)據(jù),代碼省略
                count.put(Thread.currentThread().getName(), 1);
                try {
                    c.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });

        }
    }

    @Override
    public void run() {
        int result = 0;
        //匯總每個sheet計算出的結果
        for (Map.Entry<String, Integer> sheet : count.entrySet()) {
            result += sheet.getValue();
        }
        //將結果輸出
        count.put("result", result);
        System.out.println(result);
    }

    public static void main(String[] args) {
        BankWaterService service = new BankWaterService();
        service.count();
    }
}

輸出結果:
4

5.線程間交換數(shù)據(jù)的Exchanger

Exchanger(交換者)
是一個用于線程間協(xié)作的工具類。Exchanger用于進行線程間的數(shù)據(jù)交 換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數(shù)據(jù)。這兩個線程通過 exchange方法交換數(shù)據(jù),如果第一個線程先執(zhí)行exchange()方法,它會一直等待第二個線程也 執(zhí)行exchange方法,當兩個線程都到達同步點時,這兩個線程就可以交換數(shù)據(jù),將本線程生產(chǎn) 出來的數(shù)據(jù)傳遞給對方。
Exchanger可以用于遺傳算法,遺傳算法里需要選出兩個人作為交配對象,這時候會交換 兩人的數(shù)據(jù),并使用交叉規(guī)則得出2個交配結果。Exchanger也可以用于校對工作,比如我們需 要將紙制銀行流水通過人工的方式錄入成電子銀行流水,為了避免錯誤,采用AB崗兩人進行 錄入,錄入到Excel之后,系統(tǒng)需要加載這兩個Excel,并對兩個Excel數(shù)據(jù)進行校對,看看是否 錄入一致。

import java.util.Map;
import java.util.concurrent.*;

/**
 * Created by dachao on 16-7-28.
 */
public class BankWaterService implements Runnable {
    /*
    創(chuàng)建4個屏障,處理完之后執(zhí)行當前類的run方法
     */
    private CyclicBarrier c = new CyclicBarrier(4, this);
    /*
    假設只有4個sheet,所以只啟動4個線程
     */
    private ExecutorService executor = Executors.newFixedThreadPool(4);
    /*
    保存每個sheet計算出的銀流結果
     */
    private ConcurrentHashMap<String, Integer> count = new ConcurrentHashMap<>();

    private void count() {
        for (int i = 0; i < 4; i++) {
            executor.execute(() -> {
                //計算當前sheet的銀行數(shù)據(jù),代碼省略
                count.put(Thread.currentThread().getName(), 1);
                try {
                    c.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });

        }
        executor.shutdown();
    }

    @Override
    public void run() {
        int result = 0;
        //匯總每個sheet計算出的結果
        for (Map.Entry<String, Integer> sheet : count.entrySet()) {
            result += sheet.getValue();
        }
        //將結果輸出
        count.put("result", result);
        System.out.println(result);
    }

    public static void main(String[] args) {
        BankWaterService service = new BankWaterService();
        service.count();

    }
}

輸出:
4

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容