Javaer必須要知道的并發(fā)容器


常用容器

前幾天和同事xhf、zm走查代碼,功能是為了減少頻繁你創(chuàng)建FTP開銷用線程notify和wait實(shí)現(xiàn)了一個(gè)FTP池子,當(dāng)時(shí)提的建議就是用java自帶的線程集合實(shí)現(xiàn)可能更高效,本文整理下JDK自帶線程安全的集合,不考慮多線程并發(fā)的情況下,容器類一般使用 ArrayList、HashMap 等線程不安全的類,效率更高。在并發(fā)場(chǎng)景下,常會(huì)用到ConcurrentHashMap、ArrayBlockingQueue 等線程安全的容器類,雖然犧牲了一些效率,但卻得到了安全。

什么是線程安全:

線程安全一般指的就是線程同步的意思,就是當(dāng)一個(gè)程序?qū)σ粋€(gè)線程安全的方法或者語(yǔ)句進(jìn)行訪問的時(shí)候,其他的不能再對(duì)他進(jìn)行操作了,必須等到這次訪問結(jié)束以后才能對(duì)這個(gè)線程安全的方法進(jìn)行訪問。

線程非安全用hashmap舉例試下:

public class TestThreadSafe {
    private Map<String, Integer> persons = new HashMap<>();
    private AtomicInteger count = new AtomicInteger(0);

    @Test
    public void test() throws Exception {
        for (int i = 0; i < 10000; i++) {
            int age = i;
            new Thread(()->addName("steven"+ age, age)).start();
        }
        TimeUnit.SECONDS.sleep(10);
        System.out.println("count is:"+count.get()+",persons:"+persons.size());
    }

    private void addName(String name, int age){
        persons.put(name, age);
        count.addAndGet(1);
    }
}
輸出:
count is:10000,persons:9996

可以看到addName方法執(zhí)行了10000次但是真正添加成功的有9996次,這就是由于多線程并發(fā)put時(shí)會(huì)因?yàn)閟ize++問題導(dǎo)致覆蓋問題(jdk8,jdk7時(shí)當(dāng)并發(fā)執(zhí)行擴(kuò)容操作時(shí)會(huì)造成環(huán)形鏈和數(shù)據(jù)丟失的情況)使用concurrenthashmap時(shí)就不會(huì)出現(xiàn)此線程安全問題。

1.ConcurrentHashMap 并發(fā)版 HashMap

最常見的并發(fā)容器之一,可以用作并發(fā)場(chǎng)景下的緩存。底層依然是哈希表,但在 JAVA 8 中有了不小的改變,而 JAVA 7 和 JAVA 8 都是用的比較多的版本,因此經(jīng)常會(huì)將這兩個(gè)版本的實(shí)現(xiàn)方式做一些比較(比如面試中)。

一個(gè)比較大的差異就是,JAVA 7 中采用分段鎖來減少鎖的競(jìng)爭(zhēng),JAVA 8 中放棄了分段鎖,采用 CAS(一種樂觀鎖),同時(shí)為了防止哈希沖突嚴(yán)重時(shí)退化成鏈表(沖突時(shí)會(huì)在該位置生成一個(gè)鏈表,哈希值相同的對(duì)象就鏈在一起),會(huì)在鏈表長(zhǎng)度達(dá)到閾值(8)后轉(zhuǎn)換成紅黑樹(比起鏈表,樹的查詢效率更穩(wěn)定)。

除了key和value不能為null外,其余方法和hashMap幾乎一樣

常用方法

@Test
public void test_function() throws Exception {
    ConcurrentHashMap<String, String> data = new ConcurrentHashMap<>();

    data.put("Steven","18");

    System.out.println(data.get("Steven"));
}

2.CopyOnWriteArrayList 并發(fā)版 ArrayList

并發(fā)版 ArrayList,底層結(jié)構(gòu)也是數(shù)組,和 ArrayList 不同之處在于:當(dāng)新增和刪除元素時(shí)會(huì)創(chuàng)建一個(gè)新的數(shù)組,在新的數(shù)組中增加或者排除指定對(duì)象,最后用新增數(shù)組替換原來的數(shù)組。

