Disruptor核心源碼分析

Disruptor核心源碼分析

說來慚愧,Log4j2的異步日志已經(jīng)用了將近2年時間了。但是每次想看Disruptor源碼的時候,總是沒能堅持下去。這次通過一次生產(chǎn)環(huán)境的故障,堅定了看源碼的決心。

從何說起

在閱讀這篇文章之前,需要你具備一些對Disruptor的基本了解。如果你對它
還一無所知,希望你先通過下面的文章來入個門。

http://ifeve.com/dissecting-disruptor-whats-so-special/
http://ifeve.com/dissecting_the_disruptor_how_doi_read_from_the_ring_buffer/
http://ifeve.com/disruptor-writing-ringbuffer/

上面幾篇文章對Disruptor總體流程的講解還是比較清楚的,如果你看完仍然不是特別理解,沒關(guān)系,也可以繼續(xù)往下看,畢竟talk is cheap,下面會借助code把Disruptor的工作原理闡述清楚。本篇文章不會涉及”為什么Disruptor這么快“這個主題,而把重點放在理解它的工作流程以及熟悉源代碼上。

最簡單的Demo

我們先通過一個最簡單的Demo來感受一下Disruptor的工作流程

// 事件數(shù)據(jù)結(jié)構(gòu) StringEvent
@Data
@NoArgsConstructor
public class StringEvent {
    private String value;
}
    public static void main(String[] args) throws InterruptedException {
        Disruptor<StringEvent> disruptor = new Disruptor<>(StringEvent::new, 1024,
                DaemonThreadFactory.INSTANCE);

        disruptor.handleEventsWith(
                (EventHandler<StringEvent>) (event, sequence, endOfBatch) -> System.out
                        .println(event));

        disruptor.start();

        disruptor.publishEvent((event, sequence) -> event.setValue("changed"));
        
        // sleep一下 讓消費者可以執(zhí)行到 因為消費線程是守護線程
        Thread.sleep(1000);
    }

寥寥幾行代碼,就展示了一個完整的過程。我們來看看每一步具體做了什么:

第一步——創(chuàng)建Disruptor

