Disruptor深度解析-消費(fèi)者Consumer

前言

上一篇文章介紹了RingBuffer的基本信息,本文將對Disruptor的消費(fèi)者進(jìn)行進(jìn)一步的解析,并對其中可能存在的坑點(diǎn)進(jìn)行分析;

消費(fèi)者繼承體系


從接口體系上來看,消費(fèi)者主要分為Work和Event兩種類型,這兩種類型的差別如下:

  • 同一層次的WorkProcessor只有一個(gè)可以處理成功RingBuffer的事件,類似消息體系中的點(diǎn)對點(diǎn)模式;
  • 同一層次的BatchEventProcessor并行處理(每一個(gè)消費(fèi)者)成功RingBuffer事件,類似消息體系中的Topic模式

這兩種實(shí)現(xiàn)上差別到底在哪里呢,我們進(jìn)一步進(jìn)行分析。從接口上,我們可以看到它們都繼承了Runnable,所以聯(lián)想到Disruptor在使用時(shí)生成的Executor,我們可以猜測不同的消費(fèi)者都是在不同的線程中進(jìn)行處理的,我們直接對其進(jìn)行分析。

WorkProcessor

主要屬性:

    // Processor是否已經(jīng)開始運(yùn)行
    private final AtomicBoolean running = new AtomicBoolean(false);
    // 序列,初始時(shí)為-1
    private final Sequence        sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    // 持有的RingBuffer引用
    private final RingBuffer<T>   ringBuffer;
    // Processor上游的序列屏障
    private final SequenceBarrier sequenceBarrier;
    // 用戶自定義的業(yè)務(wù)邏輯處理器
    private final WorkHandler<? super T>      workHandler;
    // 異常處理器
    private final ExceptionHandler<? super T> exceptionHandler;
    // 消費(fèi)的sequence
    private final Sequence                    workSequence;
    
    private final EventReleaser eventReleaser = new EventReleaser() {
        @Override
        public void release() {
            sequence.set(Long.MAX_VALUE);
        }
    };
    // 超時(shí)處理器
    private final TimeoutHandler timeoutHandler;

上述關(guān)鍵屬性分析下來,主要有sequence和workSequence兩個(gè)屬性有一定的迷惑性,探究下它們是如何來的:

// 構(gòu)造方法
public WorkProcessor(
            final RingBuffer<T> ringBuffer,
            final SequenceBarrier sequenceBarrier,
            final WorkHandler<? super T> workHandler,
            final ExceptionHandler<? super T> exceptionHandler,
            final Sequence workSequence) {
        ...
        // 上游傳遞進(jìn)來的
        this.workSequence = workSequence;
        ...
    }

繼續(xù)往上翻:

public WorkerPool(
            final RingBuffer<T> ringBuffer,
            final SequenceBarrier sequenceBarrier,
            final ExceptionHandler<? super T> exceptionHandler,
            final WorkHandler<? super T>... workHandlers) {
        this.ringBuffer = ringBuffer;
        final int numWorkers = workHandlers.length;
        workProcessors = new WorkProcessor[numWorkers];
        // 循環(huán)構(gòu)造processor,共享workSequence
        for (int i = 0; i < numWorkers; i++) {
            workProcessors[i] = new WorkProcessor<>(
                    ringBuffer,
                    sequenceBarrier,
                    workHandlers[i],
                    exceptionHandler,
                    workSequence);
        }
    }

可以發(fā)現(xiàn),同一WorkPool的processor共享同一個(gè)sequence,因此其實(shí)所謂的只有一個(gè)能夠消費(fèi)成功本質(zhì)上依靠的就是同一個(gè)sequence(volatile語義)。
在Disruptor的使用中,我們一般需要手動(dòng)調(diào)用下Disruptor#start()方法來啟動(dòng)整個(gè)框架:

  public RingBuffer<T> start(final Executor executor) {
        if (!started.compareAndSet(false, true)) {
            throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
        }
        // 生產(chǎn)者cursor,初始時(shí)為-1
        final long cursor = ringBuffer.getCursor();
        workSequence.set(cursor);

        for (WorkProcessor<?> processor : workProcessors) {
            processor.getSequence().set(cursor);
            executor.execute(processor);
        }

        return ringBuffer;
    }

所以由此判斷,框架啟動(dòng)時(shí)workSequence和sequence的值都與生產(chǎn)者保持一致即-1,此時(shí)我們回到WorkProcessor的run()方法。

