Java并發(fā)(六):并發(fā)容器和框架

一. ConcurrentHashMap

ConcurrentHashMap是線程安全且高效的HashMap。

  • HashMap線程不安全;
  • HashTable使用synchronized保證線程安全,但是效率非常低下;
  • ConcurrentHashMap使用鎖分段技術(shù)提升并發(fā)訪問率。

注意
ConcurrentHashMap的線程安全指的是其內(nèi)部的方法線程安全(如get、put等),即單獨調(diào)用某個方法線程安全的。
當(dāng)代碼塊包含多個操作時,此代碼塊并不是線程安全的,因為不能保證整個代碼塊是互斥執(zhí)行的,因此仍然需要同步處理(加鎖、double check等)。

1. ConcurrentHashMap的結(jié)構(gòu)

JDK1.7版本

  • HashMap由HashEntry數(shù)組組成,HashEntry是鏈表結(jié)構(gòu)。

  • ConcurrentHashMap由Segment數(shù)組組成,Segment由HashEntry數(shù)組組成,HashEntry是鏈表結(jié)構(gòu)。

2. ConcurrentHashMap的初始化

ConcurrentHashMap初始化方法是通過initialCapacity、loadFactor和concurrencyLevel等幾個參數(shù)來初始化segment數(shù)組、段偏移量segmentShift、段掩碼segmentMask和每個segment里的HashEntry數(shù)組來實現(xiàn)的。

3. 定位Segment

ConcurrentHashMap使用分段鎖Segment來保護(hù)不同段的數(shù)據(jù),在插入和獲取元素的時候,必須先通過散列算法定位到Segment。

散列算法的目的:是元素盡可能均勻的分布在不同的Segment上,從而提高容器的存取效率。

4. ConcurrentHashMap的操作

(1) get操作
  1. 對key的hashCode進(jìn)行一次再散列得到一個新的散列值;
  2. 根據(jù)新的散列值,利用散列運算定位到Segment;
  3. 再通過散列算法定位到元素;

整個get過程不需要加鎖,除非讀到的值為空才會加鎖重讀。

(2) put操作
  1. 定位到Segment;
  2. 判斷是否需要對Segment
(3) size操作

嘗試兩次不鎖住Segment直接統(tǒng)計各Segment大小的方法,如果兩次結(jié)果不相同,則采用加鎖的方式來統(tǒng)計所有Segment的大小。

二. ConcurrentLinkedQueue

ConcurrentLinkedQueue是一個基于鏈接節(jié)點的無界線程安全隊列,采用FIFO的規(guī)則對節(jié)點進(jìn)行排序。

隊列前后分別用head節(jié)點和tail節(jié)點來定位,中間的節(jié)點通過next關(guān)聯(lián)起來。

1. 入隊列

入隊列就是將入隊節(jié)點添加到隊列的尾部。

  1. 將入隊節(jié)點設(shè)置成當(dāng)前隊列尾節(jié)點的下一個節(jié)點;
  2. 更新tail節(jié)點,如果tail節(jié)點的next節(jié)點不為空,則將入隊節(jié)點設(shè)置為tail節(jié)點,如果tail節(jié)點的next節(jié)點為空,則將入隊節(jié)點設(shè)置為tail節(jié)點的next節(jié)點。

因此:tail節(jié)點并不一定就是尾節(jié)點,尾節(jié)點可能是tail節(jié)點,也可能是tail節(jié)點的next節(jié)點。

2. 出隊列

出隊列就是從隊列中返回一個節(jié)點元素,并清空該節(jié)點對元素的引用。

  1. 獲取head節(jié)點的元素,判斷是否為空
  2. 如果為空,表示另一個線程已經(jīng)進(jìn)行了一次出隊列操作將該節(jié)點的元素取走,如果不為空,則使用CAS將head節(jié)點設(shè)置為null并返回head節(jié)點的元素。

三. 阻塞隊列

阻塞隊列的主要使用場景:生產(chǎn)者和消費者。

四種處理方式:

處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用
  • 拋出異常:隊列滿時,繼續(xù)插入數(shù)據(jù)則拋出IllegalStateException;隊列空時,從隊列取數(shù)據(jù)則拋出NoSuchElementException。

  • 返回特殊值:插入元素時,成功返回true;移除元素時,如果沒有則返回null。

  • 一直阻塞:隊列滿時,繼續(xù)插入數(shù)據(jù)會一直阻塞生產(chǎn)者線程直到隊列可用或響應(yīng)中斷退出;隊列空時,從隊列取數(shù)據(jù)會阻塞消費者線程直到隊列不空。

  • 超時退出:隊列滿時,繼續(xù)插入數(shù)據(jù)會阻塞生產(chǎn)者線程一段時間,超過指定時間則退出。

1. Java里的阻塞隊列

(1) ArrayBlockingQueue

用數(shù)組實現(xiàn)的有界阻塞隊列,F(xiàn)IFO。

(2) LinkedBlockingQueue

用鏈表實現(xiàn)的有界阻塞隊列,默認(rèn)和最大長度為Integer.MAX_VALUE,F(xiàn)IFO。

(3) PriorityBlockingQueue

支持優(yōu)先級的無界阻塞隊列。

(4) DelayQueue

支持延時獲取元素的無界阻塞隊列,在創(chuàng)建元素時可指定延時,只有延遲期滿才能從隊列中提取元素。

(5) SynchronousQueue

不存儲元素的阻塞隊列,每一個put操作必須等待一個take操作,否則不能繼續(xù)添加元素。

(6) LinkedTransferQueue

由鏈表結(jié)構(gòu)組成的無界阻塞TransferQueue隊列,主要多了tryTransfer和transfer方法。

  • transfer方法:如果當(dāng)前有消費者正在等待接收元素,transfer方法可以把生產(chǎn)者傳入的元素立刻傳輸給消費者;如果沒有消費者在等待接收元素,transfer方法將元素存放在隊列tail節(jié)點,并等到該元素被消費者消費了才返回。

  • tryTransfer方法:如果當(dāng)前有消費者正在等待接收元素,tryTransfer方法可以把生產(chǎn)者傳入的元素立刻傳輸給消費者;如果沒有消費者在等待接收元素,tryTransfer方法將元素存放在隊列tail節(jié)點,并返回false。

(7) LinkedBlockingDeque

由鏈表結(jié)構(gòu)組成的雙向阻塞隊列,可以從隊列的兩端插入和移除元素。

2. 阻塞隊列的實現(xiàn)原理

使用等待 / 通知模式實現(xiàn)。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    private final Condition notEmpty;
    private final Condition notFull;

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
}
  • 當(dāng)生產(chǎn)者往滿的隊列中添加元素時會阻塞生產(chǎn)者(await),當(dāng)消費者消費了一個隊列中的元素后,會通知生產(chǎn)者當(dāng)前隊列可用(signal)。

  • 當(dāng)消費者從空的隊列中取元素時會阻塞消費者(await),當(dāng)生產(chǎn)者添加了一個元素后,會通知消費者當(dāng)前隊列可用(signal)。

四. Fork/Join 框架

Fork/Join 框架是Java 7 提供的一個用于并行執(zhí)行任務(wù)的框架,把大任務(wù)分割成若干個小任務(wù),最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果。

  1. 分割任務(wù);
  2. 執(zhí)行任務(wù)并合并結(jié)果。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容