Disruptor簡單使用

??Disruptor從功能上來說,可以實(shí)現(xiàn)隊(duì)列的功能,也可以把它當(dāng)成單機(jī)版的JMS來看待。從性能上來說,它比ArrayBlockingQueue有更好的性能表現(xiàn),對于生產(chǎn)者消費(fèi)者模型的業(yè)務(wù),Disruptor是一個(gè)更好的選擇可以很好的實(shí)現(xiàn)業(yè)務(wù)的分離。

簡單入門

  • 定義消息類,這里的消息在Disruptor里稱為Event,也就是我們系統(tǒng)里生產(chǎn)消費(fèi)的業(yè)務(wù)對象,示例代碼如下:
package com.example.disruptor;

/**
 * 產(chǎn)品
 */
public class Product {

    private int id;

    private String name;

    private double weight;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public double getWeight() {
        return weight;
    }

    public void setWeight(double weight) {
        this.weight = weight;
    }
}

  • 定義生產(chǎn)者,也就是事件的來源。
package com.example.disruptor.singleton;

import com.example.disruptor.Product;
import com.lmax.disruptor.RingBuffer;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {

    public static final int NUMBER = 10000000;
    private final CountDownLatch latch;
    private AtomicInteger idCount = new AtomicInteger(0);
    private RingBuffer<Product> ringBuffer;

    public Producer(RingBuffer<Product> ringBuffer, CountDownLatch latch) {
        this.ringBuffer = ringBuffer;
        this.latch = latch;
    }


    private void createData() {
        //1.可以把ringBuffer看做一個(gè)事件隊(duì)列,那么next就是得到下面一個(gè)事件槽
        long sequence = ringBuffer.next();
        try {
            //2.用上面的索引取出一個(gè)空的事件用于填充(獲取該序號對應(yīng)的事件對象)
            Product product = ringBuffer.get(sequence);
            //3.獲取要通過事件傳遞的業(yè)務(wù)數(shù)據(jù)
            product.setId(idCount.incrementAndGet());
        } finally {
            //4.發(fā)布事件
            //注意,最后的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到調(diào)用;
            // 如果某個(gè)請求的 sequence 未被提交,將會(huì)堵塞后續(xù)的發(fā)布操作或者其它的 producer。
            ringBuffer.publish(sequence);
        }
    }

    public void run() {
        for (int i = 0; i < NUMBER; i++) {
            createData();
        }
        //通過latch告訴主線程,完成了產(chǎn)品的生產(chǎn)
        latch.countDown();
    }
}

生產(chǎn)者在生成消息的過程中需要得到Disruptor里的ringBuffer,將生產(chǎn)的消息加入到ringBuffer里。Disruptor 的事件發(fā)布過程是一個(gè)兩階段提交的過程:
  第一步:先從 RingBuffer 獲取下一個(gè)可以寫入的事件的序號;
  第二步:獲取對應(yīng)的事件對象,將數(shù)據(jù)寫入事件對象;
  第三部:將事件提交到 RingBuffer;
事件只有在提交之后才會(huì)通知消息消費(fèi)者進(jìn)行處理;

  • 定義消息的消費(fèi)者,在Disruptor里是EventHandler類型的實(shí)例。
package com.example.disruptor.singleton;

import com.example.disruptor.Product;
import com.lmax.disruptor.EventHandler;

import java.util.concurrent.CountDownLatch;

public class Consumer implements EventHandler<Product> {

   private int count = 0;

   private CountDownLatch latch;
    public Consumer(CountDownLatch latch) {
        this.latch = latch;
    }

    public void onEvent(Product event, long sequence, boolean endOfBatch) throws Exception {
       ;count++;
       //通過latch告訴主線程,完成了產(chǎn)品的消費(fèi)
       if(count == Producer.NUMBER){
           latch.countDown();
       }
    }

    public int getCount() {
        return count;
    }
}

  • 通過Disruptor類,將生產(chǎn)者與消費(fèi)者進(jìn)行整合。具體的代碼如下:
package com.example.disruptor.singleton;

