并發(fā)編程(五):LinkedBlockingQueue源碼解析

1.1 簡(jiǎn)介

LinkedBlockingQueue是一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列,此隊(duì)列是FIFO(先進(jìn)先出)的順序來(lái)訪問(wèn)的,它由隊(duì)尾插入后再?gòu)年?duì)頭取出或移除,其中隊(duì)列的頭部是在隊(duì)列中時(shí)間最長(zhǎng)的元素,隊(duì)列的尾部是在隊(duì)列中時(shí)間最短的元素。在LinkedBlockingQueue類中分別用2個(gè)不同的鎖takeLock、putLock來(lái)保護(hù)隊(duì)頭和隊(duì)尾操作。如下圖所示:

image

1.2 類圖

image

1.3 源碼分析

1.3.1 屬性與鏈表節(jié)點(diǎn)類

//鏈表節(jié)點(diǎn)類,next指向下一個(gè)節(jié)點(diǎn)。如果下一個(gè)節(jié)點(diǎn)時(shí)null表示沒(méi)有節(jié)點(diǎn)了。  
static class Node<E> {  
    E item;  
  
    Node<E> next;  
  
    Node(E x) { item = x; }  
}  
  
// 最大容量上限,默認(rèn)是 Integer.MAX_VALUE  
private final int capacity;  
  
// 當(dāng)前元素?cái)?shù)量,這是個(gè)原子類。  
private final AtomicInteger count = new AtomicInteger(0);  
  
// 頭結(jié)點(diǎn)  
private transient Node<E> head;  
  
// 尾結(jié)點(diǎn)  
private transient Node<E> last;  
  
// 隊(duì)頭訪問(wèn)鎖  
private final ReentrantLock takeLock = new ReentrantLock();  
  
// 隊(duì)頭訪問(wèn)等待條件、隊(duì)列  
private final Condition notEmpty = takeLock.newCondition();  
  
// 隊(duì)尾訪問(wèn)鎖  
private final ReentrantLock putLock = new ReentrantLock();  
  
// 隊(duì)尾訪問(wèn)等待條件、隊(duì)列  
private final Condition notFull = putLock.newCondition();  

使用原子類AtomicInteger是因?yàn)樽x寫分別使用了不同的鎖,但都會(huì)訪問(wèn)這個(gè)屬性來(lái)計(jì)算隊(duì)列中元素的數(shù)量,所以它需要是線程安全的。關(guān)AtomicInteger詳細(xì)請(qǐng)看我的這一篇文章:【Java并發(fā)編程】深入分析AtomicInteger(二)

1.3.2 offer操作

public boolean offer(E e) {  
    if (e == null) throw new NullPointerException();  
    final AtomicInteger count = this.count;  
    //當(dāng)隊(duì)列滿時(shí),直接返回了false,沒(méi)有被阻塞等待元素插入  
    if (count.get() == capacity)  
        return false;  
    int c = -1;  
    Node<E> node = new Node(e);  
    //開(kāi)啟隊(duì)尾保護(hù)鎖  
    final ReentrantLock putLock = this.putLock;  
    putLock.lock();  
    try {  
        if (count.get() < capacity) {  
            enqueue(node);  
            //原則計(jì)數(shù)類  
            c = count.getAndIncrement();  
            if (c + 1 < capacity)  
                notFull.signal();  
        }  
    } finally {  
        //釋放鎖  
        putLock.unlock();  
    }  
    if (c == 0)  
        signalNotEmpty();  
    return c >= 0;  
}  
  
//在持有鎖下指向下一個(gè)節(jié)點(diǎn)  
private void enqueue(Node<E> node) {  
    // assert putLock.isHeldByCurrentThread();  
    // assert last.next == null;  
    last = last.next = node;  
}  

1.3.3 put操作

