JAVA并發(fā)實(shí)戰(zhàn)-對(duì)象的組合+基礎(chǔ)構(gòu)建模塊

一、設(shè)計(jì)線程安全的類

在設(shè)計(jì)線程安全類的過(guò)程中,需要包含以下三個(gè)基本要素:

  • 找出構(gòu)成對(duì)象狀態(tài)的所有變量
  • 找出約束狀態(tài)變量的不變性條件
  • 建立對(duì)象狀態(tài)的并發(fā)訪問(wèn)管理策略

1.1 收集同步需求

要確保類的線程安全性,就需要確保它的不變性條件不會(huì)再并發(fā)訪問(wèn)的情況下被破壞,這就需要對(duì)其狀態(tài)進(jìn)行推斷。對(duì)象和變量都有一個(gè)狀態(tài)空間,即所有可能的取值。狀態(tài)空間越小,就越容易判斷線程的狀態(tài)。final類型的域使用的越多,就越能簡(jiǎn)化對(duì)象可能狀態(tài)的分析過(guò)程。(在極端的情況下,不可變對(duì)象只有唯一的狀態(tài))

如果不了解對(duì)象的不變性條件與后驗(yàn)條件,那么就不能確保線程安全性。要滿足在狀態(tài)變量的有效值或狀態(tài)轉(zhuǎn)換上的各種約束條件,就需要借助于原子性和封裝性。

二、實(shí)例封閉

如果某對(duì)象不是線程安全的,那么可以通過(guò)多種技術(shù)使其在多線程程序中安全的使用。你可以確保該對(duì)象只能由單個(gè)線程訪問(wèn)(線程封閉),或者通過(guò)一個(gè)鎖來(lái)保護(hù)該對(duì)象的所有訪問(wèn)。

封裝簡(jiǎn)化了線程安全類的實(shí)現(xiàn)過(guò)程,他提供了一種實(shí)例封閉機(jī)制(Instance Confinement),通常頁(yè)簡(jiǎn)稱為“封閉”。當(dāng)一個(gè)對(duì)象被封裝到另一個(gè)對(duì)象中時(shí),能夠訪問(wèn)被封裝對(duì)象的所有代碼路徑都是已知的。與對(duì)象可以由整個(gè)程序訪問(wèn)的情況相比,更易于對(duì)代碼進(jìn)行分析。通過(guò)將封閉機(jī)制與合適的加鎖策略結(jié)合起來(lái),可以確保線程安全的方式來(lái)使用非線程安全的對(duì)象。

將數(shù)據(jù)封裝在對(duì)象內(nèi)部,可以將數(shù)據(jù)的訪問(wèn)限制在對(duì)象的方法上,從而更容易確保線程在訪問(wèn)數(shù)據(jù)時(shí)總能持有正確的鎖。

被封閉對(duì)象一定不能超出它們既定的作用域。對(duì)象可以封閉在類的一個(gè)實(shí)例中,或者封閉在某個(gè)作用域內(nèi),再或者封閉在線程內(nèi)。當(dāng)然,對(duì)象本身不會(huì)逸出--出現(xiàn)逸出情況的原因通常是由于開發(fā)人員在發(fā)布對(duì)象時(shí)超出了對(duì)象既定的作用域。

三、同步容器類

同步容器類包括Vector和HashTable,二者是早期JDK的一部分,此外還包括在JDK 1.2中添加的一些功能相似的類,這些的同步封裝器類是由Collections.synchronizedXxx等工廠方法創(chuàng)建的。這些類實(shí)現(xiàn)線程安全的方式是:將他們的狀態(tài)封裝起來(lái),并對(duì)每個(gè)共有方法進(jìn)行同步,使得每次只有一個(gè)線程能訪問(wèn)容器的狀態(tài)。

3.1 同步容器類的問(wèn)題

同步容器類都是線程安全的,但在某些情況下可能需要額外的客戶端加鎖來(lái)保護(hù)符合操作。容器上常見的復(fù)合操作包括:迭代(反復(fù)訪問(wèn)元素,直到遍歷完容器中所有元素)、跳轉(zhuǎn)(根據(jù)指定順序找到當(dāng)前元素的下一個(gè)元素)yi'ji以及條件運(yùn)算。在同步容器類中,這些復(fù)合操作在沒有客戶端加鎖的情況下,仍然是線程安全的,但當(dāng)其他線程并發(fā)的修改容器時(shí),他們可能會(huì)表現(xiàn)出意料之外的行為。

四、并發(fā)容器

