Java并發(fā)(一)——線程安全的容器(上)

Java中線程安全的容器主要包括兩類(lèi):

  • VectorHashtable,以及封裝器類(lèi)Collections.synchronizedListCollections.synchronizedMap
  • Java 5.0引入的java.util.concurrent包,其中包含并發(fā)隊(duì)列、并發(fā)HashMap以及寫(xiě)入時(shí)復(fù)制容器。

依筆者看,早期使用的同步容器主要有兩方面的問(wèn)題:1)通過(guò)對(duì)方法添加synchronized關(guān)鍵字實(shí)現(xiàn)同步,這種粗粒度的加鎖操作在synchronized關(guān)鍵字本身未充分優(yōu)化之前,效率偏低;2)同步容器雖然是線程安全的,但在某些外部復(fù)合操作(例:若沒(méi)有則添加)時(shí),依然需要客戶(hù)端加鎖保證數(shù)據(jù)安全。因此,從Java 5.0以后,并發(fā)編程偏向于使用java.util.concurrent包(作者:Doug Lea)中的容器類(lèi),本文也將著重介紹該包中的容器類(lèi),主要包括:

  1. 阻塞隊(duì)列
  2. ConcurrentHashMap
  3. 寫(xiě)入時(shí)復(fù)制容器

一、阻塞隊(duì)列

在并發(fā)環(huán)境下,阻塞隊(duì)列是常用的數(shù)據(jù)結(jié)構(gòu),它能確保數(shù)據(jù)高效安全的傳輸,為快速搭建高質(zhì)量的多線程應(yīng)用帶來(lái)極大的便利,比如MQ的原理就是基于阻塞隊(duì)列的。java.util.concurrent中包含豐富的隊(duì)列實(shí)現(xiàn),它們之間的關(guān)系如下圖所示:

并發(fā)隊(duì)列關(guān)系圖
  • BlockingQueue、Deque(雙向隊(duì)列)繼承自Queue接口;
  • BlockingDeque同時(shí)繼承自BlockingQueue、Deque接口,提供阻塞的雙向隊(duì)列屬性;
  • LinkedBlockingQueue和LinkedBlockingDeque分別實(shí)現(xiàn)了BlockingQueue和BlockingDeque接口;
  • DelayQueue實(shí)現(xiàn)了BlockingQueue接口,提供任務(wù)延遲功能;
  • TransferQueue是Java 7引入的,用于替代BlockingQueue,LinkedTransferQueue是其實(shí)現(xiàn)類(lèi)。

下面對(duì)這些隊(duì)列進(jìn)行詳細(xì)的介紹:

1.1 BlockingQueue與BlockingDeque

阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。這兩個(gè)附加的操作是:

  • 在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡?/li>
  • 當(dāng)隊(duì)列滿(mǎn)時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。

阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。

阻塞隊(duì)列提供了四種處理方法:

方法 拋出異常 返回特殊值 一直阻塞 超時(shí)退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用
  • 拋出異常:是指當(dāng)阻塞隊(duì)列滿(mǎn)時(shí)候,再往隊(duì)列里插入元素,會(huì)拋出IllegalStateException("Queue full")異常。當(dāng)隊(duì)列為空時(shí),從隊(duì)列里獲取元素時(shí)會(huì)拋出NoSuchElementException異常 。
  • 返回特殊值:插入方法會(huì)返回是否成功,成功則返回true。移除方法,則是從隊(duì)列里拿出一個(gè)元素,如果沒(méi)有則返回null
  • 一直阻塞:當(dāng)阻塞隊(duì)列滿(mǎn)時(shí),如果生產(chǎn)者線程往隊(duì)列里put元素,隊(duì)列會(huì)一直阻塞生產(chǎn)者線程,直到拿到數(shù)據(jù),或者響應(yīng)中斷退出。當(dāng)隊(duì)列空時(shí),消費(fèi)者線程試圖從隊(duì)列里take元素,隊(duì)列也會(huì)阻塞消費(fèi)者線程,直到隊(duì)列可用。
  • 超時(shí)退出:當(dāng)阻塞隊(duì)列滿(mǎn)時(shí),隊(duì)列會(huì)阻塞生產(chǎn)者線程一段時(shí)間,如果超過(guò)一定的時(shí)間,生產(chǎn)者線程就會(huì)退出。