import com.example.disruptor.Product;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    //定義ringBuffer的大小
    private static final int RING_BUFFER_SIZE = 1024 * 8;

    public static void main(String[] args) {
        //構(gòu)造消費(fèi)者一個(gè)線程池, 實(shí)際項(xiàng)目中最好不要用Executors來構(gòu)建
        ExecutorService consumerExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
        //構(gòu)造生產(chǎn)者線程池
        ExecutorService produceerExecutor = Executors.newFixedThreadPool(1);
        //創(chuàng)建disruptor
        Disruptor<Product> disruptor =
                new Disruptor<Product>(new EventFactory<Product>() {
                    public Product newInstance() {
                        return new Product();
                    }
                }, RING_BUFFER_SIZE, consumerExecutor, ProducerType.SINGLE, new YieldingWaitStrategy());

        CountDownLatch latch = new CountDownLatch(2);
        // 連接消費(fèi)事件方法
        Consumer consumer = new Consumer(latch);
        disruptor.handleEventsWith(consumer);
        // 啟動(dòng)
        disruptor.start();
        //生產(chǎn)者開始生產(chǎn)數(shù)據(jù)
        Producer producer = new Producer(disruptor.getRingBuffer(), latch);
        produceerExecutor.submit(producer);

        try {
            latch.await();
        } catch (InterruptedException e) {
        }

        System.out.println(consumer.getCount());
        //關(guān)閉打開的資源
        disruptor.shutdown();
        consumerExecutor.shutdown();
        produceerExecutor.shutdown();
    }
}

在構(gòu)造Disruptor對象,有幾個(gè)核心的概念:
1:事件工廠(Event Factory)定義了如何實(shí)例化事件(Event),Disruptor 通過 EventFactory 在 RingBuffer 中預(yù)創(chuàng)建 Event 的實(shí)例。
2:ringBuffer這個(gè)數(shù)組的大小,一般根據(jù)業(yè)務(wù)指定成2的指數(shù)倍。
3:消費(fèi)者線程池,事件的處理是在構(gòu)造的線程池里來進(jìn)行處理的。
4:指定等待策略,Disruptor 定義了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待Event事件。Disruptor 提供了多個(gè) WaitStrategy 的實(shí)現(xiàn),每種策略都具有不同性能和優(yōu)缺點(diǎn),根據(jù)實(shí)際運(yùn)行環(huán)境的 CPU 的硬件特點(diǎn)選擇恰當(dāng)?shù)牟呗?,并配合特定?JVM 的配置參數(shù),能夠?qū)崿F(xiàn)不同的性能提升。
  BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小并且在各種不同部署環(huán)境中能提供更加一致的性能表現(xiàn);
  SleepingWaitStrategy 的性能表現(xiàn)跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生產(chǎn)者線程的影響最小,適合用于異步日志類似的場景;
  YieldingWaitStrategy 的性能是最好的,適合用于低延遲的系統(tǒng)。在要求極高性能且事件處理線數(shù)小于 CPU 邏輯核心數(shù)的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。

多消費(fèi)者模型

??在生產(chǎn)者消費(fèi)者模型中,為了防止生產(chǎn)者生產(chǎn)的數(shù)據(jù)覆蓋掉還未消費(fèi)的數(shù)據(jù),Disruptor中每個(gè)消費(fèi)者都各自有個(gè)Sequence,而消費(fèi)者的Sequence狀態(tài)需要通過SequenceBarrier同步到ringBuffer中。生產(chǎn)者產(chǎn)生數(shù)據(jù)的Sequence是通過ringBuffer進(jìn)行生成的。下面是具體的代碼:

  • 定義生產(chǎn)者
package com.example.disruptor.mult;

import com.example.disruptor.Product;
import com.lmax.disruptor.RingBuffer;