public void run() {
        // 判斷是否重復(fù)啟動(dòng)
        if (!running.compareAndSet(false, true)) {
            throw new IllegalStateException("Thread is already running");
        }
        // 清除警告狀態(tài)
        sequenceBarrier.clearAlert();
        // 如果有實(shí)現(xiàn)LifecycleAware接口,回調(diào)其onStart()邏輯
        notifyStart();  

        // 上一個(gè)sequence的slot是否處理成功,剛開始為-1,所以默認(rèn)處理成功
        boolean processedSequence = true;
        // 緩存的可用sequence的下標(biāo)
        long cachedAvailableSequence = Long.MIN_VALUE;
        // 下一個(gè)待處理的sequence
        long nextSequence = sequence.get();
        T event = null;
        while (true) {
            try {
                // 如果上一個(gè)處理成功,則更新workSequence的值
                if (processedSequence) {
                    processedSequence = false;
                    do {
                        // 從當(dāng)前workSequence的后移1位,為新的需要處理的序列
                        nextSequence = workSequence.get() + 1L;
                        // sequence存儲該processor已經(jīng)處理成功的序列最大值,初始時(shí)為-1;
                        sequence.set(nextSequence - 1L);
                    }
                    // workSequence所有processor線程共享同一變量
                    // 假設(shè)此時(shí)發(fā)生并發(fā)問題,由于其它線程已經(jīng)處理成功,那么此處更新失敗,則下次nextSequence可能獲取到跳躍的值
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }
                // 邏輯走到這里,可以確定一個(gè)事情:sequence的值=workSequence的值-1,所以這里workSequence
                // 本質(zhì)上代表了該P(yáng)rocessor成功搶占成功可以處理的sequence數(shù)據(jù)
                // cachedAvailableSequence為等待之后下一個(gè)可用的序列
                if (cachedAvailableSequence >= nextSequence) {
                    // 獲取workSequence對應(yīng)的slot數(shù)據(jù)
                    event = ringBuffer.get(nextSequence);
                    // 調(diào)用用戶邏輯進(jìn)行處理
                    workHandler.onEvent(event);
                    // 處理成功,更新標(biāo)記為true
                    processedSequence = true;
                } else {
                    // 說明當(dāng)前processor搶占到的sequence可用數(shù)據(jù)超前,需要判斷該sequence數(shù)據(jù)是否可用
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            } catch (final TimeoutException e) {
                // 調(diào)用對應(yīng)的TimeoutHandler進(jìn)行處理
                notifyTimeout(sequence.get());
            } catch (final AlertException ex) {
                // 當(dāng)發(fā)生警告通知時(shí),如果該線程狀態(tài)已經(jīng)被暫停,則直接中斷
                if (!running.get()) {
                    break;
                }
            } catch (final Throwable ex) {
                // 處理異常,將該sequence標(biāo)識為處理成功  !important
                exceptionHandler.handleEventException(ex, nextSequence, event);
                processedSequence = true;
            }
        }

        notifyShutdown();

        running.set(false);
    }

從run()的代碼邏輯可以看出,不同的work消費(fèi)者都是通過CAS來搶占需要處理的slot數(shù)據(jù),每個(gè)processor維護(hù)了自身已經(jīng)處理成功的sequence以及大家公共持有的workSequence,同時(shí)該processor是否可以處理sequence是由barrier來維護(hù)的。有一點(diǎn)需要注意,在exceptionHandler.handleEventException(ex, nextSequence, event);中,默認(rèn)的異常處理器為FatalExceptionHandler,其打印日志結(jié)束后會拋出RuntimeException,從而導(dǎo)致消費(fèi)者線程中斷,所以在實(shí)際使用中一定要實(shí)現(xiàn)業(yè)務(wù)自己的ExceptionHandler或者在WorkHandler中自己處理異常;

那么這里每個(gè)processor自身的sequence有什么作用?
思考下如下場景,假設(shè)現(xiàn)在有10個(gè)消費(fèi)者,現(xiàn)在有10個(gè)slot需要處理,那么極端情況下可能workSequence被更新到了10,但是可能最小的sequence此時(shí)為0,說明第一個(gè)申請sequence成功的線程還未處理完畢,那么這整批消費(fèi)者(一個(gè)WorkPool)最慢的下標(biāo)其實(shí)就是0,一旦其處理成功,則sequence就有可能被更新為10;所以這里sequence的集合其實(shí)就是用來標(biāo)識整個(gè)消費(fèi)者中最慢的進(jìn)度;

關(guān)于waitFor

上面提到了消費(fèi)者能否處理sequence是由SequenceBarrier#waitFor來決定的,下面探究下該方法的實(shí)現(xiàn)機(jī)制。

public long waitFor(final long sequence)
            throws AlertException, InterruptedException, TimeoutException {
        checkAlert();
        // 轉(zhuǎn)移職責(zé),交由waitStrategy處理
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence) {
            return availableSequence;
        }
        // 該方法在上一期已經(jīng)分析過,此處不再分析
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
...
// 以BlockingWaitStrategy為例
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
            throws AlertException, InterruptedException {
        long availableSequence;
        // 要消費(fèi)的sequence超過了生產(chǎn)者的sequence,則消費(fèi)者等待;
        if (cursorSequence.get() < sequence) {
            synchronized (mutex) {
                while (cursorSequence.get() < sequence) {
                    barrier.checkAlert();
                    mutex.wait();
                }
            }
        }
        // 該processor要消費(fèi)的sequence超過了上游processor的最小值,自旋等待
        while ((availableSequence = dependentSequence.get()) < sequence) {
            barrier.checkAlert();
            // 嘗試尋找實(shí)現(xiàn)了onSpinWait的靜態(tài)方法Thread進(jìn)行調(diào)用
            ThreadHints.onSpinWait();
        }
        // 返回上游最小sequence
        return availableSequence;
}

BatchEventProcessor

主要屬性

    // 空閑狀態(tài)
    private static final int IDLE    = 0;
    // 暫停狀態(tài)
    private static final int HALTED  = IDLE + 1;
    // 運(yùn)行狀態(tài)
    private static final int RUNNING = HALTED + 1;
    // 實(shí)際運(yùn)行狀態(tài)
    private final AtomicInteger               running          = new AtomicInteger(IDLE);
    // 異常處理器
    private       ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
    // RingBuffer
    private final DataProvider<T>             dataProvider;
    // 屏障
    private final SequenceBarrier             sequenceBarrier;
    // 用戶業(yè)務(wù)邏輯處理器
    private final EventHandler<? super T>     eventHandler;
    // 消費(fèi)者sequence
    private final Sequence                    sequence         = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    // 超時(shí)處理器
    private final TimeoutHandler              timeoutHandler;
    private final BatchStartAware             batchStartAware;

主要屬性與WorkProcessor基本類似,查看其run()方法邏輯:

private void processEvents() {
        T event = null;
        // 從當(dāng)前sequence后移一個(gè)進(jìn)行消費(fèi)
        long nextSequence = sequence.get() + 1L;

        while (true) {
            try {
                // 獲取可用的sequence長度
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                if (batchStartAware != null && availableSequence >= nextSequence) {
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                }
                // 如果當(dāng)前處理的sequence落后,就循環(huán)挨個(gè)處理
                while (nextSequence <= availableSequence) {
                    event = dataProvider.get(nextSequence);
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }
                // 處理完畢后進(jìn)行更新
                sequence.set(availableSequence);
            } catch (final TimeoutException e) {
                notifyTimeout(sequence.get());
            } catch (final AlertException ex) {
                if (running.get() != RUNNING) {
                    break;
                }
            } catch (final Throwable ex) {
                exceptionHandler.handleEventException(ex, nextSequence, event);  
                // 業(yè)務(wù)邏輯出現(xiàn)異常時(shí),若無新的異常拋出,則更新sequence
                sequence.set(nextSequence);
                nextSequence++;
            }
        }
}

從上面的代碼可以看出,假設(shè)Disruptor調(diào)用halt之后,則該批次數(shù)據(jù)仍會處理完畢,在新的一輪waitFor判斷中拋出AlertException異常;

關(guān)于消費(fèi)者線程池

此處把線程池單獨(dú)拿出來看是有一定原因的,現(xiàn)在Disruptor把顯示傳入線程池的構(gòu)造方法置為了@Deprecated,那么我們在使用時(shí)應(yīng)該注意什么呢?
從前文我們已經(jīng)大致分析過生產(chǎn)者和消費(fèi)者是如何協(xié)同的,回顧下結(jié)論:

  • sequencer會顯示維護(hù)消費(fèi)最慢slot的下標(biāo);
  • 生產(chǎn)者在發(fā)布事件時(shí)需要先調(diào)用next()進(jìn)行顯示申請或者占用;
  • 消費(fèi)者線程在消費(fèi)時(shí)會主動(dòng)調(diào)用barrier#waitFor進(jìn)行判斷;
  • 每個(gè)消費(fèi)者占用一個(gè)線程;

這里有一個(gè)死鎖問題,假設(shè)說消費(fèi)者個(gè)數(shù)為N,線程個(gè)數(shù)為M,其中N > M,從前面的代碼我們已經(jīng)分析過,消費(fèi)者線程的邏輯都是死循環(huán),那么很有可能出現(xiàn)饑餓消費(fèi)者,即無法被線程池調(diào)用,那么從生產(chǎn)者端來看,最慢slot一直未改變,從而導(dǎo)致生產(chǎn)者等待,而生產(chǎn)者等待又會促使消費(fèi)者waitFor方法無法通過,從而出現(xiàn)互相等待死鎖問題;那么Disruptor是如何進(jìn)行解決的呢,代碼如下:

public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory) {
        this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}
...
// BaseExecutor
private final ThreadFactory factory;
private final Queue<Thread> threads = new ConcurrentLinkedQueue<>();

public BasicExecutor(ThreadFactory factory) {
  this.factory = factory;
}

@Override
public void execute(Runnable command) {
  final Thread thread = factory.newThread(command);
  if (null == thread) {
    throw new RuntimeException("Failed to create thread to run: " + command);
  }

  thread.start();

  threads.add(thread);
}

代碼非常簡單,即創(chuàng)建足夠消費(fèi)者使用的線程數(shù)量進(jìn)行消費(fèi);
本文先寫到這里,下一篇文章對消費(fèi)者的等待策略進(jìn)行具體分析。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容