JAVA 并發(fā)容器和阻塞隊(duì)列

JAVA 并發(fā)容器和阻塞隊(duì)列

JAVA 并發(fā)容器

ConcurrentHashMapjdk7 vs jdk8 異同和優(yōu)缺點(diǎn)

  • 數(shù)據(jù)結(jié)構(gòu)

    JDK7 采用segment 分段鎖的思想 ,jdk8 中是使用 數(shù)組+鏈表+紅黑樹(shù) 實(shí)現(xiàn)。

    image.png

image.png
  • 并發(fā)度

    JDK7 每個(gè)segment 獨(dú)立加鎖, 最大并發(fā)個(gè)數(shù)是 segment 個(gè)數(shù),默認(rèn)是16。

    JDK8中 ,鎖的粒度更細(xì),理想情況下數(shù)組長(zhǎng)度就是就是支持并發(fā)的最大個(gè)數(shù),并發(fā)度提高很多。

  • 保證并發(fā)安全的原理

    JDK7 采用繼承自 ReentrantLock 的segment 分段鎖的思想,來(lái)保證線程安全。

    JDK8 放棄了Segment 的設(shè)計(jì), 采用 node+ CAS + synchronizad 保證線程安全。

  • 查詢(xún)時(shí)間復(fù)雜度

    JDK7 遍歷鏈表的事件復(fù)雜度是 O(n), 【n 為鏈表的長(zhǎng)度】。

    JDK8 如果變成遍歷紅黑樹(shù),那么時(shí)間復(fù)雜度為 O(logn) 【n 為紅黑樹(shù)節(jié)點(diǎn)個(gè)數(shù)】。

ConcurrentHashMap 與 HashTable 的區(qū)別

  • 出現(xiàn)的版本不同

    HashTable 是在 JDK1.0 的時(shí)候存在,在JDK1.2 的版本實(shí)現(xiàn)了Map 接口,成為了集合框架中的一員。

    ConcurrentHashMap 則是在JDK1.5 中出現(xiàn)的。出現(xiàn)的年代不同,所以在實(shí)現(xiàn)方式和性能上都有很大的不同。

  • 實(shí)現(xiàn)線程安全的方式不同

    HashTable 中的每個(gè)方法都被 synchronized 關(guān)鍵字修飾了, 這也就保證了線程的安全。但是這種方式在高并發(fā)的情況下性能并不是非常良好的。

    ConcurrentHashMap 是通過(guò) node+CAS+synchronized 的方式實(shí)現(xiàn)的

  • 性能不同

    當(dāng)線程數(shù)量急劇增多的時(shí)候,因?yàn)槊恳淮涡薷亩夹枰i住整個(gè)對(duì)象,其他線程再此時(shí)是不能操作的,導(dǎo)致性能會(huì)急劇下降,不僅如此還會(huì)帶來(lái)額外的上下文切換開(kāi)銷(xiāo), 所以它的吞吐量甚至沒(méi)有單線程的情況下要好。

    ConcurrentHashMap 上鎖也僅僅是**對(duì)一部分上鎖 ** ,多線程的性能帶來(lái)的是增幅的效果。

  • 迭代時(shí)元素內(nèi)容修改不同

    HashTable 在迭代期間不能修改內(nèi)容,否則會(huì)因?yàn)?strong>modCount != expectedModeCount 拋出 ConcurrentModificationException 異常, 而 ConcurrentHashMap 在迭代期間修改內(nèi)容**也不會(huì)拋出ConcurrentModificationException **異常。

有關(guān)Map 的一些問(wèn)題

為什么Map 桶中超過(guò)8 個(gè)才轉(zhuǎn)為紅黑樹(shù)?

  • 采用拉鏈法在相同的hash 位的值, 當(dāng)鏈表的長(zhǎng)度大于等于閾值 (TREEIFY_THRESHOLD=8) 同時(shí)還滿(mǎn)足容量大于等于( MIN_TREEIFY_CAPACITY = 64) ** 的要求, 就會(huì)把鏈表轉(zhuǎn)化為紅黑樹(shù), 后續(xù)如果鏈表的容量小于等于(UNTREEIFY_THRESHOLD = 6)時(shí),又會(huì)恢復(fù)為鏈表**的形式。

