Disruptor提供了一種線程之間信息交換的方式。
鎖的缺點(diǎn)
并發(fā)的問題

想象有兩個(gè)線程嘗試修改同一個(gè)變量value:
- 情況一:線程1先到達(dá)
變量value的值變?yōu)椤眀lah”。
然后當(dāng)線程2到達(dá)時(shí),變量value的值變?yōu)椤眀lahy”。
- 情況二:線程2先到達(dá)
變量value的值變?yōu)椤眆luffy”。
然后當(dāng)線程1到達(dá)時(shí),值變?yōu)椤眀lah”。
- 情況三:線程1與線程2交互
線程2得到值"fluff"然后賦給本地變量myValue。
線程1改變value的值為”blah”。
然后線程2醒來并把變量value的值改為”fluffy”
解決辦法
悲觀鎖

樂觀鎖(CAS)

在這種情況,當(dāng)線程2需要去寫Entry時(shí)才會(huì)去鎖定它.它需要檢查Entry自從上次讀過后是否已經(jīng)被改過了。如果線程1在線程2讀完后到達(dá)并把值改為”blah”,線程2讀到了這個(gè)新值,線程2不會(huì)把"fluffy"寫到Entry里并把線程1所寫的數(shù)據(jù)覆蓋.線程2會(huì)重試(重新讀新的值,與舊值比較,如果相等則在變量的值后面附上’y’),這里在線程2不會(huì)關(guān)心新的值是什么的情況.或者線程2會(huì)拋出一個(gè)異常,或者會(huì)返回一個(gè)某些字段已更新的標(biāo)志,這是在期望把”fluff”改為”fluffy”的情況.舉一個(gè)第二種情況的例子,如果你和另外一個(gè)用戶同時(shí)更新一個(gè)Wiki的頁面,你會(huì)告訴另外一個(gè)用戶的線程 Thread 2,它們需要重新加載從Thread1來新的變化,然后再提交它們的內(nèi)容。
死鎖

死鎖的原因和解決方法?
鎖技術(shù)是慢的
Disruptor論文中講述了我們所做的一個(gè)實(shí)驗(yàn)。這個(gè)測(cè)試程序調(diào)用了一個(gè)函數(shù),該函數(shù)會(huì)對(duì)一個(gè)64位的計(jì)數(shù)器循環(huán)自增5億次。當(dāng)單線程無鎖時(shí),程序耗時(shí)300ms。如果增加一個(gè)鎖(仍是單線程、沒有競(jìng)爭(zhēng)、僅僅增加鎖),程序需要耗時(shí)10000ms,慢了兩個(gè)數(shù)量級(jí)。更令人吃驚的是,如果增加一個(gè)線程(簡(jiǎn)單從邏輯上想,應(yīng)該比單線程加鎖快一倍),耗時(shí)224000ms。使用兩個(gè)線程對(duì)計(jì)數(shù)器自增5億次比使用無鎖單線程慢1000倍。并發(fā)很難而鎖的性能糟糕。
Disruptor如何解決這些問題。
- 首先,Disruptor根本就不用鎖。
取而代之的是,在需要確保操作是線程安全的(特別是,在多生產(chǎn)者的環(huán)境下,更新下一個(gè)可用的序列號(hào))地方,我們使用CAS(Compare And Swap/Set)操作。這是一個(gè)CPU級(jí)別的指令,在我的意識(shí)中,它的工作方式有點(diǎn)像樂觀鎖——CPU去更新一個(gè)值,但如果想改的值不再是原來的值,操作就失敗,因?yàn)楹苊黠@,有其它操作先改變了這個(gè)值。

CAS操作比鎖消耗資源少的多,因?yàn)樗鼈儾粻可娌僮飨到y(tǒng),它們直接在CPU上操作。但它們并非沒有代價(jià)——在上面的試驗(yàn)中,單線程無鎖耗時(shí)300ms,單線程有鎖耗時(shí)10000ms,單線程使用CAS耗時(shí)5700ms。所以它比使用鎖耗時(shí)少,但比不需要考慮競(jìng)爭(zhēng)的單線程耗時(shí)多。
回到Disruptor,在我講生產(chǎn)者時(shí)講過ClaimStrategy。在這些代碼中,你可以看見兩個(gè)策略,一個(gè)是SingleThreadedStrategy(單線程策略)另一個(gè)是MultiThreadedStrategy(多線程策略)。你可能會(huì)有疑問,為什么在只有單個(gè)生產(chǎn)者時(shí)不用多線程的那個(gè)策略?它是否能夠處理這種場(chǎng)景?當(dāng)然可以。但多線程的那個(gè)使用了AtomicLong(Java提供的CAS操作),而單線程的使用long,沒有鎖也沒有CAS。這意味著單線程版本會(huì)非常快,因?yàn)樗挥幸粋€(gè)生產(chǎn)者,不會(huì)產(chǎn)生序號(hào)上的沖突。
- 當(dāng)然把一個(gè)數(shù)字轉(zhuǎn)成AtomicLong不可能是Disruptor速度快的唯一秘密
但這是非常重要的一點(diǎn)——在整個(gè)復(fù)雜的框架中,只有這一個(gè)地方出現(xiàn)多線程競(jìng)爭(zhēng)修改同一個(gè)變量值。這就是秘密。還記得所有的訪問對(duì)象都擁有序號(hào)嗎?如果只有一個(gè)生產(chǎn)者,那么系統(tǒng)中的每一個(gè)序列號(hào)只會(huì)由一個(gè)線程寫入。這意味著沒有競(jìng)爭(zhēng)、不需要鎖、甚至不需要CAS。
為什么隊(duì)列不能勝任這個(gè)工作
為什么隊(duì)列底層用RingBuffer來實(shí)現(xiàn),仍然在性能上無法與 Disruptor 相比。隊(duì)列和最簡(jiǎn)單的ring buffer只有兩個(gè)指針——一個(gè)指向隊(duì)列的頭,一個(gè)指向隊(duì)尾:

