第5章-ProducerConsumer

[TOC]

5.1 模式簡(jiǎn)介

  1. Producer是 “生產(chǎn)者” 的意思,指的是生成數(shù)據(jù)的線(xiàn)程。Consumer則是 “消費(fèi)者” 的意思,指的是使用數(shù)據(jù)的線(xiàn)程。
  2. 生產(chǎn)者安全地將數(shù)據(jù)交給消費(fèi)者。雖然僅是這樣看似簡(jiǎn)單的操作,但當(dāng)生產(chǎn)者和消費(fèi)者以不同的線(xiàn)程運(yùn)行時(shí),兩者之間的處理速度差異便會(huì)引起問(wèn)題。例如,消費(fèi)者想要獲取數(shù)據(jù),可數(shù)據(jù)還沒(méi)生成,或者生產(chǎn)者想要交付數(shù)據(jù),而消費(fèi)者的狀態(tài)還無(wú)法接收數(shù)據(jù)等。
  3. Producer-Consumer模式在生產(chǎn)者和消費(fèi)者之間加入了一個(gè) “橋梁角色” 。該橋梁角色用于消除線(xiàn)程間處理速度的差異。
  4. 一般來(lái)說(shuō),在該模式中,生產(chǎn)者和消費(fèi)者都有多個(gè),當(dāng)然生產(chǎn)者和消費(fèi)者有時(shí)也會(huì)只有一個(gè)。當(dāng)兩者都只有一個(gè)時(shí),我們稱(chēng)之為Pipe模式。

5.3 Producer-Consumer模式中的角色

5.3.1 Data

  1. Data角色由Producer角色生成,供Consumer角色使用。

5.3.2 Producer

  1. Producer角色生成Data角色,并將其傳遞給Channel角色。

5.3.3 Consumer

  1. Consumer角色從Channel角色獲取Data角色并使用。

5.3.4 Channel

  1. Channel角色保管從Producer角色獲取的Data角色,還會(huì)響應(yīng)Consumer角色的請(qǐng)求,傳遞Data角色。為了確保安全性,Channel角色會(huì)對(duì)Producer角色和Consumer角色的訪(fǎng)問(wèn)執(zhí)行互斥處理。
  2. 當(dāng)Producer角色將Data角色傳遞給Channel角色時(shí),如果Channel角色的狀態(tài)不適合接收Data角色,那么Producer角色將一直等待,直至Channel角色的狀態(tài)變?yōu)榭梢越邮諡橹埂?/li>
  3. 當(dāng)Consumer角色從Channel角色獲取Data角色時(shí),如果Channel角色中沒(méi)有可以傳遞的Data角色,那么Consumer角色將一直等待,直至Channel角色的狀態(tài)變?yōu)榭梢詡鬟fData角色為止。
  4. 當(dāng)存在多個(gè)Producer角色和Consumer角色時(shí),為了避免各處理互相影響,Channel角色需要執(zhí)行互斥處理。這樣看來(lái),Channel角色位于Producer角色和Consumer角色之間,承擔(dān)用于傳遞Data角色的中轉(zhuǎn)站、通道的任務(wù)。

5.3.5 類(lèi)圖

類(lèi)圖

5.4 拓展思路的要點(diǎn)

5.4.1 守護(hù)安全性的Channel角色

  1. 在Producer-Consumer模式中,承擔(dān)安全守護(hù)責(zé)任的是Channel角色。Channel角色執(zhí)行線(xiàn)程間的互斥處理,確保Producer角色正確地將Data角色傳遞給Consumer角色。
  2. 在示例程序中,Table類(lèi)的put方法和take方法都使用了Guarded Suspension模式。但MakerThread類(lèi)和EaterThread類(lèi)并不依賴(lài)于Table類(lèi)的具體實(shí)現(xiàn)。也就是說(shuō),MakerThread不會(huì)顧慮其他線(xiàn)程如何,而是直接調(diào)用put方法,同樣地,EaterThread也是直接調(diào)用take方法。那些使用synchronized、wait和notifyAll等來(lái)控制多線(xiàn)程運(yùn)行的代碼,都隱藏在了Channel角色的Table類(lèi)中。