為什么不一開(kāi)始就采用紅黑樹(shù)的結(jié)構(gòu)呢?

  • 單個(gè)TreeNode 節(jié)點(diǎn)占用的空間大約是普通node 的兩倍, 當(dāng)node 足夠多時(shí)才轉(zhuǎn)化為 TreeNode,
  • 如果hashCode 分布良好,hash 計(jì)算離散性比較好的情況下,那么紅黑樹(shù)這種數(shù)據(jù)結(jié)構(gòu)是很少會(huì)用到的,在理想情況下,鏈表的長(zhǎng)度符合泊松分布,各個(gè)鏈表長(zhǎng)度的命中概率依次遞減,當(dāng)長(zhǎng)度為8 時(shí) 概率僅僅為 0.00000006是一個(gè)非常小的概率,,通常情況下并不會(huì)發(fā)生鏈表向紅黑樹(shù)的轉(zhuǎn)換。
  • 如果發(fā)現(xiàn) hashMap 或 ConcurrrentHashMap 內(nèi)部出現(xiàn)了 紅黑樹(shù)的結(jié)構(gòu),這個(gè)時(shí)候說(shuō)明我們的hash算法出現(xiàn)了問(wèn)題,(hashCode 的實(shí)現(xiàn)邏輯)

CopyOnWriteList 有什么特點(diǎn)?

適用場(chǎng)景

  • 讀操作可以盡可能的快,而寫(xiě)操作即使慢一下也沒(méi)有關(guān)系。
  • 讀多寫(xiě)少

讀寫(xiě)規(guī)則

  • 讀寫(xiě)鎖的思想

    讀寫(xiě)鎖的思想是 讀讀共享 ,其他都是互斥的(讀寫(xiě),寫(xiě)寫(xiě),寫(xiě)讀),并發(fā)讀并不會(huì)修改數(shù)據(jù),不會(huì)帶來(lái)線程安全問(wèn)題,寫(xiě)操作會(huì)修改原來(lái)的數(shù)據(jù),因此寫(xiě)操作的時(shí)候,不允許讀線程和寫(xiě)線程的加入。

  • 對(duì)讀寫(xiě)鎖規(guī)則的升級(jí)

    copyOnWriteArrayList 讀取是不用加鎖的,,寫(xiě)入也不會(huì)阻塞讀取操作,即 寫(xiě)讀不互斥。但是寫(xiě)寫(xiě)是不能線程共享的,寫(xiě)寫(xiě)互斥

特點(diǎn)

  • 當(dāng)容器內(nèi)的內(nèi)容被個(gè)修改時(shí),copy 出一個(gè)新的容器, 然后在新的容器中進(jìn)行操作,修改完成 再將原來(lái)容器的引用指向新的容器
  • 迭代期間允許修改集合內(nèi)容

缺點(diǎn)

  • 內(nèi)存占用問(wèn)題
  • 在元素較多或者復(fù)雜的情況下。復(fù)制的開(kāi)銷(xiāo)很大
  • 數(shù)據(jù)一致性問(wèn)題

阻塞隊(duì)列與非阻塞隊(duì)列

常見(jiàn)的阻塞隊(duì)列

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue
  • PriorityBlockingQueue
  • DelayQueue

ArrayBlockingQueue

  • 是典型的 有界隊(duì)列 ,其內(nèi)部是用 數(shù)組 結(jié)構(gòu)存儲(chǔ)元素的, 利用 ReentrantLock 實(shí)現(xiàn)線程安全。內(nèi)部排序方式是 先進(jìn)先出(FIFO)的。

    /** The queued items */
    final Object[] items;
    
    
    /** Main lock guarding all access */
    final ReentrantLock lock;
    
        /**
         * Inserts the specified element at the tail of this queue if it is
         * possible to do so immediately without exceeding the queue's capacity,
         * returning {@code true} upon success and {@code false} if this queue
         * is full.  This method is generally preferable to method {@link #add},
         * which can fail to insert an element only by throwing an exception.
         *
         * @throws NullPointerException if the specified element is null
         */
        public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == items.length)
                    return false;
                else {
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    
  • 創(chuàng)建時(shí) 指定容量和排隊(duì)任務(wù)是否公平 , (通常非公平的吞吐量要優(yōu)于公平的場(chǎng)景)

        /**
         * Creates an {@code ArrayBlockingQueue} with the given (fixed)
         * capacity and the specified access policy.
         *
         * @param capacity the capacity of this queue
         * @param fair if {@code true} then queue accesses for threads blocked
         *        on insertion or removal, are processed in FIFO order;
         *        if {@code false} the access order is unspecified.
         * @throws IllegalArgumentException if {@code capacity < 1}
         */
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    

LinkedBlockingQueue

  • 是一個(gè)內(nèi)部用 鏈表 實(shí)現(xiàn)的BlockingQueue ,如果不指定 初始容量,那么他的默認(rèn)容量為 Integer.MAX_VALUE。 會(huì)成為一個(gè)無(wú)界隊(duì)列, 內(nèi)部排序方式是 先進(jìn)先出(FIFO)的。

     public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
    
  • 同樣內(nèi)部也是通過(guò)ReentrantLock 來(lái)保證線程安全。

     public E peek() {
            if (count.get() == 0)
                return null;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                Node<E> first = head.next;
                if (first == null)
                    return null;
                else
                    return first.item;
            } finally {
                takeLock.unlock();
            }
        }
    

