20-阻塞隊列之LinkedBlockingQueue

阻塞隊列之LinkedBlockingQueue

在前面的文章中,已經對JDK中的BlockingQueue做了一個回顧,同時對ArrayBlockingQueue中的核心方法作了說明,而LinkedBlockingQueue作為JDK中BlockingQueue家族系列中一員,由于其作為固定大小線程池(Executors.newFixedThreadPool())底層所使用的阻塞隊列,分析它的目的主要在于2點:

(1) 與ArrayBlockingQueue進行類比學習,加深各種數據結構的理解;

(2) 了解底層實現(xiàn),能夠更好地理解每一種阻塞隊列對線程池性能的影響,做到真正的知其然,且知其所以然。

源碼分析LinkedBlockingQueue的實現(xiàn)

LinkedBlockingQueue:顧名思義,它是一個基于鏈表的阻塞隊列,首先看一下它的核心組成。

// 所有的元素都通過Node這個靜態(tài)內部類來進行存儲,這與LinkedList的處理方式完全一樣
static class Node<E> {
    //使用item來保存元素本身
    E item;
    //保存當前節(jié)點的后繼節(jié)點
    Node<E> next;
    Node(E x) { item = x; }
}

/**
    阻塞隊列所能存儲的最大容量
    用戶可以在創(chuàng)建時手動指定最大容量,如果用戶沒有指定最大容量
    那么最默認的最大容量為Integer.MAX_VALUE.
*/
private final int capacity;

/** 
    當前阻塞隊列中的元素數量
    PS:如果你看過ArrayBlockingQueue的源碼,你會發(fā)現(xiàn)
    ArrayBlockingQueue底層保存元素數量使用的是一個
    普通的int類型變量。其原因是在ArrayBlockingQueue底層
    對于元素的入隊列和出隊列使用的是同一個lock對象。而數
    量的修改都是在處于線程獲取鎖的情況下進行操作,因此不
    會有線程安全問題。
    而LinkedBlockingQueue卻不是,它的入隊列和出隊列使用的是兩個    
    不同的lock對象,因此無論是在入隊列還是出隊列,都會涉及對元素數
    量的并發(fā)修改,(之后通過源碼可以更加清楚地看到)因此這里使用了一個原子操作類
    來解決對同一個變量進行并發(fā)修改的線程安全問題。
*/
private final AtomicInteger count = new AtomicInteger(0);

/**
 * 鏈表的頭部
 * LinkedBlockingQueue的頭部具有一個不變性:
 * 頭部的元素總是為null,head.item==null   
 */
private transient Node<E> head;

/**
 * 鏈表的尾部
 * LinkedBlockingQueue的尾部也具有一個不變性:
 * 即last.next==null
 */
private transient Node<E> last;

/**
 元素出隊列時線程所獲取的鎖
 當執(zhí)行take、poll等操作時線程需要獲取的鎖
*/
private final ReentrantLock takeLock = new ReentrantLock();

/**
當隊列為空時,通過該Condition讓從隊列中獲取元素的線程處于等待狀態(tài)
*/
private final Condition notEmpty = takeLock.newCondition();

/** 
  元素入隊列時線程所獲取的鎖
  當執(zhí)行add、put、offer等操作時線程需要獲取鎖
*/
private final ReentrantLock putLock = new ReentrantLock();

/** 
 當隊列的元素已經達到capactiy,通過該Condition讓入隊列的線程處于等待狀態(tài)
*/
private final Condition notFull = putLock.newCondition();

通過上面的分析,我們可以發(fā)現(xiàn)LinkedBlockingQueue在入隊列和出隊列時使用的不是同一個Lock,這也意味著它們之間的操作不會存在互斥操作。在多個CPU的情況下,它們可以做到真正的在同一時刻既消費、又生產,能夠做到并行處理。

下面讓我們看下LinkedBlockingQueue的構造方法:

/**
 * 如果用戶沒有顯示指定capacity的值,默認使用int的最大值
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

/**
 可以看到,當隊列中沒有任何元素的時候,此時隊列的頭部就等于隊列的尾部,
 指向的是同一個節(jié)點,并且元素的內容為null
*/
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