5.4.2 不可以直接傳遞嗎

  • Producer-Consumer模式為了從Producer角色向Consumer角色傳遞Data角色,在中間設(shè)置了一個(gè)Channel角色。那么Producer角色不可以直接調(diào)用Consumer角色的方法嗎?
  1. 直接調(diào)用方法
  • Consumer角色想要獲取Data角色,通常都是因?yàn)橄胧褂眠@些Data角色來(lái)執(zhí)行某些處理。如果Producer角色直接調(diào)用Consumer角色的方法,那么執(zhí)行處理的就不是Consumer角色的線(xiàn)程,而是Producer角色的線(xiàn)程了。
  • 這樣一來(lái),執(zhí)行處理花費(fèi)的時(shí)間就必須由Producer角色的線(xiàn)程來(lái)承擔(dān),準(zhǔn)備下一個(gè)數(shù)據(jù)的處理也會(huì)相應(yīng)發(fā)生延遲。這樣會(huì)使程序的響應(yīng)性變得很差。
  • 直接調(diào)用方法就好比糕點(diǎn)師做好蛋糕,直接交給客人,在客人吃完后再做下一個(gè)蛋糕一樣。
  1. 插入Channel角色
  • 我們?cè)賮?lái)思考一下插入Channel角色這種方法。Producer角色將Data角色傳遞給Channel角色之后,無(wú)需等待Consumer角色對(duì)Data角色進(jìn)行處理,可以立即開(kāi)始準(zhǔn)備下一個(gè)Data角色。也就是說(shuō),Producer角色可以持續(xù)不斷地創(chuàng)建Data角色。Producer角色不會(huì)受到Consumer角色的處理進(jìn)展?fàn)顩r的影響。
  • 當(dāng)然,雖然可以持續(xù)不斷地創(chuàng)建Data角色,但也只能是在Channel角色能夠儲(chǔ)存的范圍之內(nèi)。如果Channel角色中沒(méi)有剩余空間,那么就無(wú)法再添加Data角色了。

5.4.3 存在中間角色的意義

  1. 線(xiàn)程的協(xié)調(diào)運(yùn)行要考慮 “放在中間的東西” 。線(xiàn)程的互斥處理要考慮 “應(yīng)該保護(hù)的東西” 。
  2. 協(xié)調(diào)運(yùn)行和互斥處理其實(shí)是內(nèi)外統(tǒng)一的。為了讓線(xiàn)程協(xié)調(diào)運(yùn)行,必須執(zhí)行互斥處理,以防止共享的內(nèi)容被破壞。而線(xiàn)程的互斥處理是為了線(xiàn)程的協(xié)調(diào)運(yùn)行才執(zhí)行的。因此,協(xié)調(diào)運(yùn)行和互斥處理之間有著很深的關(guān)系。

5.6 理解InterruptedException異常

5.6.1 可能會(huì)花費(fèi)時(shí)間,但可以取消

  1. 如果方法后面加了throws InterruptedException,則表明該方法中(或者該方法進(jìn)一步調(diào)用的方法中)可能會(huì)拋出InterruptedException異常。
  2. 這包含下面兩層含義:
  • 是 “花費(fèi)時(shí)間” 的方法
  • 是 “可以取消” 的方法
  1. 用一句話(huà)來(lái)說(shuō)就是,加了throws InterruptedException的方法可能會(huì)花費(fèi)時(shí)間,但可以取消。

5.6.2 加了throws InterruptedException的方法

  1. 在Java的標(biāo)準(zhǔn)類(lèi)庫(kù)中,加了throws InterruptedException的典型方法有如下三個(gè):
  • java.lang.object類(lèi)的 wait 方法
  • java.lang.Thread類(lèi)的 sleep 方法
  • java.lang.Thread類(lèi)的 join 方法
  1. 花費(fèi)時(shí)間的方法
  • 線(xiàn)程執(zhí)行wait方法后,會(huì)進(jìn)入等待隊(duì)列,等待被notify/notifyAll。在等待期間,線(xiàn)程是不運(yùn)行的,但需要花費(fèi)時(shí)間來(lái)等待被notify/notifyAll。
  • 線(xiàn)程執(zhí)行sleep方法后,會(huì)暫停執(zhí)行(暫停多長(zhǎng)時(shí)間由參數(shù)指定)。這也是花費(fèi)時(shí)間的方法。
  • 線(xiàn)程執(zhí)行join方法后,會(huì)等待指定線(xiàn)程終止。該方法需要花費(fèi)時(shí)間,來(lái)等待指定線(xiàn)程終止。
    如上所述,上面這三個(gè)方法需要等待 “被notify/notifyAll、指定時(shí)間、指定線(xiàn)程終止”,確實(shí)是 “花費(fèi)時(shí)間” 的方法。
  1. 可以取消的方法
  • 花費(fèi)時(shí)間的處理會(huì)降低程序的響應(yīng)性,所以如果存在像下面這樣可以中途停止執(zhí)行(取消)的方法,就非常方便了。
  • 取消“wait方法等待notify/notifyAll”的處理
  • 取消“在sleep方法指定的時(shí)間內(nèi)停止執(zhí)行”的處理
  • 取消“join方法等待其他線(xiàn)程終止”的處理