創(chuàng)建Disruptor,Demo中采用的是參數(shù)較少的構(gòu)造方法,實際上完整的參數(shù)列表還包括producerTypewaitStrategy

    public Disruptor(
            final EventFactory<T> eventFactory,
            final int ringBufferSize,
            final ThreadFactory threadFactory,
            final ProducerType producerType,
            final WaitStrategy waitStrategy)
    {
        this(
            RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
            new BasicExecutor(threadFactory));
    }
    
    private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
    {
        this.ringBuffer = ringBuffer;
        this.executor = executor;
    }
  • eventFactory表示事件的構(gòu)造器
  • ringBufferSize表示RingBuffer的長度(容量)
  • threadFactory表示消費線程的創(chuàng)建工廠
  • producerType表示是單生產(chǎn)者模式還是多生產(chǎn)者模式(默認是MULTI
  • waitStrategy表示當RingBuffer中沒有可消費的Event時消費者的等待策略(默認是BlockingWaitStrategy

可以看到,通過上面5個參數(shù)構(gòu)造出了一個RingBuffer和一個Executor,而這兩個組件構(gòu)成了一個Disruptor。這里的RingBuffer除了存儲事件的職能(DataProvider)還承擔著申請sequence和publish event的職能。Executor作為消費者線程池,主要是運行消費邏輯的。因此可以說,Disruptor串聯(lián)起了生產(chǎn)者、消費者以及RingBuffer

創(chuàng)建RingBuffer

RingBuffer是個重點,因為它不止是存儲,還干了很多活。所謂能者多勞,也更值得我們研究。我們先看下創(chuàng)建它的靜態(tài)方法:

    public static <E> RingBuffer<E> create(
        ProducerType producerType,
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        switch (producerType)
        {
            case SINGLE:
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
        }
    }

根據(jù)生產(chǎn)者類型的不同,存在兩種類型的RingBuffer:單生產(chǎn)者類型和多生產(chǎn)者類型。為了更容易理解,我們這里先看Single類型的,也就是單生產(chǎn)者類型的RingBuffer

    // class RingBuffer
    public static <E> RingBuffer<E> createSingleProducer(
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<E>(factory, sequencer);
    }

可以看到,這里通過一個EventFactory和一個SingleProducerSequencer構(gòu)造了一個RingBuffer。前者是用來創(chuàng)建事件對象的,而后者可以理解成RingBuffer的"幫手",RingBuffer委托Sequencer來處理一些非存儲類的工作(比如申請sequence,維護sequence進度,發(fā)布事件等)。

我們接著跟進去看看RingBuffer的構(gòu)造函數(shù):

    // class RingBuffer
    RingBuffer(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
        super(eventFactory, sequencer);
    }
    
    // class RingBufferFields
    RingBufferFields(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
        // “幫手”,主要用來處理sequence申請、維護以及發(fā)布等工作
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();

        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        // indexMask主要是為了使用位運算取模的,很多源碼里都能看到這類優(yōu)化
        this.indexMask = bufferSize - 1;
        // 可以看到這個數(shù)組除了正常的size之外還有填充的元素,這個是為了解決false sharing的,本篇文章暫不展開
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        // 預先填充數(shù)組元素,這對垃圾回收很優(yōu)化,后續(xù)發(fā)布事件等操作都不需要創(chuàng)建對象,而只需要即可
        fill(eventFactory);
    }
    
    // class RingBufferFields
    private void fill(EventFactory<E> eventFactory)
    {
        for (int i = 0; i < bufferSize; i++)
        {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }

RingBuffer的構(gòu)造函數(shù)還是比較清晰的,跟著上面的注釋應該就可以理解。除了Buffer_PAD,這個我們留到后續(xù)文章中再去詳細的講解。

創(chuàng)建BasicExecutor

BasicExecutor實現(xiàn)了java.util.concurrent.Executor接口,通過單參數(shù)ThreadFactory函數(shù)構(gòu)造:

public class BasicExecutor implements Executor{

    public BasicExecutor(ThreadFactory factory)
    {
        this.factory = factory;
    }
}

具體在哪里使用,我們后面會看到

第二步——注冊事件處理邏輯

disruptor.handleEventsWith(
                (EventHandler<StringEvent>) (event, sequence, endOfBatch) -> System.out
                        .println(event));

代碼非常容易理解,注冊了一個事件處理的回調(diào),并且可以注冊多個,其中回調(diào)里有三個參數(shù):

  • event表示消費到的本次事件的主體,在例子里也就是StringEvent
  • sequence表示消費到的本次事件對應的sequence
  • endOfBatch表示消費到的本次事件是否是這個批次中的最后一個

由于默認的消費處理器(BatchEventProcessor)是批量來處理事件的,所以會有批次的概念。怎么樣算一個批次呢,這個后面講BatchEveentProessor的時候會講到。DEMO里的消費邏輯很簡單,打印一下event就完事。下面看看handleEventsWith的源代碼:

    public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
    {
        // 注意,第一個參數(shù)恒為一個空數(shù)組
        return createEventProcessors(new Sequence[0], handlers);
    }

createEventProcessors方法的第一個入?yún)⒔凶?code>barrierSequences,是給存在依賴關(guān)系的消費者用的。由于走Disruptor實例調(diào)用handleEventsWith都是像上面一樣傳的是空數(shù)組,為了便于理解,可以先將它當成恒為空數(shù)組。

    EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences,
        final EventHandler<? super T>[] eventHandlers)
    {
        checkNotStarted();

        // 用來保存每個消費者的消費進度
        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        // SequenceBarrier主要是用來設(shè)置消費依賴的[詳解1]
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
        {
            final EventHandler<? super T> eventHandler = eventHandlers[i];

            // 可以看到每個eventHandler會被封裝成BatchEventProcessor,看名字就知道是批量處理的了吧
            final BatchEventProcessor<T> batchEventProcessor =
                new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);

            // 設(shè)置異常處理器
            if (exceptionHandler != null)
            {
                batchEventProcessor.setExceptionHandler(exceptionHandler);
            }

            // 注冊到consumerRepository[詳解2]
            consumerRepository.add(batchEventProcessor, eventHandler, barrier);
            // 每一個BatchEventProcessor的消費進度
            processorSequences[i] = batchEventProcessor.getSequence();
        }

        // 更新一些重要的東西[詳解3]
        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

        // 返回一個EventHandlerGroup,這個主要是為了DSL服務的,可以先不關(guān)心,可以看到DEMO中我們也沒有用到這個返回值
        return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
    }