Java 5.0提供了多種并發(fā)容器來(lái)改進(jìn)同步容器的性能。同步容器將所有對(duì)容器狀態(tài)的訪問(wèn)都串行化,以實(shí)現(xiàn)他們的線程安全性。這種方法的代價(jià)是嚴(yán)重降低并發(fā)性,當(dāng)多個(gè)線程競(jìng)爭(zhēng)容器的鎖時(shí),吞吐量將嚴(yán)重降低。

另一方面,并發(fā)容器是針對(duì)多個(gè)線程并發(fā)訪問(wèn)設(shè)計(jì)的。在Java 5.0中增加了ConcurrentHashMap,用來(lái)替代同步且基于散列的Map,以及CopyOnWriteArrayList,用于在遍歷操作為主要操作的情況下代替同步的List。在新的ConcurrentMap接口中增加了對(duì)一些常見符合操作的支持,例如“若沒有則添加”、替換以及有條件刪除等。

通過(guò)并發(fā)容器來(lái)代替同步容器,可以極大地提供伸縮性并降低風(fēng)險(xiǎn)。

4.1 CocurrentHashMap

同步容器類在執(zhí)行每個(gè)操作期間都持有一個(gè)鎖。在一些操作中,例如HashMashMap.get或List.contains,可能包含大量的工作:當(dāng)遍歷散列桶或鏈表來(lái)查找某個(gè)特定的對(duì)象時(shí),必須在許多元素上調(diào)用equals。在基于散列的容器中,如果hashCode不能很均勻的分布散列值,那么容器中的元素就不會(huì)均勻的分布在整個(gè)容器中。某些情況下,某個(gè)糟糕的散列函數(shù)還會(huì)把一個(gè)散列表變成線性鏈表。當(dāng)遍歷很長(zhǎng)的鏈表并且在某些或者全部元素上調(diào)用equals方法時(shí),會(huì)花費(fèi)很長(zhǎng)時(shí)間,而其他線程在這段時(shí)間內(nèi)都不能訪問(wèn)容器。

ConcurrentHashMap使用一種粒度更細(xì)的加鎖機(jī)制來(lái)實(shí)現(xiàn)更大程度的共享,這種機(jī)制被稱為分段鎖(Lock Striping)。在這種機(jī)制中,任意數(shù)量的讀取線程可以并發(fā)的訪問(wèn)Map,執(zhí)行讀取操作的線程和執(zhí)行寫入操作的線程可以并發(fā)的訪問(wèn)Map,并且一定數(shù)量的寫入線程可以并發(fā)的修改Map。

4.2 CopyOnWriteArrayList

CopyOnWriteArrayList用于替代同步List,在某些情況下它提供了更好的并發(fā)性能,并且在迭代期間不需要對(duì)容器進(jìn)行加鎖或復(fù)制。

“寫入時(shí)賦值(Copy-On-Write)”容器的線程安全性在于,只要正確的發(fā)布一個(gè)事實(shí)不可變的對(duì)象,那么在訪問(wèn)該對(duì)象時(shí)就不再需要進(jìn)一步的同步。每次修改時(shí),都會(huì)創(chuàng)建并重新發(fā)布一個(gè)新的容器副本,從而實(shí)現(xiàn)可變性。“寫入時(shí)復(fù)制”容器的迭代器保留一個(gè)指向底層基礎(chǔ)數(shù)組的引用,這個(gè)數(shù)組當(dāng)前位于迭代器的起始位置,由于它不會(huì)被修改,因此在對(duì)其進(jìn)行同步時(shí)只需確保數(shù)組內(nèi)容的可見性。因此,多個(gè)線程可以同時(shí)對(duì)這個(gè)容器進(jìn)行迭代,而不會(huì)彼此干擾或者與修改容器的線程相互干擾?!?/p>

4.3 阻塞隊(duì)列和生產(chǎn)者-消費(fèi)者模式

阻塞隊(duì)列提供了可阻塞的put和take方法,以及支持定時(shí)的offer和poll方法。如果隊(duì)列已經(jīng)滿了,那么put方法將阻塞直到有空間可用;如果隊(duì)列為空,那么take方法將會(huì)阻塞直到有元素可用。隊(duì)列可以是有界的也可以是無(wú)界的,無(wú)界隊(duì)列永遠(yuǎn)都不會(huì)充滿,因此無(wú)界隊(duì)列的put方法也永遠(yuǎn)不會(huì)阻塞。