BlockingDeque在BlockingQueue的基礎(chǔ)上,增加了支持雙向隊(duì)列的屬性。如下圖所示,相比于BlockingQueue的插入和移除方法,變?yōu)?code>XxxFirst,XxxLast方法,分別對(duì)應(yīng)隊(duì)列的兩端,既可以在頭部添加或移除,也可以在尾部添加或移除。

BlockingDeque插入/移除方法

1.2 LinkedBlockingQueue與LinkedBlockingDeque

LinkedBlockingQueue是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列的默認(rèn)和最大長(zhǎng)度為Integer.MAX_VALUE,按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序。

首先看下LinkedBlockingQueue中核心的域:

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

private final int capacity;
private final AtomicInteger count = new AtomicInteger();

transient Node<E> head;
private transient Node<E> last;

private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
  • LinkedBlockingQueueLinkedList類(lèi)似,通過(guò)靜態(tài)內(nèi)部類(lèi)Node<E>進(jìn)行元素的存儲(chǔ);
  • capacity表示阻塞隊(duì)列所能存儲(chǔ)的最大容量,在創(chuàng)建時(shí)可以手動(dòng)指定最大容量,默認(rèn)的最大容量為Integer.MAX_VALUE;
  • count表示當(dāng)前隊(duì)列中的元素?cái)?shù)量,LinkedBlockingQueue的入隊(duì)列和出隊(duì)列使用了兩個(gè)不同的lock對(duì)象,因此無(wú)論是在入隊(duì)列還是出隊(duì)列,都會(huì)涉及對(duì)元素?cái)?shù)量的并發(fā)修改,因此這里使用了一個(gè)原子操作類(lèi)來(lái)解決對(duì)同一個(gè)變量進(jìn)行并發(fā)修改的線程安全問(wèn)題。
  • headlast分別表示鏈表的頭部和尾部;
  • takeLock表示元素出隊(duì)列時(shí)線程所獲取的鎖,當(dāng)執(zhí)行take、poll等操作時(shí)線程獲??;notEmpty當(dāng)隊(duì)列為空時(shí),通過(guò)該Condition讓獲取元素的線程處于等待狀態(tài);
  • putLock表示元素入隊(duì)列時(shí)線程所獲取的鎖,當(dāng)執(zhí)行putoffer等操作時(shí)獲??;notFull當(dāng)隊(duì)列容量達(dá)到capacity時(shí),通過(guò)該Condition讓加入元素的線程處于等待狀態(tài)。

其次,LinkedBlockingQueue有三個(gè)構(gòu)造方法,分別如下:

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

默認(rèn)構(gòu)造函數(shù)直接調(diào)用LinkedBlockingQueue(int capacity),LinkedBlockingQueue(int capacity)會(huì)初始化首尾節(jié)點(diǎn),并置位null。LinkedBlockingQueue(Collection<? extends E> c)在初始化隊(duì)列的同時(shí),將一個(gè)集合的全部元素加入隊(duì)列。

最后分析下puttake的過(guò)程,這里重點(diǎn)關(guān)注:LinkedBlockingQueue如何實(shí)現(xiàn)添加/移除并行的?

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

之所以把puttake放在一起,是因?yàn)樗鼈兪且粚?duì)互逆的過(guò)程:

  • put在插入元素前首先獲得putLock和當(dāng)前隊(duì)列的元素?cái)?shù)量,take在去除元素前首先獲得takeLock和當(dāng)前隊(duì)列的元素?cái)?shù)量;
  • put時(shí)需要判斷當(dāng)前隊(duì)列是否已滿(mǎn),已滿(mǎn)時(shí)當(dāng)前線程進(jìn)行等待,take時(shí)需要判斷隊(duì)列是否已空,隊(duì)列為空時(shí)當(dāng)前線程進(jìn)行等待;
  • put調(diào)用enqueue在隊(duì)尾插入元素,并修改尾指針,take調(diào)用dequeuehead指向原來(lái)first的位置,并將first的數(shù)據(jù)域置位null,實(shí)現(xiàn)刪除原first指針,并產(chǎn)生新的head,同時(shí),切斷原head節(jié)點(diǎn)的引用,便于垃圾回收。
private void enqueue(Node<E> node) {
    last = last.next = node;
}
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
  • 最后,put根據(jù)count決定是否觸發(fā)隊(duì)列未滿(mǎn)和隊(duì)列空;take根據(jù)count決定是否觸發(fā)隊(duì)列未空和隊(duì)列滿(mǎn)。