5.6.3 sleep方法和interrupt方法

  1. 假設(shè)線(xiàn)程A因?yàn)閳?zhí)行 sleep 正處于暫停狀態(tài),想要取消,只能由其他線(xiàn)程來(lái)執(zhí)行該操作,假設(shè)為線(xiàn)程B;
  2. 線(xiàn)程B可以執(zhí)行 A.interrupt() 來(lái)中途停止線(xiàn)程A的暫停操作,變量A里保存著與線(xiàn)程A對(duì)應(yīng)的Thread實(shí)例。
  3. 這里使用的interrupt方法是Thread類(lèi)的實(shí)例方法。當(dāng)執(zhí)行interrupt時(shí),線(xiàn)程并不需要獲取 Thread實(shí)例的鎖。無(wú)論何時(shí),任何線(xiàn)程都可以調(diào)用其他線(xiàn)程的interrupt方法。
  4. interrupt 方法被調(diào)用后,正在sleep的線(xiàn)程會(huì)終止暫停狀態(tài),拋出InterruptedException異常。此處拋出異常的是線(xiàn)程A。

5.6.4 wait方法和interrupt方法

  1. 在線(xiàn)程A使用wait進(jìn)行等待時(shí),也可以使用 A.interrupt() 來(lái)中途停止線(xiàn)程A的等待操作。

  2. 但在wait的情況下,我們需要注意鎖的問(wèn)題。線(xiàn)程在進(jìn)入等待隊(duì)列時(shí),已經(jīng)釋放了鎖。當(dāng)正在wait的線(xiàn)程被調(diào)用interrupt方法時(shí)(即線(xiàn)程被取消執(zhí)行時(shí)),該線(xiàn)程會(huì)在重新獲取鎖之后,拋出InterruptedException異常。在獲取鎖之前,線(xiàn)程不會(huì)拋出InterruptedException異常

  3. 從讓正在wait的線(xiàn)程重新運(yùn)行這一點(diǎn)來(lái)說(shuō),notify方法和interrupt方法的作用有些類(lèi)似,但仍有以下不同:

    方法 方法來(lái)源 喚醒對(duì)象 是否需要獲取實(shí)例的鎖
    notify/notifyAll java.lang.Object 實(shí)例等待隊(duì)列中的線(xiàn)程
    interrupt java.lang.Thread 指定線(xiàn)程

5.6.5 join方法和interrupt方法

  1. 當(dāng)線(xiàn)程使用join方法等待其他線(xiàn)程終止時(shí),也可以使用interrupt方法進(jìn)行取消。由于調(diào)用join方法時(shí)無(wú)需獲取鎖,所以與使用sleep暫停運(yùn)行時(shí)一樣,線(xiàn)程的控制權(quán)也會(huì)立即跳到catch語(yǔ)句塊中。

5.6.6 interrupt 方法只是改變中斷狀態(tài)

  1. 有人也許會(huì)認(rèn)為“當(dāng)調(diào)用interrupt方法時(shí),調(diào)用對(duì)象的線(xiàn)程就會(huì)拋出InterruptedException異?!?,其實(shí)這是一種誤解。實(shí)際上,interrupt方法只是改變了線(xiàn)程的中斷狀態(tài)而已。
  2. 所謂中斷狀態(tài)(interrupted status),是一種用于表示線(xiàn)程是否被中斷的狀態(tài)。
  3. 假設(shè)當(dāng)線(xiàn)程Alice執(zhí)行了sleep、wait、join而停止運(yùn)行時(shí),線(xiàn)程Bobby調(diào)用了Alice的interrupt 方法。這時(shí),線(xiàn)程Alice的確會(huì)拋出InterruptedException異常。但這其實(shí)是因?yàn)?strong>sleep、wait、join方法內(nèi)部對(duì)線(xiàn)程的中斷狀態(tài)進(jìn)行了檢查,進(jìn)而拋出了InterruptedException異常。
  4. 假設(shè)線(xiàn)程Alice執(zhí)行了1+2之類(lèi)的計(jì)算或a=123之類(lèi)的賦值操作。這時(shí),即使Bobby 調(diào)用Alice的interrupt 方法,Alice也不會(huì)拋出InterruptedException異常,而是繼續(xù)執(zhí)行處理。不僅僅是計(jì)算和賦值,for語(yǔ)句、while語(yǔ)句、if語(yǔ)句及方法調(diào)用都不會(huì)檢查中斷狀態(tài)
  5. 如果沒(méi)有調(diào)用sleep、wait、join等方法,或者沒(méi)有編寫(xiě)檢查線(xiàn)程的中斷狀態(tài)并拋出InterruptedException 異常的代碼,那么InterruptedException異常就不會(huì)被拋出。

