disruptor初步使用

筆者所有文章第一時(shí)間發(fā)布于:
hhbbz的個(gè)人博客

背景介紹

最近在作一個(gè)用于調(diào)用第三方服務(wù)以及充當(dāng)數(shù)據(jù)倉(cāng)儲(chǔ)的服務(wù),對(duì)于數(shù)據(jù)的存儲(chǔ)和獲取都要有很好的響應(yīng)。
整體架構(gòu)使用的是spring flux作接口層的異步調(diào)用,可以高效率的進(jìn)行數(shù)據(jù)獲?。皇褂肈isruptor作為內(nèi)存隊(duì)列,將存儲(chǔ)數(shù)據(jù)批量寫(xiě)入。

為什么選擇Disruptor

Disruptor是用于一個(gè)JVM中多個(gè)線(xiàn)程之間的消息隊(duì)列,作用與ArrayBlockingQueue有相似之處,但是Disruptor通過(guò)精巧的無(wú)鎖設(shè)計(jì)實(shí)現(xiàn)了在高并發(fā)情形下的高性能。從功能、性能都遠(yuǎn)好于ArrayBlockingQueue,當(dāng)多個(gè)線(xiàn)程之間傳遞大量數(shù)據(jù)或?qū)π阅芤筝^高時(shí),可以考慮使用Disruptor作為ArrayBlockingQueue的替代者。

為什么Disruptor會(huì)那么快

Disruptor通過(guò)以下設(shè)計(jì)來(lái)解決隊(duì)列速度慢的問(wèn)題: - 環(huán)形數(shù)組結(jié)構(gòu)

為了避免垃圾回收,采用數(shù)組而非鏈表。同時(shí),數(shù)組對(duì)處理器的緩存機(jī)制更加友好。 - 元素位置定位

數(shù)組長(zhǎng)度2^n,通過(guò)位運(yùn)算,加快定位的速度。下標(biāo)采取遞增的形式。不用擔(dān)心index溢出的問(wèn)題。index是long類(lèi)型,即使100萬(wàn)QPS的處理速度,也需要30萬(wàn)年才能用完。 - 無(wú)鎖設(shè)計(jì)

每個(gè)生產(chǎn)者或者消費(fèi)者線(xiàn)程,會(huì)先申請(qǐng)可以操作的元素在數(shù)組中的位置,申請(qǐng)到之后,直接在該位置寫(xiě)入或者讀取數(shù)據(jù)。

Disruptor的使用

代碼實(shí)現(xiàn)的功能:每10ms向disruptor中插入一個(gè)元素,消費(fèi)者讀取數(shù)據(jù),并打印到終端。詳細(xì)邏輯請(qǐng)細(xì)讀代碼。

/**
 * @description disruptor代碼樣例。每10ms向disruptor中插入一個(gè)元素,消費(fèi)者讀取數(shù)據(jù),并打印到終端
 */
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.ThreadFactory;


public class DisruptorMain
{
    public static void main(String[] args) throws Exception
    {
        // 隊(duì)列中的元素
        class Element {

            private int value;

            public int get(){
                return value;
            }

            public void set(int value){
                this.value= value;
            }

        }

        // 生產(chǎn)者的線(xiàn)程工廠
        ThreadFactory threadFactory = new ThreadFactory(){
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread");
            }
        };

        // RingBuffer生產(chǎn)工廠,初始化RingBuffer的時(shí)候使用
        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };

        // 處理Event的handler
        EventHandler<Element> handler = new EventHandler<Element>(){
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch)
            {
                System.out.println("Element: " + element.get());
            }
        };

        // 阻塞策略
        BlockingWaitStrategy strategy = new BlockingWaitStrategy();

        // 指定RingBuffer的大小
        int bufferSize = 16;

        // 創(chuàng)建disruptor,采用單生產(chǎn)者模式
        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

        // 設(shè)置EventHandler
        disruptor.handleEventsWith(handler);

        // 啟動(dòng)disruptor的線(xiàn)程
        disruptor.start();

        RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();

        for (int l = 0; true; l++)
        {
            // 獲取下一個(gè)可用位置的下標(biāo)
            long sequence = ringBuffer.next();  
            try
            {
                // 返回可用位置的元素
                Element event = ringBuffer.get(sequence); 
                // 設(shè)置該位置元素的值
                event.set(l); 
            }
            finally
            {
                ringBuffer.publish(sequence);
            }
            Thread.sleep(10);
        }
    }
}

總結(jié)

Disruptor通過(guò)精巧的無(wú)鎖設(shè)計(jì)實(shí)現(xiàn)了在高并發(fā)情形下的高性能。

在美團(tuán)內(nèi)部,很多高并發(fā)場(chǎng)景借鑒了Disruptor的設(shè)計(jì),減少競(jìng)爭(zhēng)的強(qiáng)度。其設(shè)計(jì)思想可以擴(kuò)展到分布式場(chǎng)景,通過(guò)無(wú)鎖設(shè)計(jì),來(lái)提升服務(wù)性能。

使用Disruptor比使用ArrayBlockingQueue略微復(fù)雜,為方便讀者上手,增加代碼樣例。

參考

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過(guò)簡(jiǎn)信或評(píng)論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容