支持內(nèi)部晉升的無鎖并發(fā)優(yōu)先級線程池

支持內(nèi)部晉升的無鎖并發(fā)優(yōu)先級線程池

[TOC]

引言

在技術群討論到一個有意思的業(yè)務需求,可以描述為:

有一個內(nèi)部按照優(yōu)先級進行任務排序的線程池。線程池會優(yōu)先執(zhí)行高優(yōu)先級的任務。隨著時間的流逝,線程池內(nèi)部低優(yōu)先級的任務的優(yōu)先級會逐漸晉升變?yōu)楦邇?yōu)先級,以避免被不斷新增的高優(yōu)先級任務阻塞導致餓死。

考慮到 JDK 已經(jīng)為開發(fā)者提供了自定義線程池ThreadPoolExecutor以及優(yōu)先級隊列PriorityBlockingQueue,兩者相結合并且定期調(diào)整隊列中低優(yōu)先級任務的優(yōu)先級再進行resort將低優(yōu)先級的任務調(diào)整到隊列的前頭,也可以一定程度上避免被餓死。

這種方案的問題在于resort的消耗比較高,并且還需要重新計算每一個任務的優(yōu)先級。為此,引出我們下面的設計,希望使用無鎖并發(fā)的數(shù)據(jù)結構存儲任務,并且任務支持自動的優(yōu)先級晉升,保證低優(yōu)先級的任務最終能夠執(zhí)行而不會被不斷增加的高優(yōu)先級任務餓死。

歡迎加入技術交流群186233599討論交流,也歡迎關注筆者公眾號:風火說。

推導過程

如何實現(xiàn)優(yōu)先級晉升

聲明一個數(shù)組,按照循環(huán)隊列的方式使用。每一個數(shù)組槽位上都掛載一個任務列表。有一個當前指針指向數(shù)組中的某一個槽位,該槽位即為當前最高優(yōu)先級任務插入的槽位。指針數(shù)字遞增方向優(yōu)先級依次降低。指針以某種方式沿遞增方向移動,因為指針指向的槽位代表最高優(yōu)先級,因此指針的移動實際上意味著所有槽位的優(yōu)先級都晉升了。

那么這里的優(yōu)先級只能是離散化的整型數(shù)字,并且優(yōu)先級的范圍為 0 到 數(shù)組長度減 1 。最高優(yōu)先級為0。

用圖形化的方式表達就是如下的情況

image

圖中優(yōu)先級的范圍是[0,6],current指針指向的槽位即為最高優(yōu)先級,current左側槽位為最低優(yōu)先級,current右側槽位為次高優(yōu)先級。每一個槽位上都掛載一個隊列,隊列中的任務的優(yōu)先級都相同(后續(xù)算法中可以看到會有不同的優(yōu)先級混合)。

每次取任務時總是從current指針指向的槽位的隊列讀取任務。當一定時間流逝后,current指針沿著右側移動一位,此時意味著所有槽位的優(yōu)先級都被晉升了,除了原本的current指向的槽位,它變?yōu)榱俗畹蛢?yōu)先級槽位。

由于current指針總是在移動,因此最終會移動到之前低優(yōu)先級的槽位,此時該槽位下的任務就成了最高優(yōu)先級任務,被讀取執(zhí)行。這樣就避免了在運行過程不斷有高優(yōu)先級任務被加入導致原本的低優(yōu)先級餓死的情況發(fā)生。

數(shù)據(jù)結構設計

根據(jù)上面的優(yōu)先級晉升思路,顯然應該有一個數(shù)組,其不同的槽位代表著不同的優(yōu)先級。每一個槽位上掛載一個 MPMC 類型的隊列,用于該優(yōu)先級下任務的添加和讀取。

使用一個當前指針,該指針指向的槽位為最高優(yōu)先級槽位。

一個指針產(chǎn)生的問題

如果只有一個指針,意味著讀取任務時,從該指針指向的槽位讀取,因此此時指針指向的槽位是最高優(yōu)先級。而插入任務的時候,需要根據(jù)當前指針進行計算。這種模式在優(yōu)先級晉升時存在并發(fā)問題。

當指針從槽位1指向更新到槽位2。此時槽位1可能還存在部分剩余的任務,這部分任務的實際優(yōu)先級應該是高于槽位2當中的。而如果在這個時候插入最低優(yōu)先級的任務,可能就會插入到槽位1中。那么槽位1的任務隊列實際就混合了最高優(yōu)先級和最低優(yōu)先級的任務,無法區(qū)分。

