(一)初識Disruptor

通過這篇文章來記錄和分享對Disruptor的初步了解認(rèn)識Disruptor框架。

Disruptor是什么?

Disruptor是一個高性能的異步處理框架,或者可以認(rèn)為是線程間通信的高效低延時的內(nèi)存消息組件,它最大特點是高性能,其LMAX架構(gòu)可以獲得每秒6百萬訂單,用1微秒的延遲獲得吞吐量為100K+。
它是如何實現(xiàn)高性能的呢?它由于JDK內(nèi)置的隊列有什么區(qū)別呢?

JDK內(nèi)置內(nèi)存隊列?

我們知道,Java內(nèi)置了幾種內(nèi)存消息隊列,如下所示:

隊列 加鎖方式 是否有界 數(shù)據(jù)結(jié)構(gòu)
ArrayBlockingQueue 加鎖 有界 ArrayList
LinkedBlockingQueue 加鎖 無界 LinkedList
ConcurrentLinkedQueue CAS 無界 LinkedList
LinkedTransferQueue CAS 無界 LinkedList

我們知道CAS算法比通過加鎖實現(xiàn)同步性能高很多,而上表可以看出基于CAS實現(xiàn)的隊列都是無界的,而有界隊列是通過同步實現(xiàn)的。在系統(tǒng)穩(wěn)定性要求比較高的場景下,為了防止生產(chǎn)者速度過快,如果采用無界隊列會最終導(dǎo)致內(nèi)存溢出,只能選擇有界隊列。
而有界隊列只有ArrayBlockingQueue,該隊列是通過加鎖實現(xiàn)的,在請求鎖和釋放鎖時對性能開銷很大,這時候基于有界隊列的高性能的Disruptor就應(yīng)運而生。

Disruptor如何實現(xiàn)高性能?

Disruptor實現(xiàn)高性能主要體現(xiàn)了去掉了鎖,采用CAS算法,同時內(nèi)部通過環(huán)形隊列實現(xiàn)有界隊列。

  • 環(huán)形數(shù)據(jù)結(jié)構(gòu)
    為了避免垃圾回收,采用數(shù)組而非鏈表。同時,數(shù)組對處理器的緩存機制更加友好。
  • 元素位置定位
    數(shù)組長度2^n,通過位運算,加快定位的速度。下標(biāo)采取遞增的形式。不用擔(dān)心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。
  • 無鎖設(shè)計
    每個生產(chǎn)者或者消費者線程,會先申請可以操作的元素在數(shù)組中的位置,申請到之后,直接在該位置寫入或者讀取數(shù)據(jù)。整個過程通過原子變量CAS,保證操作的線程安全。

Disruptor可以用來做什么?

當(dāng)前業(yè)界開源組件使用Disruptor的包括Log4j2、Apache Storm等,它可以用來作為高性能的有界內(nèi)存隊列,基于生產(chǎn)者消費者模式,實現(xiàn)一個/多個生產(chǎn)者對應(yīng)多個消費者。它也可以認(rèn)為是觀察者模式的一種實現(xiàn),或者發(fā)布訂閱模式。


生產(chǎn)者消費者.png

同時,Disruptor還允許開發(fā)者使用多線程技術(shù)去創(chuàng)建基于任務(wù)的工作流。Disruptor能用來并行創(chuàng)建任務(wù),同時保證多個處理過程的有序性,并且它是沒有鎖的。

工作流.png

為什么要使用Disruptor?

使用Disruptor,主要用于對性能要求高、延遲低的場景,它通過“榨干”機器的性能來換取處理的高性能。如果你的項目有對性能要求高,對延遲要求低的需求,并且需要一個無鎖的有界隊列,來實現(xiàn)生產(chǎn)者/消費者模式,那么Disruptor是你的不二選擇。

怎么用Disruptor?

要學(xué)會基于Disruptor進(jìn)行編程,我們先了解下大概流程示意圖,其中綠色部分是表示我們需要編寫和實現(xiàn)的類。


Disruptor執(zhí)行簡圖(2).png

下面我們實現(xiàn)一個簡單的用例,生產(chǎn)者負(fù)責(zé)將輸入的字符串輸出到隊列,消費者負(fù)責(zé)打印出來。

public class DisruptorTest {
    /**
     * 消息事件類
     */
    public static class MessageEvent{
        /**
         * 原始消息
         */
        private String message;

        public String getMessage() {
            return message;
        }

        public void setMessage(String message) {
            this.message = message;
        }
    }

    /**
     * 消息事件工廠類
     */
    public static class MessageEventFactory implements EventFactory<MessageEvent>{
        @Override
        public MessageEvent newInstance() {
            return new MessageEvent();
        }
    }

    /**
     * 消息轉(zhuǎn)換類,負(fù)責(zé)將消息轉(zhuǎn)換為事件
     */
    public static class MessageEventTranslator implements EventTranslatorOneArg<MessageEvent,String> {
        @Override
        public void translateTo(MessageEvent messageEvent, long l, String s) {
            messageEvent.setMessage(s);
        }
    }

    /**
     * 消費者線程工廠類
     */
    public static class MessageThreadFactory implements ThreadFactory{
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r,"Simple Disruptor Test Thread");
        }
    }

    /**
     * 消息事件處理類,這里只打印消息
     */
    public static class MessageEventHandler implements EventHandler<MessageEvent>{
        @Override
        public void onEvent(MessageEvent messageEvent, long l, boolean b) throws Exception {
            System.out.println(messageEvent.getMessage());
        }
    }

    /**
     * 異常處理類
     */
    public static class MessageExceptionHandler implements ExceptionHandler<MessageEvent>{
        @Override
        public void handleEventException(Throwable ex, long sequence, MessageEvent event) {
            ex.printStackTrace();
        }

        @Override
        public void handleOnStartException(Throwable ex) {
            ex.printStackTrace();

        }

        @Override
        public void handleOnShutdownException(Throwable ex) {
            ex.printStackTrace();

        }
    }

    /**
     * 消息生產(chǎn)者類
     */
    public static class MessageEventProducer{
        private RingBuffer<MessageEvent> ringBuffer;

        public MessageEventProducer(RingBuffer<MessageEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }

        /**
         * 將接收到的消息輸出到ringBuffer
         * @param message
         */
        public void onData(String message){
            EventTranslatorOneArg<MessageEvent,String> translator = new MessageEventTranslator();
            ringBuffer.publishEvent(translator,message);
        }
    }

    public static void main(String[] args) {
        String message = "Hello Disruptor!";
        int ringBufferSize = 1024;//必須是2的N次方
        Disruptor<MessageEvent> disruptor = new Disruptor<MessageEvent>(new MessageEventFactory(),ringBufferSize,new MessageThreadFactory(),ProducerType.SINGLE,new BlockingWaitStrategy());
        disruptor.handleEventsWith(new MessageEventHandler());
        disruptor.setDefaultExceptionHandler(new MessageExceptionHandler());
        RingBuffer<MessageEvent> ringBuffer = disruptor.start();
        MessageEventProducer producer = new MessageEventProducer(ringBuffer);
        producer.onData(message);
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容