上面采用了注釋來解釋代碼含義,我覺得這種形式可能更有助于在看代碼的過程中理解。不過注釋也有局限性,比如上面寫了詳解的幾處,這里我會詳細再跟進下源代碼:

詳解1——SequenceBarrier

SequenceBarrier主要是設(shè)置消費依賴的。比如某個消費者必須等它依賴的消費者消費完某個消息之后才可以消費該消息。當然此處是從Disruptor上直接創(chuàng)建消費組,sequencesToTrack都為空數(shù)組,所以只依賴于RingBuffer上的cursorSequence(也就是只要RingBuffer上寫(publish)到哪了,那么我就能消費到哪)

下面的代碼展示了通過RingBuffer創(chuàng)建SequenceBarrier的鏈路,發(fā)現(xiàn)最終創(chuàng)建的是ProcessingSequenceBarrier。并且在這條鏈路上,我們前面假定的sequencesToTrack(也就是dependentSequences)為空數(shù)組。那么根據(jù)上面的構(gòu)造函數(shù)得出dependentSequence = cursorSequence = cursor

    // class RingBuffer
    public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
    {
        return sequencer.newBarrier(sequencesToTrack);
    }
    
    // class AbstractSequencer
    public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
    {
        return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
    }
    
    // class ProcessingSequenceBarrier
    public ProcessingSequenceBarrier(
        final Sequencer sequencer,
        final WaitStrategy waitStrategy,
        final Sequence cursorSequence,
        final Sequence[] dependentSequences)
    {
        this.sequencer = sequencer;
        this.waitStrategy = waitStrategy;
        this.cursorSequence = cursorSequence;
        if (0 == dependentSequences.length)
        {
            dependentSequence = cursorSequence;
        }
        else
        {
            dependentSequence = new FixedSequenceGroup(dependentSequences);
        }
    }

這個cursor是什么呢?首先它是Sequencer的成員變量。而Sequencer有兩種:SingleProducerSequencerMultiProducerSequencer。對于SingleProducerSequencer來說,cursor表示的是RingBuffer上當前已發(fā)布的最大sequence,而對于MultiProducerSequencer來說,cursor表示的是RingBuffer上當前已申請的最大sequence。此處先有個概念即可,下面講完生產(chǎn)邏輯之后會詳細描述

詳解2——ConsumerRepository

    // class ConsumerRepository
    public void add(
        final EventProcessor eventprocessor,
        final EventHandler<? super T> handler,
        final SequenceBarrier barrier)
    {
        final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
        eventProcessorInfoByEventHandler.put(handler, consumerInfo);
        eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
        consumerInfos.add(consumerInfo);
    }

無論從類名還是方法體,都可以看出,這個對象主要是用來存儲消費者信息的,有兩個維度的Map。具體是哪里用,我們用到的時候再說好了~

詳解3——一些更新

    private void updateGatingSequencesForNextInChain(Sequence[] barrierSequences, Sequence[] processorSequences)
    {
        if (processorSequences.length > 0)
        {
            ringBuffer.addGatingSequences(processorSequences);
            for (final Sequence barrierSequence : barrierSequences)
            {
                ringBuffer.removeGatingSequence(barrierSequence);
            }
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }
    }

首先,我們要搞清楚這兩個入?yún)⒌囊饬x:

  • barrierSequences:依賴的消費進度
  • processorSequences:新進消費者的進度

其次,還要弄明白一個問題:在向RingBuffer寫入數(shù)據(jù)的時候,如何判定RingBuffer已滿(這個應該在前面入門那幾篇文章里要掌握的)?通過看最慢的消費者的消費進度是不是已經(jīng)被生產(chǎn)者拉了一圈了(類似1000米跑步的套圈)。

