前言
上一篇文章介紹了RingBuffer的基本信息,本文將對Disruptor的消費(fèi)者進(jìn)行進(jìn)一步的解析,并對其中可能存在的坑點(diǎn)進(jìn)行分析;
消費(fèi)者繼承體系

從接口體系上來看,消費(fèi)者主要分為Work和Event兩種類型,這兩種類型的差別如下:
- 同一層次的WorkProcessor只有一個(gè)可以處理成功RingBuffer的事件,類似消息體系中的點(diǎn)對點(diǎn)模式;
- 同一層次的BatchEventProcessor并行處理(每一個(gè)消費(fèi)者)成功RingBuffer事件,類似消息體系中的Topic模式
這兩種實(shí)現(xiàn)上差別到底在哪里呢,我們進(jìn)一步進(jìn)行分析。從接口上,我們可以看到它們都繼承了Runnable,所以聯(lián)想到Disruptor在使用時(shí)生成的Executor,我們可以猜測不同的消費(fèi)者都是在不同的線程中進(jìn)行處理的,我們直接對其進(jìn)行分析。
WorkProcessor
主要屬性:
// Processor是否已經(jīng)開始運(yùn)行
private final AtomicBoolean running = new AtomicBoolean(false);
// 序列,初始時(shí)為-1
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 持有的RingBuffer引用
private final RingBuffer<T> ringBuffer;
// Processor上游的序列屏障
private final SequenceBarrier sequenceBarrier;
// 用戶自定義的業(yè)務(wù)邏輯處理器
private final WorkHandler<? super T> workHandler;
// 異常處理器
private final ExceptionHandler<? super T> exceptionHandler;
// 消費(fèi)的sequence
private final Sequence workSequence;
private final EventReleaser eventReleaser = new EventReleaser() {
@Override
public void release() {
sequence.set(Long.MAX_VALUE);
}
};
// 超時(shí)處理器
private final TimeoutHandler timeoutHandler;
上述關(guān)鍵屬性分析下來,主要有sequence和workSequence兩個(gè)屬性有一定的迷惑性,探究下它們是如何來的:
// 構(gòu)造方法
public WorkProcessor(
final RingBuffer<T> ringBuffer,
final SequenceBarrier sequenceBarrier,
final WorkHandler<? super T> workHandler,
final ExceptionHandler<? super T> exceptionHandler,
final Sequence workSequence) {
...
// 上游傳遞進(jìn)來的
this.workSequence = workSequence;
...
}
繼續(xù)往上翻:
public WorkerPool(
final RingBuffer<T> ringBuffer,
final SequenceBarrier sequenceBarrier,
final ExceptionHandler<? super T> exceptionHandler,
final WorkHandler<? super T>... workHandlers) {
this.ringBuffer = ringBuffer;
final int numWorkers = workHandlers.length;
workProcessors = new WorkProcessor[numWorkers];
// 循環(huán)構(gòu)造processor,共享workSequence
for (int i = 0; i < numWorkers; i++) {
workProcessors[i] = new WorkProcessor<>(
ringBuffer,
sequenceBarrier,
workHandlers[i],
exceptionHandler,
workSequence);
}
}
可以發(fā)現(xiàn),同一WorkPool的processor共享同一個(gè)sequence,因此其實(shí)所謂的只有一個(gè)能夠消費(fèi)成功本質(zhì)上依靠的就是同一個(gè)sequence(volatile語義)。
在Disruptor的使用中,我們一般需要手動(dòng)調(diào)用下Disruptor#start()方法來啟動(dòng)整個(gè)框架:
public RingBuffer<T> start(final Executor executor) {
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
}
// 生產(chǎn)者cursor,初始時(shí)為-1
final long cursor = ringBuffer.getCursor();
workSequence.set(cursor);
for (WorkProcessor<?> processor : workProcessors) {
processor.getSequence().set(cursor);
executor.execute(processor);
}
return ringBuffer;
}
所以由此判斷,框架啟動(dòng)時(shí)workSequence和sequence的值都與生產(chǎn)者保持一致即-1,此時(shí)我們回到WorkProcessor的run()方法。
public void run() {
// 判斷是否重復(fù)啟動(dòng)
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("Thread is already running");
}
// 清除警告狀態(tài)
sequenceBarrier.clearAlert();
// 如果有實(shí)現(xiàn)LifecycleAware接口,回調(diào)其onStart()邏輯
notifyStart();
// 上一個(gè)sequence的slot是否處理成功,剛開始為-1,所以默認(rèn)處理成功
boolean processedSequence = true;
// 緩存的可用sequence的下標(biāo)
long cachedAvailableSequence = Long.MIN_VALUE;
// 下一個(gè)待處理的sequence
long nextSequence = sequence.get();
T event = null;
while (true) {
try {
// 如果上一個(gè)處理成功,則更新workSequence的值
if (processedSequence) {
processedSequence = false;
do {
// 從當(dāng)前workSequence的后移1位,為新的需要處理的序列
nextSequence = workSequence.get() + 1L;
// sequence存儲該processor已經(jīng)處理成功的序列最大值,初始時(shí)為-1;
sequence.set(nextSequence - 1L);
}
// workSequence所有processor線程共享同一變量
// 假設(shè)此時(shí)發(fā)生并發(fā)問題,由于其它線程已經(jīng)處理成功,那么此處更新失敗,則下次nextSequence可能獲取到跳躍的值
while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
}
// 邏輯走到這里,可以確定一個(gè)事情:sequence的值=workSequence的值-1,所以這里workSequence
// 本質(zhì)上代表了該P(yáng)rocessor成功搶占成功可以處理的sequence數(shù)據(jù)
// cachedAvailableSequence為等待之后下一個(gè)可用的序列
if (cachedAvailableSequence >= nextSequence) {
// 獲取workSequence對應(yīng)的slot數(shù)據(jù)
event = ringBuffer.get(nextSequence);
// 調(diào)用用戶邏輯進(jìn)行處理
workHandler.onEvent(event);
// 處理成功,更新標(biāo)記為true
processedSequence = true;
} else {
// 說明當(dāng)前processor搶占到的sequence可用數(shù)據(jù)超前,需要判斷該sequence數(shù)據(jù)是否可用
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
}
} catch (final TimeoutException e) {
// 調(diào)用對應(yīng)的TimeoutHandler進(jìn)行處理
notifyTimeout(sequence.get());
} catch (final AlertException ex) {
// 當(dāng)發(fā)生警告通知時(shí),如果該線程狀態(tài)已經(jīng)被暫停,則直接中斷
if (!running.get()) {
break;
}
} catch (final Throwable ex) {
// 處理異常,將該sequence標(biāo)識為處理成功 !important
exceptionHandler.handleEventException(ex, nextSequence, event);
processedSequence = true;
}
}
notifyShutdown();
running.set(false);
}
從run()的代碼邏輯可以看出,不同的work消費(fèi)者都是通過CAS來搶占需要處理的slot數(shù)據(jù),每個(gè)processor維護(hù)了自身已經(jīng)處理成功的sequence以及大家公共持有的workSequence,同時(shí)該processor是否可以處理sequence是由barrier來維護(hù)的。有一點(diǎn)需要注意,在exceptionHandler.handleEventException(ex, nextSequence, event);中,默認(rèn)的異常處理器為FatalExceptionHandler,其打印日志結(jié)束后會拋出RuntimeException,從而導(dǎo)致消費(fèi)者線程中斷,所以在實(shí)際使用中一定要實(shí)現(xiàn)業(yè)務(wù)自己的ExceptionHandler或者在WorkHandler中自己處理異常;
那么這里每個(gè)processor自身的sequence有什么作用?
思考下如下場景,假設(shè)現(xiàn)在有10個(gè)消費(fèi)者,現(xiàn)在有10個(gè)slot需要處理,那么極端情況下可能workSequence被更新到了10,但是可能最小的sequence此時(shí)為0,說明第一個(gè)申請sequence成功的線程還未處理完畢,那么這整批消費(fèi)者(一個(gè)WorkPool)最慢的下標(biāo)其實(shí)就是0,一旦其處理成功,則sequence就有可能被更新為10;所以這里sequence的集合其實(shí)就是用來標(biāo)識整個(gè)消費(fèi)者中最慢的進(jìn)度;
關(guān)于waitFor
上面提到了消費(fèi)者能否處理sequence是由SequenceBarrier#waitFor來決定的,下面探究下該方法的實(shí)現(xiàn)機(jī)制。
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException {
checkAlert();
// 轉(zhuǎn)移職責(zé),交由waitStrategy處理
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence) {
return availableSequence;
}
// 該方法在上一期已經(jīng)分析過,此處不再分析
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
...
// 以BlockingWaitStrategy為例
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException {
long availableSequence;
// 要消費(fèi)的sequence超過了生產(chǎn)者的sequence,則消費(fèi)者等待;
if (cursorSequence.get() < sequence) {
synchronized (mutex) {
while (cursorSequence.get() < sequence) {
barrier.checkAlert();
mutex.wait();
}
}
}
// 該processor要消費(fèi)的sequence超過了上游processor的最小值,自旋等待
while ((availableSequence = dependentSequence.get()) < sequence) {
barrier.checkAlert();
// 嘗試尋找實(shí)現(xiàn)了onSpinWait的靜態(tài)方法Thread進(jìn)行調(diào)用
ThreadHints.onSpinWait();
}
// 返回上游最小sequence
return availableSequence;
}
BatchEventProcessor
主要屬性
// 空閑狀態(tài)
private static final int IDLE = 0;
// 暫停狀態(tài)
private static final int HALTED = IDLE + 1;
// 運(yùn)行狀態(tài)
private static final int RUNNING = HALTED + 1;
// 實(shí)際運(yùn)行狀態(tài)
private final AtomicInteger running = new AtomicInteger(IDLE);
// 異常處理器
private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
// RingBuffer
private final DataProvider<T> dataProvider;
// 屏障
private final SequenceBarrier sequenceBarrier;
// 用戶業(yè)務(wù)邏輯處理器
private final EventHandler<? super T> eventHandler;
// 消費(fèi)者sequence
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 超時(shí)處理器
private final TimeoutHandler timeoutHandler;
private final BatchStartAware batchStartAware;
主要屬性與WorkProcessor基本類似,查看其run()方法邏輯:
private void processEvents() {
T event = null;
// 從當(dāng)前sequence后移一個(gè)進(jìn)行消費(fèi)
long nextSequence = sequence.get() + 1L;
while (true) {
try {
// 獲取可用的sequence長度
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null && availableSequence >= nextSequence) {
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
// 如果當(dāng)前處理的sequence落后,就循環(huán)挨個(gè)處理
while (nextSequence <= availableSequence) {
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
// 處理完畢后進(jìn)行更新
sequence.set(availableSequence);
} catch (final TimeoutException e) {
notifyTimeout(sequence.get());
} catch (final AlertException ex) {
if (running.get() != RUNNING) {
break;
}
} catch (final Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence, event);
// 業(yè)務(wù)邏輯出現(xiàn)異常時(shí),若無新的異常拋出,則更新sequence
sequence.set(nextSequence);
nextSequence++;
}
}
}
從上面的代碼可以看出,假設(shè)Disruptor調(diào)用halt之后,則該批次數(shù)據(jù)仍會處理完畢,在新的一輪waitFor判斷中拋出AlertException異常;
關(guān)于消費(fèi)者線程池
此處把線程池單獨(dú)拿出來看是有一定原因的,現(xiàn)在Disruptor把顯示傳入線程池的構(gòu)造方法置為了@Deprecated,那么我們在使用時(shí)應(yīng)該注意什么呢?
從前文我們已經(jīng)大致分析過生產(chǎn)者和消費(fèi)者是如何協(xié)同的,回顧下結(jié)論:
- sequencer會顯示維護(hù)消費(fèi)最慢slot的下標(biāo);
- 生產(chǎn)者在發(fā)布事件時(shí)需要先調(diào)用next()進(jìn)行顯示申請或者占用;
- 消費(fèi)者線程在消費(fèi)時(shí)會主動(dòng)調(diào)用barrier#waitFor進(jìn)行判斷;
- 每個(gè)消費(fèi)者占用一個(gè)線程;
這里有一個(gè)死鎖問題,假設(shè)說消費(fèi)者個(gè)數(shù)為N,線程個(gè)數(shù)為M,其中N > M,從前面的代碼我們已經(jīng)分析過,消費(fèi)者線程的邏輯都是死循環(huán),那么很有可能出現(xiàn)饑餓消費(fèi)者,即無法被線程池調(diào)用,那么從生產(chǎn)者端來看,最慢slot一直未改變,從而導(dǎo)致生產(chǎn)者等待,而生產(chǎn)者等待又會促使消費(fèi)者waitFor方法無法通過,從而出現(xiàn)互相等待死鎖問題;那么Disruptor是如何進(jìn)行解決的呢,代碼如下:
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory) {
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}
...
// BaseExecutor
private final ThreadFactory factory;
private final Queue<Thread> threads = new ConcurrentLinkedQueue<>();
public BasicExecutor(ThreadFactory factory) {
this.factory = factory;
}
@Override
public void execute(Runnable command) {
final Thread thread = factory.newThread(command);
if (null == thread) {
throw new RuntimeException("Failed to create thread to run: " + command);
}
thread.start();
threads.add(thread);
}
代碼非常簡單,即創(chuàng)建足夠消費(fèi)者使用的線程數(shù)量進(jìn)行消費(fèi);
本文先寫到這里,下一篇文章對消費(fèi)者的等待策略進(jìn)行具體分析。