Java自帶的線程安全隊(duì)列
介紹Disruptor之前,我們先來(lái)看一看常用的線程安全的內(nèi)置隊(duì)列有什么問(wèn)題。Java的內(nèi)置隊(duì)列如下表所示。
| 隊(duì)列 | 有界性 | 鎖 | 數(shù)據(jù)結(jié)構(gòu) |
|---|---|---|---|
| ArrayBlockingQueue | bounded | 加鎖 | arraylist |
| LinkedBlockingQueue | optionally-bounded | 加鎖 | linkedlist |
| ConcurrentLinkedQueue | unbounded | 無(wú)鎖 | linkedlist |
| LinkedTransferQueue | unbounded | 無(wú)鎖 | linkedlist |
| PriorityBlockingQueue | unbounded | 加鎖 | heap |
| DelayQueue | unbounded | 加鎖 | heap |
狹義上真正的鎖,或者說(shuō)所謂的重量級(jí)鎖,是需要操作系統(tǒng)配合的,某一個(gè)線程占用一個(gè)CPU在執(zhí)行,運(yùn)行到Lock之后,底層調(diào)用操作系統(tǒng)test and set指令對(duì)鎖對(duì)應(yīng)的一個(gè)內(nèi)存中的整型數(shù)進(jìn)行原子性設(shè)置,設(shè)置成功代表獲取鎖,線程繼續(xù)執(zhí)行,否則代表未競(jìng)爭(zhēng)到鎖,線程掛起,等待鎖釋放后自己被喚醒。另外,由于多核的原因,在操作系統(tǒng)對(duì)上述整型數(shù)進(jìn)行設(shè)置的時(shí)候還要能夠鎖總線,確保同時(shí)只有一個(gè)CPU核能夠通過(guò)總線來(lái)寫內(nèi)存,這是通過(guò)一個(gè)硬件HLOCK Pin高低電位來(lái)實(shí)現(xiàn)的,低電位代表總線此刻被某個(gè)CPU核心獨(dú)占。
不加鎖的隊(duì)列,使用cas操作原子變量status,各線程cas(status, 0, 1),返回true說(shuō)明獲取到“鎖”、可以操作隊(duì)列、比如入隊(duì)和出隊(duì),返回0則說(shuō)明獲取“鎖”失敗。然后我們發(fā)現(xiàn)java提供的兩個(gè)無(wú)鎖隊(duì)列都是無(wú)界的。穩(wěn)定性要求高的系統(tǒng),使用無(wú)界隊(duì)列有一定風(fēng)險(xiǎn),如果生產(chǎn)方速度過(guò)快,會(huì)導(dǎo)致隊(duì)列里積壓大量任務(wù),由此可能會(huì)導(dǎo)致內(nèi)存溢出,或者消費(fèi)端疲于處理。
所以我們要找個(gè)有界隊(duì)列,然后我們可能為了減少gc對(duì)系統(tǒng)性能的影響,會(huì)盡可能選擇array/heap格式的數(shù)據(jù)結(jié)構(gòu),這樣算下來(lái)也只有ArrayBlockingQueue了。
但是,ArrayBlockingQueue整體1把鎖,恰恰是性能最差的那個(gè)。
鎖帶來(lái)的開(kāi)銷是什么?
- 線程競(jìng)爭(zhēng)不到鎖被掛起,鎖釋放時(shí)線程又被喚醒,這個(gè)過(guò)程中帶來(lái)上下文切換,帶來(lái)比較大的開(kāi)銷
- 線程在等待鎖的過(guò)程中,會(huì)被中斷,不能做任何事情。
- 持有鎖的線程也可能會(huì)遇到缺頁(yè)錯(cuò)誤、調(diào)度延遲或者其他類似的系統(tǒng)級(jí)的情況,這個(gè)線程無(wú)法執(zhí)行下去,變相的相當(dāng)于所有需要這個(gè)鎖都無(wú)法執(zhí)行。
一般的,不加鎖的性能 > CAS的性能 > 加鎖的性能
在高度競(jìng)爭(zhēng)的情況下,鎖的性能可能會(huì)超過(guò)不斷循環(huán)CAS的性能,所以會(huì)有所謂的鎖升級(jí): 無(wú)鎖,偏向所,輕量鎖,重量鎖。
偽共享問(wèn)題
CPU操作L1\L2\L3高速緩存的時(shí)候的最小單位是緩存行cache line,64字節(jié),現(xiàn)在假設(shè)一個(gè)場(chǎng)景:一個(gè)cache line已經(jīng)在兩個(gè)CPU核心的L1緩存里了,這個(gè)cache line里邊有兩個(gè)變量,X和Y,分別由兩個(gè)線程去內(nèi)存里修改這兩個(gè)變量的本體。X被線程1修改之后,根據(jù)MESI協(xié)議,這個(gè)緩存行變?yōu)槭В秋@然同時(shí)波及了變量Y,CPU2上執(zhí)行的線程2也會(huì)發(fā)現(xiàn)自己的L1中的Y緩存也失效了,只能去內(nèi)存中讀取。這就是偽共享問(wèn)題。偽共享問(wèn)題的危害是導(dǎo)致CPU高速緩存失效,而轉(zhuǎn)為讀取更慢的內(nèi)存。
關(guān)于RingBuffer
核心數(shù)據(jù)結(jié)構(gòu)RingBuffer是個(gè)環(huán)形數(shù)組,容量自定義、超過(guò)之后會(huì)從頭覆寫,但下標(biāo)用的是long,幾乎是無(wú)盡的,所以不用擔(dān)心用完。
byte的取值范圍為-128~127,占用1個(gè)字節(jié)(-2的7次方到2的7次方-1)
short的取值范圍為-32768~32767,占用2個(gè)字節(jié)(-2的15次方到2的15次方-1)
int的取值范圍為(-2147483648~2147483647),占用4個(gè)字節(jié)(-2的31次方到2的31次方-1)
long的取值范圍為(-9223372036854774808~9223372036854774807),占用8個(gè)字節(jié)(-2的63次方到2的63次方-1)
2^63-1個(gè)slot,假如每秒使用100萬(wàn)個(gè)slot、即100萬(wàn)QPS的處理速度,也可以用9223372036854秒,約為292,471年...
所以,用就是了!
多生產(chǎn)者情況需要解決的問(wèn)題
寫數(shù)據(jù),計(jì)算n個(gè)生產(chǎn)者本次一共要寫入多少個(gè)slot,然后從ring buffer上申請(qǐng)這個(gè)slot區(qū)間。在這個(gè)slot區(qū)間上分成n個(gè)段,每個(gè)段對(duì)應(yīng)1個(gè)生產(chǎn)者、每個(gè)段的長(zhǎng)度就是這個(gè)生成者自己要寫入的slot數(shù)。這樣就解決了“多個(gè)生產(chǎn)者線程重復(fù)的寫了同一個(gè)slot”的問(wèn)題。
-
讀數(shù)據(jù),消費(fèi)者線程申請(qǐng)讀取到n下標(biāo),首先write cursor一定是要大于n的,因?yàn)橐WC讀取的是已經(jīng)可以允許寫入的slot,其次,即便如此仍還不能確定消費(fèi)線程本次可以直接讀取到n,因?yàn)榇藭r(shí)生產(chǎn)者正在并行的寫入,write cursor只是“當(dāng)前可以寫入到這里為止”的意思,要防止消費(fèi)者線程讀到還未真正寫入的slot,disruptor采用的是另一個(gè)與RingBuffer一樣大小的AvailableBuffer,生產(chǎn)者每次寫入相應(yīng)的RingBuffer的slot之后,就去AvailableBuffer上對(duì)應(yīng)的位置置寫入標(biāo)識(shí)為已寫入。
例如:消費(fèi)者讀線程申請(qǐng)讀取到下標(biāo)從3到11的slot,判斷writer cursor>=11。然后開(kāi)始讀取AvailableBuffer,從3開(kāi)始,往后讀取,發(fā)現(xiàn)下標(biāo)為7的slot沒(méi)有生產(chǎn)成功,于是WaitFor(11)返回6,表明RingBuffer現(xiàn)在只能讀取到slot下標(biāo)6。這樣本次消費(fèi)者最終讀取從3到6共計(jì)4個(gè)slot里邊的元素。
Disruptor在Log4j2中的應(yīng)用
筆者曾經(jīng)主持優(yōu)化的一個(gè)springboot應(yīng)用,核心功能是要接收客戶端上報(bào)的行為日志,將其輸出到本地日志文件(當(dāng)然可以直接輸出到kafka這樣的分布式隊(duì)列式文件系統(tǒng),但當(dāng)時(shí)確實(shí)限于團(tuán)隊(duì)條件先直接輸出到本地日志里了),使用的是logback,在壓測(cè)與優(yōu)化過(guò)程中,將log appender配置為async appender之后,性能有了極大的提升,工作線程將日志輸出任務(wù)丟進(jìn)async appender的隊(duì)列之后馬上返回,壓測(cè)tps提升。而logback的async appender所使用的生產(chǎn)者消費(fèi)者模式中的隊(duì)列用的是ArrayBlockingQueue,Log4j2在吸收了log4j與logback的優(yōu)點(diǎn)基礎(chǔ)之上,進(jìn)一步優(yōu)化了性能,以異步日志為例,其使用了Disruptor代替了ABQ,根據(jù)美團(tuán)公司對(duì)Log4j2的實(shí)踐,其異步日志的性能較logback提升了1個(gè)數(shù)量級(jí),較同步日志提升了2個(gè)數(shù)量級(jí)。
https://tech.meituan.com/2016/11/18/disruptor.html 《高性能隊(duì)列——Disruptor》美團(tuán)技術(shù)團(tuán)隊(duì)