并發(fā)編程之BlockingQueue(二)

前言:

咱們扒光系列的第一篇文章已經(jīng)對(duì)concurrent包中的三層結(jié)構(gòu)的第一層和第二層做了解析,并且對(duì)第三層中的lock做了詳細(xì)的代碼分析,本篇博文將針對(duì)BlockingQueue這個(gè)后續(xù)會(huì)在執(zhí)行器里使用的基本數(shù)據(jù)結(jié)構(gòu)做源碼分析,為后續(xù)的Executor源碼分析做準(zhǔn)備。

我們先來(lái)看Doug Lea的定義,BlockingQueue是一個(gè)隊(duì)列,該隊(duì)列支持兩種特殊操作即隊(duì)列為空的時(shí)候獲取元素的線(xiàn)程需要等待,隊(duì)列為滿(mǎn)的時(shí)候加入元素的線(xiàn)程需要等待。這是典型的生產(chǎn)者消費(fèi)者模式的應(yīng)用。

頂層接口設(shè)定了3種方法:

BlockingQueue三種方法


由于put()/take()方法是在并發(fā)中會(huì)發(fā)生阻塞,因此我們著重研究這兩種方法。

BlockingQueue知名家族成員:

ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue,我們以這三個(gè)類(lèi)為基礎(chǔ)對(duì)put()、take()方法進(jìn)行源碼解析。

1:ArrayBlockingQueue:

我們先看該類(lèi)的幾個(gè)重要成員變量

? ? final Object[] items; //基于數(shù)組

? ? int takeIndex;? ? ? ? //下一個(gè)獲取元素的位置

? ? int putIndex;? ? ? ? //下一個(gè)放置元素的位置

? ? int count;? ? ? ? ? ? //隊(duì)列中的元素個(gè)數(shù)

? ? final ReentrantLock lock;? ? //鎖,用于put和take的時(shí)候線(xiàn)程獨(dú)占

? ? private final Condition notEmpty;? ? //Condition等待隊(duì)列,在該隊(duì)列排隊(duì)的線(xiàn)程都在等待隊(duì)列中增加元素

? ? private final Condition notFull;? ? ? ? //Condition等待隊(duì)列,在該隊(duì)列中排隊(duì)的線(xiàn)程都在等待隊(duì)列元素消耗

生產(chǎn)者消費(fèi)者模式主要體現(xiàn)在notEmpty和notFull兩個(gè)隊(duì)列,前者是消費(fèi)者隊(duì)列,后者是生產(chǎn)者隊(duì)列,就像餐廳的服務(wù)員(消費(fèi)者)和廚師(生產(chǎn)者)在窗口,前者等待窗口里有菜,后者等待窗口有放菜的位置。

生產(chǎn)者方法put:

public void put(E e) throws InterruptedException {

? ? ? ? checkNotNull(e);

? ? ? ? final ReentrantLock lock = this.lock;? ? ? ? ? //獲取鎖,生產(chǎn)者消費(fèi)者用的同一把鎖,消費(fèi)者消費(fèi)的時(shí)候你不能生產(chǎn)

? ? ? ? lock.lockInterruptibly();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?//關(guān)鎖,當(dāng)前生產(chǎn)者獨(dú)占

? ? ? ? try {

? ? ? ? ? ? while (count == items.length)? ? ? ? ? ? ? ? //當(dāng)隊(duì)列已滿(mǎn)的時(shí)候讓當(dāng)前生產(chǎn)者在生產(chǎn)者隊(duì)列等待槽位

? ? ? ? ? ? ? ? notFull.await();

? ? ? ? ? ? enqueue(e);? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?//當(dāng)隊(duì)列未滿(mǎn)的時(shí)候往隊(duì)列里添加元素,此時(shí)是線(xiàn)程獨(dú)占哦

? ? ? ? } finally {

? ? ? ? ? ? lock.unlock();

? ? ? ? }

? ? }

private void enqueue(E x) {

? ? ? ? final Object[] items = this.items;? ? ? ??

? ? ? ? items[putIndex] = x;? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //在可以放置元素的數(shù)組槽位放置x元素

? ? ? ? if (++putIndex == items.length)? ? ? ? ? ? ? ?//對(duì)放置元素位置加一操作

? ? ? ? ? ? putIndex = 0;? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?//如果沒(méi)地方放置元素了就把放置元素位置設(shè)置成0

? ? ? ? count++;? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //元素個(gè)數(shù)+1

? ? ? ? notEmpty.signal();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //通知消費(fèi)者隊(duì)列現(xiàn)在有東西可以消費(fèi)了

? ? }