為了解決不同優(yōu)先級任務在同一個隊列中混合的問題,我們可以在指針移動時,將之前槽位的剩余移動到當前槽位的隊列頭。這實際上就意味著要求隊列是出于雙端隊列模式。但是因為指針移動和任務移動無法原子化進行,還是會造成槽位1的隊列中最高優(yōu)先級任務和最低優(yōu)先級任務混在一起的情況。

從實現(xiàn)效果而言,我們需要的是在指針移動的時候,保證槽位1中剩余的原本高優(yōu)先級的任務執(zhí)行完畢后才能去執(zhí)行槽位2這個“原本的次高優(yōu)先級,現(xiàn)在的最高優(yōu)先級“的任務。從效果來看,并不需要一定移動任務,可以通過一種手段,保證槽位1中原本高優(yōu)先級任務執(zhí)行完畢后再去執(zhí)行槽位2的任務即可。

基于這種考量,我們將一個指針拆分為兩個:任務插入指針和任務讀取指針。

任務插入指針和任務讀取指針

基于并發(fā)讀寫的考慮,兩個指針都是AtomicInteger類型。兩個指針的作用分別為:

  • 任務插入指針:該指針指向的槽位為當前最高優(yōu)先級槽位(后續(xù)會引入輪次這個概念,因此這里對當前加粗)。
  • 任務讀取指針:從結構體中獲取任務時使用該指針指向的槽位上獲取任務隊列進行任務讀取。

任務插入指針和任務讀取指針分離的好處在于,任務插入指針的移動意味著不同槽位優(yōu)先級的實際晉升。而讀取可以依照讀取指針指向的槽位上的隊列讀取任務,直到對應優(yōu)先級的任務讀取完畢后再移動讀取指針到下一個槽位。這樣一來,保證了按照入列的順序被公平的處理,也保證了同一個時間單位高優(yōu)先級的任務優(yōu)于低優(yōu)先級任務被處理,也避免了單一指針移動需要的任務拷貝帶來的不同優(yōu)先級任務污染問題。

任務插入指針如何移動

插入指針可以按照兩種策略移動:

  • 自然時間流逝移動,一定時間后移動。
  • 以讀取次數(shù)為單位,一定次數(shù)后移動。

如果選擇策略一,需要后臺配置一個線程,按照固定時間移動插入指針;如果選擇策略二,需要一個全局的AtomicInteger對象,用于次數(shù)判定。

如果選擇方案一,可能會存在一種場景,往線程池中投入了大量的同一個優(yōu)先級的任務,使得某個槽位上的隊列長度很長。如果任務處理相對緩存,則任務插入指針可能會被移動多次。這種移動會使得槽位上隊列有了很多不同優(yōu)先級的任務。而讀取任務時按照優(yōu)先級逐步去處理,這使得產(chǎn)生了這么多不同的優(yōu)先級實際上意義是不大的。

因此采用策略二會更加合適一些。

由于讀取任務時是多線程的,因此策略二實現(xiàn)上需要注意的點包括:

  • AtomicInteger#incrementAndGet實現(xiàn)任務讀取次數(shù)累加。如果返回的數(shù)字是閾值的倍數(shù),則意味著可以移動任務插入指針。
  • 使用AtomicInteger#incrementAndGet來移動插入指針。

在這里對插入指針移動的并發(fā)考量在于,由于讀取線程對讀取計數(shù)使用AtomicInteger#incrementAndGet方式累加是必然成功,而返回數(shù)值是晉升閾值的倍數(shù)時必然需要實現(xiàn)插入指針的遞增。因為遞增的必然性,因此同樣使用AtomicInteger#incrementAndGet方式來實現(xiàn)。

任務插入指針移動到同一位置導致的優(yōu)先級任務混合問題

假定系統(tǒng)初始狀態(tài),插入和讀取指針都指向了槽位1,在槽位1上插入了大量的任務。隨著任務的讀取,插入指針移動到了槽位2,此時該槽位上插入了一些任務。隨著任務的讀取,插入指針繼續(xù)移動,移動過數(shù)組的長度后,再次指向了槽位2。假定此時讀取指針仍然在槽位1,而如果這個時候插入插入任務。那么實際上槽位2隊列中任務應該分為兩種:前半部分是上一個輪次插入的任務,后半部分是當前剛插入的任務。