適用場(chǎng)景:由于讀操作不加鎖,寫(增、刪、改)操作加鎖,因此適用于讀多寫少的場(chǎng)景。

局限:由于讀的時(shí)候不會(huì)加鎖(讀的效率高,就和普通 ArrayList 一樣),讀取的當(dāng)前副本,因此可能讀取到臟數(shù)據(jù)。

核心方法可以看出add元素時(shí)加鎖同時(shí)復(fù)制了一個(gè)數(shù)組:

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

常用方法:

@Test
public void test_function() throws Exception {
    CopyOnWriteArrayList<String> data = new CopyOnWriteArrayList<>();

    data.add("Steven");

    System.out.println(data.get(0));
}

3.CopyOnWriteArraySet 并發(fā) Set

基于 CopyOnWriteArrayList 實(shí)現(xiàn)(內(nèi)含一個(gè) CopyOnWriteArrayList 成員變量),也就是說底層是一個(gè)數(shù)組,意味著每次 add 都要遍歷整個(gè)集合才能知道是否存在,不存在時(shí)需要插入(加鎖)。

適用場(chǎng)景:在 CopyOnWriteArrayList 適用場(chǎng)景讀多寫少且集合元素不是太多的場(chǎng)景。

核心方法可以看出內(nèi)部維護(hù)一個(gè)CopyOnWriteArrayList添加時(shí)判斷是否存在,不存在時(shí)調(diào)用CopyOnWriteArrayList的add方法

public class CopyOnWriteArraySet<E> extends AbstractSet<E>
  
private final CopyOnWriteArrayList<E> al;

/**
 * Creates an empty set.
 */
public CopyOnWriteArraySet() {
    al = new CopyOnWriteArrayList<E>();
}
   public boolean addIfAbsent(E e) {
        Object[] snapshot = getArray();
        return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
            addIfAbsent(e, snapshot);
    }
}

常用方法:

@Test
public void test_function() throws Exception {
    CopyOnWriteArraySet<String> data = new CopyOnWriteArraySet<>();

    data.add("Steven");

    System.out.println(data.stream().findFirst().get());
}

4.ConcurrentLinkedQueue 并發(fā)隊(duì)列 (基于鏈表)

一個(gè)基于鏈接節(jié)點(diǎn)的無界線程安全隊(duì)列。此隊(duì)列按照 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序。隊(duì)列的頭部是隊(duì)列中時(shí)間最長(zhǎng)的元素。隊(duì)列的尾部 是隊(duì)列中時(shí)間最短的元素。新的元素插入到隊(duì)列的尾部,隊(duì)列獲取操作從隊(duì)列頭部獲得元素。當(dāng)多個(gè)線程共享訪問一個(gè)公共 collection 時(shí),ConcurrentLinkedQueue 是一個(gè)恰當(dāng)?shù)倪x擇。此隊(duì)列不允許使用 null 元素。因?yàn)閿?shù)據(jù)結(jié)構(gòu)是鏈表,所以理論上是沒有隊(duì)列大小限制的,也就是說添加數(shù)據(jù)一定能成功。隊(duì)列用的相對(duì)少一點(diǎn),所以把方法都列舉一下:

  • boolean add(E e) 將指定元素插入此隊(duì)列的尾部和offer方法完全相同

  • boolean contains(Object o) 如果此隊(duì)列包含指定元素,則返回 true。

  • boolean isEmpty() 如果此隊(duì)列不包含任何元素,則返回 true。

  • Iterator<E> iterator() 返回在此隊(duì)列元素上以恰當(dāng)順序進(jìn)行迭代的迭代器。

  • boolean offer(E e) 將指定元素插入此隊(duì)列的尾部。

  • E peek() 獲取但不移除此隊(duì)列的頭;如果此隊(duì)列為空,則返回 null。

  • E poll() 獲取并移除此隊(duì)列的頭,如果此隊(duì)列為空,則返回 null。

  • boolean remove(Object o) 從隊(duì)列中移除指定元素的單個(gè)實(shí)例(如果存在)。

  • int size() 返回此隊(duì)列中的元素?cái)?shù)量。

  • Object[] toArray() 返回以恰當(dāng)順序包含此隊(duì)列所有元素的數(shù)組。

  • <T> T[]

  • toArray(T[] a)返回以恰當(dāng)順序包含此隊(duì)列所有元素的數(shù)組;返回?cái)?shù)組的運(yùn)行時(shí)類型是指定數(shù)組的運(yùn)行時(shí)類型。

    @Test
    public void test_function() throws Exception {
    ConcurrentLinkedQueue<String> data = new ConcurrentLinkedQueue<>();

      data.add("Steven");
      data.offer("Steven2");
      System.out.println(data.peek() + ",size:" + data.size());
      System.out.println(data.poll() + ",size:" + data.size());
    

    }
    輸出:
    Steven,size:2
    Steven,size:1