SynchronousQueue

  • 最大的不同之處在于 它的容量是 0 ,因此每次取數(shù)據(jù)都要先阻塞,直到有數(shù)據(jù)被放入,每次放入數(shù)據(jù)也會(huì)阻塞,直到有消費(fèi)者來(lái)取出。

        /**
         * Inserts the specified element into this queue, waiting if necessary
         * up to the specified wait time for another thread to receive it.
         *
         * @return {@code true} if successful, or {@code false} if the
         *         specified waiting time elapses before a consumer appears
         * @throws InterruptedException {@inheritDoc}
         * @throws NullPointerException {@inheritDoc}
         */
        public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
            if (e == null) throw new NullPointerException();
            if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
                return true;
            if (!Thread.interrupted())
                return false;
            throw new InterruptedException();
        }
    
  • 他會(huì)把數(shù)據(jù)直接從生產(chǎn)者傳給消費(fèi)者(direct handoff), 不需要存儲(chǔ)。

PriorityBlockingQueue

  • 是一個(gè)支持 優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列 ,可以通過(guò)自定義類(lèi)實(shí)現(xiàn) compareTo() 方法,或初始化是通過(guò)構(gòu)造函數(shù)傳入Comparator 來(lái)指定元素排序規(guī)則。

    public PriorityBlockingQueue(int initialCapacity,
                                     Comparator<? super E> comparator) {
            if (initialCapacity < 1)
                throw new IllegalArgumentException();
            this.lock = new ReentrantLock();
            this.notEmpty = lock.newCondition();
            this.comparator = comparator;
            this.queue = new Object[initialCapacity];
    }
    
  • 他的默認(rèn)初始化容量為 11,最大容量是 Integer.MAX_VALUE - 8 內(nèi)部是 用 **數(shù)組 **的數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)元素,(transient : 阻止屬性被序列化,盡在內(nèi)存中,不會(huì)序列化到磁盤(pán))

    /**
    * Default array capacity.
    */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    /**
    * The maximum size of array to allocate.
    * Some VMs reserve some header words in an array.
    * Attempts to allocate larger arrays may result in
    * OutOfMemoryError: Requested array size exceeds VM limit
    */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    
    private transient Object[] queue;
    
  • 插入隊(duì)列的對(duì)象必須是可以比較大小的, 否則會(huì)拋出 ClassCastException。

  • 消費(fèi)數(shù)據(jù)在隊(duì)列為空的是時(shí)候會(huì)阻塞 (take, pool),放入數(shù)據(jù)的時(shí)候不會(huì)被阻塞(因?yàn)殛?duì)列是無(wú)界的)。

    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            E result;
            try {
                while ( (result = dequeue()) == null)
                    notEmpty.await();
            } finally {
                lock.unlock();
            }
            return result;
      }
    

DelayQueue

  • 是一個(gè)具有延遲功能的隊(duì)列, 可以設(shè)定讓隊(duì)列中的任務(wù)延遲多久后再執(zhí)行。
  • 它是無(wú)界隊(duì)列,放入的元素必須實(shí)現(xiàn)了 Delayed 接口。(Delayed 繼承了 Comparable 接口, 擁有了比較和排序)
  • 元素會(huì)根據(jù)延遲時(shí)間的長(zhǎng)短被放到隊(duì)列的不同位置, 越靠近隊(duì)列頭則早過(guò)期。
  • DelayQueue 內(nèi)部使用了 PriorityQueue 的能力來(lái)排序。

阻塞隊(duì)列常用的方法

操作失敗拋出異常

方法 含義 特點(diǎn)
add 添加一個(gè)元素 如果隊(duì)列滿(mǎn)了,拋出IllegalStateException
remove 返回并刪除隊(duì)列的頭元素 如果隊(duì)列為空,拋出NoSuchElementException
element 返回隊(duì)列的頭元素 如果隊(duì)列為空,拋出NoSuchElementException

返回操作結(jié)果,不拋出異常

方法 含義 特點(diǎn)
offer 添加一個(gè)元素 隊(duì)列滿(mǎn),返回false ,添加成功 返回true
poll 返回并刪除隊(duì)列的頭元素 隊(duì)列空,刪除失敗,返回null
peeK 返回隊(duì)列的頭元素 隊(duì)列空,刪除失敗,返回null

陷入阻塞

方法 含義 特點(diǎn)
put 添加一個(gè)元素 如果隊(duì)列滿(mǎn),則阻塞,直至隊(duì)列有空閑
take 返回并刪除隊(duì)列的頭元素 如果隊(duì)列空,則阻塞。直至隊(duì)列有元素

非阻塞隊(duì)列

ConCurrentLinkedQueue

  • 使用CAS 非阻塞算法 + 重試 來(lái)實(shí)現(xiàn)線程安全。
boolean casNext(Node<E> cmp, Node<E> val) {
   return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容