如果讀取指針移動到槽位2,應該將前半部分任務執(zhí)行完畢后就去執(zhí)行槽位3上的任務,而不是將所有的任務都執(zhí)行完。因此槽位3上的任務實際優(yōu)先級應該高于槽位2隊列中后半部分的任務。

基于上述情況,問題可以轉化為依靠讀取指針在讀取任務時,如何識別當前隊列中不是本輪次要處理的任務進而移動讀取指針?

考慮到任務插入指針和任務讀取指針本身是有值的,這個值單調(diào)遞增,實際上可以看成是一種“順序”概念的表達。因此任務的準備添加時,可以將插入指針的值加上任務的優(yōu)先級,聲明為任務的插入優(yōu)先級。讀取指針在讀取任務時,只有當前任務的插入優(yōu)先級等于讀取指針的值,意味著該任務時本輪次讀取指針應該要處理的任務。如果讀取的任務的插入優(yōu)先級與讀取指針不等時,意味著當前隊列不能再讀取任務,應該移動讀取指針。

通過任務本身的插入優(yōu)先級避免了不同輪次的任務在一個隊列中被混合導致的優(yōu)先級混亂。

任務讀取指針如何移動

上個章節(jié)提出任務的插入優(yōu)先級,解決了不同輪次的任務在同一個隊列可能會混合的問題。這個問題的解決引出了讀取指針的移動策略:在讀取到的任務的插入優(yōu)先級與讀取指針的值不等時意味著需要移動。

但是這里又產(chǎn)生了新的問題:并發(fā)移動讀取指針的問題。在讀取并發(fā)的情況下,會遇到一個問題:讀取出來的任務的優(yōu)先級不符合指針,此時要重新放回隊列,但是重新放入,就可能和任務的插入混合,造成數(shù)據(jù)混亂。

有幾種可能的解決方式:

  • 任務的讀取采用Sync關鍵字修飾,如果讀取任務不符合,則放回,并且移動指針。由于沒有讀取并發(fā),但仍然可能因為讀取的放回和新任務的添加造成數(shù)據(jù)混亂。
  • 采用分段機制,每一個分段是一個隊列,分段和分段構成一個隊列。一個分段內(nèi)的優(yōu)先級是固定的,因此當分段耗盡時,就是切換讀取指針的時候。

策略一并不能徹底解決問題,在這里我們采用策略二的方案。

策略二的引入實際上改變了上面的一個數(shù)據(jù)結構,也即是數(shù)組存儲的元素不再是一個任務隊列,而是一個分段隊列。而每一個分段內(nèi)部又存儲了任務隊列,并且分段的隊列的任務的插入優(yōu)先級均是相同的。這意味著分段在創(chuàng)建的時候就具備了插入優(yōu)先級這個值。分段和分段的插入優(yōu)先級必然不同,這個結構就天然的支持了輪次的概念。

分段結構的引入導致了數(shù)據(jù)結構的變化,這實際上會改變?nèi)蝿詹迦牒腿蝿兆x取的流程。下文會再來細說具體的實現(xiàn)。分析到這里,讀取指針的移動時機就很明白了,在分段內(nèi)數(shù)據(jù)耗盡,就意味著某個具體插入優(yōu)先級的任務都被讀取完畢了。

當然,考慮到讀寫并發(fā)的原因,讀取線程發(fā)現(xiàn)分段內(nèi)數(shù)據(jù)耗盡并不意味著該插入優(yōu)先級的任務全被讀取了,后文會針對并發(fā)場景在處理流程上解決。

插入和讀取并發(fā)

插入和讀取可能在同一個槽位同一個分段上并發(fā)。分段的隊列本身是支持MPMC的,這并沒有問題。

可能會出現(xiàn)一種并發(fā)異常就是插入線程讀取了插入指針的值,并且準備插入數(shù)據(jù),但是因為線程調(diào)度的原因,失去了CPU資源,尚未完成數(shù)據(jù)插入。此時讀取線程將槽位內(nèi)的任務讀取完畢后認為沒有數(shù)據(jù),則移動了讀取指針到下一個槽位。在讀取指針移動后,插入線程才完成數(shù)據(jù)的插入。這樣導致本來應該是高優(yōu)先級的任務變成最低優(yōu)先級槽位上的任務。而當下一輪次讀取指針再次指向該槽位時,讀取指針獲取的到任務的任務優(yōu)先級又會和讀取指針本身的數(shù)值沖突。