5.6.7 isInterrupt方法和Thread.interrupted方法

  1. isInterrupted是Thread類(lèi)的實(shí)例方法,用于檢查指定線(xiàn)程的中斷狀態(tài)。該方法不會(huì)改變中斷狀態(tài)。
  2. Thread.interrupted是Thread類(lèi)的靜態(tài)方法,用于檢查并清除當(dāng)前線(xiàn)程的中斷狀態(tài)。只有這個(gè)方法才可以清除中斷狀態(tài)。Thread.interrupted的操作對(duì)象是當(dāng)前線(xiàn)程(即線(xiàn)程本身),所以該方法并不能用于清除其他線(xiàn)程的中斷狀態(tài)。

5.7 java.util.concurrent包和Producer-Consumer模式

5.7.1 java.util.concurrent包中的隊(duì)列

類(lèi)圖
  1. BlockingQueue 接口——阻塞隊(duì)列

    • BlockingQueue接口表示的是在達(dá)到合適的狀態(tài)之前線(xiàn)程一直阻塞(wait)的隊(duì)列。
    • BlockingQueue是java.util.Queue接口的子接口,擁有offer方法和poll方法等。但實(shí)際上,實(shí)現(xiàn) “阻塞” 功能的方法是BlockingQueue接口固有的put方法和take方法。
    • 由于BlockingQueue是一個(gè)接口,所以在實(shí)際使用時(shí),需要使用BlockingQueue的實(shí)現(xiàn)類(lèi)。下面列舉的就是BlockingQueue的實(shí)現(xiàn)類(lèi)。
  2. ArrayBlockingQueue 類(lèi)——基于數(shù)組的BlockingQueue

    • ArrayBlockingQueue類(lèi)表示的是元素個(gè)數(shù)有最大限制的BlockingQueue。當(dāng)數(shù)組滿(mǎn)了但仍要put數(shù)據(jù)時(shí),或者數(shù)組為空但仍要take數(shù)據(jù)時(shí),線(xiàn)程就會(huì)阻塞。
  3. LinkedBlockingQueue 類(lèi)——基于鏈表的BlockingQueue

    • LinkedBlockingQueue 類(lèi)表示的是元素個(gè)數(shù)沒(méi)有最大限制的BlockingQueue。該類(lèi)基于鏈表,如果沒(méi)有特別指定,元素個(gè)數(shù)將沒(méi)有最大限制。只要還有內(nèi)存,就可以put數(shù)據(jù)。
  4. PriorityBlockingQueue 類(lèi)——帶有優(yōu)先級(jí)的BlockingQueue

    • PriorityBlockingQueue類(lèi)表示的是帶有優(yōu)先級(jí)的BlockingQueue。數(shù)據(jù)的 “優(yōu)先級(jí)”是依據(jù)Comparable接口的自然排序,或者構(gòu)造函數(shù)的Comparator接口決定的順序指定的。
  5. DelayQueue類(lèi)—在一定時(shí)間之后才可以 take 的BlockingQueue

    • DelayQueue類(lèi)表示的是用于儲(chǔ)存 java.util.concurrent.Delayed對(duì)象的隊(duì)列。當(dāng)從該隊(duì)列take時(shí),只有在各元素指定的時(shí)間到期后才可以take。另外,到期時(shí)間最長(zhǎng)的元素將先被take。
  6. SynchronousQueue 類(lèi)——直接傳遞的BlockingQueue

    • SynchronousQueue類(lèi)表示的是BlockingQueue,該BlockingQueue用于執(zhí)行由Producer角色到Consumer角色的 “直接傳遞”。如果Producer角色先put,在Consumer角色take之前,Producer角色的線(xiàn)程將一直阻塞。相反,如果Consumer角色先take,在Producer角色put之前,Consumer角色的線(xiàn)程將一直阻塞。
  7. ConcurrentLinkedQueue類(lèi)——元素個(gè)數(shù)沒(méi)有最大限制的線(xiàn)程安全隊(duì)

    • ConcurrentLinkedQueue類(lèi)并不是BlockingQueue的實(shí)現(xiàn)類(lèi),它表示的是元素個(gè)數(shù)沒(méi)有最大限制的線(xiàn)程安全隊(duì)列。在ConcurrentLinkedQueue中,內(nèi)部的數(shù)據(jù)結(jié)構(gòu)是分開(kāi)的,線(xiàn)程之間互不影響,所以也就無(wú)需執(zhí)行互斥處理。根據(jù)線(xiàn)程情況的不同,有時(shí)程序的性能也會(huì)有所提高。

5.7.2 使用java.util.concurrent.Exchanger類(lèi)交換緩沖區(qū)

Timethreads圖
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容