在基于阻塞隊(duì)列構(gòu)建的生產(chǎn)者-消費(fèi)者設(shè)計(jì)中,當(dāng)數(shù)據(jù)生成時(shí),生產(chǎn)者把數(shù)據(jù)放入隊(duì)列,而當(dāng)消費(fèi)者準(zhǔn)備處理數(shù)據(jù)時(shí),將從隊(duì)列中獲取數(shù)據(jù)。生產(chǎn)者不需要知道消費(fèi)者的標(biāo)識(shí)或數(shù)量,或者他們是否是唯一生產(chǎn)者,只需將數(shù)據(jù)放入隊(duì)列即可。同樣,消費(fèi)者也不需要知道生產(chǎn)者是誰(shuí),或者工作來(lái)自何處。BlockingQueue簡(jiǎn)化了生產(chǎn)者-消費(fèi)者設(shè)計(jì)的實(shí)現(xiàn)過(guò)程,它支持任意數(shù)量的生產(chǎn)者和消費(fèi)者。一種常見的生產(chǎn)者-消費(fèi)者設(shè)計(jì)模式就是線程池與工作隊(duì)列的組合,在Executor任務(wù)執(zhí)行框架中就體現(xiàn)了這種模式。

在構(gòu)建高可靠的應(yīng)用程序時(shí),有界隊(duì)列是一種強(qiáng)大的資源管理工具:他們能抑制并防止產(chǎn)生過(guò)多的工作項(xiàng),使應(yīng)用程序在負(fù)荷過(guò)載的情況下變得更加健壯。

public class BlockingQueueDemo {

    public static class Basket {
        BlockingQueue<String> basket = new ArrayBlockingQueue(3);

        public void product() throws InterruptedException {
            basket.put("An apple");
        }

        public String consume() throws InterruptedException {
            String apple = basket.take();
            return apple;
        }

        public int getAppleNum() {
            return basket.size();
        }
    }

