并發(fā)容器BlockingQueue - DelayQueue及Leader-Follower模式

1.官方文檔

An unbounded blocking queue of Delayed elements, in which an 
element can only be taken when its delay has expired. The head of 
the queue is that Delayed element whose delay expired furthest in 
the past. If no delay has expired there is no head and poll will return 
null. Expiration occurs when an element's 
getDelay(TimeUnit.NANOSECONDS) method returns a value less 
than or equal to zero. Even though unexpired elements cannot be 
removed using take or poll, they are otherwise treated as normal 
elements. For example, the size method returns the count of both 
expired and unexpired elements. This queue does not permit null 
elements.

存放Delayed元素的無(wú)界阻塞隊(duì)列,只有元素過(guò)期才能將其取走。隊(duì)頭head是過(guò)期最長(zhǎng)時(shí)間的元素,如果沒(méi)有元素過(guò)期則沒(méi)有head并且poll會(huì)返回null。當(dāng)元素的getDelay()方法返回小于等于0的值,則過(guò)期。不允許null元素。

2.Delayed

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

元素必須實(shí)現(xiàn)Delayed接口,方法getDelay返回元素還剩多少時(shí)間才過(guò)時(shí)。

由于Delayed繼承自 Comparable<Delayed>,所以也要實(shí)現(xiàn)compareTo方法。

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

3.優(yōu)先隊(duì)列

將元素入隊(duì)時(shí),會(huì)調(diào)用Delayed元素的compareTo方法進(jìn)行排序。因此,元素除了提供getDelay,還需要提供compareTo方法。

private final PriorityQueue<E> q = new PriorityQueue<E>();

4.入隊(duì)和出隊(duì)

4.1 阻塞版本put和take

由于是無(wú)界隊(duì)列,入隊(duì)不會(huì)阻塞。

    /**
     * Inserts the specified element into this delay queue. As the queue is
     * unbounded this method will never block.
     *
     * @param e the element to add
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) {
        offer(e);
    }
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

為什么q.peek() == e才signal?

當(dāng)隊(duì)列頭部元素被更早到期的元素替換是,leader被置為null,offer里面q.peek() == e時(shí),會(huì)將leader=null,此時(shí)當(dāng)然會(huì)signal,重新競(jìng)選leader。所以定時(shí)等待線程必須要處理失去leader時(shí)情況。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

leader的作用是什么?

聲明 private Thread leader = null;源碼注釋:

Thread designated to wait for the element at the head of
the queue.  This variant of the Leader-Follower pattern
(http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
minimize unnecessary timed waiting.  When a thread becomes
the leader, it waits only for the next delay to elapse, but
other threads await indefinitely.  The leader thread must
signal some other thread before returning from take() or
poll(...), unless some other thread becomes leader in the
interim.  Whenever the head of the queue is replaced with
an element with an earlier expiration time, the leader
field is invalidated by being reset to null, and some
waiting thread, but not necessarily the current leader, is
signalled.  So waiting threads must be prepared to acquire
and lose leadership while waiting.

leader是等待隊(duì)列頭部元素的指定線程。Leader-Follower模式的這種變體用于最小化不必要的定時(shí)等待。

  • 當(dāng)一個(gè)線程稱為leader時(shí),其會(huì)定時(shí)等待下一個(gè)delay元素過(guò)期,但是其他線程會(huì)無(wú)限期等待。
  • 當(dāng)從take/poll返回之前,leader線程必須signal其他等待線程,除非在此期間有線程稱為了新的leader。
  • 每當(dāng)隊(duì)列頭部元素被更早到期的元素替換時(shí),leader被置為null,offer里面q.peek() == e時(shí),會(huì)將leader=null,此時(shí)當(dāng)然會(huì)signal,重新競(jìng)選leader。所以定時(shí)等待線程必須要處理失去leader時(shí)情況。

4.2 返回特殊值的offer和poll(還有超時(shí)版本)

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e);
    }
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0)
                        return null;
                    else
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    if (nanos <= 0)
                        return null;
                    first = null; // don't retain ref while waiting
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

4.3 拋出異常的add和remove

    public boolean add(E e) {
        return offer(e);
    }
    public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.remove(o);
        } finally {
            lock.unlock();
        }
    }

5.Leader-Follower模式

參考Concurrency Pattern
參考Leader-Follower線程模型簡(jiǎn)介

  • 1)線程有3種狀態(tài):領(lǐng)導(dǎo)leading,處理processing,追隨following
  • 2)假設(shè)共N個(gè)線程,其中只有1個(gè)leading線程(等待任務(wù)),x個(gè)processing線程(處理),余下有N-1-x個(gè)following線程(空閑)
  • 3)有一把鎖,誰(shuí)搶到就是leading
  • 4)事件/任務(wù)來(lái)到時(shí),leading線程會(huì)對(duì)其進(jìn)行處理,從而轉(zhuǎn)化為processing狀態(tài),處理完成之后,又轉(zhuǎn)變?yōu)閒ollowing
  • 5)丟失leading后,following會(huì)嘗試搶鎖,搶到則變?yōu)閘eading,否則保持following
  • 6)following不干事,就是搶鎖,力圖成為leading
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容