線程的并發(fā)工具類

Java 下多線程的開發(fā)我們可以自己?jiǎn)⒂枚嗑€程,線程池,除此之外,Java還為我們提供了Fork-Join、CountDownLatch、CyclicBarrier等并發(fā)工具類。掌握并使用它們,有助于我們?cè)谶M(jìn)行并發(fā)線程開發(fā)的過程中,更加得心應(yīng)手。

Fork-Join

Fork-Join是一個(gè)使用多線程的并發(fā)工具類,可以讓我們不去了解諸如Thread,Runnable 等相關(guān)的知識(shí),只要遵循forkjoin的開發(fā)模式,就可以寫出很好的多線程并發(fā)程序,

分而治之

forkjoin 在處理某一類問題時(shí)非常的有用,哪一類問題?分而治之的問題。分治法的設(shè)計(jì)思想是:將一個(gè)難以直接解決的大問題,分割成一些規(guī)模較小的相同問題,以便各個(gè)擊破,分而治之。

分治策略是:對(duì)于一個(gè)規(guī)模為n 的問題,若該問題可以容易地解決(比如說規(guī)模n 較小)則直接解決,否則將其分解為k 個(gè)規(guī)模較小的子問題,這些子問題互相獨(dú)立且與原問題形式相同(子問題相互之間有聯(lián)系就會(huì)變?yōu)閯?dòng)態(tài)規(guī)范算法),遞歸地解這些子問題,然后將各子問題的解合并得到原問題的解。這種算法設(shè)計(jì)策略叫做分治法。

歸并排序

歸并排序是建立在歸并操作上的一種有效的排序算法。該算法是采用分治法的一個(gè)非常典型的應(yīng)用。將已有序的子序列合并,得到完全有序的序列;即先使每個(gè)子序列有序,再使子序列段間有序。若將兩個(gè)有序表合并成一個(gè)有序表,稱為2-路歸并,與之對(duì)應(yīng)的還有多路歸并。

對(duì)于給定的一組數(shù)據(jù),利用遞歸與分治技術(shù)將數(shù)據(jù)序列劃分成為越來越小的半子表,在對(duì)半子表排序后,再用遞歸方法將排好序的半子表合并成為越來越大的有序序列。為了提升性能,有時(shí)我們?cè)诎胱颖淼膫€(gè)數(shù)小于某個(gè)數(shù)(比如15)的情況下,對(duì)半子表的排序采用其他排序算法,比如插入排序。

// 歸并排序,遞歸實(shí)現(xiàn)
public static void sortMergeRecursion(int[] nums) {
    sortMergeRecursionHelper(nums, 0, nums.length - 1);
}

public static void sortMergeRecursionHelper(int[] nums,int left, int right) {
    if(left == right) return;  // 當(dāng)待排序的序列長(zhǎng)度為1時(shí),遞歸開始回溯,進(jìn)行merge
    int middle = left + (right - left) / 2;
    sortMergeRecursionHelper(nums, left, middle);
    sortMergeRecursionHelper(nums, middle + 1, right);
    mergeArr(nums, left, middle, right);
}

public static void mergeArr(int[] nums, int left, int middle, int right) {
    int[] tem = new int[right - left + 1];
    int i = left, j = middle + 1, k = 0;
    while(i <= middle && j <= right) {
        tem[k++] = nums[i] < nums[j]? nums[i++] : nums[j++];
    }
    while(i <= middle) {
        tem[k++] = nums[i++];
    }
    while(j <= right) {
        tem[k++] = nums[j++];
    }
    // 將輔助數(shù)組數(shù)據(jù)寫入原數(shù)組
    int index = 0;
    while(left <= right) {
        nums[left++] = tem[index++];
    }
}

Fork-Join 原理

Fork/Join框架,就是在必要的情況下,將一個(gè)大任務(wù),進(jìn)行拆分(fork)成若干個(gè)小任務(wù)(拆到不可以再拆時(shí)),再將一個(gè)個(gè)小任務(wù)運(yùn)算的結(jié)果進(jìn)行Join匯總。除了分而治之之外,F(xiàn)ork-Join還有一種重要的思想,那就是工作密取。

