前言:
咱們扒光系列的第一篇文章已經(jīng)對(duì)concurrent包中的三層結(jié)構(gòu)的第一層和第二層做了解析,并且對(duì)第三層中的lock做了詳細(xì)的代碼分析,本篇博文將針對(duì)BlockingQueue這個(gè)后續(xù)會(huì)在執(zhí)行器里使用的基本數(shù)據(jù)結(jié)構(gòu)做源碼分析,為后續(xù)的Executor源碼分析做準(zhǔn)備。
我們先來(lái)看Doug Lea的定義,BlockingQueue是一個(gè)隊(duì)列,該隊(duì)列支持兩種特殊操作即隊(duì)列為空的時(shí)候獲取元素的線(xiàn)程需要等待,隊(duì)列為滿(mǎn)的時(shí)候加入元素的線(xiàn)程需要等待。這是典型的生產(chǎn)者消費(fèi)者模式的應(yīng)用。
頂層接口設(shè)定了3種方法:

由于put()/take()方法是在并發(fā)中會(huì)發(fā)生阻塞,因此我們著重研究這兩種方法。
BlockingQueue知名家族成員:
ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue,我們以這三個(gè)類(lèi)為基礎(chǔ)對(duì)put()、take()方法進(jìn)行源碼解析。
1:ArrayBlockingQueue:
我們先看該類(lèi)的幾個(gè)重要成員變量
? ? final Object[] items; //基于數(shù)組
? ? int takeIndex;? ? ? ? //下一個(gè)獲取元素的位置
? ? int putIndex;? ? ? ? //下一個(gè)放置元素的位置
? ? int count;? ? ? ? ? ? //隊(duì)列中的元素個(gè)數(shù)
? ? final ReentrantLock lock;? ? //鎖,用于put和take的時(shí)候線(xiàn)程獨(dú)占
? ? private final Condition notEmpty;? ? //Condition等待隊(duì)列,在該隊(duì)列排隊(duì)的線(xiàn)程都在等待隊(duì)列中增加元素
? ? private final Condition notFull;? ? ? ? //Condition等待隊(duì)列,在該隊(duì)列中排隊(duì)的線(xiàn)程都在等待隊(duì)列元素消耗
生產(chǎn)者消費(fèi)者模式主要體現(xiàn)在notEmpty和notFull兩個(gè)隊(duì)列,前者是消費(fèi)者隊(duì)列,后者是生產(chǎn)者隊(duì)列,就像餐廳的服務(wù)員(消費(fèi)者)和廚師(生產(chǎn)者)在窗口,前者等待窗口里有菜,后者等待窗口有放菜的位置。
生產(chǎn)者方法put:
public void put(E e) throws InterruptedException {
? ? ? ? checkNotNull(e);
? ? ? ? final ReentrantLock lock = this.lock;? ? ? ? ? //獲取鎖,生產(chǎn)者消費(fèi)者用的同一把鎖,消費(fèi)者消費(fèi)的時(shí)候你不能生產(chǎn)
? ? ? ? lock.lockInterruptibly();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?//關(guān)鎖,當(dāng)前生產(chǎn)者獨(dú)占
? ? ? ? try {
? ? ? ? ? ? while (count == items.length)? ? ? ? ? ? ? ? //當(dāng)隊(duì)列已滿(mǎn)的時(shí)候讓當(dāng)前生產(chǎn)者在生產(chǎn)者隊(duì)列等待槽位
? ? ? ? ? ? ? ? notFull.await();
? ? ? ? ? ? enqueue(e);? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?//當(dāng)隊(duì)列未滿(mǎn)的時(shí)候往隊(duì)列里添加元素,此時(shí)是線(xiàn)程獨(dú)占哦
? ? ? ? } finally {
? ? ? ? ? ? lock.unlock();
? ? ? ? }
? ? }
private void enqueue(E x) {
? ? ? ? final Object[] items = this.items;? ? ? ??
? ? ? ? items[putIndex] = x;? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //在可以放置元素的數(shù)組槽位放置x元素
? ? ? ? if (++putIndex == items.length)? ? ? ? ? ? ? ?//對(duì)放置元素位置加一操作
? ? ? ? ? ? putIndex = 0;? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?//如果沒(méi)地方放置元素了就把放置元素位置設(shè)置成0
? ? ? ? count++;? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //元素個(gè)數(shù)+1
? ? ? ? notEmpty.signal();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //通知消費(fèi)者隊(duì)列現(xiàn)在有東西可以消費(fèi)了
? ? }
上面方法可以總結(jié)成,廚房里有多個(gè)廚師,當(dāng)一個(gè)廚師把菜做好之后就會(huì)獨(dú)占窗口把菜放到窗口里,此時(shí)其他廚師需要排隊(duì)等待這個(gè)廚師放完,而這個(gè)時(shí)候如果窗口已經(jīng)擺滿(mǎn)菜了,那么這個(gè)廚師就會(huì)在窗口的生產(chǎn)者隊(duì)列中等待服務(wù)員把菜端走窗口有空余位置。而如果在廚師占領(lǐng)窗口這個(gè)過(guò)程中有一隊(duì)程序員在消費(fèi)者隊(duì)列里等待的話(huà)就廚師把菜放到窗口里后就會(huì)通知這個(gè)消費(fèi)者隊(duì)列取菜。
消費(fèi)者方法take:
public E take() throws InterruptedException {
? ? ? ? final ReentrantLock lock = this.lock;? ? ? //這里的鎖和生產(chǎn)者的鎖是同一把鎖,所以在生產(chǎn)者生產(chǎn)的時(shí)候你不能去消費(fèi)
? ? ? ? lock.lockInterruptibly();
? ? ? ? try {
? ? ? ? ? ? while (count == 0)? ? ? ? ? ? ? ? ? ? ? ? ? ? //如果隊(duì)列為空,就等待隊(duì)列變成非空
? ? ? ? ? ? ? ? notEmpty.await();? ? ? ? ? ? ? ? ? ? ? ? //等待隊(duì)列變?yōu)榉强?/p>
? ? ? ? ? ? return dequeue();? ? ? ? ? ? ? ? ? ? ? ? ? ? //如果隊(duì)列有元素,那么就調(diào)用取元素的方法
? ? ? ? } finally {
? ? ? ? ? ? lock.unlock();
? ? ? ? }
? ? }
private E dequeue() {
? ? ? ? final Object[] items = this.items;
? ? ? ? E x = (E) items[takeIndex];? ? ? ? ? ? ? ? ? ? ? ? ? ?//獲取元素與放入元素過(guò)程恰好相反
? ? ? ? items[takeIndex] = null;
? ? ? ? if (++takeIndex == items.length)
? ? ? ? ? ? takeIndex = 0;
? ? ? ? count--;
? ? ? ? if (itrs != null)
? ? ? ? ? ? itrs.elementDequeued();
? ? ? ? notFull.signal();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //通知生產(chǎn)者隊(duì)列,這里沒(méi)有元素了,你們需要生產(chǎn)了
? ? ? ? return x;
? ? }
OK!代碼分析到這我們看ArrayBlockingQueue底層用的就是我們上一篇分析的Condition和Lock,道理非常簡(jiǎn)單,但是大家有沒(méi)有發(fā)現(xiàn)阻塞隊(duì)列不能同時(shí)生產(chǎn)和消費(fèi)?這也是BlockingQueue沒(méi)有廣泛應(yīng)用于第三方框架的原因吧!
2:LinkedBlockingQueue:
該類(lèi)的幾個(gè)成員變量:
private final int capacity;? ? ? ? ? //容量
AtomicInteger count = new AtomicInteger();? ? //隊(duì)列元素個(gè)數(shù)
private transient Node last;? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //隊(duì)尾指針
private final ReentrantLock takeLock = new ReentrantLock();? //take和poll等方法調(diào)用的時(shí)候所占用的鎖
private final Condition notEmpty = takeLock.newCondition();? ? //消費(fèi)者等待隊(duì)列,等待隊(duì)列變?yōu)榉强?/p>
private final ReentrantLock putLock = new ReentrantLock();? ? ?//put,offer等方法調(diào)用的時(shí)候所占用的鎖
private final Condition notFull = putLock.newCondition();? ? ? ? ?//生產(chǎn)者等待隊(duì)列
生產(chǎn)者put方法:
public void put(E e) throws InterruptedException {
????????if (e == null)
????????????throw new NullPointerException();?
????????int c = -1;
????????Node node = new Node(e);? ? ? ? ? ? ? ? ? ? ? ? ? ? //以要放入的元素為基準(zhǔn)新建一個(gè)節(jié)點(diǎn)
? ? ? ? final ReentrantLock putLock = this.putLock;? ?//獲取生產(chǎn)者鎖
? ? ? ? final AtomicInteger count = this.count;? ? ? ? ? ??
? ? ? ? putLock.lockInterruptibly();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //當(dāng)前生產(chǎn)者獨(dú)占一下代碼
? ? ? ? try {
? ? ? ? ? ? while (count.get() == capacity) {? ? ? ? ? ? ? ? ? ? //一樣的套路,如果當(dāng)前隊(duì)列滿(mǎn)了就在生產(chǎn)者的Condition隊(duì)列里等待被signal
? ? ? ? ? ? ? ? notFull.await();
? ? ? ? ? ? }
? ? ? ? ? ? enqueue(node);? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //沒(méi)滿(mǎn)的話(huà)直接入隊(duì),這里不需要cas操作,因?yàn)橹挥挟?dāng)前線(xiàn)程調(diào)用該方法
? ? ? ? ? ? c = count.getAndIncrement();? ? ? ? ? ? ? ? ? ? ? ? //隊(duì)列元素個(gè)數(shù)+1
? ? ? ? ? ? if (c + 1 < capacity)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //如果隊(duì)列放入元素之后還沒(méi)滿(mǎn)那么通知其他生產(chǎn)者線(xiàn)程這里可以生產(chǎn)
? ? ? ? ? ? ? ? notFull.signal();
? ? ? ? } finally {
? ? ? ? ? ? putLock.unlock();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
? ? ? ? }
? ? ? ? if (c == 0)
? ? ? ? ? ? signalNotEmpty();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //最后c==0證明隊(duì)列里有元素,那么通知消費(fèi)者過(guò)來(lái)消費(fèi)
? ? }
消費(fèi)者take方法:
public E take() throws InterruptedException {
? ? ? ? E x;
? ? ? ? int c = -1;
? ? ? ? final AtomicInteger count = this.count;
? ? ? ? final ReentrantLock takeLock = this.takeLock;? ? ? ??
? ? ? ? takeLock.lockInterruptibly();? ? ? ? ? ? ? ? //獨(dú)占消費(fèi)者鎖,即獲取元素的時(shí)候只有一個(gè)線(xiàn)程
? ? ? ? try {
? ? ? ? ? ? while (count.get() == 0) {? ? ? ? ? ? ? ? ? ? //如果隊(duì)列為空,那么就在Condition的消費(fèi)者等待隊(duì)列里等待
? ? ? ? ? ? ? ? notEmpty.await();
? ? ? ? ? ? }
? ? ? ? ? ? x = dequeue();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //如果隊(duì)列不為空,那么從容的去拿元素,不用怕別人跟你搶?zhuān)驗(yàn)榫湍阋粋€(gè)人
? ? ? ? ? ? c = count.getAndDecrement();????????????????
? ? ? ? ? ? if (c > 1)
? ? ? ? ? ? ? ? notEmpty.signal();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?//如果隊(duì)列中有兩個(gè)以上元素那么通知消費(fèi)者等待隊(duì)列里的其他消費(fèi)者過(guò)來(lái)拿
? ? ? ? } finally {
? ? ? ? ? ? takeLock.unlock();
? ? ? ? }
? ? ? ? if (c == capacity)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //如果c==capacity說(shuō)明隊(duì)列里有一個(gè)空位,這個(gè)時(shí)候通知生產(chǎn)者隊(duì)列線(xiàn)程生產(chǎn)
? ? ? ? ? ? signalNotFull();
? ? ? ? return x;
? ? }
LinkedBlockingQueue相比ArrayBlockingQueue的優(yōu)勢(shì)在于使用了兩個(gè)鎖,這樣可以保證一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者同時(shí)工作,算是一種進(jìn)步,但不能多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者同時(shí)工作。
3:PriorityBlockingQueue類(lèi):
該類(lèi)的幾個(gè)成員變量:
//默認(rèn)容量為11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
//最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//用于存放元素的數(shù)組
private transient Object[] queue;
//實(shí)時(shí)大小
private transient int size;
//入隊(duì)順序規(guī)范
private transient Comparator comparator;
private final ReentrantLock lock;
private final Condition notEmpty;
private transient volatile int allocationSpinLock;
//優(yōu)先隊(duì)列
private PriorityQueue q;
兩個(gè)比較重要的構(gòu)造方法:
public PriorityBlockingQueue(int initialCapacity,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Comparator comparator) {
? ? ? ? if (initialCapacity < 1)
? ? ? ? ? ? throw new IllegalArgumentException();
? ? ? ? this.lock = new ReentrantLock();
? ? ? ? this.notEmpty = lock.newCondition();
? ? ? ? this.comparator = comparator;
? ? ? ? this.queue = new Object[initialCapacity];
? ? }
這個(gè)方法我們可以自己實(shí)現(xiàn)一個(gè)comparator作為參數(shù)傳進(jìn)來(lái),比如我們以學(xué)生對(duì)象的學(xué)號(hào)屬性作為排序依據(jù),那么入隊(duì)之后隊(duì)列會(huì)以學(xué)號(hào)的大小進(jìn)行排序。如果不指定就會(huì)以入隊(duì)順序?yàn)轫樞颉?/p>
public PriorityBlockingQueue(Collection c) {
? ? ? ? this.lock = new ReentrantLock();
? ? ? ? this.notEmpty = lock.newCondition();
? ? ? ? boolean heapify = true; // true if not known to be in heap order
? ? ? ? boolean screen = true;? // true if must screen for nulls
? ? ? ? if (c instanceof SortedSet) {
? ? ? ? ? ? SortedSet ss = (SortedSet) c;
? ? ? ? ? ? this.comparator = (Comparator) ss.comparator();
? ? ? ? ? ? heapify = false;
? ? ? ? }
? ? ? ? else if (c instanceof PriorityBlockingQueue) {
? ? ? ? ? ? PriorityBlockingQueue pq =
? ? ? ? ? ? ? ? (PriorityBlockingQueue) c;
? ? ? ? ? ? this.comparator = (Comparator) pq.comparator();
? ? ? ? ? ? screen = false;
? ? ? ? ? ? if (pq.getClass() == PriorityBlockingQueue.class) // exact match
? ? ? ? ? ? ? ? heapify = false;
? ? ? ? }
? ? ? ? Object[] a = c.toArray();
? ? ? ? int n = a.length;
? ? ? ? // If c.toArray incorrectly doesn't return Object[], copy it.
? ? ? ? if (a.getClass() != Object[].class)
? ? ? ? ? ? a = Arrays.copyOf(a, n, Object[].class);
? ? ? ? if (screen && (n == 1 || this.comparator != null)) {
? ? ? ? ? ? for (int i = 0; i < n; ++i)
? ? ? ? ? ? ? ? if (a[i] == null)
? ? ? ? ? ? ? ? ? ? throw new NullPointerException();
? ? ? ? }
? ? ? ? this.queue = a;
? ? ? ? this.size = n;
? ? ? ? if (heapify)
? ? ? ? ? ? heapify();
? ? }
該構(gòu)造方法會(huì)接收一個(gè)容器類(lèi)并按照容器類(lèi)定義的順序排序,如果沒(méi)有順序那么就按照自然順序排序。
該類(lèi)的put/offer(一樣)方法:
public boolean offer(E e) {
? ? ? ? if (e == null)
? ? ? ? ? ? throw new NullPointerException();
? ? ? ? final ReentrantLock lock = this.lock;
? ? ? ? lock.lock();
? ? ? ? int n, cap;
? ? ? ? Object[] array;
? ? ? ? //如果內(nèi)部數(shù)組容納不下元素了就擴(kuò)容
? ? ? ? while ((n = size) >= (cap = (array = queue).length))
? ? ? ? ? ? tryGrow(array, cap);
? ? ? ? try {
? ? ? ? ? ? Comparator cmp = comparator;
? ? ? ? ? ? if (cmp == null)
? ? ? ? ? ? ? ? //沒(méi)有給comparator賦值情況下
? ? ? ? ? ? ? ? siftUpComparable(n, e, array);
? ? ? ? ? ? else
? ? ? ? ? ? ? ? //給comparator賦值的情況下
? ? ? ? ? ? ? ? siftUpUsingComparator(n, e, array, cmp);
? ? ? ? ? ? size = n + 1;
? ? ? ? ? ? notEmpty.signal();
? ? ? ? } finally {
? ? ? ? ? ? lock.unlock();
? ? ? ? }
? ? ? ? return true;
? ? }
//加入隊(duì)列算法
private static void siftUpUsingComparator(int k, T x, Object[] array,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Comparator cmp) {
? ? ? ? while (k > 0) {
? ? ? ? ? ? int parent = (k - 1) >>> 1;
? ? ? ? ? ? Object e = array[parent];
? ? ? ? ? ? if (cmp.compare(x, (T) e) >= 0)
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? array[k] = e;
? ? ? ? ? ? k = parent;
? ? ? ? }
? ? ? ? array[k] = x;
? ? }
這個(gè)加入算法比較特別,本質(zhì)上使用的是數(shù)組,但實(shí)際上這個(gè)數(shù)組維護(hù)的是一個(gè)堆(二叉排序),新加入的元素會(huì)被移動(dòng)到合適的位置上(根據(jù)comparator)。
我們看下擴(kuò)容方法:
private void tryGrow(Object[] array, int oldCap) {
? ? ? ? lock.unlock(); // must release and then re-acquire main lock
? ? ? ? Object[] newArray = null;
? ? ? ? if (allocationSpinLock == 0 &&
? ? ? ? ? ? UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0, 1)) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? int newCap = oldCap + ((oldCap < 64) ?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (oldCap + 2) :?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (oldCap >> 1));
? ? ? ? ? ? ? ? if (newCap - MAX_ARRAY_SIZE > 0) {? ? // possible overflow
? ? ? ? ? ? ? ? ? ? int minCap = oldCap + 1;
? ? ? ? ? ? ? ? ? ? if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
? ? ? ? ? ? ? ? ? ? ? ? throw new OutOfMemoryError();
? ? ? ? ? ? ? ? ? ? newCap = MAX_ARRAY_SIZE;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? if (newCap > oldCap && queue == array)
? ? ? ? ? ? ? ? ? ? newArray = new Object[newCap];
? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? allocationSpinLock = 0;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? if (newArray == null) // back off if another thread is allocating
? ? ? ? ? ? Thread.yield();
? ? ? ? lock.lock();
? ? ? ? if (newArray != null && queue == array) {
? ? ? ? ? ? queue = newArray;
? ? ? ? ? ? System.arraycopy(array, 0, newArray, 0, oldCap);
? ? ? ? }
? ? }
擴(kuò)容過(guò)程是這樣的,當(dāng)數(shù)組長(zhǎng)度小于64的時(shí)候2個(gè)2個(gè)往上加,如果數(shù)組長(zhǎng)度大于64,那么2倍2倍的往上加。而如果其他線(xiàn)程在添加元素(自己沒(méi)有搶到自旋鎖),那么線(xiàn)程禮讓讓其他線(xiàn)程擴(kuò)容。最后內(nèi)部新建了一個(gè)數(shù)組并對(duì)數(shù)組進(jìn)行了復(fù)制。
該類(lèi)的take()方法:
方法外層加鎖,里層調(diào)用了出隊(duì)方法。
private E dequeue() {
? ? ? ? int n = size - 1;
? ? ? ? if (n < 0)
? ? ? ? ? ? return null;
? ? ? ? else {
? ? ? ? ? ? Object[] array = queue;
? ? ? ? ? ? E result = (E) array[0];
? ? ? ? ? ? E x = (E) array[n];
? ? ? ? ? ? array[n] = null;
? ? ? ? ? ? Comparator cmp = comparator;
? ? ? ? ? ? if (cmp == null)
? ? ? ? ? ? ? ? siftDownComparable(0, x, array, n);
? ? ? ? ? ? else
? ? ? ? ? ? ? ? siftDownUsingComparator(0, x, array, n, cmp);
? ? ? ? ? ? size = n;
? ? ? ? ? ? return result;
? ? ? ? }
? ? }
去的元素是堆頂?shù)脑?,取完之后把?shù)組最后一個(gè)元素暫存,并調(diào)用下沉元素方法保持堆的平衡態(tài)(二叉樹(shù)的順序)。
4:DelayedQueue類(lèi):
該類(lèi)內(nèi)部根據(jù)延時(shí)時(shí)間排序,隊(duì)頭是離到期時(shí)間最近的。以領(lǐng)導(dǎo)-隨從模式維護(hù)線(xiàn)程,領(lǐng)導(dǎo)在take和put之前要通知相應(yīng)線(xiàn)程,這樣可以保證不必要的線(xiàn)程執(zhí)行。
成員變量:
private final transient ReentrantLock lock = new ReentrantLock();?
private final PriorityQueueq = new PriorityQueue();
private Thread leader = null;
//隊(duì)頭的線(xiàn)程馬上到期可取的時(shí)候或者新的線(xiàn)程要稱(chēng)為領(lǐng)導(dǎo)的時(shí)候會(huì)通知該等待隊(duì)列
private final Condition available = lock.newCondition();
put方法:
public boolean offer(E e) {
? ? ? ? final ReentrantLock lock = this.lock;
? ? ? ? lock.lock();
? ? ? ? try {
? ? ? ? ? ? q.offer(e);
? ? ? ? ? ? if (q.peek() == e) {
? ? ? ? ? ? ? ? leader = null;
? ? ? ? ? ? ? ? available.signal();
? ? ? ? ? ? }
? ? ? ? ? ? return true;
? ? ? ? } finally {
? ? ? ? ? ? lock.unlock();
? ? ? ? }
? ? }
這個(gè)方法沒(méi)啥好說(shuō)的,加鎖--根據(jù)e的自然大小入隊(duì)--通知隨從線(xiàn)程堆頂元素可取--釋放鎖
take方法:
public E take() throws InterruptedException {
? ? ? ? final ReentrantLock lock = this.lock;
? ? ? ? lock.lockInterruptibly();
? ? ? ? try {
? ? ? ? ? ? for (;;) {
? ? ? ? ? ? ? ? E first = q.peek();
? ? ? ? ? ? ? ? if (first == null)
? ? ? ? ? ? ? ? ? ? available.await();
? ? ? ? ? ? ? ? else {
? ? ? ? ? ? ? ? ? ? //堆頂元素等待時(shí)間
? ? ? ? ? ? ? ? ? ? long delay = first.getDelay(NANOSECONDS);
? ? ? ? ? ? ? ? ? ? //如果堆頂元素刑滿(mǎn)釋放,那么取出
????????????????????if (delay <= 0)
? ? ? ? ? ? ? ? ? ? ? ? return q.poll();
? ? ? ? ? ? ? ? ? ? first = null; // don't retain ref while waiting
? ? ? ? ? ? ? ? ? ? if (leader != null)
? ? ? ? ? ? ? ? ? ? ? ? available.await();
? ? ? ? ? ? ? ? ? ? else {
? ? ? ? ? ? ? ? ? ? ? ? Thread thisThread = Thread.currentThread();
? ? ? ? ? ? ? ? ? ? ? ? leader = thisThread;
? ? ? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? ? ? available.awaitNanos(delay);
? ? ? ? ? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? ? ? ? ? if (leader == thisThread)
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? leader = null;
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? if (leader == null && q.peek() != null)
? ? ? ? ? ? ? ? available.signal();
? ? ? ? ? ? lock.unlock();
? ? ? ? }
? ? }
這個(gè)方法雖然很長(zhǎng),但是很簡(jiǎn)單,由于元素類(lèi)型E是Delayed類(lèi)型或者其子類(lèi)型,所以可以計(jì)算它的刑滿(mǎn)釋放時(shí)間,如果到時(shí)間了就可以取到了,如果沒(méi)到時(shí)間就等待到期時(shí)常的時(shí)間。這個(gè)過(guò)程中只有一個(gè)領(lǐng)導(dǎo)者在操作,所以領(lǐng)導(dǎo)者不為空的條件下其他隨從都會(huì)等待,在領(lǐng)導(dǎo)等待的時(shí)候它也是領(lǐng)導(dǎo)哦。這樣生產(chǎn)者和消費(fèi)者共用同一個(gè)鎖就成為現(xiàn)實(shí)。
5:SynchronousQueue類(lèi):
該類(lèi)源碼比較復(fù)雜,本著知之為知之的原則這里不做討論,只知道內(nèi)部沒(méi)有緩沖區(qū)間,生產(chǎn)者和消費(fèi)者手對(duì)手完成生產(chǎn)和消費(fèi),該類(lèi)用于創(chuàng)建不限大小的線(xiàn)程池。