//put 操作把指定元素添加到隊(duì)尾,如果沒(méi)有空間則一直等待。  
public void put(E e) throws InterruptedException {  
    if (e == null) throw new NullPointerException();  
    // Note: convention in all put/take/etc is to preset local var  
    // holding count negative to indicate failure unless set.  
    //注釋:在所有的 put/take/etc等操作中變量c為負(fù)數(shù)表示失敗,>=0表示成功。  
    int c = -1;  
    Node<E> node = new Node(e);  
    final ReentrantLock putLock = this.putLock;  
    final AtomicInteger count = this.count;  
    putLock.lockInterruptibly();  
    try {  
        /* 
         * Note that count is used in wait guard even though it is 
         * not protected by lock. This works because count can 
         * only decrease at this point (all other puts are shut 
         * out by lock), and we (or some other waiting put) are 
         * signalled if it ever changes from capacity. Similarly 
         * for all other uses of count in other wait guards. 
         */  
        /* 
         * 注意,count用于等待監(jiān)視,即使它沒(méi)有用鎖保護(hù)。這個(gè)可行是因?yàn)?
         * count 只能在此刻(持有putLock)減小(其他put線程都被鎖拒之門外), 
         * 當(dāng)count對(duì)capacity發(fā)生變化時(shí),當(dāng)前線程(或其他put等待線程)將被通知。 
         * 在其他等待監(jiān)視的使用中也類似。 
         */  
        while (count.get() == capacity) {  
            notFull.await();  
        }  
        enqueue(node);  
        c = count.getAndIncrement();  
        // 還有可添加空間則喚醒put等待線程。  
        if (c + 1 < capacity)  
            notFull.signal();  
    } finally {  
        putLock.unlock();  
    }  
    if (c == 0)  
        signalNotEmpty();  
}  

1.3.4 take操作

//彈出隊(duì)頭元素,如果沒(méi)有會(huì)被阻塞直到元素返回  
public E take() throws InterruptedException {  
    E x;  
    int c = -1;  
    final AtomicInteger count = this.count;  
  
    final ReentrantLock takeLock = this.takeLock;  
    takeLock.lockInterruptibly();  
    try {  
        while (count.get() == 0) {  
            notEmpty.await();//沒(méi)有元素一直阻塞  
        }  
        x = dequeue();  
        c = count.getAndDecrement();  
        if (c > 1)//如果還有可獲取元素,喚醒等待獲取的線程。  
          notEmpty.signal();  
    } finally {  
        //拿到元素后釋放鎖  
        takeLock.unlock();  
    }  
    if (c == capacity)  
        signalNotFull();  
    return x;  
}  
  
//在持有鎖下返回隊(duì)列隊(duì)頭第一個(gè)節(jié)點(diǎn)  
private E dequeue() {  
    // assert takeLock.isHeldByCurrentThread();  
    // assert head.item == null;  
    Node<E> h = head;  
    Node<E> first = h.next;  
    h.next = h; // help GC  
    //出隊(duì)后的節(jié)點(diǎn)作為頭節(jié)點(diǎn)并將元素置空  
    head = first;  
    E x = first.item;  
    first.item = null;  
    return x;  
}  

1.3.5 remove操作

image
//移除指定元素。  
public boolean remove(Object o) {  
    if (o == null) return false;  
    //對(duì)兩把鎖加鎖  
    fullyLock();  
    try {  
        for (Node<E> trail = head, p = trail.next;  
                p != null;  
                trail = p, p = p.next) {  
            if (o.equals(p.item)) {  
                unlink(p, trail);  
                return true;  
            }  
        }  
        return false;  
    } finally {  
        fullyUnlock();  
    }  
}  
  
//p是移除元素所在節(jié)點(diǎn),trail是移除元素的上一個(gè)節(jié)點(diǎn)  
void unlink(Node<E> p, Node<E> trail) {  
    // assert isFullyLocked();  
    // p.next is not changed, to allow iterators that are  
    // traversing p to maintain their weak-consistency guarantee.  
    p.item = null;  
    //將trail下一個(gè)節(jié)點(diǎn)指向p的下一個(gè)節(jié)點(diǎn)  
    trail.next = p.next;  
    if (last == p)  
        last = trail;  
    if (count.getAndDecrement() == capacity)  
        notFull.signal();  
}  
  
void fullyLock() {  
    putLock.lock();  
    takeLock.lock();  
}  
  
//釋放鎖時(shí)確保和加鎖順序一致  
void fullyUnlock() {  
    takeLock.unlock();  
    putLock.unlock();  
}  

注意,鎖的釋放順序與加鎖順序是相反的。

作者:小毛驢,一個(gè)Java游戲服務(wù)器開(kāi)發(fā)者 原文地址:https://liulongling.github.io/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容