多線程(15) — 阻塞隊(duì)列

阻塞隊(duì)列(BlockingQueue)接口繼承了Queue接口,其有兩個(gè)實(shí)現(xiàn)阻塞的方法:1. 移除阻塞:當(dāng)隊(duì)列為空時(shí),獲取隊(duì)列元素的線程即隊(duì)列的彈出操作會(huì)被阻塞,直到有元素被插入才被喚醒。2. 插入阻塞:當(dāng)隊(duì)列已滿時(shí),對(duì)這個(gè)隊(duì)列的插入操作就會(huì)被阻塞,直到有元素被彈出后才會(huì)被喚醒。

阻塞隊(duì)列除了在線程池工作隊(duì)列里作為實(shí)現(xiàn)的接口外,還常用于實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者模型生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里取元素的線程,而阻塞隊(duì)列則用來(lái)存放“消費(fèi)商品”的容器。

阻塞隊(duì)列運(yùn)用例子:

package ThreadPractice;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTest {
    public static void main(String[] args) {
        final BlockingQueue queue = new ArrayBlockingQueue(3);  // 隊(duì)列的容量,超過(guò)時(shí)則阻塞
        for(int i=0;i<2;i++){
            new Thread(){
                public void run(){
                    while(true){
                        try {
                            Thread.sleep((long)(Math.random()*1000));
                            System.out.println(Thread.currentThread().getName() + "準(zhǔn)備放數(shù)據(jù)");                         
                            queue.put(1);       // 向隊(duì)列加元素
                            System.out.println(Thread.currentThread().getName() + "已經(jīng)放了數(shù)據(jù)," +
                                        "隊(duì)列目前有" + queue.size() + "個(gè)數(shù)據(jù)");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                    }
                }
                
            }.start();
        }
        
        new Thread(){
            public void run(){
                while(true){
                    try {
                        // sleep時(shí)間不同,會(huì)取得快取得慢,線程阻塞情況不一樣
                        Thread.sleep(1000);
                        System.out.println(Thread.currentThread().getName() + "準(zhǔn)備取數(shù)據(jù)");
                        queue.take();   // 取走數(shù)據(jù)
                        System.out.println(Thread.currentThread().getName() + "已經(jīng)取走數(shù)據(jù)," +
                                "隊(duì)列目前有" + queue.size() + "個(gè)數(shù)據(jù)");                    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            
        }.start();          
    }
}
======console=======
Thread-1準(zhǔn)備放數(shù)據(jù)
Thread-1已經(jīng)放了數(shù)據(jù),隊(duì)列目前有1個(gè)數(shù)據(jù)
Thread-0準(zhǔn)備放數(shù)據(jù)
Thread-0已經(jīng)放了數(shù)據(jù),隊(duì)列目前有2個(gè)數(shù)據(jù)
Thread-0準(zhǔn)備放數(shù)據(jù)
Thread-0已經(jīng)放了數(shù)據(jù),隊(duì)列目前有3個(gè)數(shù)據(jù)
Thread-1準(zhǔn)備放數(shù)據(jù)
Thread-2準(zhǔn)備取數(shù)據(jù)
Thread-2已經(jīng)取走數(shù)據(jù),隊(duì)列目前有2個(gè)數(shù)據(jù)
Thread-1已經(jīng)放了數(shù)據(jù),隊(duì)列目前有3個(gè)數(shù)據(jù)
Thread-1準(zhǔn)備放數(shù)據(jù)

問(wèn):怎么通過(guò)阻塞隊(duì)列實(shí)現(xiàn)之前的子主兩條線程運(yùn)行的程序?
答:完成代碼如下:

package ThreadPractice;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class BlockingQueueCommunication {

    public static void main(String[] args) {
        final Business business = new Business();
        new Thread(
                new Runnable() {
                    
                    @Override
                    public void run() {
                    for(int i=1;i<=50;i++){
                        business.sub(i);
                    }
                    }
                }
        ).start();
        for(int i=1;i<=50;i++){
            business.main(i);
        }
    }

    // 加了static后相當(dāng)于內(nèi)部的外部類 因?yàn)閟tatic會(huì)在類加載時(shí)就先加載
    static class Business {
        BlockingQueue blockingQueue1 = new ArrayBlockingQueue(1);
        BlockingQueue blockingQueue2 = new ArrayBlockingQueue(1);
        //(a)成員變量創(chuàng)建類的實(shí)例對(duì)象后才分配空間才會(huì)有值,所以通過(guò)構(gòu)造方法對(duì)blockingQueue2賦值
        //這里不能用static代碼塊,static會(huì)先加載就找不到成員變量的構(gòu)造,故用這種 匿名構(gòu)造方法
        {   // 匿名構(gòu)造方法在所有構(gòu)造方法之前執(zhí)行
            try {
                System.out.println("代碼開(kāi)始執(zhí)行");
                blockingQueue2.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public void sub(int i) {
            try {
                blockingQueue1.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (int j = 1; j <= 10; j++) {
                System.out.println("子線程循環(huán):" + j + "  循環(huán)了:" + i);
            }
            try {
                blockingQueue2.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public void main(int i) {
            try {
                blockingQueue2.put(1);  // 由于(a)需要阻塞所以要先放值故需要在構(gòu)造器添加默認(rèn)值
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (int j = 1; j <= 100; j++) {
                System.out.println("主線程循環(huán):" + j + " 循環(huán)了:" + i);
            }
            try {
                blockingQueue1.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

注意:如果加了synchronized就會(huì)出現(xiàn)死鎖,當(dāng)線程2進(jìn)入時(shí)往blockingQueue2添加數(shù)據(jù)時(shí)由于此時(shí)隊(duì)列已經(jīng)加了數(shù)據(jù)滿了,所以線程2會(huì)阻塞,而synchronized會(huì)讓線程加鎖不受其他線程干擾,所以將一直處于阻塞狀態(tài)。


image.png

死鎖


image.png

所以阻塞隊(duì)列主要利用的就是:1.隊(duì)列為空取的時(shí)候阻塞 2.隊(duì)列滿時(shí)添加的時(shí)候會(huì)阻塞 3.隊(duì)列先進(jìn)先出

可以將數(shù)據(jù)存儲(chǔ)在阻塞隊(duì)列里 — 比如:生產(chǎn)者消費(fèi)者模式

生產(chǎn)者與消費(fèi)者模式就是一個(gè)經(jīng)典的例子:
生產(chǎn)者消費(fèi)者模式,將產(chǎn)生數(shù)據(jù)的線程叫做生產(chǎn)者,而將使用數(shù)據(jù)的線程叫做消費(fèi)者。當(dāng)出現(xiàn)生產(chǎn)者處理速度很快,而消費(fèi)者處理速度慢的時(shí)候,那么生產(chǎn)者線程就要等待消費(fèi)者全部處理完,同理如果消費(fèi)者比生產(chǎn)者快就會(huì)產(chǎn)生等待消費(fèi)者,也就是說(shuō)的生產(chǎn)消費(fèi)效率不同,因此有了生產(chǎn)者消費(fèi)者模式。(這有點(diǎn)類似于工業(yè)工程上面的生產(chǎn)線緩存也就是我們所說(shuō)的產(chǎn)線“超市”)

解決問(wèn)題的思路就是解耦,在后面馬上提到的volatile與內(nèi)存空間中的關(guān)于線程的讀寫(xiě)的高速緩存區(qū)類似的working memory工作區(qū),即通過(guò)第三方緩存來(lái)實(shí)現(xiàn)對(duì)效率不對(duì)等問(wèn)題的解耦操作,工廠模式也有此類的思路。線程池也有。所以通過(guò)阻塞隊(duì)列實(shí)現(xiàn)線程間的通信,其本質(zhì)就相當(dāng)于一個(gè)生產(chǎn)者消費(fèi)者的容器,當(dāng)生產(chǎn)者生產(chǎn)完直接扔給阻塞隊(duì)列,而不用等待消費(fèi)者進(jìn)行消費(fèi),消費(fèi)者也不用管生產(chǎn)者生產(chǎn)而是直接從阻塞隊(duì)列里面取,當(dāng)阻塞隊(duì)列滿了再開(kāi)始生產(chǎn)者線程阻塞,當(dāng)阻塞隊(duì)列取完了那么消費(fèi)者線程進(jìn)入阻塞直到隊(duì)列又有新的資源進(jìn)入非空。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容