以LinkedBlockingQueue為例淺談阻塞隊列的實現(xiàn)

目錄

最近在閱讀Spark源碼的過程中,又重新接觸到了一些Java并發(fā)方面的知識,于是就見縫插針地將它們記錄下來,當做復習與備忘。

阻塞隊列簡介

阻塞隊列的定義

根據(jù)Doug Lea在JavaDoc中的解釋,所謂阻塞隊列,就是在普通隊列的基礎之上,支持以下兩種操作的隊列:

  • 當某線程從隊列獲取元素時,如果隊列為空,就等待(阻塞)直至隊列中有元素;
  • 當某線程向隊列插入元素時,如果隊列已滿,就等待(阻塞)直至隊列中有空間。

也就是說,阻塞隊列是自帶同步機制的隊列。它最常用來解決線程同步中經典的生產者-消費者問題,前面講過的Spark Core異步事件總線中,就采用阻塞隊列作為事件存儲。

Java中的阻塞隊列

Java中阻塞隊列的基類是j.u.c.BlockingQueue接口,它繼承自Queue接口,并且定義了額外的方法實現(xiàn)同步:

    void put(E e) throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    E take() throws InterruptedException;

    E poll(long timeout, TimeUnit unit) throws InterruptedException;

上述put()與offer()方法用于向隊列插入元素,take()與poll()方法則是從隊列獲取元素。不同的是,put()與take()方法在插入/獲取時,如果必須等待,就會一直阻塞下去;而offer()與poll()方法可以指定阻塞的時間長度。

以BlockingQueue接口為中心的繼承關系如下圖所示。


平時開發(fā)中比較常用的阻塞隊列是基于數(shù)組實現(xiàn)的ArrayBlockingQueue,與基于單鏈表實現(xiàn)的LinkedBlockingQueue。本文選擇后者來深入看一下阻塞隊列的實現(xiàn)細節(jié),因為它的性能在多數(shù)情況下更優(yōu),可以自行寫benchmark程序來測測。

LinkedBlockingQueue

LinkedBlockingQueue(以下簡稱LBQ)是基于單鏈表實現(xiàn)的,先進先出(FIFO)的有界阻塞隊列。

單鏈表定義

LBQ的單鏈表結點數(shù)據(jù)結構定義在靜態(tài)內部類Node中。

    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }

在類的內部還定義了單鏈表的頭結點與尾結點。

    transient Node<E> head;
    private transient Node<E> last;

head始終指向鏈表的第一個結點,該結點是哨兵結點,不存儲數(shù)據(jù),只標記鏈表的開始,即head.item == null。這樣可以避免只有一個結點時造成混亂。
tail始終指向鏈表的最后一個結點,該結點是有數(shù)據(jù)的,并滿足last.next == null

LBQ在隊頭獲取及彈出元素,在隊尾插入元素。

鎖和等待隊列

LBQ采用雙鎖機制保證入隊和出隊可以同時進行,互不干擾。

    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();

可見定義有兩個ReentrantLock,takeLock用于控制出隊,putLock用于控制入隊。另外,還有這兩個鎖分別對應的條件變量notEmpty和notFull,分別維護出隊和入隊線程的等待隊列。ReentrantLock和Condition都是Java AQS機制的重要組成部分,之后也會細說。

值得注意的是,在某些方法中需要同時對takeLock與putLock加鎖與解鎖,所以LBQ內部也提供了這樣的方法。

    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

這兩個方法總會成對調用,保證所有需要同時加鎖和解鎖的地方,其順序都一致并且不可中斷,也防止了前一個鎖操作成功執(zhí)行,后一個鎖操作被打斷導致死鎖的風險。

另外,LBQ也對條件變量的Condition.signal()方法進行了簡單封裝,分別用來喚醒阻塞的出隊操作線程和入隊操作線程。

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

容量和計數(shù)

    private final int capacity;
    private final AtomicInteger count = new AtomicInteger();

capacity是LBQ的最大容量,可以在構造方法中隨同名參數(shù)傳入,默認值是Integer.MAX_VALUE。

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

count則是LBQ內當前元素的計數(shù),由于入隊和出隊動作可以并發(fā)執(zhí)行,所以要用原子類型AtomicInteger保證線程安全。

入隊操作

