阻塞隊(duì)列(BlockingQueue)接口繼承了Queue接口,其有兩個(gè)實(shí)現(xiàn)阻塞的方法:1. 移除阻塞:當(dāng)隊(duì)列為空時(shí),獲取隊(duì)列元素的線程即隊(duì)列的彈出操作會(huì)被阻塞,直到有元素被插入才被喚醒。2. 插入阻塞:當(dāng)隊(duì)列已滿時(shí),對(duì)這個(gè)隊(duì)列的插入操作就會(huì)被阻塞,直到有元素被彈出后才會(huì)被喚醒。
阻塞隊(duì)列除了在線程池工作隊(duì)列里作為實(shí)現(xiàn)的接口外,還常用于實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者模型生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里取元素的線程,而阻塞隊(duì)列則用來(lái)存放“消費(fèi)商品”的容器。
阻塞隊(duì)列運(yùn)用例子:
package ThreadPractice;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueTest {
public static void main(String[] args) {
final BlockingQueue queue = new ArrayBlockingQueue(3); // 隊(duì)列的容量,超過(guò)時(shí)則阻塞
for(int i=0;i<2;i++){
new Thread(){
public void run(){
while(true){
try {
Thread.sleep((long)(Math.random()*1000));
System.out.println(Thread.currentThread().getName() + "準(zhǔn)備放數(shù)據(jù)");
queue.put(1); // 向隊(duì)列加元素
System.out.println(Thread.currentThread().getName() + "已經(jīng)放了數(shù)據(jù)," +
"隊(duì)列目前有" + queue.size() + "個(gè)數(shù)據(jù)");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
new Thread(){
public void run(){
while(true){
try {
// sleep時(shí)間不同,會(huì)取得快取得慢,線程阻塞情況不一樣
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "準(zhǔn)備取數(shù)據(jù)");
queue.take(); // 取走數(shù)據(jù)
System.out.println(Thread.currentThread().getName() + "已經(jīng)取走數(shù)據(jù)," +
"隊(duì)列目前有" + queue.size() + "個(gè)數(shù)據(jù)");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
}
======console=======
Thread-1準(zhǔn)備放數(shù)據(jù)
Thread-1已經(jīng)放了數(shù)據(jù),隊(duì)列目前有1個(gè)數(shù)據(jù)
Thread-0準(zhǔn)備放數(shù)據(jù)
Thread-0已經(jīng)放了數(shù)據(jù),隊(duì)列目前有2個(gè)數(shù)據(jù)
Thread-0準(zhǔn)備放數(shù)據(jù)
Thread-0已經(jīng)放了數(shù)據(jù),隊(duì)列目前有3個(gè)數(shù)據(jù)
Thread-1準(zhǔn)備放數(shù)據(jù)
Thread-2準(zhǔn)備取數(shù)據(jù)
Thread-2已經(jīng)取走數(shù)據(jù),隊(duì)列目前有2個(gè)數(shù)據(jù)
Thread-1已經(jīng)放了數(shù)據(jù),隊(duì)列目前有3個(gè)數(shù)據(jù)
Thread-1準(zhǔn)備放數(shù)據(jù)
問(wèn):怎么通過(guò)阻塞隊(duì)列實(shí)現(xiàn)之前的子主兩條線程運(yùn)行的程序?
答:完成代碼如下:
package ThreadPractice;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockingQueueCommunication {
public static void main(String[] args) {
final Business business = new Business();
new Thread(
new Runnable() {
@Override
public void run() {
for(int i=1;i<=50;i++){
business.sub(i);
}
}
}
).start();
for(int i=1;i<=50;i++){
business.main(i);
}
}
// 加了static后相當(dāng)于內(nèi)部的外部類 因?yàn)閟tatic會(huì)在類加載時(shí)就先加載
static class Business {
BlockingQueue blockingQueue1 = new ArrayBlockingQueue(1);
BlockingQueue blockingQueue2 = new ArrayBlockingQueue(1);
//(a)成員變量創(chuàng)建類的實(shí)例對(duì)象后才分配空間才會(huì)有值,所以通過(guò)構(gòu)造方法對(duì)blockingQueue2賦值
//這里不能用static代碼塊,static會(huì)先加載就找不到成員變量的構(gòu)造,故用這種 匿名構(gòu)造方法
{ // 匿名構(gòu)造方法在所有構(gòu)造方法之前執(zhí)行
try {
System.out.println("代碼開(kāi)始執(zhí)行");
blockingQueue2.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void sub(int i) {
try {
blockingQueue1.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 1; j <= 10; j++) {
System.out.println("子線程循環(huán):" + j + " 循環(huán)了:" + i);
}
try {
blockingQueue2.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void main(int i) {
try {
blockingQueue2.put(1); // 由于(a)需要阻塞所以要先放值故需要在構(gòu)造器添加默認(rèn)值
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 1; j <= 100; j++) {
System.out.println("主線程循環(huán):" + j + " 循環(huán)了:" + i);
}
try {
blockingQueue1.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
注意:如果加了synchronized就會(huì)出現(xiàn)死鎖,當(dāng)線程2進(jìn)入時(shí)往blockingQueue2添加數(shù)據(jù)時(shí)由于此時(shí)隊(duì)列已經(jīng)加了數(shù)據(jù)滿了,所以線程2會(huì)阻塞,而synchronized會(huì)讓線程加鎖不受其他線程干擾,所以將一直處于阻塞狀態(tài)。

死鎖

所以阻塞隊(duì)列主要利用的就是:1.隊(duì)列為空取的時(shí)候阻塞 2.隊(duì)列滿時(shí)添加的時(shí)候會(huì)阻塞 3.隊(duì)列先進(jìn)先出
可以將數(shù)據(jù)存儲(chǔ)在阻塞隊(duì)列里 — 比如:生產(chǎn)者消費(fèi)者模式
生產(chǎn)者與消費(fèi)者模式就是一個(gè)經(jīng)典的例子:
生產(chǎn)者消費(fèi)者模式,將產(chǎn)生數(shù)據(jù)的線程叫做生產(chǎn)者,而將使用數(shù)據(jù)的線程叫做消費(fèi)者。當(dāng)出現(xiàn)生產(chǎn)者處理速度很快,而消費(fèi)者處理速度慢的時(shí)候,那么生產(chǎn)者線程就要等待消費(fèi)者全部處理完,同理如果消費(fèi)者比生產(chǎn)者快就會(huì)產(chǎn)生等待消費(fèi)者,也就是說(shuō)的生產(chǎn)消費(fèi)效率不同,因此有了生產(chǎn)者消費(fèi)者模式。(這有點(diǎn)類似于工業(yè)工程上面的生產(chǎn)線緩存也就是我們所說(shuō)的產(chǎn)線“超市”)
解決問(wèn)題的思路就是解耦,在后面馬上提到的volatile與內(nèi)存空間中的關(guān)于線程的讀寫(xiě)的高速緩存區(qū)類似的working memory工作區(qū),即通過(guò)第三方緩存來(lái)實(shí)現(xiàn)對(duì)效率不對(duì)等問(wèn)題的解耦操作,工廠模式也有此類的思路。線程池也有。所以通過(guò)阻塞隊(duì)列實(shí)現(xiàn)線程間的通信,其本質(zhì)就相當(dāng)于一個(gè)生產(chǎn)者消費(fèi)者的容器,當(dāng)生產(chǎn)者生產(chǎn)完直接扔給阻塞隊(duì)列,而不用等待消費(fèi)者進(jìn)行消費(fèi),消費(fèi)者也不用管生產(chǎn)者生產(chǎn)而是直接從阻塞隊(duì)列里面取,當(dāng)阻塞隊(duì)列滿了再開(kāi)始生產(chǎn)者線程阻塞,當(dāng)阻塞隊(duì)列取完了那么消費(fèi)者線程進(jìn)入阻塞直到隊(duì)列又有新的資源進(jìn)入非空。