如果有超過一個(gè)生產(chǎn)者想要往隊(duì)列里放東西,尾指針就將成為一個(gè)沖突點(diǎn),因?yàn)橛卸鄠€(gè)線程要更新它。如果有多個(gè)消費(fèi)者,那么頭指針就會(huì)產(chǎn)生競(jìng)爭(zhēng),因?yàn)樵乇幌M(fèi)之后,需要更新指針,所以不僅有讀操作還有寫操作了。
一個(gè)空的隊(duì)列:

一個(gè)滿的隊(duì)列:

隊(duì)列需要保存一個(gè)關(guān)于大小的變量,以便區(qū)分隊(duì)列是空還是滿。否則,它需要根據(jù)隊(duì)列中的元素的內(nèi)容來判斷。
基于以上,這三個(gè)變量常常在一個(gè)cache line里面,有可能導(dǎo)致false sharing。因此,不僅要擔(dān)心生產(chǎn)者和消費(fèi)者同時(shí)寫size變量(或者元素),還要注意由于頭指針尾指針在同一位置,當(dāng)頭指針更新時(shí),更新尾指針會(huì)導(dǎo)致緩存不命中。
神奇的緩存行填充

緩存行,偽共享


神奇的緩存行填充
Disruptor消除這個(gè)問題,至少對(duì)于緩存行大小是64字節(jié)或更少的處理器架構(gòu)來說是這樣的(譯注:有可能處理器的緩存行是128字節(jié),那么使用64字節(jié)填充還是會(huì)存在偽共享問題),通過增加補(bǔ)全來確保ring buffer的序列號(hào)不會(huì)和其他東西同時(shí)存在于一個(gè)緩存行中。
public long p1, p2, p3, p4, p5, p6, p7; // cache line padding
private volatile long cursor = INITIAL_CURSOR_VALUE;
public long p8, p9, p10, p11, p12, p13, p14; // cache line padding
因此沒有偽共享,就沒有和其它任何變量的意外沖突,沒有不必要的緩存未命中。
內(nèi)存屏障
- 什么是內(nèi)存屏障?
它是一個(gè)CPU指令。
a)確保一些特定操作執(zhí)行的順序;
b)影響一些數(shù)據(jù)的可見性(可能是某些指令執(zhí)行后的結(jié)果)。
如果你的字段是volatile,Java內(nèi)存模型將在寫操作后插入一個(gè)寫屏障指令,在讀操作前插入一個(gè)讀屏障指令。
1、一旦你完成寫入,任何訪問這個(gè)字段的線程將會(huì)得到最新的值。
2、在你寫入前,會(huì)保證所有之前發(fā)生的事已經(jīng)發(fā)生,并且任何更新過的數(shù)據(jù)值也是可見的,因?yàn)閮?nèi)存屏障會(huì)把之前的寫入值都刷新到緩存。
RingBuffer cursor
RingBuffer的指針(cursor)(譯注:指向隊(duì)尾元素)屬于一個(gè)神奇的volatile變量,同時(shí)也是我們能夠不用鎖操作就能實(shí)現(xiàn)Disruptor的原因之一。

生產(chǎn)者將會(huì)取得下一個(gè)[Entry](或者是一批),并可對(duì)它(們)作任意改動(dòng), 把它(們)更新為任何想要的值。在所有改動(dòng)都完成后,生產(chǎn)者對(duì)ring buffer調(diào)用commit方法來更新序列號(hào)(譯注:把cursor更新為該Entry的序列號(hào))。對(duì)volatile字段(cursor)的寫操作創(chuàng)建了一個(gè)內(nèi)存屏障,這個(gè)屏障將刷新所有緩存里的值(或者至少相應(yīng)地使得緩存失效)。
這時(shí)候,消費(fèi)者們能獲得最新的序列號(hào)碼(8),并且因?yàn)閮?nèi)存屏障保證了它之前執(zhí)行的指令的順序,消費(fèi)者們可以確信生產(chǎn)者對(duì)7號(hào)Entry所作的改動(dòng)已經(jīng)可用。
- 消費(fèi)者那邊會(huì)發(fā)生什么?
消費(fèi)者中的序列號(hào)是volatile類型的,會(huì)被若干個(gè)外部對(duì)象讀取——其他的[下游消費(fèi)者]可能在跟蹤這個(gè)消費(fèi)者。[ProducerBarrier]/RingBuffer跟蹤它以確保環(huán)沒有出現(xiàn)重疊(wrap)的情況(譯注:為了防止下游的消費(fèi)者和上游的消費(fèi)者對(duì)同一個(gè)Entry競(jìng)爭(zhēng)消費(fèi),導(dǎo)致在環(huán)形隊(duì)列中互相覆蓋數(shù)據(jù),下游消費(fèi)者要對(duì)上游消費(fèi)者的消費(fèi)情況進(jìn)行跟蹤)。

