??Disruptor從功能上來說,可以實(shí)現(xiàn)隊(duì)列的功能,也可以把它當(dāng)成單機(jī)版的JMS來看待。從性能上來說,它比ArrayBlockingQueue有更好的性能表現(xiàn),對于生產(chǎn)者消費(fèi)者模型的業(yè)務(wù),Disruptor是一個(gè)更好的選擇可以很好的實(shí)現(xiàn)業(yè)務(wù)的分離。
簡單入門
- 定義消息類,這里的消息在Disruptor里稱為Event,也就是我們系統(tǒng)里生產(chǎn)消費(fèi)的業(yè)務(wù)對象,示例代碼如下:
package com.example.disruptor;
/**
* 產(chǎn)品
*/
public class Product {
private int id;
private String name;
private double weight;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getWeight() {
return weight;
}
public void setWeight(double weight) {
this.weight = weight;
}
}
- 定義生產(chǎn)者,也就是事件的來源。
package com.example.disruptor.singleton;
import com.example.disruptor.Product;
import com.lmax.disruptor.RingBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable {
public static final int NUMBER = 10000000;
private final CountDownLatch latch;
private AtomicInteger idCount = new AtomicInteger(0);
private RingBuffer<Product> ringBuffer;
public Producer(RingBuffer<Product> ringBuffer, CountDownLatch latch) {
this.ringBuffer = ringBuffer;
this.latch = latch;
}
private void createData() {
//1.可以把ringBuffer看做一個(gè)事件隊(duì)列,那么next就是得到下面一個(gè)事件槽
long sequence = ringBuffer.next();
try {
//2.用上面的索引取出一個(gè)空的事件用于填充(獲取該序號對應(yīng)的事件對象)
Product product = ringBuffer.get(sequence);
//3.獲取要通過事件傳遞的業(yè)務(wù)數(shù)據(jù)
product.setId(idCount.incrementAndGet());
} finally {
//4.發(fā)布事件
//注意,最后的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到調(diào)用;
// 如果某個(gè)請求的 sequence 未被提交,將會(huì)堵塞后續(xù)的發(fā)布操作或者其它的 producer。
ringBuffer.publish(sequence);
}
}
public void run() {
for (int i = 0; i < NUMBER; i++) {
createData();
}
//通過latch告訴主線程,完成了產(chǎn)品的生產(chǎn)
latch.countDown();
}
}
生產(chǎn)者在生成消息的過程中需要得到Disruptor里的ringBuffer,將生產(chǎn)的消息加入到ringBuffer里。Disruptor 的事件發(fā)布過程是一個(gè)兩階段提交的過程:
第一步:先從 RingBuffer 獲取下一個(gè)可以寫入的事件的序號;
第二步:獲取對應(yīng)的事件對象,將數(shù)據(jù)寫入事件對象;
第三部:將事件提交到 RingBuffer;
事件只有在提交之后才會(huì)通知消息消費(fèi)者進(jìn)行處理;
- 定義消息的消費(fèi)者,在Disruptor里是EventHandler類型的實(shí)例。
package com.example.disruptor.singleton;
import com.example.disruptor.Product;
import com.lmax.disruptor.EventHandler;
import java.util.concurrent.CountDownLatch;
public class Consumer implements EventHandler<Product> {
private int count = 0;
private CountDownLatch latch;
public Consumer(CountDownLatch latch) {
this.latch = latch;
}
public void onEvent(Product event, long sequence, boolean endOfBatch) throws Exception {
;count++;
//通過latch告訴主線程,完成了產(chǎn)品的消費(fèi)
if(count == Producer.NUMBER){
latch.countDown();
}
}
public int getCount() {
return count;
}
}
- 通過Disruptor類,將生產(chǎn)者與消費(fèi)者進(jìn)行整合。具體的代碼如下:
package com.example.disruptor.singleton;
import com.example.disruptor.Product;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
//定義ringBuffer的大小
private static final int RING_BUFFER_SIZE = 1024 * 8;
public static void main(String[] args) {
//構(gòu)造消費(fèi)者一個(gè)線程池, 實(shí)際項(xiàng)目中最好不要用Executors來構(gòu)建
ExecutorService consumerExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
//構(gòu)造生產(chǎn)者線程池
ExecutorService produceerExecutor = Executors.newFixedThreadPool(1);
//創(chuàng)建disruptor
Disruptor<Product> disruptor =
new Disruptor<Product>(new EventFactory<Product>() {
public Product newInstance() {
return new Product();
}
}, RING_BUFFER_SIZE, consumerExecutor, ProducerType.SINGLE, new YieldingWaitStrategy());
CountDownLatch latch = new CountDownLatch(2);
// 連接消費(fèi)事件方法
Consumer consumer = new Consumer(latch);
disruptor.handleEventsWith(consumer);
// 啟動(dòng)
disruptor.start();
//生產(chǎn)者開始生產(chǎn)數(shù)據(jù)
Producer producer = new Producer(disruptor.getRingBuffer(), latch);
produceerExecutor.submit(producer);
try {
latch.await();
} catch (InterruptedException e) {
}
System.out.println(consumer.getCount());
//關(guān)閉打開的資源
disruptor.shutdown();
consumerExecutor.shutdown();
produceerExecutor.shutdown();
}
}
在構(gòu)造Disruptor對象,有幾個(gè)核心的概念:
1:事件工廠(Event Factory)定義了如何實(shí)例化事件(Event),Disruptor 通過 EventFactory 在 RingBuffer 中預(yù)創(chuàng)建 Event 的實(shí)例。
2:ringBuffer這個(gè)數(shù)組的大小,一般根據(jù)業(yè)務(wù)指定成2的指數(shù)倍。
3:消費(fèi)者線程池,事件的處理是在構(gòu)造的線程池里來進(jìn)行處理的。
4:指定等待策略,Disruptor 定義了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待Event事件。Disruptor 提供了多個(gè) WaitStrategy 的實(shí)現(xiàn),每種策略都具有不同性能和優(yōu)缺點(diǎn),根據(jù)實(shí)際運(yùn)行環(huán)境的 CPU 的硬件特點(diǎn)選擇恰當(dāng)?shù)牟呗?,并配合特定?JVM 的配置參數(shù),能夠?qū)崿F(xiàn)不同的性能提升。
BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小并且在各種不同部署環(huán)境中能提供更加一致的性能表現(xiàn);
SleepingWaitStrategy 的性能表現(xiàn)跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生產(chǎn)者線程的影響最小,適合用于異步日志類似的場景;
YieldingWaitStrategy 的性能是最好的,適合用于低延遲的系統(tǒng)。在要求極高性能且事件處理線數(shù)小于 CPU 邏輯核心數(shù)的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。
多消費(fèi)者模型
??在生產(chǎn)者消費(fèi)者模型中,為了防止生產(chǎn)者生產(chǎn)的數(shù)據(jù)覆蓋掉還未消費(fèi)的數(shù)據(jù),Disruptor中每個(gè)消費(fèi)者都各自有個(gè)Sequence,而消費(fèi)者的Sequence狀態(tài)需要通過SequenceBarrier同步到ringBuffer中。生產(chǎn)者產(chǎn)生數(shù)據(jù)的Sequence是通過ringBuffer進(jìn)行生成的。下面是具體的代碼:
- 定義生產(chǎn)者
package com.example.disruptor.mult;
import com.example.disruptor.Product;
import com.lmax.disruptor.RingBuffer;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable {
public static final int NUMBER = 10000000;
public static AtomicInteger idCount = new AtomicInteger(0);
private RingBuffer<Product> ringBuffer;
public Producer(RingBuffer<Product> ringBuffer) {
this.ringBuffer = ringBuffer;
}
private void createData() {
//1.可以把ringBuffer看做一個(gè)事件隊(duì)列,那么next就是得到下面一個(gè)事件槽
long sequence = ringBuffer.next();
try {
//2.用上面的索引取出一個(gè)空的事件用于填充(獲取該序號對應(yīng)的事件對象)
Product product = ringBuffer.get(sequence);
//3.獲取要通過事件傳遞的業(yè)務(wù)數(shù)據(jù)
product.setId(idCount.incrementAndGet());
} finally {
//4.發(fā)布事件
//注意,最后的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到調(diào)用;
// 如果某個(gè)請求的 sequence 未被提交,將會(huì)堵塞后續(xù)的發(fā)布操作或者其它的 producer。
ringBuffer.publish(sequence);
}
}
public void run() {
for (int i = 0; i < NUMBER; i++) {
createData();
}
}
}
- 定義消費(fèi)者
package com.example.disruptor.mult;
import com.example.disruptor.Product;
import com.lmax.disruptor.WorkHandler;
/**
* 多消費(fèi)者需要繼承自WorkHandler
*/
public class Consumer implements WorkHandler<Product> {
private int count = 0;
public Consumer() {
}
public void onEvent(Product event) throws Exception {
count++;
}
public int getCount() {
return count;
}
}
- 啟動(dòng)類:
package com.example.disruptor.mult;
import com.example.disruptor.Product;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
//定義ringBuffer的大小
private static final int RING_BUFFER_SIZE = 1024 * 8;
public static void main(String[] args) {
//線程數(shù)
int processor = Runtime.getRuntime().availableProcessors() * 2;
//構(gòu)造消費(fèi)者一個(gè)線程池, 實(shí)際項(xiàng)目中最好不要用Executors來構(gòu)建
ExecutorService consumerExecutor = Executors.newFixedThreadPool(processor);
//構(gòu)造生產(chǎn)者線程池
ExecutorService produceerExecutor = Executors.newFixedThreadPool(processor);
//定義一個(gè)ringBuffer,也就是相當(dāng)于一個(gè)隊(duì)列
RingBuffer ringBuffer = RingBuffer.create(ProducerType.MULTI, new EventFactory<Product>() {
public Product newInstance() {
return new Product();
}
}, RING_BUFFER_SIZE, new YieldingWaitStrategy());
//定義一個(gè)消費(fèi)者池,
Consumer[] consumers = new Consumer[processor];
for (int i = 0; i < processor; i++) {
consumers[i] = new Consumer();
}
WorkerPool workerPool = new WorkerPool<Product>(ringBuffer,
ringBuffer.newBarrier(), new IgnoreExceptionHandler(), consumers);
//每個(gè)消費(fèi)者,也就是 workProcessor都有一個(gè)sequence,表示上一個(gè)消費(fèi)的位置,這個(gè)在初始化時(shí)都是-1
Sequence[] sequences = workerPool.getWorkerSequences();
//將其保存在ringBuffer中的 sequencer 中,在為生產(chǎn)申請slot時(shí)要用到,也就是在為生產(chǎn)者申請slot時(shí)不能大于此數(shù)組中的最小值,否則產(chǎn)生覆蓋
ringBuffer.addGatingSequences(sequences);
//用executor 來啟動(dòng) workProcessor 線程
workerPool.start(consumerExecutor);
//生產(chǎn)者開始生產(chǎn)數(shù)據(jù)
for (int i = 0; i < processor; i++) {
Producer producer = new Producer(ringBuffer);
produceerExecutor.submit(producer);
}
while (true) {
int count = 0;
for (Consumer consumer : consumers) {
count += consumer.getCount();
}
System.out.println("生產(chǎn)了多少數(shù)據(jù)" + Producer.idCount.get());
System.out.println("消費(fèi)了多少數(shù)據(jù)" + count);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
ringBuffer需要將WorkPool里所有消費(fèi)者的Sequence加到ringBuffer中,以防止出現(xiàn)數(shù)據(jù)覆蓋的問題。
將disruptor當(dāng)成JMS,處理消息流
可以將disruptor當(dāng)成單機(jī)版的JMS,用來處理數(shù)據(jù)流,disruptor提供了消費(fèi)者處理消息的先后順序,能很好的實(shí)現(xiàn)根據(jù)指定規(guī)則來實(shí)現(xiàn)消息的處理。比如可以將消息形成如下圖的數(shù)據(jù)流:
- 定義上面四個(gè)handler的處理邏輯, 我這里只貼出一個(gè)類的實(shí)現(xiàn)
package com.example.disruptor.complex;
import com.example.disruptor.Product;
import com.lmax.disruptor.EventHandler;
public class StartHandler implements EventHandler<Product> {
public void onEvent(Product product, long l, boolean b) throws Exception {
System.out.println("start set name");
product.setName("start");
}
}
- 定義生產(chǎn)都,用于生產(chǎn)消息
package com.example.disruptor.complex;
import com.example.disruptor.Product;
import com.lmax.disruptor.RingBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable {
public static final int NUMBER = 2;
private AtomicInteger idCount = new AtomicInteger(0);
private RingBuffer<Product> ringBuffer;
public Producer(RingBuffer<Product> ringBuffer) {
this.ringBuffer = ringBuffer;
}
private void createData() {
//1.可以把ringBuffer看做一個(gè)事件隊(duì)列,那么next就是得到下面一個(gè)事件槽
long sequence = ringBuffer.next();
try {
//2.用上面的索引取出一個(gè)空的事件用于填充(獲取該序號對應(yīng)的事件對象)
Product product = ringBuffer.get(sequence);
//3.獲取要通過事件傳遞的業(yè)務(wù)數(shù)據(jù)
product.setId(idCount.incrementAndGet());
} finally {
//4.發(fā)布事件
//注意,最后的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到調(diào)用;
// 如果某個(gè)請求的 sequence 未被提交,將會(huì)堵塞后續(xù)的發(fā)布操作或者其它的 producer。
ringBuffer.publish(sequence);
}
}
public void run() {
for (int i = 0; i < NUMBER; i++) {
createData();
}
}
}
- 將disruptor與上面四個(gè)handler進(jìn)行關(guān)聯(lián)
package com.example.disruptor.complex;
import com.example.disruptor.Product;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
//定義ringBuffer的大小
private static final int RING_BUFFER_SIZE = 1024 * 8;
public static void main(String[] args) {
//構(gòu)造消費(fèi)者一個(gè)線程池, 實(shí)際項(xiàng)目中最好不要用Executors來構(gòu)建
ExecutorService consumerExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
//構(gòu)造生產(chǎn)者線程池
ExecutorService produceerExecutor = Executors.newFixedThreadPool(1);
//創(chuàng)建disruptor
Disruptor<Product> disruptor =
new Disruptor<Product>(new EventFactory<Product>() {
public Product newInstance() {
return new Product();
}
}, RING_BUFFER_SIZE, consumerExecutor, ProducerType.SINGLE, new BlockingWaitStrategy());
//定義處理消息的handler
StartHandler start = new StartHandler();
LeftHandler left = new LeftHandler();
RightHandler right = new RightHandler();
EndHandler end = new EndHandler();
//定義處理消息的順序
disruptor.handleEventsWith(start).then(left, right).then(end);
// 啟動(dòng)
disruptor.start();
// //生產(chǎn)者開始生產(chǎn)數(shù)據(jù)
Producer producer = new Producer(disruptor.getRingBuffer());
produceerExecutor.submit(producer);
//關(guān)閉打開的資源
/* disruptor.shutdown();
consumerExecutor.shutdown();
produceerExecutor.shutdown();*/
}
}
可以看到,disruptor通過提供了then方法來實(shí)現(xiàn)消息的先后順序語義。