LinkedBlockingQueue源碼分析

LinkedBlockingQueue是用鏈表實現(xiàn)的FIFO隊列,該隊列大小默認是Integer.MAX_VALUE,可以認為是無界的,也可指定大小,使其有界?;阪湵淼年犃型ǔ1然跀到M的隊列有更高的吞吐量,但大多數并發(fā)場景下,實際的性能優(yōu)劣不易測量。
LinkedBlockingQueue通過一個啞鈴節(jié)點、takeLock和putLock、(AtomicInteger類型的)count,以及級聯(lián)通知實現(xiàn)了雙鎖隊列算法(的變體),提高了性能。
java.util包下的迭代器是fail-fast的,即當迭代器創(chuàng)建后,若外部通過非迭代器自身的方法修改集合的內容,會拋出ConcurrentModificationException異常;而java.util.concurrent包下的迭代器是弱一致性的,即當迭代器創(chuàng)建后,其他線程通過非迭代器自身的方法修改集合的內容并不會拋出異常。

1. LinkedBlockingQueue繼承關系圖

2. LinkedBlockingQueue源碼分析

2.1 內部類
    // 底層單向鏈表的Node
    static class Node<E> {
        E item;
        // 為了LinkedBlockingQueue.Itr的弱一致性,可能出現(xiàn)next指向當前節(jié)點自身的情況
        Node<E> next;
        Node(E x) { item = x; }
    }
2.2 字段
    // 隊列容量(未指定時默認是Integer.MAX_VALUE)
    private final int capacity;

    // 隊列中元素的個數
    // 因為添加、獲取(刪除)操作使用各自的鎖,且兩種操作都會修改count
    // 的值,因此使用AtomicInteger類型的count來保證原子性(和可見性)
    private final AtomicInteger count = new AtomicInteger();
    
    // 底層鏈表的頭節(jié)點
    transient Node<E> head;
    
    // 底層鏈表的尾節(jié)點
    private transient Node<E> last;
    
    // take、poll等方法使用的ReentrantLock和Condition
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    
    // put、offer等方法使用的ReentrantLock和Condition
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();
2.3 三個構造方法

(1)LinkedBlockingQueue(int)

    public LinkedBlockingQueue(int capacity) {
        // 檢查capacity
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        // last和head初始時都指向該啞鈴節(jié)點,正是由于該節(jié)點,實現(xiàn)了添加元素和獲取(刪除)元素
        // 兩種操作的解耦,使得LinkedBlockingQueue這個基于鏈表的阻塞隊列可以采用雙鎖隊列算法
        last = head = new Node<E>(null);
    }

(2)LinkedBlockingQueue()

    public LinkedBlockingQueue() {
        // 調用LinkedBlockingQueue(int)
        this(Integer.MAX_VALUE);
    }

(3)LinkedBlockingQueue(Collection)

    public LinkedBlockingQueue(Collection<? extends E> c) {
        // 調用LinkedBlockingQueue(int)
        this(Integer.MAX_VALUE);
        // 要(在隊尾)添加元素,所以使用putLock鎖
        final ReentrantLock putLock = this.putLock;
        // 實例化時不存在多線程競爭,這里上鎖是為了可見性
        putLock.lock();
        try {
            int n = 0;
            for (E e : c) {
                // e不能為null
                if (e == null)
                    throw new NullPointerException();
                // 注意n從0開始
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e)); // 入隊
                ++n;
            }
            // 更新count.value的值
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }
2.4 輔助方法

(1)enqueue

    // 將node添加到隊列末尾
    private void enqueue(Node<E> node) {
        // 該語句等價轉化為:
        //       last.next = node;
        //       last = last.next;
        last = last.next = node;
    }

(2)dequeue

    // 從頭部刪除一個節(jié)點
    private E dequeue() {
        // 注意:head指向的是啞鈴節(jié)點
        Node<E> h = head;
        Node<E> first = h.next;
        // 不直接將h.next置為null是為了LinkedBlockingQueue.Itr的弱一致性
        h.next = h; // help GC
        head = first;
        // 獲取first.item作為返回值
        E x = first.item;
        // 將first.item置為null,作為新的啞鈴節(jié)點
        first.item = null;
        return x;
    }