工作密取
即當(dāng)前線程的Task已經(jīng)全被執(zhí)行完畢,則自動(dòng)取到其他線程的Task池中取出Task 繼續(xù)執(zhí)行。ForkJoinPool 中維護(hù)著多個(gè)線程(一般為CPU 核數(shù))在不斷地執(zhí)行Task,每個(gè)線程除了執(zhí)行自己職務(wù)內(nèi)的Task之外,還會(huì)根據(jù)自己工作線程的閑置情況去獲取其他繁忙的工作線程的Task,如此一來就能能夠減少線程阻塞或是閑置的時(shí)間,提高CPU利用率。

Fork/Join 實(shí)戰(zhàn)

Fork/Join 使用的標(biāo)準(zhǔn)范式

我們要使用ForkJoin 框架,必須首先創(chuàng)建一個(gè)ForkJoin 任務(wù)。它提供在任務(wù)中執(zhí)行fork 和join 的操作機(jī)制,通常我們不直接繼承ForkjoinTask 類,只需要直接繼承其子類。

  1. RecursiveAction,用于沒有返回結(jié)果的任務(wù)
  2. RecursiveTask,用于有返回值的任務(wù)

task 要通過ForkJoinPool 來執(zhí)行,使用submit()invoke() 提交,兩者的區(qū)別是:invoke() 是同步執(zhí)行,調(diào)用之后需要等待任務(wù)完成,才能執(zhí)行后面的代碼;submit()是異步執(zhí)行。join()get()方法當(dāng)任務(wù)完成的時(shí)候返回計(jì)算結(jié)果。

在我們自己實(shí)現(xiàn)的compute()方法里,首先需要判斷任務(wù)是否足夠小,如果足夠小就直接執(zhí)行任務(wù)。如果不足夠小,就必須分割成兩個(gè)子任務(wù),每個(gè)子任務(wù)在調(diào)用invokeAll()方法時(shí),又會(huì)進(jìn)入compute()方法,看看當(dāng)前子任務(wù)是否需要繼續(xù)分割成孫任務(wù),如果不需要繼續(xù)分割,則執(zhí)行當(dāng)前子任務(wù)并返回結(jié)果。使用join方法會(huì)等待子任務(wù)執(zhí)行完并得到其結(jié)果。

同步用法(有返回值)

在這里我們使用Fork/Join的同步用法統(tǒng)計(jì)整型數(shù)組中所有元素的和。

MakeArray是一個(gè)用于生成隨機(jī)數(shù)組的工具類。
先來看單線程計(jì)算:

public class SumNormal {

    public static void main(String[] args) {
        int count = 0;
        int[] src = MakeArray.makeArray();

        long start = System.currentTimeMillis();
        for(int i= 0;i<src.length;i++){
            // SleepTools.ms(1);
            count = count + src[i];
        }
        System.out.println("The count is "+count
                +" spend time:"+(System.currentTimeMillis()-start)+"ms");
    }
}

運(yùn)行打印結(jié)果:The count is 23995323 spend time:0ms。

我們?cè)賮砜词褂肍ork/Join來計(jì)算:

public class SumArray {
    private static class SumTask extends RecursiveTask<Integer> {
        //拆分的閾值
        private final static int THRESHOLD = MakeArray.ARRAY_LENGTH / 10;
        private int[] src;
        private int fromIndex;
        private int toIndex;

        public SumTask(int[] src, int fromIndex, int toIndex) {
            this.src = src;
            this.fromIndex = fromIndex;
            this.toIndex = toIndex;
        }

        @Override
        protected Integer compute() {
            //任務(wù)的大小是否合適
            if (toIndex - fromIndex < THRESHOLD) {
                System.out.println(" from index = " + fromIndex + " toIndex=" + toIndex);
                int count = 0;
                for (int i = fromIndex; i <= toIndex; i++) {
                    // SleepTools.ms(1);
                    count = count + src[i];
                }
                return count;
            } else {
                //fromIndex....mid.....toIndex
                int mid = (fromIndex + toIndex) / 2;
                SumTask left = new SumTask(src, fromIndex, mid);
                SumTask right = new SumTask(src, mid + 1, toIndex);
                invokeAll(left, right);
                return left.join() + right.join();
            }
        }
    }


    public static void main(String[] args) {

        ForkJoinPool pool = new ForkJoinPool();
        int[] src = MakeArray.makeArray();

        SumTask innerFind = new SumTask(src, 0, src.length - 1);

        long start = System.currentTimeMillis();

        pool.invoke(innerFind);
        //System.out.println("Task is Running.....");

        System.out.println("The count is " + innerFind.join()
                + " spend time:" + (System.currentTimeMillis() - start) + "ms");

    }
}

