Java中Queue、BlockingQueue和隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式

1. Queue接口 - 隊(duì)列

public interface Queue<E> 
    extends Collection<E>
  • Collection的子接口,表示隊(duì)列FIFO(First In First Out)
    常用方法:
    (1)拋出異常
    boolean add(E e) // 順序添加1個(gè)元素(到達(dá)上限后,再添加則會(huì)拋出異常)
    E remove() // 獲得第1個(gè)元素并移除(如果隊(duì)列沒有元素時(shí),則拋異常)
    E element() // 獲得第1個(gè)元素但不移除(如果隊(duì)列沒有元素時(shí),則拋異常)
    (2)返回特殊值【推薦】
    boolean offer(E e) // 順序添加1個(gè)元素(到達(dá)上限后,再添加則會(huì)返回false)
    E poll() // 獲得第1個(gè)元素并移除(如果隊(duì)列沒有元素時(shí),則返回null)
    E keep() // 獲得第1個(gè)元素但不移除(如果隊(duì)列沒有元素時(shí),則返回null)

1.1 ConcurrentLinkedQueue類(線程安全)

public class ConcurrentLinkedQueue<E> 
    extends AbstractQueue<E> 
    implements Queue<E>, Serializable

說明:

  • 線程安全、可高效讀寫的隊(duì)列,高并發(fā)下性能最好的隊(duì)列;
  • 無鎖、CAS比較交換算法,修改的方法包含3個(gè)核心參數(shù)(V,E,N);
  • V:要更新的變量、E:預(yù)期值、N:新值
  • 只有當(dāng)V==E時(shí),V=N;否則表示已被更新過,則取消當(dāng)前操作。

使用示例:

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class TestQueue {
      public static void main(String[] args) {
            // 列表:尾部追加 - add...
            // 鏈表:頭尾添加 - addFirst/addLast
            // 隊(duì)列:先進(jìn)先出(FIFO) - offer...
            // >>> 以上三種的對(duì)應(yīng)成員方法,切記不能混用!會(huì)打亂已知規(guī)則。
            LinkedList<String> link = new LinkedList<String>();
            //Queue<String> link = new LinkedList<String>(); // 強(qiáng)制LinkedList遵循隊(duì)列的規(guī)則
            link.offer("A"); // offer用的是FIFO隊(duì)列方式
            link.offer("B");
            link.offer("C");
            // 用列表的方式打亂了FIFO隊(duì)列的規(guī)則
            link.add(0, "D");
            System.out.println(link.peek()); // D
            
            // 線程安全的隊(duì)列Queue
            // 嚴(yán)格遵循隊(duì)列規(guī)則,線程安全,采用CAS交換算法
            Queue<String> q = new ConcurrentLinkedQueue<String>();
            // 1.拋出異常的 2.返回結(jié)果的
            q.offer("A");
            q.offer("B");
            q.offer("C");
            
            q.poll(); // 刪除表頭,表頭更新為B
            
            System.out.println(q.peek()); // 獲取表頭,此時(shí)為B
      }
}

2. BlockingQueue接口 - 阻塞隊(duì)列

public interface BlockingQueue<E> 
    extends Queue<E>

常用方法:
void put(E e) // 將指定元素插入此隊(duì)列中,如果沒有可用空間,則死等
E take() // 獲取并移除此隊(duì)列頭部元素,如果沒有可用元素,則死等
說明:

  • Queue的子接口,阻塞的隊(duì)列,增加了兩個(gè)線程狀態(tài)為無限期等待的方法
  • 可用于解決生產(chǎn)者、消費(fèi)者問題

2.1 ArrayBlockingQueue類(有界阻塞隊(duì)列)

  • 數(shù)組結(jié)構(gòu)實(shí)現(xiàn),有界隊(duì)列。手工固定上限
BlockingQueue<String> abq = new ArrayBlockingQueue<String>(3);

2.2 LinkedBlockingQueue類(無界阻塞隊(duì)列)

  • 鏈表結(jié)構(gòu)實(shí)現(xiàn),無界隊(duì)列。默認(rèn)上限Integer.MAX_VALUE
BlockingQueue<String> lbq = new LinkedBlockingQueue<String>();

3. 源碼:BlockingQueue實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式

BlockingQueue是JDK5.0的新增內(nèi)容,它是一個(gè)已經(jīng)在內(nèi)部實(shí)現(xiàn)了同步的隊(duì)列,實(shí)現(xiàn)方式采用的是await()/signal()方法。它可以在生成對(duì)象時(shí)指定容量大小,用于阻塞操作的是put()和take()方法。

  • put()方法:類似于我們上面的生產(chǎn)者線程,容量達(dá)到最大時(shí),自動(dòng)阻塞。
  • take()方法:類似于我們上面的消費(fèi)者線程,容量為0時(shí),自動(dòng)阻塞。
import java.util.concurrent.LinkedBlockingQueue;
public class TestProduceAndCustomer2 {
    public static void main(String[] args) {
        StorageQ s = new StorageQ();
        Thread p1 = new Thread(new ProducerQ(s), "A廠");
        Thread p2 = new Thread(new ProducerQ(s), "B廠");
        Thread p3 = new Thread(new ProducerQ(s), "C廠");

        Thread c1 = new Thread(new CustomerQ(s), "a人");
        Thread c2 = new Thread(new CustomerQ(s), "b人");
        Thread c3 = new Thread(new CustomerQ(s), "c人");
        p1.start();
        p2.start();
        p3.start();
        c1.start();
        c2.start();
        c3.start();
    }
}

// 倉庫 - 共享資源對(duì)象
class StorageQ {
    // 倉庫存儲(chǔ)的載體 - 使用無界阻塞隊(duì)列,也可指定容量大小。
    private LinkedBlockingQueue<Object> lbq = new LinkedBlockingQueue<>(10);
    public StorageQ() {
        super();
    }
    public StorageQ(LinkedBlockingQueue<Object> lbq) {
        super();
        this.lbq = lbq;
    }
    public LinkedBlockingQueue<Object> getLbq() {
        return lbq;
    }
    public void setLbq(LinkedBlockingQueue<Object> lbq) {
        this.lbq = lbq;
    }

    // 生產(chǎn)
    public void produce() {
        try{
            lbq.put(new Object());
            System.out.println("【生產(chǎn)者" + Thread.currentThread().getName()
                    + "】生產(chǎn)一個(gè)產(chǎn)品,現(xiàn)庫存" + lbq.size());
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    
    // 消費(fèi)
    public void custome() {
        try{
            lbq.take();
            System.out.println("【消費(fèi)者" + Thread.currentThread().getName()
                    + "】消費(fèi)了一個(gè)產(chǎn)品,現(xiàn)庫存" + lbq.size());
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

// 生產(chǎn)者
class ProducerQ implements Runnable {
    private StorageQ s;
    public ProducerQ() {}
    public ProducerQ(StorageQ s) {
        this.s = s;
    }
    public void run() {
        while (true) {
            try {
                Thread.sleep((int) (Math.random() * 2000));
                this.s.produce();  // 沒滿 + 可鎖 = 生產(chǎn)+1
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 消費(fèi)者
class CustomerQ implements Runnable {
    private StorageQ s;
    public CustomerQ() {}
    public CustomerQ(StorageQ s) {
        this.s = s;
    }
    public void run() {
        while (true) {
            try {
                Thread.sleep((int) (Math.random() * 2000));
                this.s.custome(); // 不空 + 可鎖 = 消費(fèi)-1
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

運(yùn)行結(jié)果:


Java隊(duì)列實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容