5.ConcurrentLinkedDeque 并發(fā)隊(duì)列 (基于雙向鏈表)

非阻塞隊(duì)列,基于雙向鏈表實(shí)現(xiàn)的并發(fā)隊(duì)列,可以分別對(duì)頭尾進(jìn)行操作,因此除了先進(jìn)先出 (FIFO),也可以先進(jìn)后出(FILO),當(dāng)然先進(jìn)后出的話應(yīng)該叫它棧了?,F(xiàn)對(duì)于單向列表方法的添加,取出都增加了相應(yīng)的XXFirst()和XXLast()方法:

@Test
public void test_function() throws Exception {
    ConcurrentLinkedDeque<String> data = new ConcurrentLinkedDeque<>();

    data.addLast("Steven");
    data.offerFirst("Steven2");
    System.out.println(data.getLast() + ",size:" + data.size());
    System.out.println(data.pollLast() + ",size:" + data.size());
}

輸出:
Steven,size:2
Steven,size:1

6.ConcurrentSkipListMap 基于跳表的并發(fā) Map

SkipList 即跳表,跳表是一種空間換時(shí)間的數(shù)據(jù)結(jié)構(gòu),通過冗余數(shù)據(jù),將鏈表一層一層索引,達(dá)到類似二分查找的效果,ConcurrentSkipListMap在JDK并發(fā)工具類使用范圍不是很廣,它是針對(duì)某一特殊需求而設(shè)計(jì)的——支持排序,同時(shí)支持搜索目標(biāo)返回最接近匹配項(xiàng)的導(dǎo)航方法。一般情況下開發(fā)者很少會(huì)使用到該類,但是如果你有如上的特殊需求,那么ConcurrentSkipListMap將是一個(gè)很好地解決方案。
原理比較復(fù)雜以后再分析。

7.ConcurrentSkipListSet 基于跳表的并發(fā) Set

類似 HashSet 和 HashMap 的關(guān)系,ConcurrentSkipListSet 里面就是一個(gè) ConcurrentSkipListMap,就不細(xì)說了。

8.ArrayBlockingQueue 阻塞隊(duì)列 (基于數(shù)組)

基于數(shù)組實(shí)現(xiàn)的可阻塞隊(duì)列,構(gòu)造時(shí)必須指定數(shù)組大小,往里面放東西時(shí)如果數(shù)組滿了便會(huì)阻塞直到有位置(也支持直接返回和超時(shí)等待),通過一個(gè)鎖 ReentrantLock 保證線程安全。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

/*
 * Concurrency control uses the classic two-condition algorithm
 * found in any textbook.
 */

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

通過put和take存取數(shù)據(jù),讀和寫都是同一個(gè)鎖,那要是空的時(shí)候正好一個(gè)讀線程來了不會(huì)一直阻塞嗎?答案就在 notEmpty、notFull 里,這兩個(gè)出自 lock 的小東西讓鎖有了類似 synchronized + wait + notify 的功能。

/** Condition for waiting puts */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

