八、并發(fā)隊(duì)列之LinkedBlockingDeque源碼解析

為什么先介紹LinkedBlockingDeque雙向同步隊(duì)列呢,主要是它比較簡(jiǎn)單,對(duì)于后面的隊(duì)列的理解是一個(gè)好的鋪墊


結(jié)構(gòu)

  • Node<E> first 頭節(jié)點(diǎn)
  • Node<E> last 尾節(jié)點(diǎn)
  • int count 節(jié)點(diǎn)數(shù)量
  • int capacity 容量
  • ReentrantLock lock = new ReentrantLock(); 鎖
  • Condition notEmpty = lock.newCondition(); 非空條件變量
  • Condition notFull = lock.newCondition(); 未滿條件變量

由上可以看出,LinkedBlockingDeque有一個(gè)獨(dú)占鎖,且包括它的兩個(gè)條件變量,其實(shí)這就是一個(gè)生產(chǎn)者-消費(fèi)者模型。
再來(lái)看其內(nèi)部Node類

static final class Node<E> {
    E item;
    Node<E> prev;
    Node<E> next;
    Node(E x) {
        item = x;
    }
}

由上可以看出與ConcurrentLinkedQueue不同,其內(nèi)部是一個(gè)雙端隊(duì)列。
到此我們值了解這么多,重要的操作原理我們往下看源碼

重要方法

因?yàn)槭顷?duì)列,離不開offer、poll、peek這幾個(gè)重要方法,不過由于LinkedBlockingDeque是一個(gè)雙向隊(duì)列所以還有對(duì)應(yīng)相反的方法。

入隊(duì)操作

入隊(duì)方法有如下:

```
public boolean add(E e) {
    addLast(e);
    return true;
}
public void addFirst(E e) {
    if (!offerFirst(e))
        throw new IllegalStateException("Deque full");
}
public void addLast(E e) {
    if (!offerLast(e))
        throw new IllegalStateException("Deque full");
}
public boolean offer(E e) {
    return offerLast(e);
}
public boolean offerFirst(E e) {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();       // 加鎖
    try {
        return linkFirst(node);  // 直接返回linkFirst
    } finally {
        lock.unlock();
    }
}
public boolean offerLast(E e) {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return linkLast(node);
    } finally {
        lock.unlock();
    }
}
public void put(E e) throws InterruptedException {
    putLast(e);
}
public void putFirst(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();        // 加鎖
    try {
        while (!linkFirst(node))    // 如果失敗,則調(diào)用notFull.await,也就是,等待未滿條件
            notFull.await();
    } finally {
        lock.unlock();
    }
}
public void putLast(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        while (!linkLast(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}
```

從代碼中我們可以看到,一共有三類:add、offer、put,然后分別有對(duì)應(yīng)last和first的方法。只有三點(diǎn)不同:

  1. put中加個(gè)while循環(huán),如果失敗,則進(jìn)行等待,只有l(wèi)ink成功的條件才往下執(zhí)行,而offer直接返回link的結(jié)果
  2. 它們的返回參數(shù)不一樣,offer的是boolean,而put則是void
  3. add函數(shù)在失敗后會(huì)拋"Deque full"的異常
    于是我們知道,offer不保證成功,put保證一定能插入元素且是阻塞的方法。
    接著我們繼續(xù)看他們的核心函數(shù)link:
private boolean linkFirst(Node<E> node) {
    // assert lock.isHeldByCurrentThread();
    if (count >= capacity)
        return false;       // 數(shù)量達(dá)到限制,則返回false
    Node<E> f = first;      // 頭節(jié)點(diǎn)
    node.next = f;          // 當(dāng)前節(jié)點(diǎn)放到頭節(jié)點(diǎn),成為新的頭節(jié)點(diǎn)
    first = node;
    if (last == null)
        last = node;        // 如果沒有尾節(jié)點(diǎn),則也置為尾節(jié)點(diǎn)
    else
        f.prev = node;
    ++count;
    notEmpty.signal();      // 喚醒出隊(duì)操作,代表現(xiàn)在隊(duì)列不是空
    return true;
}

上面的函數(shù)比較簡(jiǎn)單,就做了兩件事:

  1. 重置頭結(jié)點(diǎn)
  2. 喚醒出隊(duì)線程,因?yàn)槭且粋€(gè)入隊(duì)操作,那么至少隊(duì)列的數(shù)量有一個(gè),也就是不可能為空了,表示現(xiàn)在可以出隊(duì)了。

linkLast也是一樣,其實(shí)由于前面已經(jīng)加鎖了,不會(huì)產(chǎn)生并發(fā),所以邏輯非常簡(jiǎn)單,就是雙向隊(duì)列的入隊(duì)操作。

出隊(duì)操作

與入隊(duì)對(duì)應(yīng),出隊(duì)操作也有三種,remove、poll、take。。
看一下三者的區(qū)別:

public E remove() {
        return removeFirst();
    }
public E removeLast() {
    E x = pollLast();       // 其實(shí)就是調(diào)用的poll
    if (x == null) throw new NoSuchElementException(); 只不過為空的時(shí)候會(huì)拋錯(cuò)
    return x;
}
public E pollLast() {
    final ReentrantLock lock = this.lock;
    lock.lock();        // 采用獨(dú)占鎖進(jìn)行同步
    try {
        return unlinkLast();
    } finally {
        lock.unlock();
    }
}
public E takeLast() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        while ( (x = unlinkLast()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

可以看出,與入隊(duì)幾乎一樣,兩點(diǎn)不同:

  1. take中加個(gè)while循環(huán),如果失敗,則進(jìn)行等待,只有unlink成功的條件才往下執(zhí)行,而poll直接返回unlink的結(jié)果
  2. remove函數(shù)在失敗后會(huì)拋錯(cuò)NoSuchElementException的異常

其核心函數(shù)還是unlink :

private E unlinkLast() {
    // assert lock.isHeldByCurrentThread();
    Node<E> l = last;
    if (l == null)
        return null;
    Node<E> p = l.prev;
    E item = l.item;  // 保存值用來(lái)返回
    l.item = null;    
    l.prev = l; // 指向自己延后釋放
    last = p;      // 重新指定隊(duì)尾
    if (p == null)
        first = null;  // 如果隊(duì)列為空就沒必要再指定下一個(gè)節(jié)點(diǎn)了
    else
        p.next = null;
    --count;
    notFull.signal();  // 至少有一個(gè)空余,喚醒某個(gè)入隊(duì)線程,使其加入同步隊(duì)列
    return item;
}

上面的函數(shù)比較簡(jiǎn)單,就做了兩件事:

  1. 釋放尾節(jié)點(diǎn)、并充值last節(jié)點(diǎn)
  2. 喚醒入隊(duì)線程,因?yàn)槭且粋€(gè)出隊(duì)操作,那么至少隊(duì)列的數(shù)量減一,表示現(xiàn)在可以入隊(duì)了。

獲取元素

該操作與出列非常相似,只是沒有移除的步驟,獲取操作有兩種:get、和peek

public E getFirst() {
    E x = peekFirst();
    if (x == null) throw new NoSuchElementException();
    return x;
}
public E peekFirst() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (first == null) ? null : first.item;
    } finally {
        lock.unlock();
    }
}

可以看出唯一的區(qū)別就是為空時(shí)拋不拋異常。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容