(3)signalNotEmpty

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        // 先獲取takeLock鎖
        // notEmpty.signal中會判斷當前線程是否持有鎖,所以要調用
        // notEmpty的signal方法,必須持有takeLock鎖,否則會拋出異常      
        takeLock.lock();
        try {
            // 調用notEmpty.signal后,會將條件隊列中第一個調用了notEmpty.await或awaitNanos(nanos)
            // 的線程對應的Node轉移到同步隊列中,前驅節(jié)點調用unpark后才算真正喚醒該線程
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

(4)signalNotFull

    // 與signalNotEmpty類似
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        // 先獲取putLock鎖
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

(5)fullyLock

    // 上鎖
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }   

(6)fullyUnlock

    // 解鎖
    // 以一種順序上鎖,以相反的順序釋放鎖,可避免死鎖
    // (這里上鎖順序或解鎖順序顛倒也不會出現(xiàn)死鎖)
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }
2.5 添加元素

(1)put-無限期阻塞版

    public void put(E e) throws InterruptedException {
        // e不能為null
        if (e == null) throw new NullPointerException();
        // 約定put、take等方法中將c初始化為-1,表示入隊失敗
        int c = -1;
        // 將e封裝成Node
        Node<E> node = new Node<E>(e);
        // 要(在隊尾)添加元素,所以使用putLock鎖
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try { 
            // 獲取count.value的值,若為capacity則阻塞當前線程
            // 可能出現(xiàn)當前線程被喚醒后隊列大小仍是capacity的情況,當前線程會再次阻塞
            while (count.get() == capacity) {
                notFull.await();
            }
            // 入隊
            enqueue(node);
            // 注意:getAndIncrement中會以CAS的方式將count.value加1并返回count.value原來的值
            c = count.getAndIncrement();
            // 注意c是本次入隊前count.value的值
            // 這兩行語句算是級聯(lián)通知結構的一部分,若不采用級聯(lián)通知,即將這兩行語句替換為
            // signalNotEmpty();則每次入隊都要獲取takeLock,調用notEmpty.signal,極大降低了性能
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // c為0表示隊列中有一個元素
        if (c == 0)
            // 為了降低添加元素相關操作對takeLock的依賴,采用了級聯(lián)通知
            // 級聯(lián)通知:
            //     只要隊列中有一個元素,就通過signalNotEmpty獲取takeLock鎖,并調用notEmpty.signal
            //     將一個消費者線程(若存在的話)加入同步隊列(此時可能沒有消費者線程,這種情況下,后
            //     面過來的消費者線程是不會阻塞的,因為隊列中已經有元素了、也可能有多個阻塞著的消費
            //     者線程),后面過來的線程和同步隊列中的線程競爭鎖,競爭到鎖的線程會執(zhí)行出隊操作,
            //     若隊列不為空,該線程會調用notEmpty.signal,如此往復......若隊列為空了,消費者線程
            //     會阻塞,又會通過這里的signalNotEmpty進行處理。
            signalNotEmpty();
    }   

(2)offer(E)-非阻塞版

    public boolean offer(E e) {
        // e不能為null(略)
        final AtomicInteger count = this.count;
        // 先獲取count.value的值,嘗試判斷一下
        if (count.get() == capacity)
            return false;
        int c = -1;
        // 與put中相同的代碼不再貼出
            // 當其他線程在上面if之后又添加元素時再次獲取的
            // count.value就為capacity,條件為false,c為-1
            if (count.get() < capacity) {
                // 入隊
                enqueue(node);
                // 再次強調:c是count.value入隊前的值
                c = count.getAndIncrement();
                // 級聯(lián)通知相關
                if (c + 1 < capacity)
                    notFull.signal();
            }
        // 若c仍為-1,表示入隊失敗,返回false,否則返回true
        return c >= 0;
    }   

(3)offer(E,long,TimeUnit)-限期阻塞版

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        // 與put中相同的代碼不再貼出
        // 轉化為納秒數
        long nanos = unit.toNanos(timeout);
        while (count.get() == capacity) {
                // 隊列仍是滿的,但阻塞時長已到,在這里跳出
                if (nanos <= 0)
                    return false;
                // 阻塞指定時長,也可能被提前喚醒
                nanos = notFull.awaitNanos(nanos);
            }
        return true;
    }