所以,如果你的下游消費(fèi)者(C2)看見前一個(gè)消費(fèi)者(C1)在消費(fèi)號(hào)碼為12的Entry,當(dāng)C2的讀取也到了12,它在更新序列號(hào)前將可以獲得C1對(duì)該Entry的所作的更新。
基本來說就是,C1更新序列號(hào)前對(duì)ring buffer的所有操作(如上圖黑色所示),必須先發(fā)生,待C2拿到C1更新過的序列號(hào)之后,C2才可以為所欲為(如上圖藍(lán)色所示)。
- 內(nèi)存屏障對(duì)性能的影響
內(nèi)存屏障作為另一個(gè)CPU級(jí)的指令,沒有[鎖那樣大的開銷]。內(nèi)核并沒有在多個(gè)線程間干涉和調(diào)度。但凡事都是有代價(jià)的。內(nèi)存屏障的確是有開銷的——編譯器/cpu不能重排序指令,導(dǎo)致不可以盡可能地高效利用CPU,另外刷新緩存亦會(huì)有開銷。所以不要以為用volatile代替鎖操作就一點(diǎn)事都沒。
你會(huì)注意到Disruptor的實(shí)現(xiàn)對(duì)序列號(hào)的讀寫頻率盡量降到最低。對(duì)volatile字段的每次讀或?qū)懚际窍鄬?duì)高成本的操作。但是,也應(yīng)該認(rèn)識(shí)到在批量的情況下可以獲得很好的表現(xiàn)。如果你知道不應(yīng)對(duì)序列號(hào)頻繁讀寫,那么很合理的想到,先獲得一整批Entries,并在更新序列號(hào)前處理它們。這個(gè)技巧對(duì)生產(chǎn)者和消費(fèi)者都適用。以下的例子來自[BatchConsumer]
long nextSequence = sequence + 1;
while (running)
{
try
{
final long availableSequence = consumerBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
entry = consumerBarrier.getEntry(nextSequence);
handler.onAvailable(entry);
nextSequence++;
}
handler.onEndOfBatch();
sequence = entry.getSequence();
}
…
catch (final Exception ex)
{
exceptionHandler.handle(ex, entry);
sequence = entry.getSequence();
nextSequence = entry.getSequence() + 1;
}
}
在上面的代碼中,我們?cè)谙M(fèi)者處理entries的循環(huán)中用一個(gè)局部變量(nextSequence)來遞增。這表明我們想盡可能地減少對(duì)volatile類型的序列號(hào)的進(jìn)行讀寫。
牛逼的RingBuffer
基本來說,ringbuffer擁有一個(gè)序號(hào),這個(gè)序號(hào)指向數(shù)組中下一個(gè)可用的元素。(校對(duì)注:如下圖右邊的圖片表示序號(hào),這個(gè)序號(hào)指向數(shù)組的索引4的位置。
隨著你不停地填充這個(gè)buffer(可能也會(huì)有相應(yīng)的讀?。?,這個(gè)序號(hào)會(huì)一直增長(zhǎng),直到繞過這個(gè)環(huán)。
要找到數(shù)組中當(dāng)前序號(hào)指向的元素,可以通過mod操作:
以上面的ringbuffer為例(java的mod語法):12 % 10 = 2。很簡(jiǎn)單吧。
事實(shí)上,上圖中的ringbuffer只有10個(gè)槽完全是個(gè)意外。如果槽的個(gè)數(shù)是2的N次方更有利于基于二進(jìn)制的計(jì)算機(jī)進(jìn)行計(jì)算。
(校對(duì)注:2的N次方換成二進(jìn)制就是1000,100,10,1這樣的數(shù)字, sequence & (array length-1) = array index,比如一共有8槽,3&(8-1)=3,HashMap就是用這個(gè)方式來定位數(shù)組元素的,這種方式比取模的速度更快。)
- 沒有尾指針。只維護(hù)了一個(gè)指向下一個(gè)可用位置的序號(hào)。這種實(shí)現(xiàn)是經(jīng)過深思熟慮的—我們選擇用環(huán)形buffer的最初原因就是想要竟可能多地保存消息(若有了尾指針,則尾指針經(jīng)過的空間又浪費(fèi)了)。
- 不刪除buffer中的數(shù)據(jù)
也就是說這些數(shù)據(jù)一直存放在buffer中,直到新的數(shù)據(jù)覆蓋他們。這也是不需要尾指針的原因。 - 它是數(shù)組
這是對(duì)CPU緩存友好的—也就是說,在硬件級(jí)別,數(shù)組中的元素是會(huì)被預(yù)加載的,因此在ringbuffer當(dāng)中,cpu無需時(shí)不時(shí)去主存加載數(shù)組中的下一個(gè)元素。 - 為數(shù)組預(yù)先分配內(nèi)存
這使得數(shù)組對(duì)象一直存在(除非程序終止)。這就意味著不需要花大量的時(shí)間用于垃圾回收。
從Ringbuffer中讀取數(shù)據(jù)消費(fèi)

消費(fèi)者(Consumer)是一個(gè)想從Ring Buffer里讀取數(shù)據(jù)的線程,它可以訪問ConsumerBarrier對(duì)象——這個(gè)對(duì)象由RingBuffer創(chuàng)建并且代表消費(fèi)者與RingBuffer進(jìn)行交互。就像Ring Buffer顯然需要一個(gè)序號(hào)才能找到下一個(gè)可用節(jié)點(diǎn)一樣,消費(fèi)者也需要知道它將要處理的序號(hào)——每個(gè)消費(fèi)者都需要找到下一個(gè)它要訪問的序號(hào)。在上面的例子中,消費(fèi)者處理完了Ring Buffer里序號(hào)8之前(包括8)的所有數(shù)據(jù),那么它期待訪問的下一個(gè)序號(hào)是9。
消費(fèi)者可以調(diào)用ConsumerBarrier對(duì)象的waitFor()方法,傳遞它所需要的下一個(gè)序號(hào).
final long availableSeq = consumerBarrier.waitFor(nextSequence);
ConsumerBarrier返回RingBuffer的最大可訪問序號(hào)——在上面的例子中是12。ConsumerBarrier有一個(gè)WaitStrategy方法來決定它如何等待這個(gè)序號(hào)。
接下來
接下來,消費(fèi)者會(huì)一直原地停留,等待更多數(shù)據(jù)被寫入Ring Buffer。并且,一旦數(shù)據(jù)寫入后消費(fèi)者會(huì)收到通知——節(jié)點(diǎn)9,10,11和12 已寫入?,F(xiàn)在序號(hào)12到了,消費(fèi)者可以讓ConsumerBarrier去拿這些序號(hào)節(jié)點(diǎn)里的數(shù)據(jù)了。

拿到了數(shù)據(jù)后,消費(fèi)者(Consumer)會(huì)更新自己的標(biāo)識(shí)(cursor)。
你應(yīng)該已經(jīng)感覺得到,這樣做是怎樣有助于平緩延遲的峰值了——以前需要逐個(gè)節(jié)點(diǎn)地詢問“我可以拿下一個(gè)數(shù)據(jù)嗎?現(xiàn)在可以了么?現(xiàn)在呢?”,消費(fèi)者(Consumer)現(xiàn)在只需要簡(jiǎn)單的說“當(dāng)你拿到的數(shù)字比我這個(gè)要大的時(shí)候請(qǐng)告訴我”,函數(shù)返回值會(huì)告訴它有多少個(gè)新的節(jié)點(diǎn)可以讀取數(shù)據(jù)了。因?yàn)檫@些新的節(jié)點(diǎn)的確已經(jīng)寫入了數(shù)據(jù)(Ring Buffer本身的序號(hào)已經(jīng)更新),而且消費(fèi)者對(duì)這些節(jié)點(diǎn)的唯一操作是讀而不是寫,因此訪問不用加鎖。這太好了,不僅代碼實(shí)現(xiàn)起來可以更加安全和簡(jiǎn)單,而且不用加鎖使得速度更快。
另一個(gè)好處是——你可以用多個(gè)消費(fèi)者(Consumer)去讀同一個(gè)RingBuffer ,不需要加鎖,也不需要用另外的隊(duì)列來協(xié)調(diào)不同的線程(消費(fèi)者)。這樣你可以在Disruptor的協(xié)調(diào)下實(shí)現(xiàn)真正的并發(fā)數(shù)據(jù)處理。
寫入 Ringbuffer
ProducerBarriers(AbstractSequencer)
寫入 Ring Buffer 的過程涉及到兩階段提交 (two-phase commit)。首先,你的生產(chǎn)者需要申請(qǐng) buffer 里的下一個(gè)節(jié)點(diǎn)。然后,當(dāng)生產(chǎn)者向節(jié)點(diǎn)寫完數(shù)據(jù),它將會(huì)調(diào)用 ProducerBarrier 的 commit 方法。
那么讓我們首先來看看第一步。 “給我 Ring Buffer 里的下一個(gè)節(jié)點(diǎn)”,這句話聽起來很簡(jiǎn)單。的確,從生產(chǎn)者角度來看它很簡(jiǎn)單:簡(jiǎn)單地調(diào)用 ProducerBarrier 的 nextEntry() 方法,這樣會(huì)返回給你一個(gè) Entry 對(duì)象,這個(gè)對(duì)象就是 Ring Buffer 的下一個(gè)節(jié)點(diǎn)。
如何防止 Ring Buffer 重疊
在后臺(tái),由 ProducerBarrier 負(fù)責(zé)所有的交互細(xì)節(jié)來從 Ring Buffer 中找到下一個(gè)節(jié)點(diǎn),然后才允許生產(chǎn)者向它寫入數(shù)據(jù)。

在這幅圖中,我們假設(shè)只有一個(gè)生產(chǎn)者寫入 Ring Buffer。過一會(huì)兒我們?cè)偬幚矶鄠€(gè)生產(chǎn)者的復(fù)雜問題。
ConsumerTrackingProducerBarrier 對(duì)象擁有所有正在訪問 Ring Buffer 的 消費(fèi)者 列表。這看起來有點(diǎn)兒奇怪-我從沒有期望 ProducerBarrier 了解任何有關(guān)消費(fèi)端那邊的事情。但是等等,這是有原因的。因?yàn)槲覀儾幌肱c隊(duì)列“混為一談”(隊(duì)列需要追蹤隊(duì)列的頭和尾,它們有時(shí)候會(huì)指向相同的位置),Disruptor 由消費(fèi)者負(fù)責(zé)通知它們處理到了哪個(gè)序列號(hào),而不是 Ring Buffer。所以,如果我們想確定我們沒有讓 Ring Buffer 重疊,需要檢查所有的消費(fèi)者們都讀到了哪里。
在上圖中,有一個(gè) 消費(fèi)者 順利的讀到了最大序號(hào) 12(用紅色/粉色高亮)。第二個(gè)消費(fèi)者 有點(diǎn)兒落后——可能它在做 I/O 操作之類的——它停在序號(hào) 3。因此消費(fèi)者 2 在趕上消費(fèi)者 1 之前要跑完整個(gè) Ring Buffer 一圈的距離。
現(xiàn)在生產(chǎn)者想要寫入 Ring Buffer 中序號(hào) 3 占據(jù)的節(jié)點(diǎn),因?yàn)樗?Ring Buffer 當(dāng)前游標(biāo)的下一個(gè)節(jié)點(diǎn)。但是 ProducerBarrier 明白現(xiàn)在不能寫入,因?yàn)橛幸粋€(gè)消費(fèi)者正在占用它。所以,ProducerBarrier 停下來自旋 (spins),等待,直到那個(gè)消費(fèi)者離開。
申請(qǐng)下一個(gè)節(jié)點(diǎn)
現(xiàn)在可以想像消費(fèi)者 2 已經(jīng)處理完了一批節(jié)點(diǎn),并且向前移動(dòng)了它的序號(hào)??赡芩驳搅诵蛱?hào) 9(因?yàn)橄M(fèi)端的批處理方式,現(xiàn)實(shí)中我會(huì)預(yù)計(jì)它到達(dá) 12,但那樣的話這個(gè)例子就不夠有趣了)。

