Java并發(fā)編程——LinkedBlockingDeque

一、阻塞隊列 BlockingQueue

在java.util.concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全“傳輸”數(shù)據(jù)的問題。通過這些高效并且線程安全的隊列類,為我們快速搭建高質(zhì)量的多線程程序帶來極大的便利。

1.1、BlockingQueue的基本原理

先來解釋一下阻塞隊列:


如上圖:

  • 1、生產(chǎn)線程1往阻塞隊列里面添加新的數(shù)據(jù),當阻塞隊列滿的時候(針對有界隊列),生產(chǎn)線程1將會處于阻塞狀態(tài),直到消費線程2從隊列中取走一個數(shù)據(jù);
  • 2、消費線程2從阻塞隊列取數(shù)據(jù),當阻塞隊列空的時候,消費線程2將會處于阻塞狀態(tài),直到生產(chǎn)線程把一個數(shù)據(jù)放進去。

阻塞隊列的基本原理就這樣,至于隊列是用什么數(shù)據(jù)結(jié)構(gòu)進行存儲的,這里并沒有規(guī)定,所以后面我們可以看到很多阻塞隊列的實現(xiàn)。

阻塞隊列的常用方法

查閱BlockingQueue總結(jié)了以下阻塞隊列的方法:

1、boolean add(E e)

  • 在不違反容量限制的情況下,可立即將指定元素插入此隊列,成功返回true,當無可用空間時候,返回IllegalStateException異常。

2、boolean offer(E e)

  • 在不違反容量限制的情況下,可立即將指定元素插入此隊列,成功返回true,當無可用空間時候,返回false。

3、void put(E e)

  • 直接在隊列中插入元素,當無可用空間時候,阻塞等待。

4、boolean offer(E e, long timeout, TimeUnit unit)

  • 將給定元素在給定的時間內(nèi)設(shè)置到隊列中,如果設(shè)置成功返回true, 否則返回false。

5、E take()

  • 獲取并移除隊列頭部的元素,無元素時候阻塞等待。

6、E poll( long time, timeunit unit)

  • 獲取并移除隊列頭部的元素,無元素時候阻塞等待指定時間。

7、boolean remove()

  • 獲取并移除隊列頭部的元素,無元素時候會拋出NoSuchElementException異常。

8、E element()

  • 不移除的情況下返回列頭部的元素,無元素時候會拋出NoSuchElementException異常。

9、E peek()

  • 不移除的情況下返回列頭部的元素,隊列為空無元素時返回null。

注意:
根據(jù)remove(Object o)方法簽名可知,這個方法可以移除隊列的特定對象,但是這個方法效率并不高。因為需要遍歷隊列匹配到特定的對象之后,再進行移除。
以上支持阻塞和超時的方法都是能夠響應(yīng)中斷的。

1.2、BlockingQueue的實現(xiàn)

BlockingQueue底層也是基于AQS實現(xiàn)的,隊列的阻塞使用ReentrantLock的Condition實現(xiàn)的。

下面我們來看看各個實現(xiàn)類的原理。以下分析我都會基于支持阻塞的put和take方法來分析。

二、LinkedBlockingDeque

  • LinkedBlockingDeque是一個由鏈表結(jié)構(gòu)(雙向鏈表)組成的雙向阻塞隊列,即可以從隊列的兩端插入和移除元素,支持FIFO和FILO。

  • 相比于其他阻塞隊列,LinkedBlockingDeque多了addFirst、addLast、peekFirst、peekLast等方法。

  • LinkedBlockingDeque是可選容量的,默認容量大小為Integer.MAX_VALUE。

  • LinkedBlockingDequeConcurrentLinkedDeque類似,都是一種雙端隊列的結(jié)構(gòu),只不過LinkedBlockingDeque同時也是一種阻塞隊列。

注意:LinkedBlockingDeque底層利用ReentrantLock實現(xiàn)同步,并不像ConcurrentLinkedDeque那樣采用無鎖算法。

如何使用 LinkedBlockingDeque

  • 1、并發(fā)場景下,需要作為雙端隊列使用時,如果只是作為 FIFO 隊列使用,則 LinkedBlockingQueue 的性能更高。
  • 2、指定隊列的容量,以避免生產(chǎn)速率遠高于消費速率時資源耗盡的問題。