上面方法可以總結(jié)成,廚房里有多個(gè)廚師,當(dāng)一個(gè)廚師把菜做好之后就會(huì)獨(dú)占窗口把菜放到窗口里,此時(shí)其他廚師需要排隊(duì)等待這個(gè)廚師放完,而這個(gè)時(shí)候如果窗口已經(jīng)擺滿(mǎn)菜了,那么這個(gè)廚師就會(huì)在窗口的生產(chǎn)者隊(duì)列中等待服務(wù)員把菜端走窗口有空余位置。而如果在廚師占領(lǐng)窗口這個(gè)過(guò)程中有一隊(duì)程序員在消費(fèi)者隊(duì)列里等待的話(huà)就廚師把菜放到窗口里后就會(huì)通知這個(gè)消費(fèi)者隊(duì)列取菜。

消費(fèi)者方法take:

public E take() throws InterruptedException {

? ? ? ? final ReentrantLock lock = this.lock;? ? ? //這里的鎖和生產(chǎn)者的鎖是同一把鎖,所以在生產(chǎn)者生產(chǎn)的時(shí)候你不能去消費(fèi)

? ? ? ? lock.lockInterruptibly();

? ? ? ? try {

? ? ? ? ? ? while (count == 0)? ? ? ? ? ? ? ? ? ? ? ? ? ? //如果隊(duì)列為空,就等待隊(duì)列變成非空

? ? ? ? ? ? ? ? notEmpty.await();? ? ? ? ? ? ? ? ? ? ? ? //等待隊(duì)列變?yōu)榉强?/p>

? ? ? ? ? ? return dequeue();? ? ? ? ? ? ? ? ? ? ? ? ? ? //如果隊(duì)列有元素,那么就調(diào)用取元素的方法

? ? ? ? } finally {

? ? ? ? ? ? lock.unlock();

? ? ? ? }

? ? }

private E dequeue() {

? ? ? ? final Object[] items = this.items;

? ? ? ? E x = (E) items[takeIndex];? ? ? ? ? ? ? ? ? ? ? ? ? ?//獲取元素與放入元素過(guò)程恰好相反

? ? ? ? items[takeIndex] = null;

? ? ? ? if (++takeIndex == items.length)

? ? ? ? ? ? takeIndex = 0;

? ? ? ? count--;

? ? ? ? if (itrs != null)

? ? ? ? ? ? itrs.elementDequeued();

? ? ? ? notFull.signal();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //通知生產(chǎn)者隊(duì)列,這里沒(méi)有元素了,你們需要生產(chǎn)了

? ? ? ? return x;

? ? }

OK!代碼分析到這我們看ArrayBlockingQueue底層用的就是我們上一篇分析的Condition和Lock,道理非常簡(jiǎn)單,但是大家有沒(méi)有發(fā)現(xiàn)阻塞隊(duì)列不能同時(shí)生產(chǎn)和消費(fèi)?這也是BlockingQueue沒(méi)有廣泛應(yīng)用于第三方框架的原因吧!

2:LinkedBlockingQueue:

該類(lèi)的幾個(gè)成員變量:

private final int capacity;? ? ? ? ? //容量

AtomicInteger count = new AtomicInteger();? ? //隊(duì)列元素個(gè)數(shù)

private transient Node last;? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //隊(duì)尾指針

private final ReentrantLock takeLock = new ReentrantLock();? //take和poll等方法調(diào)用的時(shí)候所占用的鎖

private final Condition notEmpty = takeLock.newCondition();? ? //消費(fèi)者等待隊(duì)列,等待隊(duì)列變?yōu)榉强?/p>

private final ReentrantLock putLock = new ReentrantLock();? ? ?//put,offer等方法調(diào)用的時(shí)候所占用的鎖

private final Condition notFull = putLock.newCondition();? ? ? ? ?//生產(chǎn)者等待隊(duì)列

生產(chǎn)者put方法:

public void put(E e) throws InterruptedException {

????????if (e == null)

????????????throw new NullPointerException();?

????????int c = -1;

????????Node node = new Node(e);? ? ? ? ? ? ? ? ? ? ? ? ? ? //以要放入的元素為基準(zhǔn)新建一個(gè)節(jié)點(diǎn)

? ? ? ? final ReentrantLock putLock = this.putLock;? ?//獲取生產(chǎn)者鎖

? ? ? ? final AtomicInteger count = this.count;? ? ? ? ? ??

? ? ? ? putLock.lockInterruptibly();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //當(dāng)前生產(chǎn)者獨(dú)占一下代碼

? ? ? ? try {

? ? ? ? ? ? while (count.get() == capacity) {? ? ? ? ? ? ? ? ? ? //一樣的套路,如果當(dāng)前隊(duì)列滿(mǎn)了就在生產(chǎn)者的Condition隊(duì)列里等待被signal

? ? ? ? ? ? ? ? notFull.await();

? ? ? ? ? ? }

? ? ? ? ? ? enqueue(node);? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //沒(méi)滿(mǎn)的話(huà)直接入隊(duì),這里不需要cas操作,因?yàn)橹挥挟?dāng)前線(xiàn)程調(diào)用該方法

? ? ? ? ? ? c = count.getAndIncrement();? ? ? ? ? ? ? ? ? ? ? ? //隊(duì)列元素個(gè)數(shù)+1

? ? ? ? ? ? if (c + 1 < capacity)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //如果隊(duì)列放入元素之后還沒(méi)滿(mǎn)那么通知其他生產(chǎn)者線(xiàn)程這里可以生產(chǎn)

? ? ? ? ? ? ? ? notFull.signal();

? ? ? ? } finally {

? ? ? ? ? ? putLock.unlock();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

? ? ? ? }

? ? ? ? if (c == 0)

? ? ? ? ? ? signalNotEmpty();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //最后c==0證明隊(duì)列里有元素,那么通知消費(fèi)者過(guò)來(lái)消費(fèi)

? ? }

消費(fèi)者take方法:

public E take() throws InterruptedException {

? ? ? ? E x;

? ? ? ? int c = -1;

? ? ? ? final AtomicInteger count = this.count;

? ? ? ? final ReentrantLock takeLock = this.takeLock;? ? ? ??

? ? ? ? takeLock.lockInterruptibly();? ? ? ? ? ? ? ? //獨(dú)占消費(fèi)者鎖,即獲取元素的時(shí)候只有一個(gè)線(xiàn)程

? ? ? ? try {

? ? ? ? ? ? while (count.get() == 0) {? ? ? ? ? ? ? ? ? ? //如果隊(duì)列為空,那么就在Condition的消費(fèi)者等待隊(duì)列里等待

? ? ? ? ? ? ? ? notEmpty.await();

? ? ? ? ? ? }

? ? ? ? ? ? x = dequeue();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //如果隊(duì)列不為空,那么從容的去拿元素,不用怕別人跟你搶?zhuān)驗(yàn)榫湍阋粋€(gè)人

? ? ? ? ? ? c = count.getAndDecrement();????????????????

? ? ? ? ? ? if (c > 1)

? ? ? ? ? ? ? ? notEmpty.signal();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?//如果隊(duì)列中有兩個(gè)以上元素那么通知消費(fèi)者等待隊(duì)列里的其他消費(fèi)者過(guò)來(lái)拿

? ? ? ? } finally {

? ? ? ? ? ? takeLock.unlock();

? ? ? ? }

? ? ? ? if (c == capacity)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //如果c==capacity說(shuō)明隊(duì)列里有一個(gè)空位,這個(gè)時(shí)候通知生產(chǎn)者隊(duì)列線(xiàn)程生產(chǎn)

? ? ? ? ? ? signalNotFull();

? ? ? ? return x;

? ? }

LinkedBlockingQueue相比ArrayBlockingQueue的優(yōu)勢(shì)在于使用了兩個(gè)鎖,這樣可以保證一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者同時(shí)工作,算是一種進(jìn)步,但不能多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者同時(shí)工作。

3:PriorityBlockingQueue類(lèi):

該類(lèi)的幾個(gè)成員變量:

//默認(rèn)容量為11

private static final int DEFAULT_INITIAL_CAPACITY = 11;

//最大容量

private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

//用于存放元素的數(shù)組

private transient Object[] queue;

//實(shí)時(shí)大小

private transient int size;

//入隊(duì)順序規(guī)范

private transient Comparator comparator;

private final ReentrantLock lock;

private final Condition notEmpty;