BlockingQueue 方法以四種形式出現(xiàn),對(duì)于不能立即滿足但可能在將來某一時(shí)刻可以滿足的操作,這四種形式的處理方式不同:第一種是拋出一個(gè)異常,第二種是返回一個(gè)特殊值(null 或 false,具體取決于操作),第三種是在操作可以成功前,無限期地阻塞當(dāng)前線程,第四種是在放棄前只在給定的最大時(shí)間限制內(nèi)阻塞。下表中總結(jié)了這些方法:

||拋出異常|特殊值 |阻塞|超時(shí)|
|--|--|--|
|插入| add(e) |offer(e)(false)|put(e)|offer(e, time, unit)|
|移除 |remove()| poll()(null)|take()| poll(time, unit)|
|檢查 |element() |peek()(null)|\ ||

9.LinkedBlockingQueue 阻塞隊(duì)列 (基于鏈表)

基于鏈表實(shí)現(xiàn)的阻塞隊(duì)列,相對(duì)于不阻塞的 ConcurrentLinkedQueue,它多了一個(gè)容量限制,如果不設(shè)置默認(rèn)為 int 最大值。LinkedBlockingQueue保存元素的是一個(gè)鏈表。其內(nèi)部有一個(gè)Node的內(nèi)部類,其中有一個(gè)成員變量 Node next。就這樣形成了一個(gè)鏈表的結(jié)構(gòu),要獲取下一個(gè)元素,只要調(diào)用next就可以了。而ArrayBlockingQueue則是一個(gè)數(shù)組。

LinkedBlockingQueue內(nèi)部讀寫(插入獲取)各有一個(gè)鎖,而ArrayBlockingQueue則讀寫共享一個(gè)鎖,常用方法和ArrayBlockingQueue完全一樣

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

10.LinkedBlockingDeque 阻塞隊(duì)列 (基于雙向鏈表)

類似 LinkedBlockingQueue,但提供了雙向鏈表特有的操作。

11.PriorityBlockingQueue 線程安全的優(yōu)先隊(duì)列

構(gòu)造時(shí)可以傳入一個(gè)比較器,可以看做放進(jìn)去的元素會(huì)被排序,然后讀取的時(shí)候按順序消費(fèi)。某些低優(yōu)先級(jí)的元素可能長(zhǎng)期無法被消費(fèi),因?yàn)椴粩嘤懈邇?yōu)先級(jí)的元素進(jìn)來。priorityBlockingQueue是一個(gè)無界隊(duì)列,它沒有限制,在內(nèi)存允許的情況下可以無限添加元素;它又是具有優(yōu)先級(jí)的隊(duì)列,是通過構(gòu)造函數(shù)傳入的對(duì)象來判斷,傳入的對(duì)象必須實(shí)現(xiàn)comparable接口。

@Test
public void test_function() throws Exception {
    PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();

    queue.add(3);
    queue.add(2);
    queue.add(1);
    System.out.println(queue);
    System.out.println(queue.poll());
    System.out.println(queue);
}
輸出
[1, 3, 2]
1
[2, 3]

對(duì)結(jié)果分析,每次添加一個(gè)元素,PriorityBlockingQueue中的元素都會(huì)執(zhí)行compareTo方法進(jìn)行排序,但是只是把第一個(gè)元素排在首位,其他元素按照隊(duì)列的一系列復(fù)雜算法排序。這就保障了每次獲取到的元素都是經(jīng)過排序的第一個(gè)元素。

12.SynchronousQueue 數(shù)據(jù)同步交換的隊(duì)列

一個(gè)虛假的隊(duì)列,因?yàn)樗鼘?shí)際上沒有真正用于存儲(chǔ)元素的空間,每個(gè)插入操作都必須有對(duì)應(yīng)的取出操作,沒取出時(shí)無法繼續(xù)放入。