運(yùn)行打印結(jié)果:The count is 23704295 spend time:2ms。

從結(jié)果來看,我們使用Fork/Join,還不如使用單線程。為什么會(huì)這樣,F(xiàn)ork/Join的Task是繼承自RecursiveTask的,Recursive是遞歸的意思,既然是遞歸,那么就涉及到方法的出棧和壓棧。另一方面由于是多線程任務(wù),那么就會(huì)有線程的上下文切換,由于這是CPU計(jì)算密集型任務(wù),自然而然就會(huì)比單線程的慢。

既然這樣,我們?cè)诖a中加上一句SleepTools.ms(1);,再來看結(jié)果如何呢。

單線程:The count is 23973773 spend time:6912ms
Fork/Join:The count is 24007429 spend time:920ms

這個(gè)時(shí)間比單線程快了將近7倍吧,對(duì)于單線程來說,每次相加休眠1ms,那么4000個(gè)數(shù)的累加就休眠將近4000ms。而對(duì)于Fork/Join來說,由于分而治之的思想,線程休眠1ms影響微小。當(dāng)單線程任務(wù)耗時(shí)變長(zhǎng)的情況下,F(xiàn)ork/Join的性能優(yōu)勢(shì)就體現(xiàn)出來了。

因此我們?cè)谑褂枚嗑€程開發(fā)的時(shí)候,要考慮清楚線程任務(wù)的性質(zhì),再做選擇。

異步用法(無返回值)

在這里我們使用Fork/Join的異步用法遍歷指定目錄,尋找指定類型文件

/**
 * 類說明:遍歷指定目錄(含子目錄)找尋指定類型文件
 */
public class FindDirsFiles extends RecursiveAction {

    private File path;

    public FindDirsFiles(File path) {
        this.path = path;
    }

    @Override
    protected void compute() {
        List<FindDirsFiles> subTasks = new ArrayList<>();

        File[] files = path.listFiles();
        if (files != null) {
            for (File file : files) {
                if (file.isDirectory()) {
                    // 對(duì)每個(gè)子目錄都新建一個(gè)子任務(wù)。
                    subTasks.add(new FindDirsFiles(file));
                } else {
                    // 遇到文件,檢查。
                    if (file.getAbsolutePath().endsWith("txt")) {
                        System.out.println("文件:" + file.getAbsolutePath());
                    }
                }
            }
            if (!subTasks.isEmpty()) {
                // 在當(dāng)前的 ForkJoinPool 上調(diào)度所有的子任務(wù)。
                for (FindDirsFiles subTask : invokeAll(subTasks)) {
                    subTask.join();
                }
            }
        }
    }