針對并發(fā)的異常場景,有一種常見的解決思路就是二次檢查。也就是讀取線程在移動任務讀取指針后,再次檢查下當前分段內(nèi)是否出現(xiàn)了新的任務,如果有,則協(xié)助遷移到下一個槽位上;寫入線程在放入任務后,檢查是否讀取指針移動過,如果有,則協(xié)助遷移到下一個槽位上。

然而,讀取線程檢查分段內(nèi)的隊列是否剩余,寫入線程檢查讀取指針是否移動,這些狀態(tài)都是在動態(tài)變化的,仍然會產(chǎn)生一些其他問題。雙重檢查一般會引入一個終止狀態(tài)來來減少可能的變化場景。在這里,我們?yōu)榉侄我霠顟B(tài):使用中和終止。一個分段初始化時是使用中狀態(tài),當讀取線程認為該分段內(nèi)的任務都被消耗后,則應該更新為終止狀態(tài)。一旦分段進入終止狀態(tài),則被拋棄,不應該再有任務數(shù)據(jù)添加到該分段中。

通過分段狀態(tài),我們可以將任務區(qū)分為終止前添加到分段和終止后添加到分段兩類。前者需要被正常讀取,后者則需要遷移到其它合適的分段中再被處理。

到這里為止,我們針對數(shù)據(jù)結構和其元素屬性的變化就完成了。

將數(shù)組通過循環(huán)隊列的方式來表達不同的優(yōu)先級。通過任務寫指針的移動來實現(xiàn)內(nèi)部任務優(yōu)先級的晉升。通過讀指針來實現(xiàn)任務嚴格按照優(yōu)先級順序被處理,且避免低優(yōu)先級任務被高優(yōu)先級任務餓死。數(shù)組的元素指向一個該槽位上插入優(yōu)先級最低的分段。一同散列到同一個槽位上的分段按照插入優(yōu)先級的順序形成隊列。

代碼實現(xiàn)

整個代碼當中,最為復雜的就是任務的插入和讀取,下面分別來設計流程。

任務插入

上面推導過程分析了插入和讀取并發(fā)可能導致的沖突場景。這里我們細化其解決流程。對于插入線程而言,要處理的情況包括有:

  • 元素對應槽位上沒有分段。
  • 元素對應槽位上的分段的插入優(yōu)先級和插入指針的值不相等。
  • 元素對應槽位上分段列表中插入優(yōu)先級與插入指針相符的分段處于終止狀態(tài)
  • 元素對應槽位上的分段插入優(yōu)先級與插入指針相等,且處于使用狀態(tài)。

可以看到,只有第四種情況任務可以在當前分段插入成功,且插入完畢后還需要再次檢查分段的狀態(tài)。基于這些考量,我們將插入流程設計為

image

可以看到,這個流程中沒有處理槽位上沒有分段的情況,這個在下一個章節(jié)我們會分析。

任務的讀取

有了分段的存在,讀取指針的移動判定更加復雜,讀取線程可能碰到的場景有:

  • 讀取指針散列的槽位上沒有分段。
  • 讀取指針散列的槽位上有分段且狀態(tài)為使用,分段內(nèi)沒有任務。
  • 讀取指針散列的槽位上有分段且狀態(tài)為使用,分段內(nèi)有任務。
  • 讀取指針散列的槽位上有分段且狀態(tài)為關閉,分段內(nèi)沒有任務。
  • 讀取指針散列的槽位上有分段且狀態(tài)為關閉,分段內(nèi)有任務。

只有第三種情況可以讀取任務并且進行處理。有了輪次這個概念,讀取指針永遠只會讀取槽位上的第一個分段。如果槽位上沒有分段,或者分段的插入優(yōu)先級與讀取指針不同,或者分段內(nèi)沒有任務,則可以考慮移動讀取指針。注意,分段狀態(tài)為關閉并不是讀取指針移動的條件,原因下面會分析。

但是移動讀取指針的時候首先需要考慮當前讀取指針是否已經(jīng)處于(寫入指針的值+最低優(yōu)先級數(shù)字),如果是的話,意味著已經(jīng)處于邊界,不應該在移動。

