序
本文主要講一下SynchronousQueue。
定義
SynchronousQueue,實際上它不是一個真正的隊列,因為它不會為隊列中元素維護(hù)存儲空間。與其他隊列不同的是,它維護(hù)一組線程,這些線程在等待著把元素加入或移出隊列。
如果以洗盤子的比喻為例,那么這就相當(dāng)于沒有盤架,而是將洗好的盤子直接放入下一個空閑的烘干機(jī)中。這種實現(xiàn)隊列的方式看似很奇怪,但由于可以直接交付工作,從而降低了將數(shù)據(jù)從生產(chǎn)者移動到消費(fèi)者的延遲。(在傳統(tǒng)的隊列中,在一個工作單元可以交付之前,必須通過串行方式首先完成入列[Enqueue]或者出列[Dequeue]等操作。)
直接交付方式還會將更多關(guān)于任務(wù)狀態(tài)的信息反饋給生產(chǎn)者。當(dāng)交付被接受時,它就知道消費(fèi)者已經(jīng)得到了任務(wù),而不是簡單地把任務(wù)放入一個隊列——這種區(qū)別就好比將文件直接交給同事,還是將文件放到她的郵箱中并希望她能盡快拿到文件。
因為SynchronousQueue沒有存儲功能,因此put和take會一直阻塞,直到有另一個線程已經(jīng)準(zhǔn)備好參與到交付過程中。僅當(dāng)有足夠多的消費(fèi)者,并且總是有一個消費(fèi)者準(zhǔn)備好獲取交付的工作時,才適合使用同步隊列。
實例
public class SynchronousQueueExample {
static class SynchronousQueueProducer implements Runnable {
protected BlockingQueue<String> blockingQueue;
final Random random = new Random();
public SynchronousQueueProducer(BlockingQueue<String> queue) {
this.blockingQueue = queue;
}
@Override
public void run() {
while (true) {
try {
String data = UUID.randomUUID().toString();
System.out.println("Put: " + data);
blockingQueue.put(data);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class SynchronousQueueConsumer implements Runnable {
protected BlockingQueue<String> blockingQueue;
public SynchronousQueueConsumer(BlockingQueue<String> queue) {
this.blockingQueue = queue;
}
@Override
public void run() {
while (true) {
try {
String data = blockingQueue.take();
System.out.println(Thread.currentThread().getName()
+ " take(): " + data);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>();
SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
synchronousQueue);
new Thread(queueProducer).start();
SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
synchronousQueue);
new Thread(queueConsumer1).start();
SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(
synchronousQueue);
new Thread(queueConsumer2).start();
}
}
插入數(shù)據(jù)的線程和獲取數(shù)據(jù)的線程,交替執(zhí)行
應(yīng)用場景
Executors.newCachedThreadPool()
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
由于ThreadPoolExecutor內(nèi)部實現(xiàn)任務(wù)提交的時候調(diào)用的是工作隊列(BlockingQueue接口的實現(xiàn)類)的非阻塞式入隊列方法(offer方法),因此,在使用SynchronousQueue作為工作隊列的前提下,客戶端代碼向線程池提交任務(wù)時,而線程池中又沒有空閑的線程能夠從SynchronousQueue隊列實例中取一個任務(wù),那么相應(yīng)的offer方法調(diào)用就會失?。慈蝿?wù)沒有被存入工作隊列)。此時,ThreadPoolExecutor會新建一個新的工作者線程用于對這個入隊列失敗的任務(wù)進(jìn)行處理(假設(shè)此時線程池的大小還未達(dá)到其最大線程池大小)。
所以,使用SynchronousQueue作為工作隊列,工作隊列本身并不限制待執(zhí)行的任務(wù)的數(shù)量。但此時需要限定線程池的最大大小為一個合理的有限值,而不是Integer.MAX_VALUE,否則可能導(dǎo)致線程池中的工作者線程的數(shù)量一直增加到系統(tǒng)資源所無法承受為止。
如果應(yīng)用程序確實需要比較大的工作隊列容量,而又想避免無界工作隊列可能導(dǎo)致的問題,不妨考慮SynchronousQueue。SynchronousQueue實現(xiàn)上并不使用緩存空間。
使用SynchronousQueue的目的就是保證“對于提交的任務(wù),如果有空閑線程,則使用空閑線程來處理;否則新建一個線程來處理任務(wù)”。