上圖顯示了當(dāng)消費(fèi)者 2 挪動(dòng)到序號(hào) 9 時(shí)發(fā)生的情況。在這張圖中我已經(jīng)忽略了ConsumerBarrier,因?yàn)樗鼪]有參與這個(gè)場(chǎng)景。
ProducerBarier 會(huì)看到下一個(gè)節(jié)點(diǎn)——序號(hào) 3 那個(gè)已經(jīng)可以用了。它會(huì)搶占這個(gè)節(jié)點(diǎn)上的 Entry(我還沒有特別介紹 Entry 對(duì)象,基本上它是一個(gè)放寫入到某個(gè)序號(hào)的 Ring Buffer 數(shù)據(jù)的桶),把下一個(gè)序號(hào)(13)更新成 Entry 的序號(hào),然后把 Entry 返回給生產(chǎn)者。生產(chǎn)者可以接著往 Entry 里寫入數(shù)據(jù)。
提交新的數(shù)據(jù)

當(dāng)生產(chǎn)者結(jié)束向 Entry 寫入數(shù)據(jù)后,它會(huì)要求 ProducerBarrier 提交。
ProducerBarrier 先等待 Ring Buffer 的游標(biāo)追上當(dāng)前的位置(對(duì)于單生產(chǎn)者這毫無意義-比如,我們已經(jīng)知道游標(biāo)到了 12 ,而且沒有其他人正在寫入 Ring Buffer)。然后 ProducerBarrier 更新 Ring Buffer 的游標(biāo)到剛才寫入的 Entry 序號(hào)-在我們這兒是 13。接下來,ProducerBarrier 會(huì)讓消費(fèi)者知道 buffer 中有新東西了。它戳一下 ConsumerBarrier 上的 WaitStrategy 對(duì)象說-“喂,醒醒!有事情發(fā)生了!”
ProducerBarrier 上的批處理
有趣的是 Disruptor 可以同時(shí)在生產(chǎn)者和 [消費(fèi)者]兩端實(shí)現(xiàn)批處理。還記得伴隨著程序運(yùn)行,消費(fèi)者 2 最后達(dá)到了序號(hào) 9 嗎?ProducerBarrier 可以在這里做一件很狡猾的事-它知道 Ring Buffer 的大小,也知道最慢的消費(fèi)者位置。因此它能夠發(fā)現(xiàn)當(dāng)前有哪些節(jié)點(diǎn)是可用的。