import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {

    public static final int NUMBER = 10000000;
    public static AtomicInteger idCount = new AtomicInteger(0);
    private RingBuffer<Product> ringBuffer;

    public Producer(RingBuffer<Product> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }


    private void createData() {
        //1.可以把ringBuffer看做一個(gè)事件隊(duì)列,那么next就是得到下面一個(gè)事件槽
        long sequence = ringBuffer.next();
        try {
            //2.用上面的索引取出一個(gè)空的事件用于填充(獲取該序號對應(yīng)的事件對象)
            Product product = ringBuffer.get(sequence);
            //3.獲取要通過事件傳遞的業(yè)務(wù)數(shù)據(jù)
            product.setId(idCount.incrementAndGet());
        } finally {
            //4.發(fā)布事件
            //注意,最后的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到調(diào)用;
            // 如果某個(gè)請求的 sequence 未被提交,將會(huì)堵塞后續(xù)的發(fā)布操作或者其它的 producer。
            ringBuffer.publish(sequence);
        }
    }

    public void run() {
        for (int i = 0; i < NUMBER; i++) {
            createData();
        }
    }
}

  • 定義消費(fèi)者
package com.example.disruptor.mult;

import com.example.disruptor.Product;
import com.lmax.disruptor.WorkHandler;

/**
 * 多消費(fèi)者需要繼承自WorkHandler
 */
public class Consumer implements WorkHandler<Product> {


    private int count = 0;

    public Consumer() {
    }


    public void onEvent(Product event) throws Exception {
        count++;
    }

    public int getCount() {
        return count;
    }

}

  • 啟動(dòng)類:
package com.example.disruptor.mult;