@Test
public void test_function() throws Exception {
    SynchronousQueue<String> queue = new SynchronousQueue<>();
    new Thread(() -> {
        try {
            int i = 0;
            while (true) {
                String name = "steven" + i++;
                System.out.println("增加:" + name);
                queue.put(name);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    new Thread(() -> {
        while (true) {
            try {
                System.out.println("取出:" + queue.take());
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }).start();
    TimeUnit.MINUTES.sleep(10);
}


輸出:
增加:steven0
取出:steven0
增加:steven1
取出:steven1
增加:steven2
取出:steven2
增加:steven3
取出:steven3

也就是說SynchronousQueue的隊(duì)列大小是1

13.LinkedTransferQueue 基于鏈表的數(shù)據(jù)交換隊(duì)列

實(shí)現(xiàn)了接口 TransferQueue,通過 transfer 方法放入元素時(shí),如果發(fā)現(xiàn)有線程在阻塞在取元素,會(huì)直接把這個(gè)元素給等待線程。如果沒有人等著消費(fèi),那么會(huì)把這個(gè)元素放到隊(duì)列尾部,并且此方法阻塞直到讀取這個(gè)元素。和 SynchronousQueue 有點(diǎn)像,但比它更強(qiáng)大。調(diào)用add添加時(shí)不會(huì)再等待取出。

14.DelayQueue 延時(shí)隊(duì)列

可以使放入隊(duì)列的元素在指定的延時(shí)后才被消費(fèi)者取出,元素需要實(shí)現(xiàn) Delayed 接口。延遲隊(duì)列提供了在指定時(shí)間才能獲取隊(duì)列元素的功能,隊(duì)列頭元素是最接近過期的元素。沒有過期元素的話,使用poll()方法會(huì)返回null值,超時(shí)判定是通過getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于0來判斷。延時(shí)隊(duì)列不能存放空元素。

DelayedQuene的元素存儲(chǔ)交由優(yōu)先級(jí)隊(duì)列存放。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();//元素存放

DelayedQuene的優(yōu)先級(jí)隊(duì)列使用的排序方式是隊(duì)列元素的compareTo方法,優(yōu)先級(jí)隊(duì)列存放順序是從小到大的,所以隊(duì)列元素的compareTo方法影響了隊(duì)列的出隊(duì)順序。

若compareTo方法定義不當(dāng),會(huì)造成延時(shí)高的元素在隊(duì)頭,延時(shí)低的元素?zé)o法出隊(duì)。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element with an expired delay is available on this queue.
 *
 * @return the head of this queue
 * @throws InterruptedException {@inheritDoc}
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)//沒有元素,讓出線程,等待java.lang.Thread.State#WAITING
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)// 已到期,元素出隊(duì)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();// 其它線程在leader線程TIMED_WAITING期間,會(huì)進(jìn)入等待狀態(tài),這樣可以只有一個(gè)線程去等待到時(shí)喚醒,避免大量喚醒操作
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);// 等待剩余時(shí)間后,再嘗試獲取元素,他在等待期間,由于leader是當(dāng)前線程,所以其它線程會(huì)等待
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

常用場(chǎng)景:緩存系統(tǒng)的設(shè)計(jì),緩存中的對(duì)象,超過了空閑時(shí)間,需要從緩存中移出;任務(wù)調(diào)度系統(tǒng),能夠準(zhǔn)確的把握任務(wù)的執(zhí)行時(shí)間??赡苄枰ㄟ^線程處理很多時(shí)間上要求很嚴(yán)格的數(shù)據(jù),如果使用普通的線程,我們就需要遍歷所有的對(duì)象,一個(gè)一個(gè)的檢 查看數(shù)據(jù)是否過期等,首先這樣在執(zhí)行上的效率不會(huì)太高,其次就是這種設(shè)計(jì)的風(fēng)格也大大的影響了數(shù)據(jù)的精度。一個(gè)需要12:00點(diǎn)執(zhí)行的任務(wù)可能12:01 才執(zhí)行,這樣對(duì)數(shù)據(jù)要求很高的系統(tǒng)有更大的弊端。此時(shí)可以使用DelayQueue。

總結(jié)

這些并發(fā)容器能很好的解決日常大部分需求,可以學(xué)習(xí)原理,但不要重復(fù)造輪子況且輪子可能還不如這些。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容