如果 ProducerBarrier 知道 Ring Buffer 的游標(biāo)指向 12,而最慢的消費(fèi)者在 9 的位置,它就可以讓生產(chǎn)者寫入節(jié)點(diǎn) 3,4,5,6,7 和 8,中間不需要再次檢查消費(fèi)者的位置。
多個(gè)生產(chǎn)者的場(chǎng)景
在多個(gè)生產(chǎn)者的場(chǎng)景下,你還需要其他東西來追蹤序號(hào)。這個(gè)序號(hào)是指當(dāng)前可寫入的序號(hào)。注意這和“向 Ring Buffer 的游標(biāo)加 1”不一樣-如果你有一個(gè)以上的生產(chǎn)者同時(shí)在向 Ring Buffer 寫入,就有可能出現(xiàn)某些 Entry 正在被生產(chǎn)者寫入但還沒有提交的情況。

讓我們復(fù)習(xí)一下如何申請(qǐng)寫入節(jié)點(diǎn)。每個(gè)生產(chǎn)者都向 ClaimStrategy 申請(qǐng)下一個(gè)可用的節(jié)點(diǎn)。生產(chǎn)者 1 拿到序號(hào) 13,這和上面單個(gè)生產(chǎn)者的情況一樣。生產(chǎn)者 2 拿到序號(hào) 14,盡管 Ring Buffer的當(dāng)前游標(biāo)僅僅指向 12。這是因?yàn)?ClaimSequence 不但負(fù)責(zé)分發(fā)序號(hào),而且負(fù)責(zé)跟蹤哪些序號(hào)已經(jīng)被分配。
我把生產(chǎn)者 1 和它的寫入節(jié)點(diǎn)涂上綠色,把生產(chǎn)者 2 和它的寫入節(jié)點(diǎn)涂上可疑的粉色-看起來像紫色。

現(xiàn)在假設(shè)生產(chǎn)者 1 還生活在童話里,因?yàn)槟承┰驔]有來得及提交數(shù)據(jù)。生產(chǎn)者 2 已經(jīng)準(zhǔn)備好提交了,并且向 ProducerBarrier 發(fā)出了請(qǐng)求。
就像我們先前在 commit 示意圖中看到的一樣,ProducerBarrier 只有在 Ring Buffer 游標(biāo)到達(dá)準(zhǔn)備提交的節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)時(shí)它才會(huì)提交。在當(dāng)前情況下,游標(biāo)必須先到達(dá)序號(hào) 13 我們才能提交節(jié)點(diǎn) 14 的數(shù)據(jù)。但是我們不能這樣做,因?yàn)樯a(chǎn)者 1 正盯著一些閃閃發(fā)光的東西,還沒來得及提交。因此 ClaimStrategy 就停在那兒自旋 (spins), 直到 Ring Buffer 游標(biāo)到達(dá)它應(yīng)該在的位置。

