筆者所有文章第一時(shí)間發(fā)布于:
hhbbz的個(gè)人博客
背景介紹
最近在作一個(gè)用于調(diào)用第三方服務(wù)以及充當(dāng)數(shù)據(jù)倉(cāng)儲(chǔ)的服務(wù),對(duì)于數(shù)據(jù)的存儲(chǔ)和獲取都要有很好的響應(yīng)。
整體架構(gòu)使用的是spring flux作接口層的異步調(diào)用,可以高效率的進(jìn)行數(shù)據(jù)獲?。皇褂肈isruptor作為內(nèi)存隊(duì)列,將存儲(chǔ)數(shù)據(jù)批量寫(xiě)入。
為什么選擇Disruptor
Disruptor是用于一個(gè)JVM中多個(gè)線(xiàn)程之間的消息隊(duì)列,作用與ArrayBlockingQueue有相似之處,但是Disruptor通過(guò)精巧的無(wú)鎖設(shè)計(jì)實(shí)現(xiàn)了在高并發(fā)情形下的高性能。從功能、性能都遠(yuǎn)好于ArrayBlockingQueue,當(dāng)多個(gè)線(xiàn)程之間傳遞大量數(shù)據(jù)或?qū)π阅芤筝^高時(shí),可以考慮使用Disruptor作為ArrayBlockingQueue的替代者。
為什么Disruptor會(huì)那么快
Disruptor通過(guò)以下設(shè)計(jì)來(lái)解決隊(duì)列速度慢的問(wèn)題: - 環(huán)形數(shù)組結(jié)構(gòu)
為了避免垃圾回收,采用數(shù)組而非鏈表。同時(shí),數(shù)組對(duì)處理器的緩存機(jī)制更加友好。 - 元素位置定位
數(shù)組長(zhǎng)度2^n,通過(guò)位運(yùn)算,加快定位的速度。下標(biāo)采取遞增的形式。不用擔(dān)心index溢出的問(wèn)題。index是long類(lèi)型,即使100萬(wàn)QPS的處理速度,也需要30萬(wàn)年才能用完。 - 無(wú)鎖設(shè)計(jì)
每個(gè)生產(chǎn)者或者消費(fèi)者線(xiàn)程,會(huì)先申請(qǐng)可以操作的元素在數(shù)組中的位置,申請(qǐng)到之后,直接在該位置寫(xiě)入或者讀取數(shù)據(jù)。
Disruptor的使用
代碼實(shí)現(xiàn)的功能:每10ms向disruptor中插入一個(gè)元素,消費(fèi)者讀取數(shù)據(jù),并打印到終端。詳細(xì)邏輯請(qǐng)細(xì)讀代碼。
/**
* @description disruptor代碼樣例。每10ms向disruptor中插入一個(gè)元素,消費(fèi)者讀取數(shù)據(jù),并打印到終端
*/
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ThreadFactory;
public class DisruptorMain
{
public static void main(String[] args) throws Exception
{
// 隊(duì)列中的元素
class Element {
private int value;
public int get(){
return value;
}
public void set(int value){
this.value= value;
}
}
// 生產(chǎn)者的線(xiàn)程工廠
ThreadFactory threadFactory = new ThreadFactory(){
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "simpleThread");
}
};
// RingBuffer生產(chǎn)工廠,初始化RingBuffer的時(shí)候使用
EventFactory<Element> factory = new EventFactory<Element>() {
@Override
public Element newInstance() {
return new Element();
}
};
// 處理Event的handler
EventHandler<Element> handler = new EventHandler<Element>(){
@Override
public void onEvent(Element element, long sequence, boolean endOfBatch)
{
System.out.println("Element: " + element.get());
}
};
// 阻塞策略
BlockingWaitStrategy strategy = new BlockingWaitStrategy();
// 指定RingBuffer的大小
int bufferSize = 16;
// 創(chuàng)建disruptor,采用單生產(chǎn)者模式
Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
// 設(shè)置EventHandler
disruptor.handleEventsWith(handler);
// 啟動(dòng)disruptor的線(xiàn)程
disruptor.start();
RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();
for (int l = 0; true; l++)
{
// 獲取下一個(gè)可用位置的下標(biāo)
long sequence = ringBuffer.next();
try
{
// 返回可用位置的元素
Element event = ringBuffer.get(sequence);
// 設(shè)置該位置元素的值
event.set(l);
}
finally
{
ringBuffer.publish(sequence);
}
Thread.sleep(10);
}
}
}
總結(jié)
Disruptor通過(guò)精巧的無(wú)鎖設(shè)計(jì)實(shí)現(xiàn)了在高并發(fā)情形下的高性能。
在美團(tuán)內(nèi)部,很多高并發(fā)場(chǎng)景借鑒了Disruptor的設(shè)計(jì),減少競(jìng)爭(zhēng)的強(qiáng)度。其設(shè)計(jì)思想可以擴(kuò)展到分布式場(chǎng)景,通過(guò)無(wú)鎖設(shè)計(jì),來(lái)提升服務(wù)性能。
使用Disruptor比使用ArrayBlockingQueue略微復(fù)雜,為方便讀者上手,增加代碼樣例。