/*
在初始化LinkedBlockingQueue的時候,還可以直接將一個集合
中的元素全部入隊列,此時隊列最大容量依然是int的最大值。
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    //獲取鎖
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        //迭代集合中的每一個元素,讓其入隊列,并且更新一下當前隊列中的元素數量
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            //參考下面的enqueue分析        
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        //釋放鎖
        putLock.unlock();
    }
}

/**
 * 我去,這代碼其實可讀性不怎么樣啊。
 * 其實下面的代碼等價于如下內容:
 * last.next=node;
 * last = node;
 * 其實也沒有什么花樣:
   就是讓新入隊列的元素成為原來的last的next,讓進入的元素成為last
 *
 */
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

在分析完LinkedBlockingQueue的核心組成之后,下面讓我們再看下核心的幾個操作方法,首先分析一下元素入隊列的過程:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    
    /*注意上面這句話,約定所有的put/take操作都會預先設置本地變量,
    可以看到下面有一個將putLock賦值給了一個局部變量的操作
    */
    int c = -1;
    Node<E> node = new Node(e);
    /* 
     在這里首先獲取到putLock,以及當前隊列的元素數量
     即上面所描述的預設置本地變量操作
    */
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    /*
        執(zhí)行可中斷的鎖獲取操作,即意味著如果線程由于獲取
        鎖而處于Blocked狀態(tài)時,線程是可以被中斷而不再繼
        續(xù)等待,這也是一種避免死鎖的一種方式,不會因為
        發(fā)現(xiàn)到死鎖之后而由于無法中斷線程最終只能重啟應用。
    */
    putLock.lockInterruptibly();
    try {
        /*
        當隊列的容量到底最大容量時,此時線程將處于等待狀
        態(tài),直到隊列有空閑的位置才繼續(xù)執(zhí)行。使用while判
        斷依舊是為了防止線程被"偽喚醒”而出現(xiàn)的情況,即當
        線程被喚醒時而隊列的大小依舊等于capacity時,線
        程應該繼續(xù)等待。
        */
        while (count.get() == capacity) {
            notFull.await();
        }
        //讓元素進行隊列的末尾,enqueue代碼在上面分析過了
        enqueue(node);
        //首先獲取原先隊列中的元素個數,然后再對隊列中的元素個數+1.
        c = count.getAndIncrement();
        /*注:c+1得到的結果是新元素入隊列之后隊列元素的總和。
        當前隊列中的總元素個數小于最大容量時,此時喚醒其他執(zhí)行入隊列的線程
        讓它們可以放入元素,如果新加入元素之后,隊列的大小等于capacity,
        那么就意味著此時隊列已經滿了,也就沒有必須要喚醒其他正在等待入隊列的線程,因為喚醒它們之后,它們也還是繼續(xù)等待。
        */
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        //完成對鎖的釋放
        putLock.unlock();
    }
    /*當c=0時,即意味著之前的隊列是空隊列,出隊列的線程都處于等待狀態(tài),
    現(xiàn)在新添加了一個新的元素,即隊列不再為空,因此它會喚醒正在等待獲取元素的線程。
    */
    if (c == 0)
        signalNotEmpty();
}

/*
喚醒正在等待獲取元素的線程,告訴它們現(xiàn)在隊列中有元素了
*/
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //通過notEmpty喚醒獲取元素的線程
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

在分析完入隊列的過程之后,我們接下來看看LinkedBlockingQueue出隊列的過程;由于BlockingQueue的方法都具有對稱性,此處就只分析take方法的實現(xiàn),其余方法的實現(xiàn)都如出一轍:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    //通過takeLock獲取鎖,并且支持線程中斷
    takeLock.lockInterruptibly();
    try {
        //當隊列為空時,則讓當前線程處于等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        //完成元素的出隊列
        x = dequeue();
        /*            
           隊列元素個數完成原子化操作-1,可以看到count元素會
           在插入元素的線程和獲取元素的線程進行并發(fā)修改操作。
        */
        c = count.getAndDecrement();
        /*
          當一個元素出隊列之后,隊列的大小依舊大于1時
          當前線程會喚醒其他執(zhí)行元素出隊列的線程,讓它們也
          可以執(zhí)行元素的獲取
        */
        if (c > 1)
            notEmpty.signal();
    } finally {
        //完成鎖的釋放
        takeLock.unlock();
    }
    /*
        當c==capaitcy時,即在獲取當前元素之前,
        隊列已經滿了,而此時獲取元素之后,隊列就會
        空出一個位置,故當前線程會喚醒執(zhí)行插入操作的線
        程通知其中的一個可以進行插入操作。
    */
    if (c == capacity)
        signalNotFull();
    return x;
}