回到剛才的問(wèn)題:LinkedBlockingQueue如何實(shí)現(xiàn)添加/移除并行的?
LinkedBlockingQueue在入隊(duì)列和出隊(duì)列時(shí)使用的是不同的Lock,這也意味著它們之間的操作不會(huì)存在互斥。在多個(gè)CPU的情況下,可以做到在同一時(shí)刻既消費(fèi)、又生產(chǎn),做到并行處理。

同樣的,LinkedBlockingDequeLinkedBlockingQueue的基礎(chǔ)上,增加了雙向操作的屬性。繼續(xù)以puttake為例,LinkedBlockingDeque增加了putFirst/putLast、takeFirst/takeLast方法,分別用于在隊(duì)列頭、尾進(jìn)行添加和刪除。與LinkedBlockingQueue不同的是,LinkedBlockingDeque的入隊(duì)列和出隊(duì)列不再使用不同的Lock。

final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

其中,lock表示讀寫(xiě)的主鎖,notEmpty和notFull依然表示相應(yīng)的控制線程狀態(tài)條件量。以putFirsttakeFirst為例:

public void putFirst(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        while (!linkFirst(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

putFirst不支持插入null元素,首先新建一個(gè)Node對(duì)象,然后調(diào)用ReentrantLocklock方法獲取鎖,插入操作通過(guò)boolean linkFirst(Node<E> node)實(shí)現(xiàn),如果當(dāng)前隊(duì)列頭已滿(mǎn),那么該線程等待(linkFirst方法在寫(xiě)入元素成功后會(huì)釋放該鎖信號(hào)),最后,在finally塊中釋放鎖(ReentrantLock的使用)。

public E takeFirst() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        while ( (x = unlinkFirst()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

putFirst類(lèi)似,takeFirst首先獲取鎖,然后在try中解除尾元素對(duì)象的引用,如果unlinkFirst為空,表示隊(duì)列為空,沒(méi)有元素可刪,那么該線程等待。同樣,最后在finally塊中釋放鎖。

那么問(wèn)題來(lái)了,LinkedBlockingDeque為什么不使用LinkedBlockingQueue讀寫(xiě)鎖分離的方式呢?LinkedBlockingDequeLinkedBlockingQueue的使用場(chǎng)景有什么區(qū)別呢?

1.3 DelayQueue

DelayQueue主要用于實(shí)現(xiàn)延時(shí)任務(wù),比如:等待一段時(shí)間之后關(guān)閉連接,緩存對(duì)象過(guò)期刪除,任務(wù)超時(shí)處理等等,這些任務(wù)的共同特點(diǎn)是等待一段時(shí)間之后執(zhí)行(類(lèi)似于TimerTask)。DelayQueue的實(shí)現(xiàn)包括三個(gè)核心特征:

  • 延時(shí)任務(wù):DelayQueue的泛型類(lèi)需要繼承自Delayed接口,而Delayed接口繼承自Comparable<Delayed>,用于隊(duì)列中優(yōu)先排序的比較;
  • 優(yōu)先隊(duì)列:DelayQueue的實(shí)現(xiàn)采用了優(yōu)先隊(duì)列PriorityQueue,即延遲時(shí)間越短的任務(wù)越優(yōu)先(回憶下優(yōu)先隊(duì)列中二叉堆的實(shí)現(xiàn))。
  • 阻塞隊(duì)列:支持并發(fā)讀寫(xiě),采用ReentrantLock來(lái)實(shí)現(xiàn)讀寫(xiě)的鎖操作。

因此,DelayQueue = Delayed + PriorityQueue + BlockingQueue

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    private Thread leader = null;
    private final Condition available = lock.newCondition();
}

接下來(lái)看下DelayQueue的讀寫(xiě)操作如何實(shí)現(xiàn)延時(shí)任務(wù)的?

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

首先執(zhí)行加鎖操作,然后往優(yōu)先隊(duì)列中插入元素e,優(yōu)先隊(duì)列會(huì)調(diào)用泛型E的compareTo方法進(jìn)行比較(具體關(guān)于二叉堆的操作,這里不再贅述,請(qǐng)參考數(shù)據(jù)結(jié)構(gòu)部分相關(guān)分析),將延遲時(shí)間最短的任務(wù)添加到隊(duì)頭。最后檢查下元素是否為隊(duì)頭,如果是隊(duì)頭的話,設(shè)置leader為空,喚醒所有等待的隊(duì)列,釋放鎖。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}
  • 首先執(zhí)行加鎖操作,然后取出優(yōu)先隊(duì)列的隊(duì)頭,如果對(duì)頭為空,則該線程阻塞;
  • 獲得對(duì)頭元素的延遲時(shí)間,如果延遲時(shí)間小于等于0,說(shuō)明該元素已經(jīng)到了可以使用的時(shí)間,調(diào)用poll方法彈出該元素;
  • 在延遲時(shí)間大于0時(shí),首先釋放元素first的引用(避免內(nèi)存泄露),其次判斷如果leader線程不為空,則該線程阻塞(表示已有線程在等待)。否則,把當(dāng)前線程賦值給leader元素,然后阻塞delay的時(shí)間,即等待隊(duì)頭到達(dá)延遲時(shí)間,在finally塊中釋放leader元素的引用。循環(huán)后,取出對(duì)頭元素,退出for循環(huán)。
  • 最后,如果leader為空并且優(yōu)先級(jí)隊(duì)列不為空的情況下(判斷還有沒(méi)有其他后續(xù)節(jié)點(diǎn)),調(diào)用signal通知其他的線程,并執(zhí)行解鎖操作。

1.4 TransferQueue與LinkedTransferQueue

TransferQueue是一個(gè)繼承了BlockingQueue的接口,并且增加若干新的方法。LinkedTransferQueueTransferQueue接口的實(shí)現(xiàn)類(lèi),其定義為一個(gè)無(wú)界的隊(duì)列,具有先進(jìn)先出(FIFO)的特性。

TransferQueue接口主要包含以下方法:

public interface TransferQueue<E> extends BlockingQueue<E> {
    boolean tryTransfer(E e);
    void transfer(E e) throws InterruptedException;
    boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    boolean hasWaitingConsumer();
    int getWaitingConsumerCount();
}
  • transfer(E e):若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程,即立刻移交之;否則,會(huì)插入當(dāng)前元素e到隊(duì)列尾部,并且等待進(jìn)入阻塞狀態(tài),到有消費(fèi)者線程取走該元素。
  • tryTransfer(E e):若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程(使用take()或者poll()函數(shù)),使用該方法會(huì)即刻轉(zhuǎn)移/傳輸對(duì)象元素e;若不存在,則返回false,并且不進(jìn)入隊(duì)列。這是一個(gè)不阻塞的操作。
  • tryTransfer(E e, long timeout, TimeUnit unit):若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程,會(huì)立即傳輸給它;否則將插入元素e到隊(duì)列尾部,并且等待被消費(fèi)者線程獲取消費(fèi)掉;若在指定的時(shí)間內(nèi)元素e無(wú)法被消費(fèi)者線程獲取,則返回false,同時(shí)該元素被移除。
  • hasWaitingConsumer():判斷是否存在消費(fèi)者線程。
  • getWaitingConsumerCount():獲取所有等待獲取元素的消費(fèi)線程數(shù)量。

LinkedTransferQueue實(shí)現(xiàn)了上述方法,較之于LinkedBlockingQueue在隊(duì)列滿(mǎn)時(shí),入隊(duì)操作會(huì)被阻塞的特性,LinkedTransferQueue在隊(duì)列不滿(mǎn)時(shí)也可以阻塞,只要沒(méi)有消費(fèi)者使用元素。下面來(lái)看下LinkedTransferQueue的入隊(duì)和和出隊(duì)操作:transfertake方法。

public void transfer(E e) throws InterruptedException {
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}
public E take() throws InterruptedException {
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

LinkedTransferQueue入隊(duì)和和出隊(duì)都使用了一個(gè)關(guān)鍵方法:

private E xfer(E e, boolean haveData, int how, long nanos) {}

其中,E表示被操作的元素,haveDatatrue表示添加數(shù)據(jù),false表示移除數(shù)據(jù);how有四種取值:NOW, ASYNC, SYNC, 或者TIMED,分別表示執(zhí)行的時(shí)機(jī);nanos表示howTIMED時(shí)的時(shí)間限制。
xfer方法具體流程較為復(fù)雜,這里不再展開(kāi)。另外,LinkedTransferQueue采用了CAS非阻塞同步機(jī)制,后面會(huì)具體講到)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容