    public static void main(String[] args) {
        try {
            // 用一個(gè) ForkJoinPool 實(shí)例調(diào)度總?cè)蝿?wù)
            ForkJoinPool pool = new ForkJoinPool();
            FindDirsFiles task = new FindDirsFiles(new File("F:/"));

            //異步提交
            pool.execute(task);

            System.out.println("Task is Running......");

            Thread.sleep(1);
            //主線程做自己的工作
            int otherWork = 0;
            for (int i = 0; i < 100; i++) {
                otherWork = otherWork + i;
            }
            System.out.println("Main Thread done sth......,otherWork=" + otherWork);
            //阻塞方法
            task.join();
            System.out.println("Task end");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

CountDownLatch

CountDownLatch,也稱之為閉鎖,CountDownLatch 這個(gè)類能夠使一個(gè)線程等待其他線程完成各自的工作后再執(zhí)行。例如,應(yīng)用程序的主線程希望在負(fù)責(zé)啟動(dòng)框架服務(wù)的線程已經(jīng)啟動(dòng)所有的框架服務(wù)之后再執(zhí)行(初始化)。

CountDownLatch 是通過一個(gè)計(jì)數(shù)器來實(shí)現(xiàn)的,計(jì)數(shù)器的初始值為初始任務(wù)的數(shù)量。每當(dāng)完成了一個(gè)任務(wù)后,計(jì)數(shù)器的值就會(huì)減1(CountDownLatch.countDown()方法)。當(dāng)計(jì)數(shù)器值到達(dá)0 時(shí),它表示所有的已經(jīng)完成了任務(wù),然后在閉鎖上等待CountDownLatch.await()方法的線程就可以恢復(fù)執(zhí)行任務(wù)。

基本使用

構(gòu)造方法

  • CountDownLatch(int count) 構(gòu)造一個(gè)以給定計(jì)數(shù) CountDownLatch CountDownLatch。

方法:

  • await() 當(dāng)前線程等到鎖存器計(jì)數(shù)到零,除非線程是 interrupted 。
  • await(long timeout, TimeUnit unit) 使當(dāng)前線程等待直到鎖存器計(jì)數(shù)到零為止,除非線程為 interrupted或指定的等待時(shí)間過去。
  • countDown() 減少鎖存器的計(jì)數(shù),如果計(jì)數(shù)達(dá)到零,釋放所有等待的線程。
  • getCount() 返回當(dāng)前計(jì)數(shù)。

實(shí)例演示:

/**
 *類說明:演示CountDownLatch用法,
 * 共5個(gè)初始化子線程,6個(gè)閉鎖扣除點(diǎn),扣除完畢后,主線程和業(yè)務(wù)線程才能繼續(xù)執(zhí)行
 */
public class UseCountDownLatch {
    static CountDownLatch latch = new CountDownLatch(6);

    /**
    * 初始化線程
    */
    private static class InitThread implements Runnable{

        @Override
        public void run() {
            System.out.println("Thread_"+Thread.currentThread().getId()
                    +" ready init work......");
            latch.countDown();
            for(int i =0;i<2;i++) {
                System.out.println("Thread_"+Thread.currentThread().getId()
                        +" ........continue do its work");
            }
        }
    }

    /**
    * 業(yè)務(wù)線程等待latch的計(jì)數(shù)器為0 完成
    */
    private static class BusiThread implements Runnable{

        @Override
        public void run() {
          try{
            latch.await();
            for(int i =0;i<3;i++) {
                System.out.println("BusiThread_"+Thread.currentThread().getId()
                        +" do business-----");
            }
          }catch (Exception e) {

          }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                SleepTools.ms(1);
                System.out.println("Thread_"+Thread.currentThread().getId()
                        +" ready init work step 1st......");
                latch.countDown();
                System.out.println("begin step 2nd.......");
                SleepTools.ms(1);
                System.out.println("Thread_"+Thread.currentThread().getId()
                        +" ready init work step 2nd......");
                latch.countDown();
            }
        }).start();
        new Thread(new BusiThread()).start();
        for(int i=0;i<=3;i++){
            Thread thread = new Thread(new InitThread());
            thread.start();
        }
        latch.await();
        System.out.println("Main do ites work........");
    }
}

從代碼來看,等待的線程可以是多個(gè),且可以在主線程和異步線程中使用latch.await();,而且同一個(gè)線程里面,也可以多次調(diào)用latch.countDown();。在使用的時(shí)候,我們也要注意,如果閉鎖扣除點(diǎn)大于實(shí)際扣減數(shù),那么等待線程就會(huì)一直等下去,所以一定要計(jì)算好閉鎖初始化值和扣減次數(shù)。

CountDownLatch的不足
CountDownLatch是一次性的,計(jì)數(shù)器的值只能在構(gòu)造方法中初始化一次,之后沒有任何機(jī)制再次對(duì)其設(shè)置值,當(dāng)CountDownLatch使用完畢后,它不能再次被使用。

CyclicBarrier

CyclicBarrier,也稱為循環(huán)阻塞,允許一組線程全部等待彼此達(dá)到共同屏障點(diǎn)的同步輔助。循環(huán)阻塞在涉及固定大小的線程的程序中很有用,這些線程必須偶爾等待彼此。阻塞之所以被稱為循環(huán),因?yàn)樗梢栽诘却木€程被釋放之后重新使用。

基本使用

構(gòu)造方法

  • CyclicBarrier(int parties)
    創(chuàng)建一個(gè)新的 CyclicBarrier ,當(dāng)給定數(shù)量的線程(線程)等待它時(shí),它將跳閘,并且當(dāng)屏障跳閘時(shí)不執(zhí)行預(yù)定義的動(dòng)作。
  • CyclicBarrier(int parties, Runnable barrierAction)
    創(chuàng)建一個(gè)新的 CyclicBarrier ,當(dāng)給定數(shù)量的線程(線程)等待時(shí),它將跳閘,當(dāng)屏障跳閘時(shí)執(zhí)行給定的屏障動(dòng)作,由最后一個(gè)進(jìn)入屏障的線程執(zhí)行。