/*
    喚醒等待插入元素的線程,告訴它們現(xiàn)在隊列中有空間了
*/
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}


/**
 * 讓頭部元素出隊列的過程
 * 其最終的目的是讓原來的head被GC回收,讓其的next成為head
 * 并且新的head的item為null.
 * 因為LinkedBlockingQueue的頭部具有一致性:即元素為null。
 */
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
image.png

對于LinkedBlockingQueue的源碼分析就到這里,下面讓我們將LinkedBlockingQueue與ArrayBlockingQueue進行一個比較。

LinkedBlockingQueue與ArrayBlockingQueue的比較

ArrayBlockingQueue由于其底層基于數組,并且在創(chuàng)建時指定存儲的大小,在完成后就會立即在內存分配固定大小容量的數組元素,因此其存儲通常有限,故其是一個“有界“的阻塞隊列;而LinkedBlockingQueue可以由用戶指定最大存儲容量,也可以無需指定,如果不指定則最大存儲容量將是Integer.MAX_VALUE,即可以看作是一個“無界”的阻塞隊列,由于其節(jié)點的創(chuàng)建都是動態(tài)創(chuàng)建,并且在節(jié)點出隊列后可以被GC所回收,因此其具有靈活的伸縮性。但是由于ArrayBlockingQueue的有界性,因此其能夠更好的對于性能進行預測,而LinkedBlockingQueue由于沒有限制大小,當任務非常多的時候,不停地向隊列中存儲,就有可能導致內存溢出的情況發(fā)生。

其次,ArrayBlockingQueue中在入隊列和出隊列操作過程中,使用的是同一個lock,所以即使在多核CPU的情況下,其入隊和出隊的操作都無法做到并行,而LinkedBlockingQueue的讀取和插入操作所使用的鎖是兩個不同的lock,它們之間的操作互相不受干擾,因此兩種操作可以并行完成,故LinkedBlockingQueue的吞吐量要高于ArrayBlockingQueue。

選擇LinkedBlockingQueue的理由

/**
    下面的代碼是Executors創(chuàng)建固定大小線程池的代碼,其使用了
    LinkedBlockingQueue來作為任務隊列。
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

JDK中選用LinkedBlockingQueue作為阻塞隊列的原因就在于其無界性。因為線程大小固定的線程池,其線程的數量是不具備伸縮性的,當任務非常繁忙的時候,就勢必會導致所有的線程都處于工作狀態(tài),如果使用一個有界的阻塞隊列來進行處理,那么就非常有可能很快導致隊列滿的情況發(fā)生,從而導致任務無法提交而拋出RejectedExecutionException,而使用無界隊列由于其良好的存儲容量的伸縮性,可以很好的去緩沖任務繁忙情況下的場景,即使任務非常多,也可以進行動態(tài)擴容,當任務被處理完成之后,隊列中的節(jié)點也會被隨之被GC回收,非常靈活。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • 相關文章Java并發(fā)編程(一)線程定義、狀態(tài)和屬性 Java并發(fā)編程(二)同步Java并發(fā)編程(三)volatil...
    劉望舒閱讀 5,288評論 1 31
  • 一、多線程 說明下線程的狀態(tài) java中的線程一共有 5 種狀態(tài)。 NEW:這種情況指的是,通過 New 關鍵字創(chuàng)...
    Java旅行者閱讀 4,865評論 0 44
  • 1.阻塞隊列定義阻塞隊列常用于生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。...
    SDY_0656閱讀 480評論 0 1
  • 每次生病就會擔心,擔心自己是不是時間不多了,雖然可笑。但是我卻是發(fā)自內心的害怕。 因為自己以前的某些事 這種時候我...
    一窗天閱讀 376評論 1 2
  • 央Ya閱讀 449評論 0 0

友情鏈接更多精彩內容