private transient volatile int allocationSpinLock;

//優(yōu)先隊(duì)列

private PriorityQueue q;

兩個(gè)比較重要的構(gòu)造方法:

public PriorityBlockingQueue(int initialCapacity,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Comparator comparator) {

? ? ? ? if (initialCapacity < 1)

? ? ? ? ? ? throw new IllegalArgumentException();

? ? ? ? this.lock = new ReentrantLock();

? ? ? ? this.notEmpty = lock.newCondition();

? ? ? ? this.comparator = comparator;

? ? ? ? this.queue = new Object[initialCapacity];

? ? }

這個(gè)方法我們可以自己實(shí)現(xiàn)一個(gè)comparator作為參數(shù)傳進(jìn)來(lái),比如我們以學(xué)生對(duì)象的學(xué)號(hào)屬性作為排序依據(jù),那么入隊(duì)之后隊(duì)列會(huì)以學(xué)號(hào)的大小進(jìn)行排序。如果不指定就會(huì)以入隊(duì)順序?yàn)轫樞颉?/p>

public PriorityBlockingQueue(Collection c) {

? ? ? ? this.lock = new ReentrantLock();

? ? ? ? this.notEmpty = lock.newCondition();

? ? ? ? boolean heapify = true; // true if not known to be in heap order

? ? ? ? boolean screen = true;? // true if must screen for nulls

? ? ? ? if (c instanceof SortedSet) {

? ? ? ? ? ? SortedSet ss = (SortedSet) c;

? ? ? ? ? ? this.comparator = (Comparator) ss.comparator();

? ? ? ? ? ? heapify = false;

? ? ? ? }

? ? ? ? else if (c instanceof PriorityBlockingQueue) {

? ? ? ? ? ? PriorityBlockingQueue pq =

? ? ? ? ? ? ? ? (PriorityBlockingQueue) c;

? ? ? ? ? ? this.comparator = (Comparator) pq.comparator();

? ? ? ? ? ? screen = false;

? ? ? ? ? ? if (pq.getClass() == PriorityBlockingQueue.class) // exact match

? ? ? ? ? ? ? ? heapify = false;

? ? ? ? }

? ? ? ? Object[] a = c.toArray();

? ? ? ? int n = a.length;

? ? ? ? // If c.toArray incorrectly doesn't return Object[], copy it.

? ? ? ? if (a.getClass() != Object[].class)

? ? ? ? ? ? a = Arrays.copyOf(a, n, Object[].class);

? ? ? ? if (screen && (n == 1 || this.comparator != null)) {

? ? ? ? ? ? for (int i = 0; i < n; ++i)

? ? ? ? ? ? ? ? if (a[i] == null)

? ? ? ? ? ? ? ? ? ? throw new NullPointerException();

? ? ? ? }

? ? ? ? this.queue = a;

? ? ? ? this.size = n;

? ? ? ? if (heapify)

? ? ? ? ? ? heapify();

? ? }

該構(gòu)造方法會(huì)接收一個(gè)容器類(lèi)并按照容器類(lèi)定義的順序排序,如果沒(méi)有順序那么就按照自然順序排序。

該類(lèi)的put/offer(一樣)方法:

public boolean offer(E e) {

? ? ? ? if (e == null)

? ? ? ? ? ? throw new NullPointerException();

? ? ? ? final ReentrantLock lock = this.lock;

? ? ? ? lock.lock();

? ? ? ? int n, cap;

? ? ? ? Object[] array;

? ? ? ? //如果內(nèi)部數(shù)組容納不下元素了就擴(kuò)容

? ? ? ? while ((n = size) >= (cap = (array = queue).length))

? ? ? ? ? ? tryGrow(array, cap);

? ? ? ? try {

? ? ? ? ? ? Comparator cmp = comparator;

? ? ? ? ? ? if (cmp == null)

? ? ? ? ? ? ? ? //沒(méi)有給comparator賦值情況下

? ? ? ? ? ? ? ? siftUpComparable(n, e, array);

? ? ? ? ? ? else

? ? ? ? ? ? ? ? //給comparator賦值的情況下

? ? ? ? ? ? ? ? siftUpUsingComparator(n, e, array, cmp);

? ? ? ? ? ? size = n + 1;

? ? ? ? ? ? notEmpty.signal();

? ? ? ? } finally {

? ? ? ? ? ? lock.unlock();

? ? ? ? }

? ? ? ? return true;

? ? }