理解了上面兩個問題之后,再來看代碼總共做的三個事情:

  1. 把新進消費者的消費進度加入到【所有消費者的消費進度數(shù)組】中
  2. 如果說這個新進消費者是依賴了其他的消費者的,那么把其他的消費者從【所有消費者的消費進度數(shù)組】中移除。這里為什么要移除呢?因為【所有消費者的消費進度數(shù)組】主要是用來獲取最慢的進度的。那么被依賴的可以不用考慮,因為它不可能比依賴它的慢。并且讓這個數(shù)組足夠小,可以提升計算最慢進度的性能。
  3. 把被依賴的消費者的endOfChain屬性設(shè)置成false。這個endOfChain是用來干嘛的呢?其實主要是Disruptor在shutdown的時候需要判定是否所有消費者都已經(jīng)消費完了(如果依賴了別人的消費者都消費完了,那么整條鏈路上一定都消費完了)。

第三步——啟動Disruptor

    public RingBuffer<T> start()
    {
        checkOnlyStartedOnce();
        for (final ConsumerInfo consumerInfo : consumerRepository)
        {
            consumerInfo.start(executor);
        }

        return ringBuffer;
    }

這個consumerRepository是不是很熟悉?這是第二步詳解二里ConsumerInfo注冊的地方??梢钥吹絾?code>Disruptor其實就是在啟動消費線程:

    // class EventProcessorInfo
    public void start(final Executor executor)
    {
        // 這里對應的是BatchEventProcessor
        executor.execute(eventprocessor);
    }
    
    // class BasicExecutor
    public void execute(Runnable command)
    {
        final Thread thread = factory.newThread(command);
        if (null == thread)
        {
            throw new RuntimeException("Failed to create thread to run: " + command);
        }

        thread.start();

        threads.add(thread);
    }

那么消費線程的具體邏輯是?看看BatchEventProcessorrun()方法:

    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        T event = null;
        // 成員變量sequence維護該Processor的消費進度
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {
                    // 以nextSequence作為底線,去獲取最大的可用sequence(也就是已經(jīng)被publish的sequence)
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);

                    // 如果獲取到的sequence大于等于nextSequence,說明有可以消費的event,從nextSequence(包含)到availableSequence(包含)這一段的事件就作為同一個批次
                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        // 調(diào)用了前面注冊的回調(diào)函數(shù)
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }

                    // 消費完一批之后 一次性更新消費進度
                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    // waitFor超時的場景
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                
                    // 消費過程中如果拋出異常,表面上看會更新消費進度,也就是說沒有補償機制。但實際上默認的策略是會拋異常的,消費線程會直接結(jié)束掉
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            notifyShutdown();
            running.set(false);
        }
    }

sequenceBarrier.waitFor里的邏輯值得好好看一看,不過我覺得等看完發(fā)布事件的流程之后會更容易理解。

第四步——發(fā)布事件

這里使用的是EventTranslator的方式來發(fā)布事件的:

    // class Disruptor
    public void publishEvent(final EventTranslator<T> eventTranslator)
    {
        ringBuffer.publishEvent(eventTranslator);
    }
    
    // class RingBuffer
    public void publishEvent(EventTranslator<E> translator)
    {
        final long sequence = sequencer.next();
        translateAndPublish(translator, sequence);
    }
    
    private void translateAndPublish(EventTranslator<E> translator, long sequence)
    {
        try
        {
            translator.translateTo(get(sequence), sequence);
        }
        finally
        {
            sequencer.publish(sequence);
        }
    }

從上面的代碼結(jié)構(gòu)可以看出來,發(fā)布事件總共分為三個步驟:

  1. 申請sequence
  2. 填充事件內(nèi)容
  3. 提交發(fā)布

有點類似數(shù)據(jù)庫事務的味道。為了便于理解,我們還是以SingleProducerSequencer來分析下上面三個步驟:

申請sequence

    // class SingleProducerSequencer
    public long next()
    {
        return next(1);
    }
    
    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        // nextValue這個變量名有點詭異,實際上表示已經(jīng)申請到的那個sequence
        long nextValue = this.nextValue;

        // nextSequence表示本次需要申請的最大sequence
        long nextSequence = nextValue + n;
        // 計算出nextSequence在上一圈的點位
        long wrapPoint = nextSequence - bufferSize;
        // 最慢消費進度的緩存
        long cachedGatingSequence = this.cachedValue;
        // 下面這個條件表達式以及其代碼塊解釋起來可能需要比較大的篇幅,所以在下面[核心代碼詳解]里說明
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            cursor.setVolatile(nextValue);  // StoreLoad fence

            long minSequence;
            
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                // 通知下消費者
                waitStrategy.signalAllWhenBlocking();
                // 生產(chǎn)者如果沒有空間寫數(shù)據(jù)了,只能無限park
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }
            
            this.cachedValue = minSequence;
        }

        this.nextValue = nextSequence;

        return nextSequence;
    }
