模式
1.發(fā)布訂閱模式,同一事件會被多個消費者并行消費
2.點對點模式,同一事件會被一組消費者其中之一消費
3.順序消費;
使用場景
低延遲,高吞吐量,有界的緩存隊列
提高吞吐量,減少并發(fā)執(zhí)行上下文之間的延遲并確保可預(yù)測延遲
為什么RingBuffer這么快?
1.首先是CPU false sharing的解決,Disruptor通過將基本對象填充冗余基本類型變量來填充滿整個緩存行,減少false sharing的概率,這部分沒怎么看懂,Disruptor通過填充失效這個效果。
(就是一個緩存行8個變量,預(yù)設(shè)7個變量,然后再保存一個唯一變量,這樣就不會出現(xiàn)相同的變量)
2.無鎖隊列的實現(xiàn),對于傳統(tǒng)并發(fā)隊列,至少要維護兩個指針,一個頭指針和一個尾指針。在并發(fā)訪問修改時,頭指針和尾指針的維護不可避免的應(yīng)用了鎖。Disruptor由于是環(huán)狀隊列,對于Producer而言只有頭指針而且鎖是樂觀鎖,在標準Disruptor應(yīng)用中,只有一個生產(chǎn)者,避免了頭指針鎖的爭用。所以我們可以理解Disruptor為無鎖隊列。
為什么要用Disruptor?
鎖的成本: 傳統(tǒng)阻塞隊列使用鎖保證線程安全。而鎖通過操作系統(tǒng)內(nèi)核的上下文切換實現(xiàn),會暫停線程去等待鎖直到釋放。執(zhí)行這樣的上下文切換,會丟失之前保存的數(shù)據(jù)和指令。由于消費者和生產(chǎn)者之間的速度差異,隊列總是接近滿或者空的狀態(tài)。這種狀態(tài)會導(dǎo)致高水平的寫入爭用。
偽共享問題導(dǎo)致的性能低下。
隊列是垃圾的重要來源,隊列中的元素和用于存儲元素的節(jié)點對象需要進行頻繁的重新分配。
代碼demo
public class MessageEvent<T> {
private T message;
public T getMessage() {
return message;
}
public void setMessage(T message) {
this.message = message;
}
}
public class MessageEventFactory implements EventFactory<MessageEvent> {
@Override
public MessageEvent newInstance() {
return new MessageEvent();
}
}
public class MessageEvenHandler3 implements EventHandler<MessageEvent> {
@Override
public void onEvent(MessageEvent messageEvent, long l, boolean b) throws Exception {
System.out.println("----------------"+messageEvent.getMessage());
}
}
public class MessageEventProducer {
private RingBuffer<MessageEvent> ringBuffer;
public MessageEventProducer(RingBuffer<MessageEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(String message) {
EventTranslatorOneArg<MessageEvent, String> translator = new MessageEventTranslator();
ringBuffer.publishEvent(translator, message);
}
}
public class MessageEventTranslator implements EventTranslatorOneArg<MessageEvent,String> {
@Override
public void translateTo(MessageEvent messageEvent, long l, String o2) {
messageEvent.setMessage(o2);
}
}
public class MessageExceptionHandler implements ExceptionHandler {
@Override
public void handleEventException(Throwable throwable, long l, Object o) {
throwable.printStackTrace();
}
@Override
public void handleOnStartException(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void handleOnShutdownException(Throwable throwable) {
throwable.printStackTrace();
}
}
public class MessageThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"Simple Disruptor Test Thread");
}
}
public class MessageConsumer {
public static void main(String[] args) {
String message = "Hello Disruptor!";
int ringBufferSize = 1024;//必須是2的N次方
Disruptor<MessageEvent> disruptor = new Disruptor<MessageEvent>(new MessageEventFactory(),ringBufferSize,new MessageThreadFactory(), ProducerType.SINGLE,new BlockingWaitStrategy());
//這里用的是單一生成者,如果是多生成者的話是另一種模式,自己的類實現(xiàn)WorkHandler接口,
//然后這邊調(diào)用 disruptor.handleEventsWithWorkerPool(new MessageEventHandler());
disruptor.handleEventsWith(new MessageEvenHandler3());
disruptor.setDefaultExceptionHandler(new MessageExceptionHandler());
RingBuffer<MessageEvent> ringBuffer = disruptor.start();
MessageEventProducer producer = new MessageEventProducer(ringBuffer);
IntStream.range(0,20).forEach(x->{
producer.onData(x+message);
});
}
}
下面是實現(xiàn)WorkHandler接口的類
public class MessageEventHandler implements WorkHandler<MessageEvent> {
@Override
public void onEvent(MessageEvent messageEvent) throws Exception {
System.out.println(System.currentTimeMillis()+"------我是1號消費者----------"+messageEvent.getMessage());
}
}