SynchronousQueue使用實例

本文主要講一下SynchronousQueue。

定義

SynchronousQueue,實際上它不是一個真正的隊列,因為它不會為隊列中元素維護(hù)存儲空間。與其他隊列不同的是,它維護(hù)一組線程,這些線程在等待著把元素加入或移出隊列。

如果以洗盤子的比喻為例,那么這就相當(dāng)于沒有盤架,而是將洗好的盤子直接放入下一個空閑的烘干機(jī)中。這種實現(xiàn)隊列的方式看似很奇怪,但由于可以直接交付工作,從而降低了將數(shù)據(jù)從生產(chǎn)者移動到消費(fèi)者的延遲。(在傳統(tǒng)的隊列中,在一個工作單元可以交付之前,必須通過串行方式首先完成入列[Enqueue]或者出列[Dequeue]等操作。)

直接交付方式還會將更多關(guān)于任務(wù)狀態(tài)的信息反饋給生產(chǎn)者。當(dāng)交付被接受時,它就知道消費(fèi)者已經(jīng)得到了任務(wù),而不是簡單地把任務(wù)放入一個隊列——這種區(qū)別就好比將文件直接交給同事,還是將文件放到她的郵箱中并希望她能盡快拿到文件。

因為SynchronousQueue沒有存儲功能,因此put和take會一直阻塞,直到有另一個線程已經(jīng)準(zhǔn)備好參與到交付過程中。僅當(dāng)有足夠多的消費(fèi)者,并且總是有一個消費(fèi)者準(zhǔn)備好獲取交付的工作時,才適合使用同步隊列。

實例

public class SynchronousQueueExample {

    static class SynchronousQueueProducer implements Runnable {

        protected BlockingQueue<String> blockingQueue;
        final Random random = new Random();

        public SynchronousQueueProducer(BlockingQueue<String> queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = UUID.randomUUID().toString();
                    System.out.println("Put: " + data);
                    blockingQueue.put(data);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    static class SynchronousQueueConsumer implements Runnable {

        protected BlockingQueue<String> blockingQueue;

        public SynchronousQueueConsumer(BlockingQueue<String> queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = blockingQueue.take();
                    System.out.println(Thread.currentThread().getName()
                            + " take(): " + data);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    public static void main(String[] args) {
        final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>();

        SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
                synchronousQueue);
        new Thread(queueProducer).start();

        SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
                synchronousQueue);
        new Thread(queueConsumer1).start();

        SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(
                synchronousQueue);
        new Thread(queueConsumer2).start();

    }
}

插入數(shù)據(jù)的線程和獲取數(shù)據(jù)的線程,交替執(zhí)行

應(yīng)用場景

Executors.newCachedThreadPool()

 /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

由于ThreadPoolExecutor內(nèi)部實現(xiàn)任務(wù)提交的時候調(diào)用的是工作隊列(BlockingQueue接口的實現(xiàn)類)的非阻塞式入隊列方法(offer方法),因此,在使用SynchronousQueue作為工作隊列的前提下,客戶端代碼向線程池提交任務(wù)時,而線程池中又沒有空閑的線程能夠從SynchronousQueue隊列實例中取一個任務(wù),那么相應(yīng)的offer方法調(diào)用就會失?。慈蝿?wù)沒有被存入工作隊列)。此時,ThreadPoolExecutor會新建一個新的工作者線程用于對這個入隊列失敗的任務(wù)進(jìn)行處理(假設(shè)此時線程池的大小還未達(dá)到其最大線程池大小)。

所以,使用SynchronousQueue作為工作隊列,工作隊列本身并不限制待執(zhí)行的任務(wù)的數(shù)量。但此時需要限定線程池的最大大小為一個合理的有限值,而不是Integer.MAX_VALUE,否則可能導(dǎo)致線程池中的工作者線程的數(shù)量一直增加到系統(tǒng)資源所無法承受為止。

如果應(yīng)用程序確實需要比較大的工作隊列容量,而又想避免無界工作隊列可能導(dǎo)致的問題,不妨考慮SynchronousQueue。SynchronousQueue實現(xiàn)上并不使用緩存空間。

使用SynchronousQueue的目的就是保證“對于提交的任務(wù),如果有空閑線程,則使用空閑線程來處理;否則新建一個線程來處理任務(wù)”。

doc

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容