使用 LinkedBlockingDeque 的風(fēng)險

  • 1、未指定容量的情況下,生產(chǎn)速率遠高于消費速率時,會導(dǎo)致內(nèi)存耗盡而 OOM。
  • 2、高并發(fā)場景下,性能遠低于 LinkedBlockingQueue。
  • 3、由于需要維持前后節(jié)點的鏈接,內(nèi)存消耗也高于 LinkedBlockingQueue。

2.1、內(nèi)部結(jié)構(gòu)

LinkedBlockingDeque內(nèi)部是雙鏈表的結(jié)構(gòu),結(jié)點Node的定義如下:

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /** 雙向鏈表節(jié)點 */
    static final class Node<E> {
        /**
         *  節(jié)點元素,如果節(jié)點已經(jīng)被移除,則為 null
         */
        E item;

        /**
         * 前驅(qū)結(jié)點指針.
         */
        Node<E> prev;

        /**
         * 后驅(qū)結(jié)點指針.
         */
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }
}   

2.2、成員屬性

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     * 頭結(jié)點
     */
    transient Node<E> first;

    /**
     * 尾結(jié)點
     */
    transient Node<E> last;

    /** 隊列中個數(shù) */
    private transient int count;

    /** 隊列長度,可以使用構(gòu)造注入,如未設(shè)定,默認為無界隊列 */
    private final int capacity;

    /** 顯示鎖 */
    final ReentrantLock lock = new ReentrantLock();

    /** 消費隊列(隊列為空時,無法消費,線程阻塞) */
    private final Condition notEmpty = lock.newCondition();

    /** 生產(chǎn)隊列(隊列滿時,無法入隊,線程阻塞) */
    private final Condition notFull = lock.newCondition();
}   

2.3、構(gòu)造函數(shù)

LinkedBlockingDeque一共三種構(gòu)造器,不指定容量時,默認為Integer.MAX_VALUE:

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     * 默認構(gòu)造器.
     */ 
    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }
    
    /**
     * 指定容量的構(gòu)造器.
     */ 
    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }

    /**
     * 從已有集合構(gòu)造隊列.
     */
    public LinkedBlockingDeque(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock lock = this.lock;
        lock.lock(); // Never contended, but necessary for visibility
        try {
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (!linkLast(new Node<E>(e)))
                    throw new IllegalStateException("Deque full");
            }
        } finally {
            lock.unlock();
        }
    }   
}   

2.4、隊首入隊

初始:


