java并發(fā)包消息隊列


BlockingQueue

  • BlockingQueue也是java.util.concurrent下的主要用來控制線程同步的工具。

  • 主要的方法是:put、take一對阻塞存?。籥dd、poll一對非阻塞存取。

    • 插入:
      • add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則拋出異常,不好
        • offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.
        • put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調(diào)用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續(xù), 有阻塞, 放不進去就等待
    • 讀取:
      - poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數(shù)規(guī)定的時間,取不到時返回null; 取不到返回null
      - take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態(tài)直到Blocking有新的對象被加入為止; 阻塞, 取不到就一直等
    • 其他
      - int remainingCapacity();返回隊列剩余的容量,在隊列插入和獲取的時候,數(shù)據(jù)可能不準(zhǔn), 不能保證數(shù)據(jù)的準(zhǔn)確性
      - boolean remove(Object o); 從隊列移除元素,如果存在,即移除一個或者更多,隊列改 變了返回true
      - public boolean contains(Object o); 查看隊列是否存在這個元素,存在返回true
      - int drainTo(Collection<? super E> c); //移除此隊列中所有可用的元素,并將它們添加到給定 collection 中。取出放到集合中
      - int drainTo(Collection<? super E> c, int maxElements); 和上面方法的區(qū)別在于,指定了移 動的數(shù)量; 取出指定個數(shù)放到集合
  • BlockingQueue有四個具體的實現(xiàn)類,常用的兩種實現(xiàn)類為:

    • ArrayBlockingQueue:

      • 一個由數(shù)組支持的有界阻塞隊列,規(guī)定大小的BlockingQueue,其構(gòu)造函數(shù)必須帶一個int參數(shù)來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的。
    • LinkedBlockingQueue:

      • 大小不定的BlockingQueue,若其構(gòu)造函數(shù)帶一個規(guī)定大小的參數(shù),生成的BlockingQueue有大小限制,若不帶大小參數(shù),所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的。
      • LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,默認(rèn)最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。
    • LinkedBlockingQueue和ArrayBlockingQueue區(qū)別:

      • LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背后所用的數(shù)據(jù)結(jié)構(gòu)不一樣,導(dǎo)致LinkedBlockingQueue的數(shù)據(jù)吞吐量要大于ArrayBlockingQueue,但在線程數(shù)量很大時其性能的可預(yù)見性低于ArrayBlockingQueue.

生產(chǎn)者代碼

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class TestBlockingQueueProducer implements Runnable {
    BlockingQueue<String> queue;
    Random random = new Random();

    public TestBlockingQueueProducer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(random.nextInt(10));
                String task = Thread.currentThread().getName() + " made a product " + i;
                System.out.println(task);
                queue.put(task);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消費者代碼

mport java.util.Random;
import java.util.concurrent.BlockingQueue;

public class TestBlockingQueueConsumer implements Runnable{  
    BlockingQueue<String> queue; 
    Random random = new Random();
    
    public TestBlockingQueueConsumer(BlockingQueue<String> queue){  
        this.queue = queue;  
    }        
    @Override  
    public void run() {  
        try {  
            Thread.sleep(random.nextInt(10));
            System.out.println(Thread.currentThread().getName()+ "trying...");
            String temp = queue.take();//如果隊列為空,會阻塞當(dāng)前線程  
            System.out.println(Thread.currentThread().getName() + " get a job " +temp);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}

測試代碼

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class TestBlockingQueue {
    
    public static void main(String[] args) {
        

        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
        // BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
        // 不設(shè)置的話,LinkedBlockingQueue默認(rèn)大小為Integer.MAX_VALUE
        // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
        TestBlockingQueueConsumer consumer = new TestBlockingQueueConsumer(queue);
        TestBlockingQueueProducer producer = new TestBlockingQueueProducer(queue);
        for (int i = 0; i < 3; i++) {
            new Thread(producer, "Producer" + (i + 1)).start();
        }
        for (int i = 0; i < 5; i++) {
            new Thread(consumer, "Consumer" + (i + 1)).start();
        }
        
        new Thread(producer, "Producer" + (5)).start();
    }
}

?

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容