現(xiàn)在生產(chǎn)者 1 從迷糊中清醒過來并且申請(qǐng)?zhí)峤还?jié)點(diǎn) 13 的數(shù)據(jù)(生產(chǎn)者 1 發(fā)出的綠色箭頭代表這個(gè)請(qǐng)求)。ProducerBarrier 讓 ClaimStrategy 先等待 Ring Buffer 的游標(biāo)到達(dá)序號(hào) 12,當(dāng)然現(xiàn)在已經(jīng)到了。因此 Ring Buffer 移動(dòng)游標(biāo)到 13,讓 ProducerBarrier 戳一下 WaitStrategy 告訴所有人都知道 Ring Buffer 有更新了?,F(xiàn)在 ProducerBarrier 可以完成生產(chǎn)者 2 的請(qǐng)求,讓 Ring Buffer 移動(dòng)游標(biāo)到 14,并且通知所有人都知道。
你會(huì)看到,盡管生產(chǎn)者在不同的時(shí)間完成數(shù)據(jù)寫入,但是 Ring Buffer 的內(nèi)容順序總是會(huì)遵循 nextEntry() 的初始調(diào)用順序。也就是說,如果一個(gè)生產(chǎn)者在寫入 Ring Buffer 的時(shí)候暫停了,只有當(dāng)它解除暫停后,其他等待中的提交才會(huì)立即執(zhí)行。
更新:最近的 RingBuffer? 版本去掉了 Producer Barrier。如果在你看的代碼里找不到 ProducerBarrier,那就假設(shè)當(dāng)我講“Producer Barrier”時(shí),我的意思是“Ring Buffer”。
更新2:注意 Disruptor 2.0 版使用了與本文不一樣的命名。如果你對(duì)類名感到困惑,請(qǐng)閱讀我寫的Disruptor 2.0更新摘要?。
Disruptor的依賴關(guān)系
菱形結(jié)構(gòu)

- 用隊(duì)列實(shí)現(xiàn)菱形結(jié)構(gòu)

- 用 Disruptor 實(shí)現(xiàn)菱形結(jié)構(gòu)

生產(chǎn)者這邊比較簡(jiǎn)單,它是我在 上文? 中描述過的單生產(chǎn)者模型。有趣的是,生產(chǎn)者并不需要關(guān)心所有的消費(fèi)者。它只關(guān)心消費(fèi)者 C3,如果消費(fèi)者 C3 處理完了 Ring Buffer 的某一個(gè)節(jié)點(diǎn),那么另外兩個(gè)消費(fèi)者肯定也處理完了。因此,只要 C3 的位置向前移動(dòng),Ring Buffer 的后續(xù)節(jié)點(diǎn)就會(huì)空閑出來。
管理消費(fèi)者的依賴關(guān)系需要兩個(gè) ConsumerBarrier 對(duì)象。第一個(gè)僅僅與 Ring Buffer 交互,C1 和 C2 消費(fèi)者向它申請(qǐng)下一個(gè)可訪問節(jié)點(diǎn)。第二個(gè) ConsumerBarrier 只知道消費(fèi)者 C1 和 C2,它返回兩個(gè)消費(fèi)者訪問過的消息序號(hào)中較小的那個(gè)。
Disruptor 怎樣實(shí)現(xiàn)消費(fèi)者等待(依賴)

我們從這個(gè)故事發(fā)生到一半的時(shí)候來看:生產(chǎn)者 P1 已經(jīng)在 Ring Buffer 里寫到序號(hào) 22 了,消費(fèi)者 C1 已經(jīng)訪問和處理完了序號(hào) 21 之前的所有數(shù)據(jù)。消費(fèi)者 C2 處理到了序號(hào) 18。消費(fèi)者 C3,就是依賴其他消費(fèi)者的那個(gè),才處理到序號(hào) 15。
生產(chǎn)者 P1 不能繼續(xù)向 RingBuffer 寫入數(shù)據(jù)了,因?yàn)樾蛱?hào) 15 占據(jù)了我們想要寫入序號(hào) 23 的數(shù)據(jù)節(jié)點(diǎn) (Slot)。

第一個(gè) ConsumerBarrier(CB1)告訴 C1 和 C2 消費(fèi)者可以去訪問序號(hào) 22 前面的所有數(shù)據(jù),這是 Ring Buffer 中的最大序號(hào)。第二個(gè) ConsumerBarrier (CB2) 不但會(huì)檢查 RingBuffer 的序號(hào),也會(huì)檢查另外兩個(gè)消費(fèi)者的序號(hào)并且返回它們之間的最小值。因此,三號(hào)消費(fèi)者被告知可以訪問 Ring Buffer 里序號(hào) 18 前面的數(shù)據(jù)。
注意這些消費(fèi)者還是直接從 Ring Buffer 拿數(shù)據(jù)節(jié)點(diǎn)——并不是由 C1 和 C2 消費(fèi)者把數(shù)據(jù)節(jié)點(diǎn)從 Ring Buffer 里取出再傳遞給 C3 消費(fèi)者的。作為替代的是,由第二個(gè) ConsumerBarrier 告訴 C3 消費(fèi)者,在 RingBuffer 里的哪些節(jié)點(diǎn)可以安全的處理。
這產(chǎn)生了一個(gè)問題——如果任何數(shù)據(jù)都來自于 Ring Buffer,那么 C3 消費(fèi)者如何讀到前面兩個(gè)消費(fèi)者處理完成的數(shù)據(jù)呢?如果 C3 消費(fèi)者關(guān)心的只是先前的消費(fèi)者是否已經(jīng)完成它們的工作(例如,把數(shù)據(jù)復(fù)制到別的地方),那么這一切都沒有問題—— C3 消費(fèi)者知道工作已完成就放心了。但是,如果 C3 消費(fèi)者需要訪問先前的消費(fèi)者的處理結(jié)果,它又從哪里去獲取呢?
秘密在于把處理結(jié)果寫入 Ring Buffer 數(shù)據(jù)節(jié)點(diǎn) (Entry) 本身。這樣,當(dāng) C3 消費(fèi)者從 Ring Buffer 取出節(jié)點(diǎn)時(shí),它已經(jīng)填充好了 C3 消費(fèi)者工作需要的所有信息。這里 真正 重要的地方是節(jié)點(diǎn) (Entry) 對(duì)象的每一個(gè)字段應(yīng)該只允許一個(gè)消費(fèi)者寫入。這可以避免產(chǎn)生并發(fā)寫入沖突 (write-contention) 減慢了整個(gè)處理過程。