//加入隊(duì)列算法

private static void siftUpUsingComparator(int k, T x, Object[] array,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Comparator cmp) {

? ? ? ? while (k > 0) {

? ? ? ? ? ? int parent = (k - 1) >>> 1;

? ? ? ? ? ? Object e = array[parent];

? ? ? ? ? ? if (cmp.compare(x, (T) e) >= 0)

? ? ? ? ? ? ? ? break;

? ? ? ? ? ? array[k] = e;

? ? ? ? ? ? k = parent;

? ? ? ? }

? ? ? ? array[k] = x;

? ? }

這個(gè)加入算法比較特別,本質(zhì)上使用的是數(shù)組,但實(shí)際上這個(gè)數(shù)組維護(hù)的是一個(gè)堆(二叉排序),新加入的元素會(huì)被移動(dòng)到合適的位置上(根據(jù)comparator)。

我們看下擴(kuò)容方法:

private void tryGrow(Object[] array, int oldCap) {

? ? ? ? lock.unlock(); // must release and then re-acquire main lock

? ? ? ? Object[] newArray = null;

? ? ? ? if (allocationSpinLock == 0 &&

? ? ? ? ? ? UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0, 1)) {

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? int newCap = oldCap + ((oldCap < 64) ?

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (oldCap + 2) :?

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (oldCap >> 1));

? ? ? ? ? ? ? ? if (newCap - MAX_ARRAY_SIZE > 0) {? ? // possible overflow

? ? ? ? ? ? ? ? ? ? int minCap = oldCap + 1;

? ? ? ? ? ? ? ? ? ? if (minCap < 0 || minCap > MAX_ARRAY_SIZE)

? ? ? ? ? ? ? ? ? ? ? ? throw new OutOfMemoryError();

? ? ? ? ? ? ? ? ? ? newCap = MAX_ARRAY_SIZE;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? if (newCap > oldCap && queue == array)

? ? ? ? ? ? ? ? ? ? newArray = new Object[newCap];

? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? allocationSpinLock = 0;

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? if (newArray == null) // back off if another thread is allocating

? ? ? ? ? ? Thread.yield();

? ? ? ? lock.lock();

? ? ? ? if (newArray != null && queue == array) {

? ? ? ? ? ? queue = newArray;

? ? ? ? ? ? System.arraycopy(array, 0, newArray, 0, oldCap);

? ? ? ? }

? ? }

擴(kuò)容過(guò)程是這樣的,當(dāng)數(shù)組長(zhǎng)度小于64的時(shí)候2個(gè)2個(gè)往上加,如果數(shù)組長(zhǎng)度大于64,那么2倍2倍的往上加。而如果其他線(xiàn)程在添加元素(自己沒(méi)有搶到自旋鎖),那么線(xiàn)程禮讓讓其他線(xiàn)程擴(kuò)容。最后內(nèi)部新建了一個(gè)數(shù)組并對(duì)數(shù)組進(jìn)行了復(fù)制。

該類(lèi)的take()方法:

方法外層加鎖,里層調(diào)用了出隊(duì)方法。

private E dequeue() {

? ? ? ? int n = size - 1;

? ? ? ? if (n < 0)

? ? ? ? ? ? return null;

? ? ? ? else {

? ? ? ? ? ? Object[] array = queue;

? ? ? ? ? ? E result = (E) array[0];

? ? ? ? ? ? E x = (E) array[n];

? ? ? ? ? ? array[n] = null;

? ? ? ? ? ? Comparator cmp = comparator;

? ? ? ? ? ? if (cmp == null)

? ? ? ? ? ? ? ? siftDownComparable(0, x, array, n);

? ? ? ? ? ? else

? ? ? ? ? ? ? ? siftDownUsingComparator(0, x, array, n, cmp);

? ? ? ? ? ? size = n;

? ? ? ? ? ? return result;

? ? ? ? }

? ? }

去的元素是堆頂?shù)脑?,取完之后把?shù)組最后一個(gè)元素暫存,并調(diào)用下沉元素方法保持堆的平衡態(tài)(二叉樹(shù)的順序)。

4:DelayedQueue類(lèi):