    public static void testBasket() {
        final Basket basket = new Basket();

        class Producer implements Runnable {
            public void run() {
                try {
                    while (true) {
                        System.out.println("準(zhǔn)備生產(chǎn)蘋果:" + System.currentTimeMillis());
                        basket.product();
                        System.out.println("生產(chǎn)完畢:" + System.currentTimeMillis());
                        System.out.println("生產(chǎn)完的蘋果有" + basket.getAppleNum() + "個(gè)");
                        Thread.sleep(300);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        class Consumer implements Runnable {
            public void run() {
                try {
                    while (true) {
                        System.out.println("消費(fèi)者準(zhǔn)備消費(fèi)蘋果:" + System.currentTimeMillis());
                        basket.consume();
                        System.out.println("消費(fèi)者消費(fèi)蘋果完畢:" + System.currentTimeMillis());
                        System.out.println("消費(fèi)完后有蘋果:" + basket.getAppleNum() + "個(gè)");
                        Thread.sleep(1000);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        ExecutorService service = Executors.newCachedThreadPool();
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        service.submit(producer);
        service.submit(consumer);
        // 程序運(yùn)行10s后,所有任務(wù)停止
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
        }
        service.shutdownNow();
    }

    public static void main(String[] args) {
        BlockingQueueDemo.testBasket();
    }
}

4.4 阻塞方法和中斷方法

線程可能會(huì)阻塞或暫停執(zhí)行,原因有多種:等待I/O操作結(jié)束,等待獲取一個(gè)鎖,等待從Thread.sleep方法中醒來(lái),或是等待另一個(gè)線程的計(jì)算結(jié)果。當(dāng)線程阻塞時(shí),它通常被掛起,并處于某種阻塞狀態(tài)(BLOCKED、WAITING或TIMED_WAITING)。阻塞操作與執(zhí)行時(shí)間很長(zhǎng)的普通操作的差別在于,被阻塞的線程必須等待某個(gè)不受他控制的事件發(fā)生后才能繼續(xù)執(zhí)行,例如等待I/O操作完成,等待某個(gè)鎖變成可用,或者等待外部計(jì)算的結(jié)束。當(dāng)某個(gè)外部事件發(fā)生時(shí),線程被置hui回RUNNABLE狀態(tài),并可以再次被調(diào)度執(zhí)行。

Thread提供了interrupt方法,用于中斷線程或者查詢線程是否已經(jīng)被中斷。每個(gè)線程都有一個(gè)布爾類型的屬性,表示線程的中斷狀態(tài),當(dāng)中斷線程時(shí)將設(shè)置這個(gè)狀態(tài)。

中斷時(shí)一種協(xié)作機(jī)制。一個(gè)線程不能強(qiáng)制其他線程停止正在執(zhí)行的操作而去執(zhí)行其他的操作。當(dāng)在代碼中調(diào)用了一個(gè)將拋出InterruptedException異常方法時(shí),你自己的方法也就變成了一個(gè)阻塞方法,并且必須要處理對(duì)中斷的響應(yīng)。對(duì)于庫(kù)代碼來(lái)說(shuō),有兩種基本選擇:

傳遞InterruptedException。避開這個(gè)異常通常是最明智的策略-只需把InterruptedException傳遞給方法的調(diào)用者。傳遞的方法包括,根本不捕獲該異常,或者捕獲該異常,然后在執(zhí)行某種簡(jiǎn)單的清理工作后再次拋出這個(gè)異常。

恢復(fù)中斷。有時(shí)候不能拋出InterruptedException,例如當(dāng)代碼是Runnable的一部分時(shí)。在這些狀態(tài)下,必須捕獲InterruptedException,并通過(guò)調(diào)用當(dāng)前線程上的interrupt方法恢復(fù)中斷狀態(tài),這樣在調(diào)用棧中更高層的代碼將看到引發(fā)了一個(gè)中斷。

public class TaskRunnable implements Runnable {
    BlockingQueue<Task> queue;
    ...
    public void run() {
        try {
            processTask(queue.take());
        } catch (InterruptedException e) {
            Thread.currentThread.interrupt();
        }
    }
}

五、同步工具類

同步工具類可以是任何一個(gè)對(duì)象,只要它根據(jù)其自身的狀態(tài)來(lái)協(xié)調(diào)線程的控制流。阻塞隊(duì)列可以作為同步工具類,其他類型的同步工具類還包括信號(hào)量(Semaphore)、柵欄(Barrier)以及閉鎖(Latch)。所有的同步工具類都包含一些特定的結(jié)構(gòu)化屬性:他們封裝了一些狀態(tài),這些狀態(tài)將決定執(zhí)行同步工具類的線程是繼續(xù)執(zhí)行還是等待,此外還提供了一些方法對(duì)狀態(tài)進(jìn)行操作,以及另一些方法用于高效的等待同步工具類進(jìn)入到預(yù)期狀態(tài)。

5.1 閉鎖

閉鎖是一種同步工具類,可以延遲線程的進(jìn)度直到到達(dá)終止?fàn)顟B(tài)。閉鎖的作用相當(dāng)于一扇門:在閉鎖到達(dá)結(jié)束狀態(tài)之前,這扇門是一直關(guān)閉的,并且沒有任何線程能通過(guò),當(dāng)?shù)竭_(dá)結(jié)束狀態(tài)時(shí),這扇門會(huì)打開并允許所有的線程通過(guò)。當(dāng)閉鎖到達(dá)結(jié)束狀態(tài)后,將不會(huì)再改變狀態(tài),因此這扇門將永遠(yuǎn)保持打開狀態(tài)。

5.2 FutureTask

FutureTask表示的計(jì)算是通過(guò)Callable來(lái)實(shí)現(xiàn)的,相當(dāng)于一種可以生成結(jié)果的Runnable,并可以處于以下三種狀態(tài):等待運(yùn)行(Waiting to run),正在運(yùn)行(Running)和運(yùn)行完成(Completed)。

Future.get的行為取決于任務(wù)的狀態(tài)。如果任務(wù)已經(jīng)完成,那么get會(huì)立即返回結(jié)果,否則get將阻塞直到任務(wù)進(jìn)入完成狀態(tài),然后返回結(jié)果或者拋出異常。FutureTask將計(jì)算結(jié)果從執(zhí)行計(jì)算的線程傳遞到獲取這個(gè)結(jié)果的線程,而FutureTask的規(guī)范確保了這種傳遞能實(shí)現(xiàn)結(jié)果的安全發(fā)布。

5.3 信號(hào)量

計(jì)數(shù)信號(hào)量(Counting Semaphore)用來(lái)控制同時(shí)訪問(wèn)某個(gè)特定資源的操作數(shù)量,或者同時(shí)執(zhí)行某個(gè)指定操作的數(shù)量。計(jì)數(shù)信號(hào)量還可以用來(lái)實(shí)現(xiàn)某種資源池,或者對(duì)容器施加邊界。

Semaphore中管理者一組虛擬的許可(permit),許可的初始數(shù)量可通過(guò)構(gòu)造函數(shù)來(lái)指定。在執(zhí)行操作時(shí)可以首先獲得許可(只要還有剩余的許可),并在使用以后釋放許可。如果沒有許可,那么acquire將阻塞直到有許可(或者直到被中斷或者操作超時(shí))。release方法將返回一個(gè)許可給信號(hào)量。計(jì)算信號(hào)量的一種簡(jiǎn)化形式是二zhi二值信號(hào)量,即初始值為1的Semaphore。二值信號(hào)量可以用作互斥體(mutex),并具備不可重入的加鎖語(yǔ)義:誰(shuí)擁有這個(gè)唯一的許可,誰(shuí)就擁有了互斥鎖。

Semaphore可以用于實(shí)現(xiàn)資源池,例如數(shù)據(jù)庫(kù)連接池。我們可以構(gòu)造一個(gè)固定長(zhǎng)度的資源池,當(dāng)池為空時(shí),請(qǐng)求資源將會(huì)失敗,但你真正希望看到的行為是阻塞而不是失敗,并且當(dāng)池非空時(shí)解除阻塞。如果將Semaphore的計(jì)數(shù)值初始化為池的大小,并在從池中獲取一個(gè)資源之前首先調(diào)用acquire方法獲取一個(gè)許可,在將資源返回給池之后調(diào)用release釋放許可,那么acquire將一直阻塞直到資源池不為空。

同樣,你可以使用Semaphore將任何一種容器變成有界阻塞容器。信號(hào)量的計(jì)數(shù)值會(huì)初始化為容器容量的最大值。add操作在向底層容器添加一個(gè)元素之前,首先要獲取一個(gè)許可。如果add操作沒有添加任何元素,那么會(huì)立刻釋放許可。同樣,remove操作釋放一個(gè)許可,使更多的元素能夠添加到容器中。

5.4 柵欄

柵欄(Barrier)類似于閉鎖,它能阻塞一組線程直到某個(gè)事件發(fā)生。柵欄與閉鎖的關(guān)鍵區(qū)別在于,所有線程必須同時(shí)到達(dá)柵欄位置,才能繼續(xù)執(zhí)行。閉鎖用于等待事件,而柵欄用于等待其他線程。

CyclicBarrier可以是一定數(shù)量的參與方反復(fù)的在柵欄位置匯集,它在并行迭代算法中非常有用:這種算法將一個(gè)問(wèn)題拆分成一系列相互獨(dú)立的子問(wèn)題。當(dāng)線程到達(dá)柵欄位置時(shí)將調(diào)研await方法,這個(gè)方法將阻塞直到所有線程都到達(dá)柵欄位置。如果所有線程都到達(dá)了柵欄位置,那么柵欄將打開,此時(shí)所有線程都被釋放,而柵欄將被重置以便下次使用。如果對(duì)await的調(diào)用超時(shí),或者await阻塞的線程被中斷,那么柵欄就被認(rèn)為是打破了,所有阻塞的await調(diào)用都將終止并拋出BrokenBarrierException。如果成功通過(guò)柵欄,那么await將為每個(gè)線程返回一個(gè)唯一的到達(dá)索引號(hào),我們可以利用這些索引來(lái)“選舉”產(chǎn)生一個(gè)領(lǐng)導(dǎo)線程,并在下一次迭代中由該領(lǐng)導(dǎo)線程執(zhí)行一些特殊的工作。CyclicBarrier還可以使你將一個(gè)柵欄操作傳遞給構(gòu)造函數(shù),這是一個(gè)Runnable,當(dāng)城管通過(guò)柵欄時(shí)會(huì)(在一個(gè)子任務(wù)線程中)執(zhí)行它,但在阻塞線程被釋放之前是不能執(zhí)行的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一.線程安全性 線程安全是建立在對(duì)于對(duì)象狀態(tài)訪問(wèn)操作進(jìn)行管理,特別是對(duì)共享的與可變的狀態(tài)的訪問(wèn) 解釋下上面的話: ...
    黃大大吃不胖閱讀 959評(píng)論 0 3
  • 從三月份找實(shí)習(xí)到現(xiàn)在,面了一些公司,掛了不少,但最終還是拿到小米、百度、阿里、京東、新浪、CVTE、樂(lè)視家的研發(fā)崗...
    時(shí)芥藍(lán)閱讀 42,798評(píng)論 11 349
  • 5.1 同步容器類 同步容器類包括Vector和HashTable以及Collection.synchronize...
    大海孤了島閱讀 418評(píng)論 1 0
  • 并發(fā)與多線程是每個(gè)人程序員都頭疼的內(nèi)容,幸好Java庫(kù)所提供了豐富并發(fā)基礎(chǔ)模塊,這些多線程安全的模塊作為并發(fā)工具類...
    登高且賦閱讀 1,310評(píng)論 0 8
  • 小學(xué),初中,高中,我都乖乖的上完了。沒有初戀等著你去發(fā)現(xiàn)。你說(shuō),要好好學(xué)習(xí),樓下那誰(shuí)誰(shuí)誰(shuí)在學(xué)校談了戀愛,成績(jī)一落...
    玖汣閱讀 378評(píng)論 2 1

友情鏈接更多精彩內(nèi)容