- 一些實(shí)際的 Java 代碼
ConsumerBarrier consumerBarrier1 =
ringBuffer.createConsumerBarrier();
BatchConsumer consumer1 =
new BatchConsumer(consumerBarrier1, handler1);
BatchConsumer consumer2 =
new BatchConsumer(consumerBarrier1, handler2);
ConsumerBarrier consumerBarrier2 =
ringBuffer.createConsumerBarrier(consumer1, consumer2);
BatchConsumer consumer3 =
new BatchConsumer(consumerBarrier2, handler3);
ProducerBarrier producerBarrier =
ringBuffer.createProducerBarrier(consumer3);
- 使用DSL
Executor executor = Executors.newCachedThreadPool();
BatchHandler handler1 = new MyBatchHandler1();
BatchHandler handler2 = new MyBatchHandler2();
BatchHandler handler3 = new MyBatchHandler3();
DisruptorWizard dw = new DisruptorWizard(ENTRY_FACTORY,
RING_BUFFER_SIZE, executor);
dw.consumeWith(handler1, handler2).then(handler3);
ProducerBarrier producerBarrier = dw.createProducerBarrier();

dw.consumeWith(handler1a, handler2a);
dw.after(handler1a).consumeWith(handler1b);
dw.after(handler2a).consumeWith(handler2b);
dw.after(handler1b, handler2b).consumeWith(handler3);
ProducerBarrier producerBarrier = dw.createProducerBarrier();
現(xiàn)在你知道了——如何關(guān)聯(lián) Disruptor 與相互依賴(等待)的多個(gè)消費(fèi)者。關(guān)鍵點(diǎn)是:
- 使用多個(gè) ConsumerBarrier 來管理消費(fèi)者之間的依賴(等待)關(guān)系。
- 使用 ProducerBarrier 監(jiān)視結(jié)構(gòu)圖中最后一個(gè)消費(fèi)者。
- 只允許一個(gè)消費(fèi)者更新數(shù)據(jù)節(jié)點(diǎn) (Entry) 的每一個(gè)獨(dú)立字段。
Disruptor 2.0更新摘要
對(duì)于2.0版的主要變化有3點(diǎn):
- 更貼切的命名;
- 把producer barrier(生產(chǎn)者屏障)整合進(jìn)了ring buffer;
- 將Disruptor Wizard加入了主代碼庫。
新版本:

你可以看到基本原理還是類似的。新版本更加簡(jiǎn)單,因?yàn)镻roducerBarrier本身不再作為一個(gè)單獨(dú)的實(shí)體存在,它的替代者是PublishPort 接口,且RingBuffer自身就實(shí)現(xiàn)了這個(gè)接口。 類似地,DependencyBarrier替代ConsumerBarrier ,厘清了此對(duì)象的職責(zé)。另外,Publisher (Producer的替代者)和EventProcessor(替代了Consumer)也更能精確地體現(xiàn)出它們的行為。Consumer這個(gè)名字總是會(huì)帶來一些混淆,因?yàn)橐驗(yàn)槠鋵?shí)消費(fèi)者從來不從ring buffer消費(fèi)任何東西。Consumer之前僅僅是用于隊(duì)列實(shí)現(xiàn)的一個(gè)術(shù)語。
圖上沒有表現(xiàn)出來的變動(dòng)是以前存在ring buffer里的東西叫entry(輸入條目),而現(xiàn)在改名叫Event(事件)了,相應(yīng)的就是EventProcessor。
概念
RingBuffer: 被看作Disruptor最主要的組件,然而從3.0開始RingBuffer僅僅負(fù)責(zé)存儲(chǔ)和更新在Disruptor中流通的數(shù)據(jù)。對(duì)一些特殊的使用場(chǎng)景能夠被用戶(使用其他數(shù)據(jù)結(jié)構(gòu))完全替代。
Sequence:Disruptor使用Sequence來表示一個(gè)特殊組件處理的序號(hào)。和Disruptor一樣,每個(gè)消費(fèi)者(EventProcessor)都維持著一個(gè)Sequence。大部分的并發(fā)代碼依賴這些Sequence值的運(yùn)轉(zhuǎn),因此Sequence支持多種當(dāng)前為AtomicLong類的特性。
Sequencer:這是Disruptor真正的核心。實(shí)現(xiàn)了這個(gè)接口的兩種生產(chǎn)者(單生產(chǎn)者和多生產(chǎn)者)均實(shí)現(xiàn)了所有的并發(fā)算法,為了在生產(chǎn)者和消費(fèi)者之間進(jìn)行準(zhǔn)確快速的數(shù)據(jù)傳遞。
SequenceBarrier: 由Sequencer生成,并且包含了已經(jīng)發(fā)布的Sequence的引用,這些的Sequence源于Sequencer和一些獨(dú)立的消費(fèi)者的Sequence。它包含了決定是否有供消費(fèi)者來消費(fèi)的Event的邏輯。
WaitStrategy:決定一個(gè)消費(fèi)者將如何等待生產(chǎn)者將Event置入Disruptor。
Event:從生產(chǎn)者到消費(fèi)者過程中所處理的數(shù)據(jù)單元。Disruptor中沒有代碼表示Event,因?yàn)樗耆怯捎脩舳x的。
EventProcessor:主要事件循環(huán),處理Disruptor中的Event,并且擁有消費(fèi)者的Sequence。它有一個(gè)實(shí)現(xiàn)類是BatchEventProcessor,包含了event loop有效的實(shí)現(xiàn),并且將回調(diào)到一個(gè)EventHandler接口的實(shí)現(xiàn)對(duì)象。
EventHandler:由用戶實(shí)現(xiàn)并且代表了Disruptor中的一個(gè)消費(fèi)者的接口。
Producer:由用戶實(shí)現(xiàn),它調(diào)用RingBuffer來插入事件(Event),在Disruptor中沒有相應(yīng)的實(shí)現(xiàn)代碼,由用戶實(shí)現(xiàn)。
WorkProcessor:確保每個(gè)sequence只被一個(gè)processor消費(fèi),在同一個(gè)WorkPool中的處理多個(gè)WorkProcessor不會(huì)消費(fèi)同樣的sequence。
WorkerPool:一個(gè)WorkProcessor池,其中WorkProcessor將消費(fèi)Sequence,所以任務(wù)可以在實(shí)現(xiàn)WorkHandler接口的worker吃間移交
LifecycleAware:當(dāng)BatchEventProcessor啟動(dòng)和停止時(shí),于實(shí)現(xiàn)這個(gè)接口用于接收通知。
應(yīng)用舉例
Disruptor系統(tǒng)的最初設(shè)計(jì)是為了支持需要按照特定的順序發(fā)生的階段性類似流水線事件,這種需求在企業(yè)應(yīng)用系統(tǒng)開發(fā)中并不少見。圖8顯示了標(biāo)準(zhǔn)的3級(jí)流水線。

