9. DelayQueue

DelayQueue類實(shí)現(xiàn)BlockingQueue接口。閱讀BlockingQueue文本以獲取有關(guān)的更多信息。

DelayQueue內(nèi)部阻止元素直到某個(gè)延遲到期,元素必須實(shí)現(xiàn)接口java.util.concurrent.Delayed。以下是java.util.concurrent.Delayed接口:

public interface Delayed extends Comparable<Delayed< {

    public long getDelay(TimeUnit timeUnit);

}

getDelay()方法返回的值應(yīng)該是在釋放此元素之前剩余的延遲。如果返回0或負(fù)值,則延遲將被視為已過期,并且在DelayQueue調(diào)用下一個(gè)take()等操作時(shí)釋放。

傳遞給getDelay()方法的TimeUnit實(shí)例是一個(gè)Enum,它說明了延遲的時(shí)間單位。TimeUnit枚舉有以下值:

DAYS
HOURS
MINUTES
SECONDS
MILLISECONDS
MICROSECONDS
NANOSECONDS

Delayed接口繼承了java.lang.Comparable接口,這意味著Delayed對(duì)象可以被相互比較。這可能是在DelayQueue內(nèi)部用于排序隊(duì)列中的元素,因此它們能夠按到期時(shí)間排序釋放。

以下是使用DelayQueue的示例:

public class DelayQueueExample {

    public static void main(String[] args) {
        DelayQueue queue = new DelayQueue();
        Delayed element1 = new DelayedElement();
        queue.put(element1);
        Delayed element2 = queue.take();
    }
}

DelayedElement是我創(chuàng)建的Delayed接口的實(shí)現(xiàn)。它不是java.util.concurrent包的一部分。你必須創(chuàng)建自己的Delayed接口實(shí)現(xiàn)才能使用DelayQueue類。

源碼

DelayQueue類的泛型定義中可以看出,此類只能儲(chǔ)存繼承自Delayed接口的元素,內(nèi)部使用一個(gè)優(yōu)先級(jí)隊(duì)列對(duì)元素進(jìn)行排序。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    // 等待隊(duì)列的頭節(jié)點(diǎn),可以視作一個(gè)緩存
    // 當(dāng)一個(gè)線程成為leader,它只會(huì)等待指定延遲的時(shí)間,但
    // 其他線程會(huì)一直等到。所以leader線程在獲取到元素后
    // 一定要釋放其他線程,除非其他線程臨時(shí)成為leader
    private Thread leader;

    /**
     * 當(dāng)隊(duì)列頭部的一個(gè)新元素可獲得(即超時(shí)到期)或者一個(gè)新線程成為leader,喚醒此等待條件上的線程
     */
    private final Condition available = lock.newCondition();

構(gòu)造函數(shù)

只有兩個(gè)構(gòu)造方法,一個(gè)是默認(rèn)構(gòu)造方法,一個(gè)是給定一個(gè)集合,并將其中元素增加到等待隊(duì)列中。

public DelayQueue() {}

/**
 * Creates a {@code DelayQueue} initially containing the elements of the
 * given collection of {@link Delayed} instances.
 *
 * @param c the collection of elements to initially contain
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

增加操作

public boolean add(E e) {
    // 重用offer方法
    return offer(e);
}

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 將元素增加到優(yōu)先級(jí)隊(duì)列中
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

public void put(E e) {
    // 因?yàn)槭菬o界隊(duì)列,所以插入不會(huì)被阻塞。超時(shí)方法同理
    offer(e);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}

刪除操作

public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

// 提取并刪除第一個(gè)元素,如果隊(duì)列為空返回null
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 獲取第一個(gè)元素
        E first = q.peek();
        return (first == null || first.getDelay(NANOSECONDS) > 0)
            ? null
            : q.poll();
    } finally {
        lock.unlock();
    }
}

/**
 * 提取并刪除隊(duì)列的第一個(gè)元素,如果隊(duì)列為空則等待 
 * 直到有可獲得的元素
 *
 * @return the head of this queue
 * @throws InterruptedException {@inheritDoc}
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            // 如果隊(duì)列為空,阻塞
            if (first == null)
                available.await();
            else {
                // 獲取頭元素的等待延遲
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)
                    return q.poll();
                first = null; // don't retain ref while waiting
                // 如果已經(jīng)有線程在等待獲取頭元素,那么阻塞自己
                if (leader != null)
                    available.await();
                // 否則,自己就是leader,等待給定延遲
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果成功獲取到元素并且隊(duì)列不為空,喚醒其他線程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element with an expired delay is available on this queue,
 * or the specified wait time expires.
 *
 * @return the head of this queue, or {@code null} if the
 *         specified waiting time elapses before an element with
 *         an expired delay becomes available
 * @throws InterruptedException {@inheritDoc}
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            // 如果隊(duì)列為空,超時(shí)等待
            if (first == null) {
                if (nanos <= 0L)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)
                    return q.poll();
                // 如果延遲還未到期,而指定的超時(shí)已到期,那么返回null
                if (nanos <= 0L)
                    return null;
                first = null; // don't retain ref while waiting
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

訪問操作

public E element() {
    E x = peek();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 委托給優(yōu)先級(jí)隊(duì)列獲取
        return q.peek();
    } finally {
        lock.unlock();
    }
}

其他操作

public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
    Objects.requireNonNull(c);
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int n = 0;
        for (E first;
             n < maxElements
                 && (first = q.peek()) != null
                 && first.getDelay(NANOSECONDS) <= 0;) {
            // 增加到集合中
            c.add(first);   // In this order, in case add() throws.
            // 從隊(duì)列中刪除此元素
            q.poll();
            ++n;
        }
        return n;
    } finally {
        lock.unlock();
    }
}

迭代器

迭代器使用數(shù)組保存隊(duì)列中的元素,當(dāng)創(chuàng)建一個(gè)迭代器時(shí),使用toArray()方法將當(dāng)前隊(duì)列轉(zhuǎn)換為數(shù)組,所以此迭代器不一定會(huì)和內(nèi)部的優(yōu)先級(jí)隊(duì)列保持一致。迭代器除了提供訪問操作外,只提供了一個(gè)刪除操作,這個(gè)刪除操作保證不會(huì)出現(xiàn)不一致的情況。

public Iterator<E> iterator() {
    return new Itr(toArray());
}

/**
 * Snapshot iterator that works off copy of underlying q array.
 */
