
1.官方文檔
An unbounded blocking queue of Delayed elements, in which an
element can only be taken when its delay has expired. The head of
the queue is that Delayed element whose delay expired furthest in
the past. If no delay has expired there is no head and poll will return
null. Expiration occurs when an element's
getDelay(TimeUnit.NANOSECONDS) method returns a value less
than or equal to zero. Even though unexpired elements cannot be
removed using take or poll, they are otherwise treated as normal
elements. For example, the size method returns the count of both
expired and unexpired elements. This queue does not permit null
elements.
存放Delayed元素的無(wú)界阻塞隊(duì)列,只有元素過(guò)期才能將其取走。隊(duì)頭head是過(guò)期最長(zhǎng)時(shí)間的元素,如果沒(méi)有元素過(guò)期則沒(méi)有head并且poll會(huì)返回null。當(dāng)元素的getDelay()方法返回小于等于0的值,則過(guò)期。不允許null元素。
2.Delayed
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
元素必須實(shí)現(xiàn)Delayed接口,方法getDelay返回元素還剩多少時(shí)間才過(guò)時(shí)。
由于Delayed繼承自 Comparable<Delayed>,所以也要實(shí)現(xiàn)compareTo方法。
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
3.優(yōu)先隊(duì)列
將元素入隊(duì)時(shí),會(huì)調(diào)用Delayed元素的compareTo方法進(jìn)行排序。因此,元素除了提供getDelay,還需要提供compareTo方法。
private final PriorityQueue<E> q = new PriorityQueue<E>();
4.入隊(duì)和出隊(duì)
4.1 阻塞版本put和take
由于是無(wú)界隊(duì)列,入隊(duì)不會(huì)阻塞。
/**
* Inserts the specified element into this delay queue. As the queue is
* unbounded this method will never block.
*
* @param e the element to add
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
為什么q.peek() == e才signal?
當(dāng)隊(duì)列頭部元素被更早到期的元素替換是,leader被置為null,offer里面q.peek() == e時(shí),會(huì)將leader=null,此時(shí)當(dāng)然會(huì)signal,重新競(jìng)選leader。所以定時(shí)等待線程必須要處理失去leader時(shí)情況。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
leader的作用是什么?
聲明 private Thread leader = null;源碼注釋:
Thread designated to wait for the element at the head of
the queue. This variant of the Leader-Follower pattern
(http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
minimize unnecessary timed waiting. When a thread becomes
the leader, it waits only for the next delay to elapse, but
other threads await indefinitely. The leader thread must
signal some other thread before returning from take() or
poll(...), unless some other thread becomes leader in the
interim. Whenever the head of the queue is replaced with
an element with an earlier expiration time, the leader
field is invalidated by being reset to null, and some
waiting thread, but not necessarily the current leader, is
signalled. So waiting threads must be prepared to acquire
and lose leadership while waiting.
leader是等待隊(duì)列頭部元素的指定線程。Leader-Follower模式的這種變體用于最小化不必要的定時(shí)等待。
- 當(dāng)一個(gè)線程稱為leader時(shí),其會(huì)定時(shí)等待下一個(gè)delay元素過(guò)期,但是其他線程會(huì)無(wú)限期等待。
- 當(dāng)從take/poll返回之前,leader線程必須signal其他等待線程,除非在此期間有線程稱為了新的leader。
- 每當(dāng)隊(duì)列頭部元素被更早到期的元素替換時(shí),leader被置為null,offer里面q.peek() == e時(shí),會(huì)將leader=null,此時(shí)當(dāng)然會(huì)signal,重新競(jìng)選leader。所以定時(shí)等待線程必須要處理失去leader時(shí)情況。
4.2 返回特殊值的offer和poll(還有超時(shí)版本)
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
4.3 拋出異常的add和remove
public boolean add(E e) {
return offer(e);
}
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}
5.Leader-Follower模式
參考Concurrency Pattern
參考Leader-Follower線程模型簡(jiǎn)介

- 1)線程有3種狀態(tài):領(lǐng)導(dǎo)leading,處理processing,追隨following
- 2)假設(shè)共N個(gè)線程,其中只有1個(gè)leading線程(等待任務(wù)),x個(gè)processing線程(處理),余下有N-1-x個(gè)following線程(空閑)
- 3)有一把鎖,誰(shuí)搶到就是leading
- 4)事件/任務(wù)來(lái)到時(shí),leading線程會(huì)對(duì)其進(jìn)行處理,從而轉(zhuǎn)化為processing狀態(tài),處理完成之后,又轉(zhuǎn)變?yōu)閒ollowing
- 5)丟失leading后,following會(huì)嘗試搶鎖,搶到則變?yōu)閘eading,否則保持following
- 6)following不干事,就是搶鎖,力圖成為leading