文章同步更新在個人公眾號“梓莘”,歡迎大家關注,相互交流。
阻塞隊列
ArrayBlockingQueue 是一個基于數組的有界阻塞隊列,此隊列基按FIFO原則對元素進行排序
LinkedBlockQueue:一個基于鏈表結構的阻塞隊列,次隊列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue
SynchromousQueue:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockQueue
阻塞隊列:首先它是一個隊列,而一個阻塞隊列在數據結構中所其的作用大致如圖

當阻塞隊列是空時,從隊列中獲取元素的操作將會被阻塞
當阻塞隊列是滿時,往隊列中添加元素的操作將會被阻塞
試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其他的線程王空的隊列中插入新的元素。
試圖往滿的阻塞隊列中添加新元素的線程通用也會被阻塞,直到其他的線程從隊列中移除一個或者多個元素或者完全清空隊列后使隊列重新變得空閑起來并后續(xù)新增。
為什么用 好處是什么?
在多線程領域中:所謂阻塞,在某些情況下會掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會自動被喚醒。
為什么需要BlockingQueue?
好處是我們不需要關系什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切BlockingQueue都實現了。
在concurrent包發(fā)布之前,在多線程環(huán)境下,我們每個線程都必須自己去控制這些細節(jié),尤其還要兼顧效率和線程安全,而這會給我們的程序帶來不少的復雜度。
BlockingQueue種類
- ArrayBlockingQueue:由數組結果組成的有界阻塞隊列
- LinkedBlockingQueue:由鏈表結構組成的有界阻塞隊列,默認大小為Integer.MAX_VALUE
- PriorityBlockingQueue :支持優(yōu)先級排序的無界阻塞隊列
- DelayQueue: 使用優(yōu)先級隊列實現的延遲無界阻塞隊列
- SynchronousQueue:不存儲元素的阻塞隊列,也就是單個元素的隊列
- LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列
- LinkedBlockingQueue:由鏈表結構組成的雙向阻塞隊列
阻塞隊列的核心方法

- 拋出異常:當阻塞隊列滿時,再往隊列里add插入元素會拋出java.lang.IllegalStateException: Queue full
當阻塞隊列空時,再從隊列里remove移除元素會拋NoSuchElementException
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.element());
//java.lang.IllegalStateException: Queue full
//System.out.println(blockingQueue.add("d"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//java.util.NoSuchElementException
//System.out.println(blockingQueue.remove());
}
- 特殊值:插入方法,成功true失敗false
移除方法,成功返回出隊列的元素,隊列里面沒有就返回null
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
//false
System.out.println(blockingQueue.offer("d"));
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//null
System.out.println(blockingQueue.poll());
}
- 一直阻塞:當阻塞隊列滿時,生產者線程繼續(xù)往隊列中put元素,隊列會一直阻塞生產線程直到put數據或者響應中斷請求
當隊列空時,消費者線程視圖從隊列里take元素,隊列會一直阻塞消費者線程直到隊列可用
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
//blockingQueue.put("d");
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
//blockingQueue.take();
}
- 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一定時間,超過限時后生產者線程會退出
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a",2L, TimeUnit.SECONDS);
blockingQueue.offer("b",2L, TimeUnit.SECONDS);
blockingQueue.offer("c",2L, TimeUnit.SECONDS);
blockingQueue.offer("d",2L, TimeUnit.SECONDS);
}
SynchronousQueue
SynchronousQueue與其他BlockingQueue不同,SynchronousQueue是一個不存儲元素的BlockingQueue,每一個put操作必須要等待一個take操作,否則不能繼續(xù)添加元素,反之亦然。
package com.zixin;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
/**
* @ClassName SynchronousQueueDemo
* @Description
* @Author zishen
* @Date 2019/12/31 9:10
* @Version 1.0
* AA put 1
* BB take 1
* AA put 2
* BB take 2
* AA put 3
* BB take 3
**/
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+" put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+" put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"AA").start();
new Thread(()->{
try {
Thread.sleep(5);
System.out.println(Thread.currentThread().getName()+" take 1");
blockingQueue.take();
Thread.sleep(5);
System.out.println(Thread.currentThread().getName()+" take 2");
blockingQueue.take();
Thread.sleep(5);
System.out.println(Thread.currentThread().getName()+" take 3");
blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"BB").start();
}
}
線程消費之生產者消費者
package com.zixin;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 資源類
*/
class ShareData{
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment()throws Exception{
lock.lock();
try{
//1、判斷
while(number !=0){
//等待 不能生產
condition.await();
}
//2、干活
number++;
System.out.println(Thread.currentThread().getName()+" "+number);
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void decrement()throws Exception{
lock.lock();
try{
//1、判斷 要使用while 如果用if可能會產生虛假喚起
while(number ==0){
//等待 不能生產
condition.await();
}
//2、干活
number--;
System.out.println(Thread.currentThread().getName()+" "+number);
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
/**
* @ClassName ProduceConsumerTraditionDemo
* @Description 初始值為0的變量。兩個線程對其交替操作,一個加1一個減1 來5輪
* 1、線程 操作 資源類
* 2、判斷 干活 通知
* 3、防止虛假喚醒操作
* @Author zixin
* @Date 2019/12/31 10:35
* @Version 1.0
**/
public class ProduceConsumerTraditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(()->{
for (int i=0;i<5;i++){
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
},"AA").start();
new Thread(()->{
for (int i=0;i<5;i++){
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
},"BB").start();
}
}
synchronized和lock的區(qū)別
原始構成:
synchronized是關鍵字屬于JVM層面
monitorenter(底層是通過monitor對象來完成,其實wait/notify等方法也依賴于monitor,對象只有在同步塊或方法中才能調wait/notify方法)
monitorexit
lock是具體類(java.util.concurrent.locks.Lock)是api層面的鎖使用方法:
snchronized不需要用戶去手動釋放鎖,當synchronized代碼執(zhí)行完成后系統會自動讓線程釋放對鎖的占用
ReentrantLock則需要用戶去手動釋放鎖若么有主動釋放鎖,就有可能導致出現死鎖現象
需要lock()和unlock()方法配合try/finally語句塊來完成等待是否可中斷:
synchronized不可中斷,除非拋出異?;蛘_\行完成。
ReentrantLock可中斷:
1、設置超時方法tryLock(long timeout,TimeUnit unit)
2、LockInterruptibly()放代碼塊中,調用interrupt()方法可中斷加鎖是否公平:
synchronized非公平鎖
ReentrantLock兩者都可以,默認公平鎖,構造方法可以傳入boolean值true為公平鎖,false為非公平鎖鎖綁定多個條件Condition
synchronized沒有
ReentrantLock用來實現介紹喚醒需要喚醒的線程組,可以精確喚醒,而不是像synchronized要么喚醒一個線程要么喚醒全部線程