核心代碼詳解

首先,我們看下if表達式里的兩個條件:

  1. wrapPoint > cachedGatingSequence,結(jié)合我們對Disruptor的了解,此處判斷的應該是此次要申請的sequence是否已經(jīng)領(lǐng)先最慢消費進度一圈了(類似1000米跑步的套圈)

  2. cachedGatingSequence > nextValue判斷的是最慢消費進度超過了我們即將要申請的sequence,乍一看這應該是不可能的吧,都還沒申請到該sequence怎么可能消費到呢?找了些資料,發(fā)現(xiàn)確實是存在該場景的:RingBuffer提供了一個叫resetTo的方法,可以重置當前已申請sequence為一個指定值并publish出去:

    @Deprecated
    public void resetTo(long sequence)
    {
        sequencer.claim(sequence);
        sequencer.publish(sequence);
    }
    

    具體資料可參考:

    不過該代碼已經(jīng)標注為@Deprecated,按照作者的意思,后續(xù)是要刪掉的。那么在此處分析的時候,我們就將當它恒為false。

對于第一個條件表達式,也有個值得注意的地方:因為里面的【最慢消費進度】取的是緩存值(cached)。而這個緩存值是什么時候更新的呢?答案是只有在“套圈”了以后才會更新。這個邏輯你品,你細品,那么你會發(fā)現(xiàn),每申請RingBuffer.size()個sequence之后都會走進上面的“套圈”邏輯來更新cachedGatingSequence。這樣就極大的減少了Util.getMinimumSequence(gatingSequences, nextValue)的運算量

再來看看“套圈”時需要執(zhí)行的邏輯:

  1. 插入一個StoreLoad屏障,防止是因為內(nèi)存可見性導致的消費者消費不了數(shù)據(jù)(應該極少存在這樣的情況吧)
  2. 實時計算一下最慢消費進度Util.getMinimumSequence(gatingSequences, nextValue)
  3. 如果真的套圈了,那么就一直死循環(huán)直到RingBuffer上有空間可以申請
  4. 更新【最慢消費進度緩存】

注意,當消費者消費過慢時,可能會導致生產(chǎn)者無限park,這個在編程的時候要特別留意。

提交發(fā)布

填充事件內(nèi)容沒什么好說的,無非就是設(shè)值的過程。設(shè)值完成之后,就可以發(fā)布了:

    public void publish(long sequence)
    {
        cursor.set(sequence);
        waitStrategy.signalAllWhenBlocking();
    }

這里會把cursor的值設(shè)置為當前申請的sequence,代表序號為sequence的事件發(fā)布成功。這里的cursor表示已經(jīng)publish的最大事件序號(在多生產(chǎn)者模式中并不是),所以我們在使用過程中需要依次申請,依次發(fā)布,不能直接上來就publish(100),這樣會導致消費者會認為100以前的序號也都就緒了。另外,由于我們現(xiàn)在看的是單生產(chǎn)者模式,也不需要考慮并發(fā)場景。

sequenceBarrier.waitFor

看完了生產(chǎn)者的流程,我們來回顧下在消費者里這一句關(guān)鍵的代碼。前面有提到過ProcessingSequenceBarrier,帶著這一絲絲的印象我們來看看下面這段獲取availableSequence的邏輯:

    // class ProcessingSequenceBarrier
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();

        // waitStrategy派上用場了,這是我們在構(gòu)造Disruptor的時候的入?yún)ⅲㄒ彩菢?gòu)造RingBuffer的入?yún)ⅲ?        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        // 理論上沒有可能為true,因為當前每種waitStrategy內(nèi)都保證了availableSequence一定大于等于sequence
        if (availableSequence < sequence)
        {
            return availableSequence;
        }

        // 返回最大的已發(fā)布的sequence,在單生產(chǎn)者模式下這個函數(shù)返回值就等于availableSequence
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }

跟著上面的注釋,相信應該沒有什么理解上的難點,上面的代碼核心就兩步:

  1. 通過WaitStrategy.waitFor()獲取availableSequence,下面會分析具體的邏輯
  2. 通過sequencer來得到最大的已發(fā)布的sequence(HighestPublishedSequence)

WaitStrategy.waitFor

先看看第一步中的WaitStrategy.waitFor()方法,這里以BlockingWaitStrategy為例:

    // class BlockingWaitStrategy
    // 這里的四個入?yún)⑽覀冝垡晦?    // sequence:消費者想要消費的最小sequence(底線)
    // cursorSequence:Sequencer的cursor,也就是當前RingBuffer上已經(jīng)被申請的最大sequence(在講生產(chǎn)者邏輯的時候提到了)
    // dependentSequence:在我們當前鏈路為cursorSequence,不存在消費依賴(如果存在依賴的話,則為依賴消費者消費進度)
    // barrier:這個主要是用了其中一些中斷方法,不用太care
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        if (cursorSequence.get() < sequence)
        {
            lock.lock();
            try
            {
                while (cursorSequence.get() < sequence)
                {
                    barrier.checkAlert();
                    processorNotifyCondition.await();
                }
            }
            finally
            {
                lock.unlock();
            }
        }

        // 看到了吧,這里已經(jīng)保證了availableSequence必然大于等于sequence
        // 并且在存在依賴的場景中,被依賴消費者存在慢消費的話,會直接導致下游進入死循環(huán)
        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
        }

        return availableSequence;
    }

從上面的代碼能看到,WaitStrategy.waitFor()獲取的是依賴消費者的消費進度sequence(默認依賴RingBuffer上已申請進度的sequence)。需要注意的一點是,當消費者獲取可消費事件的過程中,存在兩種場景需要等待:

  1. RingBuffer上沒有事件可以消費
  2. RingBuffer上有可消費事件,但是依賴的消費者還未消費完該事件

如果是第一種場景,那么消費者會采用WaitStrategy的策略進行等待。而如果是第二種場景的話,只能如上所示一樣進入死循環(huán)(此時可能造成cpu升高)。

sequencer.getHighestPublishedSequence

WaitStrategy.waitFor()返回后,得到的是RingBuffer上已申請進度sequence或者是依賴消費者消費進度sequence(當然如果把cursorSequence也看成一種依賴的話,理解起來就統(tǒng)一了)。注意一個形容詞——“已申請”,而不是“已發(fā)布”,“已申請”意味著還不一定“已發(fā)布”,也就是還不能消費。所以,SequenceBarrier.waitFor最后還有一步sequencer.getHighestPublishedSequence(sequence, availableSequence)。

當然如果你很仔細的看到這里并且對于前面的內(nèi)容都理解了,你可能會產(chǎn)生疑問:對于單生產(chǎn)者來說,本來就是在publish的時候才更新cursor的啊?那上一步從WaitStrategy.waitFor()獲取到的不就是“已發(fā)布”的進度sequence嗎?是的,你說得很正確。對于單生產(chǎn)者確實如此,所以但生產(chǎn)者對應的實現(xiàn)為:

    public long getHighestPublishedSequence(long lowerBound, long availableSequence)
    {
        return availableSequence;
    }

而對于多生產(chǎn)者的話,邏輯就會相對復雜一點,這個我們下一篇文章再分析

總結(jié)

到這里,Disruptor核心的邏輯我們基本上看完了。我們介紹了Disruptor中的單生產(chǎn)者模式的生產(chǎn)邏輯以及默認的單線程批量消費邏輯。當然這只是最基本的模式,為了讓我們對Disruptor的邏輯和源代碼有一個整體的了解。后面的文章我們會涉及更多的場景,比如

  • 多生產(chǎn)者模式是如何工作的
  • 如何實現(xiàn)消費依賴
  • Log4j2是如何使用Disruptor的
  • Disruptor性能高的原因以及使用過程中的一些心得等

如文中有描述錯誤,還望指出,以便改正,多謝~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容