由于put()和offer()方法的邏輯基本相同,所以只看offer()方法就好了。

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

在入隊時,首先將putLock加鎖,然后用衛(wèi)語句count.get() == capacity判斷隊列是否已滿,若已滿,則進入等待循環(huán)。當阻塞的時間超時后,判定入隊操作失敗,并返回false。
如果隊列未滿,或者在超時時間未到時有了空間,就調用enqueue()方法在隊尾插入元素,并將計數(shù)器自增。入隊后若還有更多的剩余空間,則喚醒其他等待的入隊線程。
最后將putLock解鎖,并檢查由count.getAndIncrement()返回的值是否為0。如果為0,表示隊列剛剛由空變?yōu)榉强諣顟B(tài),因此也要喚醒等待的出隊線程。

出隊操作

同理,只看poll()方法。

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

將講解入隊方法時的描述反著說一遍就行了:
在出隊時,首先將takeLock加鎖,然后用衛(wèi)語句count.get() == 0判斷隊列是否為空,若為空,則進入等待循環(huán)。當阻塞的時間超時后,判定出隊操作失敗,并返回false。
如果隊列不為空,或者在超時時間未到時進了新元素,就調用dequeue()方法彈出隊頭元素,并將計數(shù)器自減。出隊后若還有更多的剩余元素,則喚醒其他等待的出隊線程。
最后將takeLock解鎖,并檢查由count.getAndDecrement()返回的值是否為capacity。如果為capacity,表示隊列剛剛由滿變?yōu)椴粷M狀態(tài),因此也要喚醒等待的入隊線程。

需要操作雙鎖的情況

以remove()方法為例。

    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

    void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.
        p.item = null;
        trail.next = p.next;
        if (last == p)
            last = trail;
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

由于單鏈表刪除結點涉及到對鏈表的遍歷,以及對前驅和后繼結點的斷鏈和補鏈,因此必須將兩個鎖都加上,禁止一切修改。待刪除成功后才能解鎖,繼續(xù)正常的入隊和出隊操作。

生產者-消費者問題示例

生產者-消費者問題的解決方法用操作系統(tǒng)理論中的信號量PV(wait-signal)原語描述如下:

semaphore filled = 0;
semaphore empty = BUF_CAPACITY;
mutex_semaphore mutex = 1;

procedure producer() {
  while (true) {
    item = produce();
    wait(empty);
    wait(mutex);
    buffer.put(item);
    signal(mutex);
    signal(filled);
  }
}

procedure consumer() {
  while (true) {
    wait(filled);
    wait(mutex);
    item = buffer.get();
    signal(mutex);
    signal(empty);
    consume(item);
  }
}

利用阻塞隊列可以免去自己實現(xiàn)同步機制的麻煩,從而非常方便地實現(xiàn)。一個極簡的示例如下:

public class ProducerConsumerExample {
    private static final int BUF_CAPACITY = 16;

    public static void main(String[] args) {
        BlockingQueue<Long> blockingQueue = new LinkedBlockingQueue<>(BUF_CAPACITY);

        Thread producerThread = new Thread(() -> {
            try {
                while (true) {
                    long value = System.currentTimeMillis() % 1000;
                    blockingQueue.put(value);
                    Thread.sleep(value);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "producer");

        Thread consumerThread = new Thread(() -> {
            try {
                while (true) {
                    System.out.println(blockingQueue.take());
                    Thread.sleep(System.currentTimeMillis() % 1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "consumer");

        producerThread.start();
        consumerThread.start();
    }
}

一個?。ǎ浚﹩栴}

在上面的代碼(以及j.u.c包中很多類的代碼)的方法體中,經常能看到類似以下的語句:

        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;

也就是有些類中定義的字段,在方法中使用時會先賦值給一個局部變量。這樣做到底是為了什么?以目前我所了解到的而言,還沒有特別確切的答案,但可以確定是一個非常微小的優(yōu)化,與JVM及緩存有關。

以下是reference傳送門:

順便,StackOverflow最近(不知道是哪一天)改版成了1998年的樣式,滿滿的懷舊感。上面concurrency-interest郵件列表中關于這個問題也是眾說紛紜,如果仔細爬樓還會發(fā)現(xiàn)Doug Lea本人的回復,不過有些令人費解。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容