該類(lèi)內(nèi)部根據(jù)延時(shí)時(shí)間排序,隊(duì)頭是離到期時(shí)間最近的。以領(lǐng)導(dǎo)-隨從模式維護(hù)線(xiàn)程,領(lǐng)導(dǎo)在take和put之前要通知相應(yīng)線(xiàn)程,這樣可以保證不必要的線(xiàn)程執(zhí)行。

成員變量:

private final transient ReentrantLock lock = new ReentrantLock();?

private final PriorityQueueq = new PriorityQueue();

private Thread leader = null;

//隊(duì)頭的線(xiàn)程馬上到期可取的時(shí)候或者新的線(xiàn)程要稱(chēng)為領(lǐng)導(dǎo)的時(shí)候會(huì)通知該等待隊(duì)列

private final Condition available = lock.newCondition();

put方法:

public boolean offer(E e) {

? ? ? ? final ReentrantLock lock = this.lock;

? ? ? ? lock.lock();

? ? ? ? try {

? ? ? ? ? ? q.offer(e);

? ? ? ? ? ? if (q.peek() == e) {

? ? ? ? ? ? ? ? leader = null;

? ? ? ? ? ? ? ? available.signal();

? ? ? ? ? ? }

? ? ? ? ? ? return true;

? ? ? ? } finally {

? ? ? ? ? ? lock.unlock();

? ? ? ? }

? ? }

這個(gè)方法沒(méi)啥好說(shuō)的,加鎖--根據(jù)e的自然大小入隊(duì)--通知隨從線(xiàn)程堆頂元素可取--釋放鎖

take方法:

public E take() throws InterruptedException {

? ? ? ? final ReentrantLock lock = this.lock;

? ? ? ? lock.lockInterruptibly();

? ? ? ? try {

? ? ? ? ? ? for (;;) {

? ? ? ? ? ? ? ? E first = q.peek();

? ? ? ? ? ? ? ? if (first == null)

? ? ? ? ? ? ? ? ? ? available.await();

? ? ? ? ? ? ? ? else {

? ? ? ? ? ? ? ? ? ? //堆頂元素等待時(shí)間

? ? ? ? ? ? ? ? ? ? long delay = first.getDelay(NANOSECONDS);

? ? ? ? ? ? ? ? ? ? //如果堆頂元素刑滿(mǎn)釋放,那么取出

????????????????????if (delay <= 0)

? ? ? ? ? ? ? ? ? ? ? ? return q.poll();

? ? ? ? ? ? ? ? ? ? first = null; // don't retain ref while waiting

? ? ? ? ? ? ? ? ? ? if (leader != null)

? ? ? ? ? ? ? ? ? ? ? ? available.await();

? ? ? ? ? ? ? ? ? ? else {

? ? ? ? ? ? ? ? ? ? ? ? Thread thisThread = Thread.currentThread();

? ? ? ? ? ? ? ? ? ? ? ? leader = thisThread;

? ? ? ? ? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? ? ? ? ? available.awaitNanos(delay);

? ? ? ? ? ? ? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? ? ? ? ? ? ? if (leader == thisThread)

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? leader = null;

? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? } finally {

? ? ? ? ? ? if (leader == null && q.peek() != null)

? ? ? ? ? ? ? ? available.signal();

? ? ? ? ? ? lock.unlock();

? ? ? ? }

? ? }

這個(gè)方法雖然很長(zhǎng),但是很簡(jiǎn)單,由于元素類(lèi)型E是Delayed類(lèi)型或者其子類(lèi)型,所以可以計(jì)算它的刑滿(mǎn)釋放時(shí)間,如果到時(shí)間了就可以取到了,如果沒(méi)到時(shí)間就等待到期時(shí)常的時(shí)間。這個(gè)過(guò)程中只有一個(gè)領(lǐng)導(dǎo)者在操作,所以領(lǐng)導(dǎo)者不為空的條件下其他隨從都會(huì)等待,在領(lǐng)導(dǎo)等待的時(shí)候它也是領(lǐng)導(dǎo)哦。這樣生產(chǎn)者和消費(fèi)者共用同一個(gè)鎖就成為現(xiàn)實(shí)。

5:SynchronousQueue類(lèi):

該類(lèi)源碼比較復(fù)雜,本著知之為知之的原則這里不做討論,只知道內(nèi)部沒(méi)有緩沖區(qū)間,生產(chǎn)者和消費(fèi)者手對(duì)手完成生產(chǎn)和消費(fèi),該類(lèi)用于創(chuàng)建不限大小的線(xiàn)程池。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容