private class Itr implements Iterator<E> {
    final Object[] array; // Array of all elements
    int cursor;           // index of next element to return
    int lastRet;          // index of last element, or -1 if no such

    Itr(Object[] array) {
        lastRet = -1;
        this.array = array;
    }

    public boolean hasNext() {
        return cursor < array.length;
    }

    @SuppressWarnings("unchecked")
    public E next() {
        if (cursor >= array.length)
            throw new NoSuchElementException();
        return (E)array[lastRet = cursor++];
    }

    public void remove() {
        if (lastRet < 0)
            throw new IllegalStateException();
        removeEQ(array[lastRet]);
        lastRet = -1;
    }
}

void removeEQ(Object o) {
    final ReentrantLock lock = this.lock;
    // 加鎖
    lock.lock();
    try {
        // 獲取優(yōu)先級(jí)隊(duì)列的迭代器,然后執(zhí)行刪除操作
        for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
            if (o == it.next()) {
                it.remove();
                break;
            }
        }
    } finally {
        lock.unlock();
    }
}

示例:

import org.junit.Assert;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueTest {
    private DelayQueue<DelayElement> queue;
    private int count;

    // 測(cè)試迭代器和內(nèi)部組件的不一致性

    @Test
    public void test() {
        List<DelayElement> list = new ArrayList<>();
        for(int i = 1; i < 6; ++i) {
            list.add(new DelayElement(i, TimeUnit.SECONDS));
        }
        queue = new DelayQueue<>(list);

        Iterator<DelayElement> iterator = queue.iterator();
        // 增加一個(gè)元素
        queue.add(new DelayElement(6, TimeUnit.SECONDS));

        iterator.forEachRemaining((e) -> ++count);
        Assert.assertEquals(count, queue.size());

        iterator.next();
        iterator.remove();
        System.out.println(queue.size());
    }

    // 測(cè)試reomove方法的一致性
    @Test
    public void testRemoveInItr() {
        List<DelayElement> list = new ArrayList<>();
        for(int i = 1; i < 6; ++i) {
            list.add(new DelayElement(i, TimeUnit.SECONDS));
        }
        queue = new DelayQueue<>(list);

        Iterator<DelayElement> iterator = queue.iterator();
        // 增加一個(gè)元素
        queue.add(new DelayElement(6, TimeUnit.SECONDS));

        System.out.println(queue.size());
        iterator.next();
        iterator.remove();
        System.out.println(queue.size());
    }


    private static class DelayElement implements Delayed {
        private long deadline;

        DelayElement(long delay) {
            this.deadline = System.nanoTime() + delay;
        }

        DelayElement(long delay, TimeUnit unit) {
            this.deadline = System.nanoTime() + unit.toNanos(delay);
        }

        DelayElement(Date date) {
            this.deadline = TimeUnit.MILLISECONDS.toNanos(date.getTime());
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.toNanos(deadline - System.nanoTime());
        }

        @Override
        public int compareTo(Delayed o) {
            Objects.requireNonNull(o);
            return (int) (deadline - o.getDelay(TimeUnit.NANOSECONDS));
        }
    }
}

輸出:

java.lang.AssertionError:
Expected :5
Actual :6
<Click to see difference>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at p6.DelayQueueTest.test(DelayQueueTest.java:28)

6
5

核心要點(diǎn)

  1. 使用此隊(duì)列時(shí),元素必須要實(shí)現(xiàn)Delayed接口
  2. 當(dāng)已經(jīng)有一個(gè)線程等待獲取隊(duì)列頭元素時(shí),其他也想要獲取元素的線程就會(huì)進(jìn)行等待阻塞狀態(tài)
  3. 迭代器不和內(nèi)部的優(yōu)先級(jí)隊(duì)列保持一致性
  4. 迭代器的remove()方法與內(nèi)部的優(yōu)先級(jí)隊(duì)列保持一致性
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容