分段狀態(tài)的更新只能由讀取線程來進行。當讀取線程發(fā)現(xiàn)該分段已經(jīng)沒有任務了,首先應該通過CAS的方式更新分段狀態(tài)。CAS競爭成功的線程再次檢查分段內(nèi)是否出現(xiàn)了新的任務,如果出現(xiàn)的話,則提取任務,完成任務讀取。為何不將任務移動到下一個槽位。因為下一個槽位上可能還沒有分段,此時讀取線程可能和寫入線程競爭槽位上的分段寫入。如果寫入線程競爭成功,讀取線程移動過去的任務數(shù)據(jù)的優(yōu)先級就放到了錯誤的分段中;如果讀取線程競爭成功,則讀取線程創(chuàng)建的分段必須是第一個分段,否則任務還是移動到錯誤的地方。

解決這個問題最好的辦法就是不解決。不移動任務,仍然在該分段上讀取任務直到任務耗盡。然后再嘗試移動讀取指針。而對于寫入線程而言,當其發(fā)現(xiàn)分段的狀態(tài)變?yōu)榻K止后,是提取出任務重新執(zhí)行完整的放入流程,不會有并發(fā)的問題。

再次梳理下沒有任務情況下的流程,應該是通過CAS修改分段的狀態(tài)。無論成功或失敗,都可以繼續(xù)檢查隊列是否有任務,如果有的話,則返回讀取到的任務。如果沒有的話,則CAS將讀取指針+1。競爭成功的線程將當前分段的下一個分段設置給槽位,并且重新執(zhí)行讀取流程。競爭失敗的線程則反復檢查讀取指針的值,發(fā)現(xiàn)變化后,重新執(zhí)行讀取流程。

這里有一個并發(fā)沖突需要考慮,當讀取線程嘗試將當前分段的下一個分段設置為槽位的值時,可能此時當前分段的下一個分段是null,而寫入線程正在嘗試為當前分段設置下一個分段。這種情況下可能導致下一個分段丟失。特別的,如果當前分段的下一個分段已經(jīng)被設置,并且有任務被放入其中,丟失這個分段就意味著數(shù)據(jù)丟失。

為了避免這個情況,在當前分段的下一個分段為null時,就不能將下一個分段(屬性值)設置給槽位。這使得在讀取到分段時,需要首先檢查分段的優(yōu)先級,確認是否本輪次。如果是的話,再執(zhí)行后續(xù)的流程。否則要么移動(該分段沒有下一個分段),要么將該分段的下一個分段設置給槽位后,在移動。

從這個角度出發(fā),我們可以在初始化的時候,將數(shù)組中的元素都填充一個分段。這樣寫入線程就不需要處理槽位上可能為空的場景了。

基于此,我們將讀取任務的變化為:

  • 槽位上的分段優(yōu)先級小于讀取指針,且分段狀態(tài)為終止。
  • 槽位上的分段優(yōu)先級等于讀取指針。
  • 槽位上的分段優(yōu)先級大于讀取指針。

第一種情況,如果該分段有下一個分段,CAS更新到槽位上;如果沒有,則CAS移動讀取指針。

第二種情況,按照上面分析的流程進行處理即可。

第三種情況,CAS移動讀取指針。

綜上,我們可以將讀取流程設計為

image

包裝為BlockQueue

在JDK提供的ThreadPoolExecutor類的構造方法中,需要傳入BlockingQueue作為隊列的接口。顯然,上述的存儲結構并不能支持BlockQueue,需要考慮包裝。

顯然,上面的存儲結果在寫入的時候并不會阻塞,因此只需要考慮如何包裝讀取數(shù)據(jù)不存在時的阻塞等待即可。

簡單的方式就是在讀取失敗的獲取鎖,并且在隊列空的condition對象執(zhí)行等待;插入任務的時候執(zhí)行喚醒。

效果展現(xiàn)

測試代碼如下

image

首先添加一定量的高優(yōu)先級任務,隨后添加5個低優(yōu)先級,最后通過CountLatch模擬在運行過程中添加高優(yōu)先級任務。

如果單純按照優(yōu)先級排序,則需要所有高優(yōu)先級任務輸出完畢后才會輸出低優(yōu)先級任務,顯然這是錯誤的。正確的實現(xiàn)應該是先輸出第一批高優(yōu)先級任務,再輸出低優(yōu)先級任務,最后輸出第三批高優(yōu)先級任務。運行代碼,看到結果如下

image

與我們的預期相吻合。

代碼托管地址

Gitee:https://gitee.com/eric_ds/eric_article/blob/master/優(yōu)先級自動晉升線程池/AutoPromotePriorityQueue.java

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容