Disruptor入門

需求

生產(chǎn)者傳遞一個long類型的值給消費者,而消費者消費這個數(shù)據(jù)的方式僅僅是把它打印出來。

Event

聲明一個Event來包含需要傳遞的數(shù)據(jù)

public class LongEvent { 
    private long value;
    public long getValue() { 
        return value; 
    } 
 
    public void setValue(long value) { 
        this.value = value; 
    } 
} 

還需要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中存儲的數(shù)據(jù)打印到終端:

public class LongEventHandler implements EventHandler<LongEvent> { 
    @Override 
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { 
        System.out.println(longEvent.getValue()); 
    } 
} 

事件生產(chǎn)者

public class LongEventProducer { 
    private final RingBuffer<LongEvent> ringBuffer;
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { 
        this.ringBuffer = ringBuffer; 
    } 
 
    /** 
     * onData用來發(fā)布事件,每調(diào)用一次就發(fā)布一次事件 
     * 它的參數(shù)會通過事件傳遞給消費者 
     * 
     * @param bb 
     */public void onData(ByteBuffer bb) { 
            //可以把ringBuffer看做一個事件隊列,那么next就是得到下面一個事件槽
            long sequence = ringBuffer.next();try { 
            //用上面的索引取出一個空的事件用于填充 
            LongEvent event = ringBuffer.get(sequence);// for the sequence 
            event.setValue(bb.getLong(0)); 
        } finally { 
            //發(fā)布事件 
            ringBuffer.publish(sequence); 
        } 
    } 
} 

發(fā)布事件最少需要兩步:獲取下一個事件槽并發(fā)布事件(發(fā)布事件的時候要使用try/finnally保證事件一定會被發(fā)布)。如果我們使用RingBuffer.next()獲取一個事件槽,那么一定要發(fā)布對應的事件。如果不能發(fā)布事件,那么就會引起Disruptor狀態(tài)的混亂。尤其是在多個事件生產(chǎn)者的情況下會導致事件消費者失速,從而不得不重啟應用才能會恢復。

事件處理系統(tǒng)

public class LongEventMain { 
    public static void main(String[] args) throws InterruptedException { 
        // Executor that will be used to construct new threads for consumers 
        Executor executor = Executors.newCachedThreadPool();
        // The factory for the event 
        LongEventFactory factory = new LongEventFactory();
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;
        // Construct the Disruptor 
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor);
        // Connect the handler 
        disruptor.handleEventsWith(new LongEventHandler());
        // Start the Disruptor, starts all threads running 
        disruptor.start();
        // Get the ring buffer from the Disruptor to be used for publishing. 
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 
 
        LongEventProducer producer = new LongEventProducer(ringBuffer); 
 
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) { 
            bb.putLong(0, l); 
            producer.onData(bb); 
            Thread.sleep(1000); 
        } 
    } 
} 

Disruptor 3.0寫法

事件生產(chǎn)者

public class LongEventProducerWithTranslator { 
    //一個translator可以看做一個事件初始化器,publicEvent方法會調(diào)用它
    //填充Event
    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = 
            new EventTranslatorOneArg<LongEvent, ByteBuffer>() { 
                public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { 
                    event.setValue(bb.getLong(0)); 
                } 
            };
    private final RingBuffer<LongEvent> ringBuffer;
    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { 
        this.ringBuffer = ringBuffer; 
    } 
 
    public void onData(ByteBuffer bb) { 
        ringBuffer.publishEvent(TRANSLATOR, bb); 
    } 
} 

Disruptor提供了不同的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, 等等)去產(chǎn)生一個Translator對象。很明顯,Translator中方法的參數(shù)是通過RingBuffer來傳遞的。

使用Java 8

Disruptor在自己的接口里面添加了對于Java 8 Lambda的支持。大部分Disruptor中的接口都符合Functional Interface的要求(也就是在接口中僅僅有一個方法)。所以在Disruptor中,可以廣泛使用Lambda來代替自定義類。

public class LongEventMainJava8 { 
    /** 
     * 用lambda表達式來注冊EventHandler和EventProductor 
     * @param args 
     * @throws InterruptedException 
     */public static void main(String[] args) throws InterruptedException { 
        // Executor that will be used to construct new threads for consumers 
        Executor executor = Executors.newCachedThreadPool();
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;// Construct the Disruptor 
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
        // 可以使用lambda來注冊一個EventHandler 
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.getValue()));
        // Start the Disruptor, starts all threads running 
        disruptor.start();
        // Get the ring buffer from the Disruptor to be used for publishing. 
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 
 
        LongEventProducer producer = new LongEventProducer(ringBuffer); 
 
        ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++) { 
            bb.putLong(0, l); 
            ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)), bb); 
            Thread.sleep(1000); 
        } 
    } 
} 

單或多 事件生產(chǎn)者

Disruptor默認情況下是多生產(chǎn)者
在并發(fā)系統(tǒng)中提高性能最好的方式之一就是單一寫者原則,對Disruptor也是適用的。如果在你的代碼中僅僅有一個事件生產(chǎn)者,那么可以設置為單一生產(chǎn)者模式來提高系統(tǒng)的性能。

public class singleProductorLongEventMain { 
    public static void main(String[] args) throws Exception { 
        //.....// Construct the Disruptor with a SingleProducerSequencer 
 
        Disruptor<LongEvent> disruptor = new Disruptor(factory, 
                bufferSize, 
                // Single producernew 
                ProducerType.SINGLE, BlockingWaitStrategy(), 
                executor);//..... 
    } 
} 

可選的等待策略

Disruptor默認的等待策略是BlockingWaitStrategy。這個策略的內(nèi)部適用一個鎖和條件變量來控制線程的執(zhí)行和等待(Java基本的同步方法)。BlockingWaitStrategy是最慢的等待策略,但也是CPU使用率最低和最穩(wěn)定的選項。然而,可以根據(jù)不同的部署環(huán)境調(diào)整選項以提高性能。

  • SleepingWaitStrategy

和BlockingWaitStrategy一樣,SpleepingWaitStrategy的CPU使用率也比較低。它的方式是循環(huán)等待并且在循環(huán)中間調(diào)用LockSupport.parkNanos(1)來睡眠,(在Linux系統(tǒng)上面睡眠時間60μs).然而,它的優(yōu)點在于生產(chǎn)線程只需要計數(shù),而不執(zhí)行任何指令。并且沒有條件變量的消耗。但是,事件對象從生產(chǎn)者到消費者傳遞的延遲變大了。SleepingWaitStrategy最好用在不需要低延遲,而且事件發(fā)布對于生產(chǎn)者的影響比較小的情況下。比如異步日志功能。

  • YieldingWaitStrategy

YieldingWaitStrategy是可以被用在低延遲系統(tǒng)中的兩個策略之一,這種策略在減低系統(tǒng)延遲的同時也會增加CPU運算量。YieldingWaitStrategy策略會循環(huán)等待sequence增加到合適的值。循環(huán)中調(diào)用Thread.yield()允許其他準備好的線程執(zhí)行。如果需要高性能而且事件消費者線程比邏輯內(nèi)核少的時候,推薦使用YieldingWaitStrategy策略。例如:在開啟超線程的時候。

  • BusySpinWaitStrategy

BusySpinWaitStrategy是性能最高的等待策略,同時也是對部署環(huán)境要求最高的策略。這個性能最好用在事件處理線程比物理內(nèi)核數(shù)目還要小的時候。例如:在禁用超線程技術的時候。

參考

Disruptor入門

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容