首先,每個(gè)事件都被寫入硬盤(日志)作為日后恢復(fù)用。其次,這些事件被復(fù)制到備份服務(wù)器。只有在這兩個(gè)階段后,系統(tǒng)開始業(yè)務(wù)邏輯處理。
按順序執(zhí)行上次操作是一個(gè)合乎邏輯的方法,但是并不是最有效的方法。日志和復(fù)制操作可以同步執(zhí)行,因?yàn)樗麄兓ハ嗒?dú)立。但是業(yè)務(wù)邏輯必須在他們都執(zhí)行完后才能執(zhí)行。

LMAX 一種新的零售的金融交易平臺(tái)
通過低延遲處理大量交易,取得低延遲和高吞吐量,而且沒有并發(fā)代碼的復(fù)雜性,他們是怎么做到呢?現(xiàn)在LMAX已經(jīng)產(chǎn)品化一段時(shí)間了,現(xiàn)在應(yīng)該可以揭開其神秘而迷人的面紗了。
整體結(jié)構(gòu)

從最高層次看,架構(gòu)有三個(gè)部分:
- 業(yè)務(wù)邏輯處理器business logic processor[5]
- 輸入input disruptor
- 輸出output disruptors
業(yè)務(wù)邏輯處理器處理所有的應(yīng)用程序的業(yè)務(wù)邏輯,這是一個(gè)單線程的Java程序,純粹的方法調(diào)用,并返回輸出。不需要任何平臺(tái)框架,運(yùn)行在JVM里,這就保證其很容易運(yùn)行測(cè)試環(huán)境。
全部駐留在內(nèi)存中
業(yè)務(wù)邏輯處理器有次序地取出消息,然后運(yùn)行其中的業(yè)務(wù)邏輯,然后產(chǎn)生輸出事件,整個(gè)操作都是在內(nèi)存中,沒有數(shù)據(jù)庫或其他持久存儲(chǔ)。將所有數(shù)據(jù)駐留在內(nèi)存中有兩個(gè)重要好處:首先是快,沒有IO,也沒有事務(wù),其次是簡(jiǎn)化編程,沒有對(duì)象/關(guān)系數(shù)據(jù)庫的映射,所有代碼都是使用Java對(duì)象模型
使用基于內(nèi)存的模型有一個(gè)重要問題:萬一崩潰怎么辦?電源掉電也是可能發(fā)生的,“事件”(Event Sourcing )概念是問題解決的核心,業(yè)務(wù)邏輯處理器的狀態(tài)是由輸入事件驅(qū)動(dòng)的,只要這些輸入事件被持久化保存起來,你就總是能夠在崩潰情況下,根據(jù)事件重演重新獲得當(dāng)前狀態(tài)。(NOSQL存儲(chǔ)的基于事件的事務(wù)實(shí)現(xiàn))
LMAX提供業(yè)務(wù)邏輯處理的快照,從快照還原,每天晚上系統(tǒng)不繁忙時(shí)構(gòu)建快照,重新啟動(dòng)商業(yè)邏輯處理器的速度很快,一個(gè)完整的重新啟動(dòng) – 包括重新啟動(dòng)JVM加載最近的快照,和重放一天事件 – 不到一分鐘。
快照雖然使啟動(dòng)一個(gè)新的業(yè)務(wù)邏輯處理器的速度,但速度還不夠快,業(yè)務(wù)邏輯處理器在下午2時(shí)就非常繁忙甚至崩潰,LMAX就保持多個(gè)業(yè)務(wù)邏輯處理器同時(shí)運(yùn)行,每個(gè)輸入事件由多個(gè)處理器處理,只有一個(gè)處理器輸出有效,其他忽略,如果一個(gè)處理器失敗,切換到另外一個(gè),這種故障轉(zhuǎn)移失敗恢復(fù)是事件源驅(qū)動(dòng)(Event Sourcing)的另外一個(gè)好處。