2.6 獲取(刪除)元素

(1)take-阻塞版

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 要(在隊t頭)刪除元素,所以使用takeLock鎖
        takeLock.lockInterruptibly();
        try {
            // 隊列為空則阻塞
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 出隊
            x = dequeue();
            // 強調:c是count.value入隊前的值
            c = count.getAndDecrement();
            // c為1說明經過上面的出隊操作隊列已空
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // c為capacity說明當前隊列中的元素個數是capacity-1
        if (c == capacity)
            // 為了降低添加元素相關操作對putLock的依賴,采用了級聯(lián)通知
            // 與put的級聯(lián)通知是相對應的,不再介紹
            signalNotFull();
        return x;
    }   

poll()-非阻塞版和poll(long,TimeUnit)-限期阻塞版與offer(E)-非阻塞版和offer(E,long,TimeUnit)-限期阻塞版是相對應的,不再介紹。
(2)remove-直接刪除元素

    // 注意take、poll等方法是獲取元素,remove是直接刪除隊列中第一個滿足o.equals(p.item)的元素
    public boolean remove(Object o) {
        // 隊列中的元素不允許為null,所以這里o也不能為null
        if (o == null) return false;
        // 因為要操作的是整個隊列,所以使用雙鎖
        fullyLock();
        try {
            // 遍歷鏈表
            // trail是p的前一個節(jié)點,trail初始為啞鈴節(jié)點
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    // 找到滿足o.equals(p.item)的節(jié)點,斷開該節(jié)點
                    unlink(p, trail);
                    //  已移除一個,返回true
                    return true;
                }
            }
            // 未找到匹配的節(jié)點,返回false
            return false;
        } finally {
            // 釋放雙鎖
            fullyUnlock();
        }
    }
    void unlink(Node<E> p, Node<E> trail) {
        // 為了LinkedBlockingQueue.Itr的弱一致性,不將p.next置為null
        p.item = null;
        // 進行連接
        trail.next = p.next;
        // p是否是最后一個節(jié)點
        if (last == p)
            last = trail;
        // 若隊列中元素個數為capacity-1,則進行級聯(lián)通知
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }
2.7 查看元素
    public E peek() {
        // 條件成立說明隊列為空
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        // 要查看對頭元素的值,因此使用takeLock
        takeLock.lock();
        try {
            // 獲取頭節(jié)點的下一個節(jié)點
            // 注意頭節(jié)點是啞鈴節(jié)點
            Node<E> first = head.next;
            // 當在上面if之后,其他線程將隊列中的元素取完時,這里first就為null
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }   
2.8 弱一致性迭代器

下面通過兩個例子對LinkedBlockingQueue.Itr.nextNode中的條件進行復現(xiàn),留作參考。
(1)nextNode中"s == p"為true的例子

    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue(3);
        lbq.put("a");
        lbq.put("b");
        lbq.put("c");
        Iterator<String> iterator = lbq.iterator();
        lbq.take();
        lbq.take();
        // 此處設斷點,進入nextNode方法,會發(fā)現(xiàn)s == p成立
        iterator.next();
    }

(2)nextNode中"s != null && s.item == null"為true的例子

    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue(3);
        lbq.put("a");
        lbq.put("b");
        lbq.put("c");
        Iterator<String> iterator = lbq.iterator();
        lbq.remove("a");
        lbq.take();
        // 此處設斷點調試,進入nextNode方法,會
        // 發(fā)現(xiàn)s != null && s.item == null成立
        iterator.next();
    }
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容