import com.example.disruptor.Product;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    //定義ringBuffer的大小
    private static final int RING_BUFFER_SIZE = 1024 * 8;

    public static void main(String[] args) {
        //線程數(shù)
        int processor = Runtime.getRuntime().availableProcessors() * 2;
        //構(gòu)造消費(fèi)者一個(gè)線程池, 實(shí)際項(xiàng)目中最好不要用Executors來構(gòu)建
        ExecutorService consumerExecutor = Executors.newFixedThreadPool(processor);
        //構(gòu)造生產(chǎn)者線程池
        ExecutorService produceerExecutor = Executors.newFixedThreadPool(processor);
        //定義一個(gè)ringBuffer,也就是相當(dāng)于一個(gè)隊(duì)列
        RingBuffer ringBuffer = RingBuffer.create(ProducerType.MULTI, new EventFactory<Product>() {
            public Product newInstance() {
                return new Product();
            }
        }, RING_BUFFER_SIZE, new YieldingWaitStrategy());
        //定義一個(gè)消費(fèi)者池,
        Consumer[] consumers = new Consumer[processor];
        for (int i = 0; i < processor; i++) {
            consumers[i] = new Consumer();
        }
        WorkerPool workerPool = new WorkerPool<Product>(ringBuffer,
                ringBuffer.newBarrier(), new IgnoreExceptionHandler(), consumers);
        //每個(gè)消費(fèi)者,也就是 workProcessor都有一個(gè)sequence,表示上一個(gè)消費(fèi)的位置,這個(gè)在初始化時(shí)都是-1
        Sequence[] sequences = workerPool.getWorkerSequences();
        //將其保存在ringBuffer中的 sequencer 中,在為生產(chǎn)申請slot時(shí)要用到,也就是在為生產(chǎn)者申請slot時(shí)不能大于此數(shù)組中的最小值,否則產(chǎn)生覆蓋
        ringBuffer.addGatingSequences(sequences);
        //用executor 來啟動(dòng) workProcessor 線程
        workerPool.start(consumerExecutor);

        //生產(chǎn)者開始生產(chǎn)數(shù)據(jù)
        for (int i = 0; i < processor; i++) {
            Producer producer = new Producer(ringBuffer);
            produceerExecutor.submit(producer);
        }

        while (true) {
            int count = 0;
            for (Consumer consumer : consumers) {
                count += consumer.getCount();
            }
            System.out.println("生產(chǎn)了多少數(shù)據(jù)" + Producer.idCount.get());
            System.out.println("消費(fèi)了多少數(shù)據(jù)" + count);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

ringBuffer需要將WorkPool里所有消費(fèi)者的Sequence加到ringBuffer中,以防止出現(xiàn)數(shù)據(jù)覆蓋的問題。

將disruptor當(dāng)成JMS,處理消息流

可以將disruptor當(dāng)成單機(jī)版的JMS,用來處理數(shù)據(jù)流,disruptor提供了消費(fèi)者處理消息的先后順序,能很好的實(shí)現(xiàn)根據(jù)指定規(guī)則來實(shí)現(xiàn)消息的處理。比如可以將消息形成如下圖的數(shù)據(jù)流:
  • 定義上面四個(gè)handler的處理邏輯, 我這里只貼出一個(gè)類的實(shí)現(xiàn)
package com.example.disruptor.complex;


import com.example.disruptor.Product;
import com.lmax.disruptor.EventHandler;

public class StartHandler implements EventHandler<Product> {

    public void onEvent(Product product, long l, boolean b) throws Exception {
        System.out.println("start set name");
        product.setName("start");
    }

}

  • 定義生產(chǎn)都,用于生產(chǎn)消息
package com.example.disruptor.complex;

import com.example.disruptor.Product;
import com.lmax.disruptor.RingBuffer;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {

    public static final int NUMBER = 2;
    private AtomicInteger idCount = new AtomicInteger(0);
    private RingBuffer<Product> ringBuffer;

    public Producer(RingBuffer<Product> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }


    private void createData() {
        //1.可以把ringBuffer看做一個(gè)事件隊(duì)列,那么next就是得到下面一個(gè)事件槽
        long sequence = ringBuffer.next();
        try {
            //2.用上面的索引取出一個(gè)空的事件用于填充(獲取該序號對應(yīng)的事件對象)
            Product product = ringBuffer.get(sequence);
            //3.獲取要通過事件傳遞的業(yè)務(wù)數(shù)據(jù)
            product.setId(idCount.incrementAndGet());
        } finally {
            //4.發(fā)布事件
            //注意,最后的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到調(diào)用;
            // 如果某個(gè)請求的 sequence 未被提交,將會(huì)堵塞后續(xù)的發(fā)布操作或者其它的 producer。
            ringBuffer.publish(sequence);
        }
    }

    public void run() {
        for (int i = 0; i < NUMBER; i++) {
            createData();
        }
    }
}

  • 將disruptor與上面四個(gè)handler進(jìn)行關(guān)聯(lián)
package com.example.disruptor.complex;

import com.example.disruptor.Product;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    //定義ringBuffer的大小
    private static final int RING_BUFFER_SIZE = 1024 * 8;

    public static void main(String[] args) {
        //構(gòu)造消費(fèi)者一個(gè)線程池, 實(shí)際項(xiàng)目中最好不要用Executors來構(gòu)建
        ExecutorService consumerExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
        //構(gòu)造生產(chǎn)者線程池
        ExecutorService produceerExecutor = Executors.newFixedThreadPool(1);
        //創(chuàng)建disruptor
        Disruptor<Product> disruptor =
                new Disruptor<Product>(new EventFactory<Product>() {
                    public Product newInstance() {
                        return new Product();
                    }
                }, RING_BUFFER_SIZE, consumerExecutor, ProducerType.SINGLE, new BlockingWaitStrategy());

        //定義處理消息的handler
        StartHandler start = new StartHandler();
        LeftHandler left = new LeftHandler();
        RightHandler right = new RightHandler();
        EndHandler end = new EndHandler();
        //定義處理消息的順序
        disruptor.handleEventsWith(start).then(left, right).then(end);

        // 啟動(dòng)
        disruptor.start();
//        //生產(chǎn)者開始生產(chǎn)數(shù)據(jù)
        Producer producer = new Producer(disruptor.getRingBuffer());
        produceerExecutor.submit(producer);


        //關(guān)閉打開的資源
/*        disruptor.shutdown();
        consumerExecutor.shutdown();
        produceerExecutor.shutdown();*/
    }
}

可以看到,disruptor通過提供了then方法來實(shí)現(xiàn)消息的先后順序語義。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容