隊首插入結(jié)點node:


  • 1、將目標元素 e 添加到隊列頭部,如果隊列已滿,則阻塞等待有可用空間后重試
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     *  將目標元素 e 添加到隊列頭部,如果隊列已滿,則阻塞等待有可用空間后重試
     */ 
    public void putFirst(E e) throws InterruptedException {
        // 如果存入的值為null,直接拋出空指針異常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        //加鎖
        lock.lock();
        try {
            // 嘗試在頭部添加元素
            while (!linkFirst(node))
                // 當前線程在非滿條件上等待
                notFull.await();
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }
    
    private boolean linkFirst(Node<E> node) {
        // 隊列已滿,則直接返回 false
        if (count >= capacity)
            return false;
        // 讀取頭節(jié)點    
        Node<E> f = first;
        // 將舊頭結(jié)點鏈接到目標節(jié)點之后
        node.next = f;
        // 寫入新頭節(jié)點
        first = node;
        // 1)當前元素為第一個添加到隊列中的元素
        if (last == null)
            // 寫入尾節(jié)點
            last = node;
        else
            // 將舊頭節(jié)點的前置節(jié)點設(shè)置為新頭結(jié)點
            f.prev = node;
        // 遞增計數(shù) 
        ++count;
        // 喚醒在非空條件上阻塞等待的線程來讀取元素
        notEmpty.signal();
        return true;
    }   
}   
  • 2、如果隊列已滿,則直接返回 false,否則將目標元素 e 添加到隊列頭部
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     *  如果隊列已滿,則直接返回 false,否則將目標元素 e 添加到隊列頭部
     */ 
    public boolean offerFirst(E e) {
        // 如果存入的值為null,直接拋出空指針異常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        //加鎖
        lock.lock();
        try {
            // 嘗試在頭部添加元素
            return linkFirst(node);
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }
}   
  • 3、在指定的超時時間內(nèi)嘗試將目標元素 e 添加到隊列頭部,成功則返回 true
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     *  在指定的超時時間內(nèi)嘗試將目標元素 e 添加到隊列頭部,成功則返回 true
     */ 
    public boolean offerFirst(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        // 如果存入的值為null,直接拋出空指針異常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 頭結(jié)點添加失敗
            while (!linkFirst(node)) {
                // 已經(jīng)超時則直接返回
                if (nanos <= 0)
                    return false;
                // 當前線程在非滿條件上阻塞等待,喚醒后再次嘗試添加 
                nanos = notFull.awaitNanos(nanos);
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
}   

2.4、隊尾入隊

初始:


隊尾插入結(jié)點node:


  • 1、將目標元素 e 添加到隊列尾部,如果隊列已滿,則阻塞等待有可用空間后重試
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public void put(E e) throws InterruptedException {
        putLast(e);
    }
    
    /**
     *  將目標元素 e 添加到隊列尾部,如果隊列已滿,則阻塞等待有可用空間后重試    
     */ 
    public void putLast(E e) throws InterruptedException {
        // 如果存入的值為null,直接拋出空指針異常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        //加鎖
        lock.lock();
        try {
            // 嘗試將節(jié)點鏈接到隊列尾部
            while (!linkLast(node))
                // 隊列已滿,當前線程在非滿條件上阻塞等待,被喚醒后再次嘗試
                notFull.await();
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }   
    
    private boolean linkLast(Node<E> node) {
        // 隊列已滿,則直接返回 false
        if (count >= capacity)
            return false;
        // 讀取尾節(jié)點    
        Node<E> l = last;
        // 將目標節(jié)點鏈接到尾節(jié)點之后
        node.prev = l;
        // 寫入尾節(jié)點為新增節(jié)點
        last = node;
        // 1)當前元素是第一個加入隊列的元素
        if (first == null)
            // 寫入頭結(jié)點
            first = node;
        else
            // 將舊尾節(jié)點的后置節(jié)點更新為新增節(jié)點
            l.next = node;
        // 遞增總數(shù) 
        ++count;
        // 喚醒在非空條件上等待的線程
        notEmpty.signal();
        return true;
    }   
}   
  • 2、如果隊列已滿,則直接返回 false,否則將目標元素 e 添加到隊列尾部
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public boolean offer(E e) {
        return offerLast(e);
    }
    
    /**
     *  如果隊列已滿,則直接返回 false,否則將目標元素 e 添加到隊列尾部
     */ 
    public boolean offerLast(E e) {
        // 如果存入的值為null,直接拋出空指針異常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        //加鎖
        lock.lock();
        try {
            // 嘗試將節(jié)點鏈接到隊列尾部
            return linkLast(node);
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }   
}   
  • 3、在指定的超時時間內(nèi)嘗試將目標元素 e 添加到隊列尾部,成功則返回 true
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        return offerLast(e, timeout, unit);
    }

    /**
     *  在指定的超時時間內(nèi)嘗試將目標元素 e 添加到隊列尾部,成功則返回 true
     */
    public boolean offerLast(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        // 如果存入的值為null,直接拋出空指針異常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        //加鎖
        lock.lockInterruptibly();
        try {
            // 嘗試將目標元素 e 添加到隊列尾部
            while (!linkLast(node)) {
                // 已經(jīng)超時則直接返回 false
                if (nanos <= 0)
                    return false;
                // 當前線程在非滿條件上阻塞等待,被喚醒后再次嘗試  
                nanos = notFull.awaitNanos(nanos);
            }
            return true;
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }   
}   

2.5、隊首出隊

初始:


刪除隊首結(jié)點:


  • 1、移除并返回頭部節(jié)點,如果隊列為空,則阻塞等待有可用元素之后重試
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public E take() throws InterruptedException {
        return takeFirst();
    }
    
    /**
     *  移除并返回頭部節(jié)點,如果隊列為空,則阻塞等待有可用元素之后重試
     */ 
    public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            // 嘗試移除并返回頭部節(jié)點
            while ( (x = unlinkFirst()) == null)
                // 隊列為空,則阻塞等待有可用元素之后重試
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }   
}   
  • 2、如果隊列為空,則立即返回 null,否則移除并返回頭部元素
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public E poll() {
        return pollFirst();
    }
    
    /**
     *  如果隊列為空,則立即返回 null,否則移除并返回頭部元素
     */ 
    public E pollFirst() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkFirst();
        } finally {
            lock.unlock();
        }
    }   
    
    /**
     * 從隊首刪除一個元素, 失敗則返回null.
     */ 
    private E unlinkFirst() {    
        // 獲取首節(jié)點    
        Node<E> f = first;    
        // 首節(jié)點為null,則返回null    
        if (f == null)        
            return null;    
        // 獲取首節(jié)點的后繼節(jié)點    
        Node<E> n = f.next;    
        // 移除first,將首節(jié)點更新為n
        E item = f.item;    
        f.item = null;    
        f.next = f; // help GC    
        first = n;    
        // 移除首節(jié)點后,為空隊列    
        if (n == null)        
            last = null;    
        else        
            // 將新的首節(jié)點的前驅(qū)節(jié)點設(shè)置為null        
            n.prev = null;    
        --count;    
        // 喚醒阻塞在notFull上的線程    
        notFull.signal();    
        return item;
    }
}   
  • 3、在指定的超時時間內(nèi)嘗試移除并返回頭部元素,如果已經(jīng)超時,則返回 null
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        return pollFirst(timeout, unit);
    }
    
    /**
     *  在指定的超時時間內(nèi)嘗試移除并返回頭部元素,如果已經(jīng)超時,則返回 null
     */ 
    public E pollFirst(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            E x;
            // 嘗試移除并返回頭部元素
            while ( (x = unlinkFirst()) == null) {
                // 已經(jīng)超時則返回 null
                if (nanos <= 0)
                    return null;
                // 當前線程在非空條件上阻塞等待,被喚醒后進行重試  
                nanos = notEmpty.awaitNanos(nanos);
            }
            // 移除成功則直接返回頭部元素
            return x;
        } finally {
            lock.unlock();
        }
    }   
}   

2.6、隊尾出隊

初始:


刪除隊尾結(jié)點:


  • 1、移除并返回尾部節(jié)點,如果隊列為空,則阻塞等待有可用元素之后重試
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     *  移除并返回尾部節(jié)點,如果隊列為空,則阻塞等待有可用元素之后重試
     */ 
    public E takeLast() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            // 嘗試移除并返回尾部節(jié)點
            while ( (x = unlinkLast()) == null)
                // 隊列為空,則阻塞等待有可用元素之后重試
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 從隊尾刪除一個元素, 失敗則返回null.
     */     
    private E unlinkLast() {       
        // 獲取尾節(jié)點    
        Node<E> l = last;    
        // 尾節(jié)點為null,則返回null    
        if (l == null)        
            return null;    
        // 獲取尾節(jié)點的前驅(qū)節(jié)點    
        Node<E> p = l.prev;    
        // 移除尾節(jié)點,將尾節(jié)點更新為p    
        E item = l.item;    
        l.item = null;    
        l.prev = l; // help GC    
        last = p;    
        // 移除尾節(jié)點后,為空隊列    
        if (p == null)        
            first = null;    
        else        
            // 將新的尾節(jié)點的后繼節(jié)點設(shè)置為null        
            p.next = null;    
        --count;    
        // 喚醒阻塞在notFull上的線程    
        notFull.signal();    
        return item;
    }   
}   
  • 2、如果隊列為空,則立即返回 null,否則移除并返回尾部元素
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     *  如果隊列為空,則拋出異常,否則移除并返回尾部元素
     */ 
    public E removeLast() {
        E x = pollLast();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    
    /**
     *  如果隊列為空,則立即返回 null,否則移除并返回尾部元素
     */ 
    public E pollLast() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //從隊尾刪除一個元素, 失敗則返回null. 
            return unlinkLast();
        } finally {
            lock.unlock();
        }
    }   
}
  • 3、在指定的超時時間內(nèi)嘗試移除并返回尾部元素,如果已經(jīng)超時,則返回 null
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public E pollLast(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            E x;
            // 嘗試移除并返回尾部元素
            while ( (x = unlinkLast()) == null) {
                // 已經(jīng)超時則返回 null
                if (nanos <= 0)
                    return null;
                // 當前線程在非空條件上阻塞等待,被喚醒后進行重試
                nanos = notEmpty.awaitNanos(nanos);
            }
            // 移除成功則直接返回尾部元素
            return x;
        } finally {
            lock.unlock();
        }
    }
}

三、LinkedBlockingQueue與LinkedBlockingDeque對比

LinkedBlockingQueue

  • FIFO;
  • 讀寫分開兩個ReentrantLock;

LinkedBlockingDeque

  • FIFO & FILO;
  • 全局一把ReentrantLock;

參考:
https://www.itzhai.com/articles/graphical-blocking-queue.html

https://www.cnblogs.com/zhuxudong/p/10079511.html

https://segmentfault.com/a/1190000016398508

https://www.cnblogs.com/snake107/p/12035580.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容