方法

  • int await() 等待所有 parties已經(jīng)在這個(gè)障礙上調(diào)用了 await 。
  • int await(long timeout, TimeUnit unit) 等待所有 parties已經(jīng)在此屏障上調(diào)用 await ,或指定的等待時(shí)間過去。
  • int getNumberWaiting() 返回目前正在等待障礙的各方的數(shù)量。
  • int getParties() 返回旅行這個(gè)障礙所需的parties數(shù)量。
  • boolean isBroken() 查詢這個(gè)障礙是否處于破碎狀態(tài)。
  • void reset() 將屏障重置為初始狀態(tài)。

實(shí)例演示

/**
 * 類說明:演示CyclicBarrier用法,共5個(gè)子線程,他們?nèi)客瓿晒ぷ骱螅怀鲎约航Y(jié)果,
 * 再被統(tǒng)一釋放去做自己的事情,而交出的結(jié)果被另外的線程拿來拼接字符串
 */
public class UseCyclicBarrier {
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new CollectRunnable());
    private static ConcurrentHashMap<String, Long> resultMap
            = new ConcurrentHashMap<>();//存放子線程工作結(jié)果的容器

    public static void main(String[] args) {
        for (int i = 0; i <= 4; i++) {
            Thread thread = new Thread(new SubRunnable());
            thread.start();
        }
    }

    private static class CollectRunnable implements Runnable {

        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
                result.append("[" + workResult.getValue() + "]");
            }
            System.out.println(" the result = " + result);
            System.out.println("do other business........");
        }
    }

    private static class SubRunnable implements Runnable {

        @Override
        public void run() {
            long id = Thread.currentThread().getId();
            try {
                Thread.sleep(1000 + id);
                System.out.println("Thread_" + id + " ....do something ");
                resultMap.put(Thread.currentThread().getId() + "", id);
                cyclicBarrier.await();
                Thread.sleep(1000 + id);
                System.out.println("Thread_" + id + " ....do its business ");
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}

我們定義了兩個(gè)Runnable,SubRunnableCollectRunnable,然后定義了CyclicBarrier對(duì)象,阻塞計(jì)數(shù)初始值為5,并設(shè)定屏障動(dòng)作為CollectRunnable。在SubRunnable中調(diào)用cyclicBarrier.await();使屏障點(diǎn)加1,在主線程中新啟5個(gè)線程,分別傳入SubRunnable任務(wù)。

運(yùn)行代碼,可以觀察到執(zhí)行SubRunnable的線程會(huì)在調(diào)用完await()后,進(jìn)入阻塞等待狀態(tài),直到所有線程都到達(dá)屏障點(diǎn),即到達(dá)屏障點(diǎn)的線程數(shù)等于5了,達(dá)到初始化設(shè)定的值,最后一個(gè)到達(dá)屏障點(diǎn)的線程就會(huì)執(zhí)行去執(zhí)行CollectRunnable中的邏輯,然后各個(gè)線程才會(huì)去執(zhí)行各自任務(wù)接下來的代碼。

Thread_9 ....do something
Thread_10 ....do something
Thread_11 ....do something
Thread_12 ....do something
Thread_13 ....do something
Thread_13 the result = [11][12][13][9][10]
Thread_13 do other business........
Thread_9 ....do its business
Thread_10 ....do its business
Thread_11 ....do its business
Thread_12 ....do its business
Thread_13 ....do its business

Process finished with exit code 0

CountDownLatch和CyclicBarrier的比較

  • CountDownLatch是線程組之間的等待,即一個(gè)(或多個(gè))線程等待N個(gè)線程完成某件事情之后再執(zhí)行;而CyclicBarrier則是線程組內(nèi)的等待,即每個(gè)線程相互等待,即N個(gè)線程都被攔截之后,然后依次執(zhí)行。
  • CountDownLatch是減計(jì)數(shù)方式,而CyclicBarrier是加計(jì)數(shù)方式。
  • CountDownLatch計(jì)數(shù)為0無法重置,而CyclicBarrier計(jì)數(shù)達(dá)到初始值,則可以重置。
  • CountDownLatch不可以復(fù)用,而CyclicBarrier可以復(fù)用。

Semaphore

基本使用

Semaphore,也稱之為信號(hào)量,信號(hào)量就相當(dāng)于一個(gè)計(jì)數(shù)器,通常用來限制線程的數(shù)量。它也是一個(gè)線程同步的輔助類,可以維護(hù)當(dāng)前訪問自身的線程個(gè)數(shù),并提供了同步機(jī)制。使用Semaphore可以控制同時(shí)訪問資源的線程個(gè)數(shù),例如,實(shí)現(xiàn)一個(gè)文件允許的并發(fā)訪問數(shù)。

構(gòu)造方法

  • public Semaphore(int permits)
  • public Semaphore(int permits, boolean fair) 可以提供了公平和非公平兩種策略

主要方法

  • void acquire() 從此信號(hào)量獲取一個(gè)許可,在提供一個(gè)許可前一直將線程阻塞,否則線程被中斷。
  • void release() 釋放一個(gè)許可,將其返回給信號(hào)量。
  • int availablePermits() 返回此信號(hào)量中當(dāng)前可用的許可數(shù)。
  • boolean hasQueuedThreads() 查詢是否有線程正在等待獲取。

實(shí)例演示

我們用一個(gè)連接池獲取連接的例子來演示Semaphore的使用:
DBPoolSemaphore.java

/**
 * 類說明:演示Semaphore用法,一個(gè)數(shù)據(jù)庫(kù)連接池的實(shí)現(xiàn)
 */
public class DBPoolSemaphore {
    private final static int POOL_SIZE = 10;
    private final Semaphore sp;

    //存放數(shù)據(jù)庫(kù)連接的容器
    private static LinkedList<Connection> pool = new LinkedList<Connection>();

    //初始化池
    static {
        for (int i = 0; i < POOL_SIZE; i++) {
            pool.addLast(SqlConnectImpl.fetchConnection());
        }
    }
    public DBPoolSemaphore() {
        sp = new Semaphore(10);
    }

    /*歸還連接*/
    public void returnConnect(Connection connection) throws InterruptedException {
        if (connection != null) {
            System.out.println("當(dāng)前有" + sp.getQueueLength() + "個(gè)線程等待數(shù)據(jù)庫(kù)連接!!"
                    + "可用連接數(shù):" + sp.availablePermits());
            synchronized (pool) {
                pool.addLast(connection);
            }
            sp.release();
        }
    }

    /*從池子拿連接*/
    public Connection takeConnect() throws InterruptedException {
        sp.acquire();
        Connection connection;
        synchronized (pool) {
            connection = pool.removeFirst();
        }
        return connection;
    }

}

SemaphoreTest.java

/**
 * 類說明:測(cè)試數(shù)據(jù)庫(kù)連接池
 */
public class AppTest {

    private static DBPoolSemaphore dbPool = new DBPoolSemaphore();

    private static class BusinessThread extends Thread {
        @Override
        public void run() {
            Random r = new Random();//讓每個(gè)線程持有連接的時(shí)間不一樣
            long start = System.currentTimeMillis();
            try {
                Connection connect = dbPool.takeConnect();
                System.out.println("Thread_" + Thread.currentThread().getId()
                        + "_獲取數(shù)據(jù)庫(kù)連接共耗時(shí)【" + (System.currentTimeMillis() - start) + "】ms.");
                SleepTools.ms(100 + r.nextInt(100));//模擬業(yè)務(wù)操作,線程持有連接查詢數(shù)據(jù)
                System.out.println("查詢數(shù)據(jù)完成,歸還連接!");
                dbPool.returnConnect(connect);
            } catch (InterruptedException e) {
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            Thread thread = new BusinessThread();
            thread.start();
        }
    }

在這個(gè)例子,在DBPoolSemaphore類中,我們初始化了一個(gè)有10個(gè)連接的連接池,并new了一個(gè)Semaphore實(shí)例,在主線程中,啟動(dòng)了50個(gè)線程去獲取數(shù)據(jù)庫(kù)連接,獲取的過程需要通過Semaphore.acquire()來獲取,如果獲取不到,則阻塞,直到分配到信號(hào)量許可為止。當(dāng)使用完連接后,則釋放連接,并通過Semaphore.release()歸還信號(hào)量許可。

Semaphore注意事項(xiàng)

在使用Semaphore的過程中,有一點(diǎn)非常需要注意,即使創(chuàng)建信號(hào)量的時(shí)候,指定了信號(hào)量的大小,但是不正當(dāng)使用release()操作釋放信號(hào)量會(huì)使得信號(hào)量超過配置的大小,也就有可能同時(shí)執(zhí)行的線程數(shù)量比最開始設(shè)置的要大。因?yàn)闆]有任何線程獲取信號(hào)量的時(shí)候,依然能夠釋放并且釋放的有效。例如:

private void testRelease(){
  Semaphore semaphore = new Semaphore(3);
  Runnable runnable = () -> {
      try {
          System.out.println(Thread.currentThread().getName() + "try acquire");
          semaphore.acquire();
          System.out.println(Thread.currentThread().getName() + "acquired semaphore");
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
  };

  semaphore.release();
  // 4個(gè)線程都能夠獲取到信號(hào)量
  for (int i = 0; i < 4; i++) {
      new Thread(runnable).start();
  }
}

因此,推薦的做法是,保證在使用時(shí),一個(gè)線程先acquire(),然后release()。如果釋放線程和獲取線程不是同一個(gè),那么最好保證這種對(duì)應(yīng)關(guān)系。不要釋放過多的許可證。

Exchange

Exchanger類可用于兩個(gè)線程之間交換信息??珊?jiǎn)單地將Exchanger對(duì)象理解為一個(gè)包含兩個(gè)格子的容器,通過exchanger方法可以向兩個(gè)格子中填充信息。當(dāng)兩個(gè)格子中的均被填充時(shí),該對(duì)象會(huì)自動(dòng)將兩個(gè)格子的信息交換,然后返回給線程,從而實(shí)現(xiàn)兩個(gè)線程的信息交換。

簡(jiǎn)單說就是一個(gè)線程在完成一定的事務(wù)后想與另一個(gè)線程交換數(shù)據(jù),則第一個(gè)先拿出數(shù)據(jù)的線程會(huì)一直等待第二個(gè)線程,直到第二個(gè)線程拿著數(shù)據(jù)到來時(shí)才能彼此交換對(duì)應(yīng)數(shù)據(jù)。其定義為 Exchanger<V> 泛型類型,其中 V 表示可交換的數(shù)據(jù)類型,

基本使用

構(gòu)造方法

  • Exchanger():無參構(gòu)造方法。

方法

  • V exchange(V v):等待另一個(gè)線程到達(dá)此交換點(diǎn)(除非當(dāng)前線程被中斷),然后將給定的對(duì)象傳送給該線程,并接收該線程的對(duì)象。
  • V exchange(V v, long timeout, TimeUnit unit):等待另一個(gè)線程到達(dá)此交換點(diǎn)(除非當(dāng)前線程被中斷或超出了指定的等待時(shí)間),然后將給定的對(duì)象傳送給該線程,并接收該線程的對(duì)象。

實(shí)例演示:

/**
 * 類說明:演示CyclicExchange用法
 */
public class UseExchange {
    private static final Exchanger<String> exchange = new Exchanger<String>();

    public static void main(String[] args) {

        new Thread(new Runnable() {
            @Override
            public void run() {
                long id = Thread.currentThread().getId();
                String data = "A";
                try {
                    System.out.println("Thread [" + id + "] 交換前 : " + data);
                    data = exchange.exchange(data);
                    System.out.println("Thread [" + id + "] 交換后 : " + data);

                } catch (InterruptedException e) {
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                long id = Thread.currentThread().getId();
                String data = "B";
                try {
                    System.out.println("Thread [" + id + "] 交換前 : " + data);
                    Thread.sleep(2000);
                    data = exchange.exchange(data);
                    System.out.println("Thread [" + id + "] 交換后 : " + data);

                } catch (InterruptedException e) {
                }
            }
        }).start();
    }
}

運(yùn)行結(jié)果如下:

Thread [9] 交換前 : A
Thread [10] 交換前 : B
Thread [10] 交換后 : A
Thread [9] 交換后 : B

可以看出,當(dāng)一個(gè)線程到達(dá) exchange 調(diào)用點(diǎn)時(shí),如果其他線程此前已經(jīng)調(diào)用了此方法,則其他線程會(huì)被調(diào)度喚醒并與之進(jìn)行對(duì)象交換,然后各自返回;如果其他線程還沒到達(dá)交換點(diǎn),則當(dāng)前線程會(huì)被掛起,直至其他線程到達(dá)才會(huì)完成交換并正常返回,或者當(dāng)前線程被中斷或超時(shí)返回。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容