淺談消息隊(duì)列(MQ)

一. 認(rèn)識(shí)消息隊(duì)列

1. 隊(duì)列

隊(duì)列(queue)是只允許在一端進(jìn)行插入操作,而在另一端進(jìn)行刪除操作的線性表(數(shù)據(jù)結(jié)構(gòu))。
隊(duì)列是一種先進(jìn)先出(First in First Out)的線性表,簡(jiǎn)稱FIFO。允許插入的一端稱為隊(duì)尾,允許刪除的一端稱為隊(duì)頭。假設(shè)隊(duì)列是 q=(a1,a2,…,an),那么a1就是隊(duì)頭元素,而an是隊(duì)尾元素。這樣我們就可以刪除時(shí),總是從a1開始,而插入時(shí),列在最后。

隊(duì)列數(shù)據(jù)結(jié)構(gòu)

2. jdk中的實(shí)現(xiàn)

隊(duì)列實(shí)現(xiàn)
常見隊(duì)列:
  1. ArrayBlockingQueue :由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。
  2. LinkedBlockingQueue :由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列。
  3. PriorityBlockingQueue :支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列。
  4. DelayQueue:使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無(wú)界阻塞隊(duì)列。
  5. SynchronousQueue:不存儲(chǔ)元素的阻塞隊(duì)列。
  6. LinkedTransferQueue:由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列。
  7. LinkedBlockingDeque:由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。

ArrayBlockingQueue(公平、非公平):
數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序。默認(rèn)情況下
不保證訪問者公平的訪問隊(duì)列,所謂公平訪問隊(duì)列是指阻塞的所有生產(chǎn)者線程或消費(fèi)者線程,當(dāng)
隊(duì)列可用時(shí),可以按照阻塞的先后順序訪問隊(duì)列,即先阻塞的生產(chǎn)者線程,可以先往隊(duì)列里插入
元素,先阻塞的消費(fèi)者線程,可以先從隊(duì)列里獲取元素。通常情況下為了保證公平性會(huì)降低吞吐
量。我們可以使用以下代碼創(chuàng)建一個(gè)公平的阻塞隊(duì)列:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

LinkedBlockingQueue(兩個(gè)獨(dú)立鎖提高并發(fā))
基于鏈表的阻塞隊(duì)列,同 ArrayListBlockingQueue 類似,此隊(duì)列按照先進(jìn)先出(FIFO)的原則對(duì)
元素進(jìn)行排序。而 LinkedBlockingQueue 之所以能夠高效的處理并發(fā)數(shù)據(jù),還因?yàn)槠鋵?duì)于生產(chǎn)者
端和消費(fèi)者端分別采用了獨(dú)立的鎖來(lái)控制數(shù)據(jù)同步,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)
者可以并行地操作隊(duì)列中的數(shù)據(jù),以此來(lái)提高整個(gè)隊(duì)列的并發(fā)性能。
LinkedBlockingQueue 會(huì)默認(rèn)一個(gè)類似無(wú)限大小的容量(Integer.MAX_VALUE)。

PriorityBlockingQueue(compareTo 排序?qū)崿F(xiàn)優(yōu)先)
是一個(gè)支持優(yōu)先級(jí)的無(wú)界隊(duì)列。默認(rèn)情況下元素采取自然順序升序排列??梢?code>自定義實(shí)現(xiàn) compareTo()方法來(lái)指定元素進(jìn)行排序規(guī)則,或者初始化 PriorityBlockingQueue 時(shí),指定構(gòu)造 參數(shù) Comparator來(lái)對(duì)元素進(jìn)行排序。需要注意的是不能保證同優(yōu)先級(jí)元素的順序。

SynchronousQueue(不存儲(chǔ)數(shù)據(jù)、可用于傳遞數(shù)據(jù))
是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每一個(gè) put 操作必須等待一個(gè) take 操作,否則不能繼續(xù)添加元素。
SynchronousQueue 可以看成是一個(gè)傳球手,負(fù)責(zé)把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費(fèi)者線
程。隊(duì)列本身并不存儲(chǔ)任何元素,非常適合于傳遞性場(chǎng)景,比如在一個(gè)線程中使用的數(shù)據(jù),傳遞給
另 外 一 個(gè) 線 程 使 用 , SynchronousQueue 的 吞 吐 量 高 于 LinkedBlockingQueue 和
ArrayBlockingQueue。

LinkedBlockingDeque
是一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列所謂雙向隊(duì)列指的你可以從隊(duì)列的兩端插入和移出元素。
雙端隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口,在多線程同時(shí)入隊(duì)時(shí),也就減少了一半的競(jìng)爭(zhēng)。相比其
他的阻塞隊(duì)列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,
peekFirst,peekLast 等方法,以 First 單詞結(jié)尾的方法,表示插入,獲?。╬eek)或移除雙端隊(duì)
列的第一個(gè)元素。以 Last 單詞結(jié)尾的方法,表示插入,獲取或移除雙端隊(duì)列的最后一個(gè)元素。另
外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。但是 take 方法卻等同
于 takeFirst,不知道是不是 Jdk 的 bug,使用時(shí)還是用帶有 First 和 Last 后綴的方法更清楚。
在初始化 LinkedBlockingDeque 時(shí)可以設(shè)置容量防止其過渡膨脹。另外雙向阻塞隊(duì)列可以運(yùn)用在
“工作竊取”模式中。

LinkedTransferQueue
是 一 個(gè) 由 鏈 表 結(jié) 構(gòu) 組 成 的 無(wú) 界 阻 塞 TransferQueue 隊(duì) 列 。 相 對(duì) 于 其 他 阻 塞 隊(duì) 列 ,
LinkedTransferQueue 多了 tryTransfertransfer 方法。

  1. transfer 方法:如果當(dāng)前有消費(fèi)者正在等待接收元素(消費(fèi)者使用 take()方法或帶時(shí)間限制的
    poll()方法時(shí)),transfer 方法可以把生產(chǎn)者傳入的元素立刻 transfer(傳輸)給消費(fèi)者。如
    果沒有消費(fèi)者在等待接收元素,transfer 方法會(huì)將元素存放在隊(duì)列的 tail 節(jié)點(diǎn),并等到該元素 被消費(fèi)者消費(fèi)了才返回。
  2. tryTransfer 方法。則是用來(lái)試探下生產(chǎn)者傳入的元素是否能直接傳給消費(fèi)者。如果沒有消費(fèi)
    者等待接收元素,則返回 false。和 transfer 方法的區(qū)別是 tryTransfer 方法無(wú)論消費(fèi)者是否
    接收,方法立即返回。而 transfer 方法是必須等到消費(fèi)者消費(fèi)了才返回。
    對(duì)于帶有時(shí)間限制的 tryTransfer(E e, long timeout, TimeUnit unit)方法,則是試圖把生產(chǎn)者傳
    入的元素直接傳給消費(fèi)者,但是如果沒有消費(fèi)者消費(fèi)該元素則等待指定的時(shí)間再返回,如果超時(shí)
    還沒消費(fèi)元素,則返回 false,如果在超時(shí)時(shí)間內(nèi)消費(fèi)了元素,則返回 true。

DelayQueue(緩存失效、定時(shí)任務(wù) )
是一個(gè)支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列。隊(duì)列使用 PriorityQueue 來(lái)實(shí)現(xiàn)。隊(duì)列中的元素必須實(shí)
現(xiàn) Delayed 接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。只有在延遲期滿時(shí)才
能從隊(duì)列中提取元素。我們可以將 DelayQueue 運(yùn)用在以下應(yīng)用場(chǎng)景:

  1. 緩存系統(tǒng)的設(shè)計(jì):可以用 DelayQueue 保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢
    DelayQueue,一旦能從 DelayQueue 中獲取元素時(shí),表示緩存有效期到了。

  2. 定時(shí)任務(wù)調(diào)度:使用 DelayQueue 保存當(dāng)天將會(huì)執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從
    DelayQueue 中獲取到任務(wù)就開始執(zhí)行,從比如 TimerQueue 就是使用 DelayQueue 實(shí)現(xiàn)的。

3. 消息隊(duì)列(Message Queue):

維基百科:


維基百科消息隊(duì)列解釋

在計(jì)算機(jī)科學(xué)領(lǐng)域,消息隊(duì)列和郵箱都是軟件工程組件,通常用于進(jìn)程間或同一進(jìn)程內(nèi)的線程通信。它們通過隊(duì)列來(lái)傳遞消息-傳遞控制信息或內(nèi)容,群組通信系統(tǒng)提供類似的功能。

簡(jiǎn)單的概括下上面的定義:消息隊(duì)列就是一個(gè)使用隊(duì)列來(lái)通信的組件。

上面的定義沒有錯(cuò),但就現(xiàn)在而言我們?nèi)粘Kf的消息隊(duì)列常常指代的是消息中間件,它的存在不僅僅只是為了通信這個(gè)問題。

可以認(rèn)為是在消息的傳輸過程中保存消息的容器。


消息隊(duì)列

二. 消息隊(duì)列的兩種模式

1. 點(diǎn)對(duì)點(diǎn)模式(一對(duì)一,消費(fèi)者主動(dòng)拉取數(shù)據(jù),消息收到后消息清除)

消息生產(chǎn)者生產(chǎn)消息發(fā)送到 Queue 中,消費(fèi)者從 Queue 中取出消費(fèi)消息,消息被消費(fèi)以后,Queue中不在存儲(chǔ)該消息,所以消費(fèi)者不可能消費(fèi)已經(jīng)消費(fèi)過的消息。Queue支持有多個(gè)消費(fèi)者,但是對(duì)于一條消息而言,只會(huì)被一個(gè)消費(fèi)者消費(fèi)。

生產(chǎn)者往某個(gè)隊(duì)列里面發(fā)送消息,一個(gè)隊(duì)列可以存儲(chǔ)多個(gè)生產(chǎn)者的消息,一個(gè)隊(duì)列也可以有多個(gè)消費(fèi)者, 但是消費(fèi)者之間是競(jìng)爭(zhēng)關(guān)系,即每條消息只能被一個(gè)消費(fèi)者消費(fèi)。

點(diǎn)對(duì)點(diǎn)模式
2)發(fā)布/訂閱模式(一對(duì)多,消費(fèi)者消費(fèi)數(shù)據(jù)后不會(huì)清除數(shù)據(jù))

消息生產(chǎn)者(發(fā)布)生產(chǎn)消息到Topic中,多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息。發(fā)布到Topic的消息可以同時(shí)被多個(gè)訂閱者消費(fèi)。

其實(shí)可以這么理解,發(fā)布/訂閱模型等于我們都加入了一個(gè)群聊中,我發(fā)一條消息,加入了這個(gè)群聊的人都能收到這條消息。那么一對(duì)一模式就是一對(duì)一聊天,我發(fā)給你的消息,只能在你的聊天窗口彈出,是不可能彈出到別人的聊天窗口中的。

講到這有人說,那我一對(duì)一聊天對(duì)每個(gè)人都發(fā)同樣的消息不就也實(shí)現(xiàn)了一條消息被多個(gè)人消費(fèi)了嘛。

是的,通過多隊(duì)列全量存儲(chǔ)相同的消息,即數(shù)據(jù)的冗余可以實(shí)現(xiàn)一條消息被多個(gè)消費(fèi)者消費(fèi)。RabbitMQ 就是采用隊(duì)列模型,通過 Exchange 模塊來(lái)將消息發(fā)送至多個(gè)隊(duì)列,解決一條消息需要被多個(gè)消費(fèi)者消費(fèi)問題。

發(fā)布/訂閱模式

總結(jié):點(diǎn)對(duì)點(diǎn)模式每條消息只能被一個(gè)消費(fèi)者消費(fèi),而發(fā)布/訂閱模型就是為讓一條消息可以被多個(gè)消費(fèi)者消費(fèi)而生的,當(dāng)然點(diǎn)對(duì)點(diǎn)模式也可以通過消息全量存儲(chǔ)至多個(gè)隊(duì)列來(lái)解決一條消息被多個(gè)消費(fèi)者消費(fèi)問題,但是會(huì)有數(shù)據(jù)的冗余。

三. 為什么要使用消息隊(duì)列?

從本質(zhì)上來(lái)說是因?yàn)榛ヂ?lián)網(wǎng)的快速發(fā)展,業(yè)務(wù)不斷擴(kuò)張,促使技術(shù)架構(gòu)需要不斷的演進(jìn)。

從以前的單體架構(gòu)到現(xiàn)在的微服務(wù)架構(gòu),成百上千的服務(wù)之間相互調(diào)用和依賴。從互聯(lián)網(wǎng)初期一個(gè)服務(wù)器上有 100 個(gè)在線用戶已經(jīng)很了不得,到現(xiàn)在坐擁10億日活躍量的微信。我們需要有一個(gè)「東西」來(lái)解耦服務(wù)之間的關(guān)系、控制資源合理合時(shí)的使用以及緩沖流量洪峰等等。

消息隊(duì)列就應(yīng)運(yùn)而生了。它常用來(lái)實(shí)現(xiàn):異步處理、服務(wù)解耦、流量控制。

1. 異步

很多時(shí)候,用戶不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,允許用戶把一個(gè)消息放入隊(duì)列,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少,然后在需要的時(shí)候再去處理它們。

隨著業(yè)務(wù)復(fù)雜度、架構(gòu)的演變,微服務(wù)已經(jīng)廣泛應(yīng)用在各個(gè)系統(tǒng)中。
比如一個(gè)購(gòu)物網(wǎng)站,涉及到很多系統(tǒng):訂單系統(tǒng)、支付系統(tǒng)、庫(kù)存系統(tǒng)、積分系統(tǒng)、短信系統(tǒng)....
傳統(tǒng)的業(yè)務(wù)流程:


傳統(tǒng)業(yè)務(wù)流程

如果按照傳統(tǒng)的流程,真正的電商系統(tǒng)在下單支付后會(huì)涉及到更多的系統(tǒng),這個(gè)鏈路這樣下去,耗時(shí)是非常巨大的,不可能買個(gè)東西花費(fèi)十幾秒(用戶體驗(yàn)感極差)
其實(shí)從上邊的流程中可以看出支付后,積分增減的同時(shí)也可以發(fā)短信,這時(shí)候可以考慮異步

異步流程

異步處理,為什么不用多線程去做?這就涉及到下面第二點(diǎn)解耦

如果一個(gè)訂單流程,增減積分,發(fā)短信,扣庫(kù)存都在支付系統(tǒng)中進(jìn)行調(diào)用,如果再增加一個(gè)優(yōu)惠券系統(tǒng),又需要調(diào)用扣優(yōu)惠券的接口,如果再增加,又需要增加或修改代碼....每次增加業(yè)務(wù)時(shí)支付系統(tǒng)都需要相應(yīng)的修改和重新部署,耦合性非常高(一個(gè)友好系統(tǒng)的設(shè)計(jì)應(yīng)該滿足低耦合、高內(nèi)聚的特點(diǎn)),而且所有流程都寫在一起,排查問題的復(fù)雜度也會(huì)上升。

所以,使用消息隊(duì)列是有必要的,下單后把支付成功的消息發(fā)送到MQ,其他系統(tǒng)通過監(jiān)聽消息去處理各自的業(yè)務(wù),面要再接入系統(tǒng),直接訂閱發(fā)送的支付成功消息即可。


使用MQ流程

2. 解耦

允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

3. 削峰(靈活性 & 峰值處理能力)

在訪問量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見。如果為以能處理這類峰值訪問為標(biāo)準(zhǔn)來(lái)投入資源隨時(shí)待命無(wú)疑是巨大的浪費(fèi)。使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰。

比如:秒殺活動(dòng)、雙十一.....流量突然暴增,而平時(shí)都是低流量,如果承受能力不夠,可能就會(huì)突然崩掉
所以:可以把請(qǐng)求放到隊(duì)列里面,然后至于每秒消費(fèi)多少請(qǐng)求,就看自己的服務(wù)器處理能力,能處理多多少Q(mào)PS就消費(fèi)這么多少,可能會(huì)比正常的慢一點(diǎn),但是不至于打掛服務(wù)器,等流量高峰下去了,服務(wù)也就沒壓力了。
阿里雙十一12:00的時(shí)候這么多流量瞬間涌進(jìn)去,他有時(shí)候是會(huì)慢一點(diǎn),但是不會(huì)直接掛掉,或者降級(jí)給你個(gè)友好的提示頁(yè)面。

2. 可恢復(fù)性

系統(tǒng)的一部分組件失效時(shí),不會(huì)影響到整個(gè)系統(tǒng)。消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。

3. 緩沖

有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度,解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況。

四. 使用消息隊(duì)列有什么問題?

1. 系統(tǒng)復(fù)雜性

本來(lái)只有一個(gè)系統(tǒng),現(xiàn)在接入一個(gè)中間件,需要去維護(hù)他,而且使用的過程中是不是要考慮各種問題,比如消息重復(fù)消費(fèi)、消息丟失、消息的順序消費(fèi)等等。

(1) 重復(fù)消費(fèi)
消息重復(fù)消費(fèi)是使用消息隊(duì)列之后,必須考慮的一個(gè)問題,也是比較嚴(yán)重和常見的問題。
特別是對(duì)于交易支付的場(chǎng)景,重復(fù)的操作和行為會(huì)導(dǎo)致重復(fù)扣款,重復(fù)支付等資金風(fēng)險(xiǎn)
就比如有這樣的一個(gè)場(chǎng)景,用戶下單成功后我需要去一個(gè)活動(dòng)頁(yè)面給他加GMV(銷售總額),最后根據(jù)他的GMV去給他發(fā)獎(jiǎng)勵(lì),這是電商活動(dòng)很常見的玩法。

一般消息隊(duì)列的使用,我們都是有重試機(jī)制的,就是說我下游的業(yè)務(wù)發(fā)生異常了,我會(huì)拋出異常并且要求你重新發(fā)一次。

但是可能下游有多個(gè)系統(tǒng),某個(gè)系統(tǒng)業(yè)務(wù)處理失敗要求重發(fā),但是其他系統(tǒng)處理成功了,會(huì)再次消費(fèi),導(dǎo)致不冪等

冪等(idempotent、idempotence)是一個(gè)數(shù)學(xué)與計(jì)算機(jī)學(xué)概念,常見于抽象代數(shù)中。
在編程中一個(gè)冪等操作的特點(diǎn)是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。
冪等函數(shù),或冪等方法,是指可以使用相同參數(shù)重復(fù)執(zhí)行,并能獲得相同結(jié)果的函數(shù)。這些函數(shù)不會(huì)影響系統(tǒng)狀態(tài),也不用擔(dān)心重復(fù)執(zhí)行會(huì)對(duì)系統(tǒng)造成改變。
例如,“setTrue()”函數(shù)就是一個(gè)冪等函數(shù),無(wú)論多次執(zhí)行,其結(jié)果都是一樣的.更復(fù)雜的操作冪等保證是利用唯一交易號(hào)(流水號(hào))實(shí)現(xiàn).

怎么保證冪等?
一般冪等,分場(chǎng)景去考慮,看是強(qiáng)校驗(yàn)還是弱校驗(yàn),比如跟金錢相關(guān)的場(chǎng)景那就很關(guān)鍵,就做強(qiáng)校驗(yàn),其他不是很重要的場(chǎng)景做弱校驗(yàn)。

  • 強(qiáng)校驗(yàn):

比如你監(jiān)聽到用戶支付成功的消息,你監(jiān)聽到了去加GMV是不是要調(diào)用加錢的接口,那加錢接口下面再調(diào)用一個(gè)加流水的接口,兩個(gè)放在一個(gè)事務(wù),成功一起成功失敗一起失敗。

每次消息過來(lái)都要拿著訂單號(hào)+業(yè)務(wù)場(chǎng)景這樣的唯一標(biāo)識(shí)(比是天貓雙十一活動(dòng))去流水表查,看看有沒有這條流水,有就直接return不要走下面的流程了,沒有就執(zhí)行后面的邏輯。

之所以用流水表,是因?yàn)樯婕暗浇疱X這樣的活動(dòng),有啥問題后面也可以去流水表對(duì)賬,還有就是幫助開發(fā)人員定位問題。

  • 弱校驗(yàn)

這個(gè)簡(jiǎn)單,一些不重要的場(chǎng)景,比如給誰(shuí)發(fā)短信啥的,我就把這個(gè)id+場(chǎng)景唯一標(biāo)識(shí)作為Redis的key,放到緩存里面失效時(shí)間看你場(chǎng)景,一定時(shí)間內(nèi)的這個(gè)消息就去Redis判斷。

用KV就算消息丟了可能這樣的場(chǎng)景也沒關(guān)系,反正丟條無(wú)關(guān)痛癢的通知短信嘛(你敢說你沒驗(yàn)證碼短信丟失的情況?)。

因此需要改造業(yè)務(wù)處理邏輯,使得在重復(fù)消息的情況下也不會(huì)影響最終的結(jié)果。

  • 可以通過上面我那條 SQL 一樣,做了個(gè)前置條件判斷,即money = 100情況,并且直接修改,更通用的是做個(gè)version即版本號(hào)控制,對(duì)比消息中的版本號(hào)和數(shù)據(jù)庫(kù)中的版本號(hào)。
  • 或者通過數(shù)據(jù)庫(kù)的約束例如唯一鍵,例如insert into update on duplicate key...。
  • 或者記錄關(guān)鍵的key,比如處理訂單這種,記錄訂單ID,假如有重復(fù)的消息過來(lái),先判斷下這個(gè)ID是否已經(jīng)被處理過了,如果沒處理再進(jìn)行下一步。當(dāng)然也可以用全局唯一ID等等。
    基本上就這么幾個(gè)套路,真正應(yīng)用到實(shí)際中還是得看具體業(yè)務(wù)細(xì)節(jié)。

(2)消息有序

一般都是同個(gè)業(yè)務(wù)場(chǎng)景下不同幾個(gè)操作的消息同時(shí)過去,本身順序是對(duì)的,但是你發(fā)出去的時(shí)候同時(shí)發(fā)出去了,消費(fèi)的時(shí)候卻亂掉了,這樣就有題問了。

生產(chǎn)者消費(fèi)者一般需要保證順序消息的話,可能就是一個(gè)業(yè)務(wù)場(chǎng)景下的,比如訂單的創(chuàng)建、支付、發(fā)貨、收貨。

全局有序
如果要保證消息的全局有序,首先只能由一個(gè)生產(chǎn)者往Topic發(fā)送消息,并且一個(gè)Topic內(nèi)部只能有一個(gè)隊(duì)列(分區(qū))。消費(fèi)者也必須是單線程消費(fèi)這個(gè)隊(duì)列。這樣的消息就是全局有序的!
不過一般情況下我們都不需要全局有序,即使是同步MySQL Binlog也只需要保證單表消息有序即可。

全局有序

部分有序
因此絕大部分的有序需求是部分有序,部分有序我們就可以將Topic內(nèi)部劃分成我們需要的隊(duì)列數(shù),把消息通過特定的策略發(fā)往固定的隊(duì)列中,然后每個(gè)隊(duì)列對(duì)應(yīng)一個(gè)單線程處理的消費(fèi)者。這樣即完成了部分有序的需求,又可以通過隊(duì)列數(shù)量的并發(fā)來(lái)提高消息處理效率。

部分有序

RocketMQ:一個(gè)topic下有多個(gè)隊(duì)列,為了保證發(fā)送有序,RocketMQ提供了MessageQueueSelector隊(duì)列選擇機(jī)制,他有三種實(shí)現(xiàn):


隊(duì)列選擇機(jī)制

使用Hash取模法,讓同一個(gè)訂單發(fā)送到同一個(gè)隊(duì)列中,再使用同步發(fā)送,只有同個(gè)訂單的創(chuàng)建消息發(fā)送成功,再發(fā)送支付消息。這樣,我們保證了發(fā)送有序。

RocketMQ的topic內(nèi)的隊(duì)列機(jī)制,可以保證存儲(chǔ)滿足FIFO(First Input First Output 簡(jiǎn)單說就是指先進(jìn)先出),剩下的只需要消費(fèi)者順序消費(fèi)即可。

RocketMQ僅保證順序發(fā)送,順序消費(fèi)由消費(fèi)者業(yè)務(wù)保證!!!

(3)數(shù)據(jù)丟失
就我們市面上常見的消息隊(duì)列而言,只要配置得當(dāng),我們的消息就不會(huì)丟。

消息傳輸流程

1)生產(chǎn)者:開啟confirm模式
產(chǎn)者發(fā)送消息至Broker,需要處理Broker的響應(yīng),不論是同步還是異步發(fā)送消息,同步和異步回調(diào)都需要做好try-catch,妥善的處理響應(yīng),如果Broker返回寫入失敗等錯(cuò)誤消息,需要重試發(fā)送。當(dāng)多次發(fā)送失敗需要作報(bào)警,日志記錄等。

這樣就能保證在生產(chǎn)消息階段消息不會(huì)丟失。

2)MQ:消息持久化
存儲(chǔ)消息階段需要在消息刷盤(持久化)之后再給生產(chǎn)者響應(yīng),假設(shè)消息寫入緩存中就返回響應(yīng),那么機(jī)器突然斷電這消息就沒了,而生產(chǎn)者以為已經(jīng)發(fā)送成功了。

如果Broker是集群部署,有多副本機(jī)制,即消息不僅僅要寫入當(dāng)前Broker,還需要寫入副本機(jī)中。那配置成至少寫入兩臺(tái)機(jī)子后再給生產(chǎn)者響應(yīng)。這樣基本上就能保證存儲(chǔ)的可靠了。一臺(tái)掛了還有一臺(tái)還在呢(假如怕兩臺(tái)都掛了..那就再多些)。

3)消費(fèi)者:關(guān)閉自動(dòng)ack
這里經(jīng)常會(huì)有同學(xué)犯錯(cuò),有些同學(xué)當(dāng)消費(fèi)者拿到消息之后直接存入內(nèi)存隊(duì)列中就直接返回給Broker消費(fèi)成功,這是不對(duì)的。

你需要考慮拿到消息放在內(nèi)存之后消費(fèi)者就宕機(jī)了怎么辦。所以我們應(yīng)該在消費(fèi)者真正執(zhí)行完業(yè)務(wù)邏輯之后,再發(fā)送給Broker消費(fèi)成功,這才是真正的消費(fèi)了。

所以只要我們?cè)谙I(yè)務(wù)邏輯處理完成之后再給Broker響應(yīng),那么消費(fèi)階段消息就不會(huì)丟失。

保證消息的可靠性需要三方配合:
生產(chǎn)者:需要處理好Broker的響應(yīng),出錯(cuò)情況下利用重試、報(bào)警等手段。
MQ:需要控制響應(yīng)的時(shí)機(jī),單機(jī)情況下是消息刷盤后返回響應(yīng),集群多副本情況下,即發(fā)送至兩個(gè)副本及以上的情況下再返回響應(yīng)。
消費(fèi)者:需要在執(zhí)行完真正的業(yè)務(wù)邏輯之后再返回響應(yīng)給Broker。
但是要注意消息可靠性增強(qiáng)了,性能就下降了,等待消息刷盤、多副本同步后返回都會(huì)影響性能。因此還是看業(yè)務(wù),例如日志的傳輸可能丟那么一兩條關(guān)系不大,因此沒必要等消息刷盤再響應(yīng)。

(4)消息堆積
消息的堆積往往是因?yàn)樯a(chǎn)者的生產(chǎn)速度與消費(fèi)者的消費(fèi)速度不匹配。有可能是因?yàn)橄⑾M(fèi)失敗反復(fù)重試造成的,也有可能就是消費(fèi)者消費(fèi)能力弱,漸漸地消息就積壓了。

因此我們需要先定位消費(fèi)慢的原因,如果是bug則處理 bug ,如果是因?yàn)楸旧硐M(fèi)能力較弱,我們可以優(yōu)化下消費(fèi)邏輯,比如之前是一條一條消息消費(fèi)處理的,這次我們批量處理,比如數(shù)據(jù)庫(kù)的插入,一條一條插和批量插效率是不一樣的。

假如邏輯我們已經(jīng)都優(yōu)化了,但還是慢,那就得考慮水平擴(kuò)容了,增加Topic的隊(duì)列數(shù)和消費(fèi)者數(shù)量,注意隊(duì)列數(shù)一定要增加,不然新增加的消費(fèi)者是沒東西消費(fèi)的。一個(gè)Topic中,一個(gè)隊(duì)列只會(huì)分配給一個(gè)消費(fèi)者。

當(dāng)然你消費(fèi)者內(nèi)部是單線程還是多線程消費(fèi)那看具體場(chǎng)景。不過要注意上面提高的消息丟失的問題,如果你是將接受到的消息寫入內(nèi)存隊(duì)列之后,然后就返回響應(yīng)給Broker,然后多線程向內(nèi)存隊(duì)列消費(fèi)消息,假設(shè)此時(shí)消費(fèi)者宕機(jī)了,內(nèi)存隊(duì)列里面還未消費(fèi)的消息也就丟了。

2. 數(shù)據(jù)一致性

這個(gè)其實(shí)是分布式服務(wù)本身就存在的一個(gè)問題,不僅僅是消息隊(duì)列的問題,但是放在這里說是因?yàn)橛昧讼㈥?duì)列這個(gè)問題會(huì)暴露得比較嚴(yán)重一點(diǎn)。就像我開頭說的,你下單的服務(wù)自己保證自己的邏輯成功處理了,你成功發(fā)了消息,但是優(yōu)惠券系統(tǒng),積分系統(tǒng)等等這么多系統(tǒng),他們成功還是失敗你就不管了?我說了保證自己的業(yè)務(wù)數(shù)據(jù)對(duì)的就好了,其實(shí)還是比較不負(fù)責(zé)任的一種說法。(分布式事務(wù))

分布式事務(wù)

事務(wù)

一般是指要做的或所做的事情。
在計(jì)算機(jī)術(shù)語(yǔ)中是指訪問并可能更新數(shù)據(jù)庫(kù)中各種數(shù)據(jù)項(xiàng)的一個(gè)程序執(zhí)行單元(unit)。
事務(wù)通常由高級(jí)數(shù)據(jù)庫(kù)操縱語(yǔ)言或編程語(yǔ)言(如SQL,C++或Java)書寫的用戶程序用戶程序的執(zhí)行所引起,并用形如begin transaction和end transaction語(yǔ)句(或函數(shù)調(diào)用)來(lái)界定。
事務(wù)由事務(wù)開始(begin transaction)和事務(wù)結(jié)束(end transaction)之間執(zhí)行的全體操作組成。

特性

事務(wù)是恢復(fù)和并發(fā)控制的基本單位。
事務(wù)應(yīng)該具有4個(gè)屬性:原子性、一致性、隔離性、持久性。這四個(gè)屬性通常稱為ACID特性。
原子性(atomicity):一個(gè)事務(wù)是一個(gè)不可分割的工作單位,事務(wù)中包括的操作要么都做,要么都不做。
一致性(consistency):事務(wù)必須是使數(shù)據(jù)庫(kù)從一個(gè)一致性狀態(tài)變到另一個(gè)一致性狀態(tài)。一致性與原子性是密切相關(guān)的。
隔離性(isolation):一個(gè)事務(wù)的執(zhí)行不能被其他事務(wù)干擾。即一個(gè)事務(wù)內(nèi)部的操作及使用的數(shù)據(jù)對(duì)并發(fā)的其他事務(wù)是隔離的,并發(fā)執(zhí)行的各個(gè)事務(wù)之間不能互相干擾。
持久性(durability):持久性也稱永久性(permanence),指一個(gè)事務(wù)一旦提交,它對(duì)數(shù)據(jù)庫(kù)中數(shù)據(jù)的改變就應(yīng)該是永久性的。接下來(lái)的其他操作或故障不應(yīng)該對(duì)其有任何影響。
總結(jié)起來(lái)就是:事務(wù)就是一系列操作,要么同時(shí)成功,要么同時(shí)失敗。事務(wù)具有的特性 ACID (原子性、一致性、隔離性、持久性)。

分布式事務(wù)
我接觸和了解到的分布式事務(wù)大概分為:

  • 2pc(兩段式提交)
  • 3pc(三段式提交)
  • TCC(Try、Confirm、Cancel)
  • 最大努力通知
  • XA
  • 本地消息表(ebay研發(fā)出的)
  • 半消息/最終一致性(RocketMQ)

介紹下最簡(jiǎn)單的2pc(兩段式),以及大家以后可能比較常用的半消息事務(wù)也就是最終一致性,目的是讓大家理解下分布式事務(wù)里面消息中間件的作用,別的事務(wù)都大同小異,都有很多優(yōu)點(diǎn)。

當(dāng)然也都有種種弊端:

  • 例如長(zhǎng)時(shí)間鎖定數(shù)據(jù)庫(kù)資源,導(dǎo)致系統(tǒng)的響應(yīng)不快,并發(fā)上不去
  • 網(wǎng)絡(luò)抖動(dòng)出現(xiàn)腦裂情況,導(dǎo)致事物參與者,不能很好地執(zhí)行協(xié)調(diào)者的指令,導(dǎo)致數(shù)據(jù)不一致。
  • 單點(diǎn)故障:例如事物協(xié)調(diào)者,在某一時(shí)刻宕機(jī),雖然可以通過選舉機(jī)制產(chǎn)生新的Leader,但是這過程中,必然出現(xiàn)問題,而TCC,只有強(qiáng)悍的技術(shù)團(tuán)隊(duì),才能支持開發(fā),成本太高。

2pc(兩段式提交):

2pc

2pc(兩段式提交)可以說是分布式事務(wù)的最開始的樣子了,像極了媒婆,就是通過消息中間件協(xié)調(diào)多個(gè)系統(tǒng),在兩個(gè)系統(tǒng)操作事務(wù)的時(shí)候都鎖定資源但是不提交事務(wù),等兩者都準(zhǔn)備好了,告訴消息中間件,然后再分別提交事務(wù)。

是的你可能已經(jīng)發(fā)現(xiàn)了,如果A系統(tǒng)事務(wù)提交成功了,但是B系統(tǒng)在提交的時(shí)候網(wǎng)絡(luò)波動(dòng)或者各種原因提交失敗了,其實(shí)還是會(huì)失敗的。

最終一致性:

最終一致性

整個(gè)流程中,我們能保證是:

  • 業(yè)務(wù)主動(dòng)方本地事務(wù)提交失敗,業(yè)務(wù)被動(dòng)方不會(huì)收到消息的投遞。
  • 只要業(yè)務(wù)主動(dòng)方本地事務(wù)執(zhí)行成功,那么消息服務(wù)一定會(huì)投遞消息給下游的業(yè)務(wù)被動(dòng)方,并最終保證業(yè)務(wù)被動(dòng)方一定能成功消費(fèi)該消息(消費(fèi)成功或失敗,即最終一定會(huì)有一個(gè)最終態(tài))。

不過呢技術(shù)就是這樣,各種極端的情況我們都需要考慮,也很難有完美的方案,所以才會(huì)有這么多的方案三段式、TCC、最大努力通知等等分布式事務(wù)方案,大家只需要知道為啥要做,做了有啥好處,有啥壞處,在實(shí)際開發(fā)的時(shí)候都注意下就好好了,系統(tǒng)都是根據(jù)業(yè)務(wù)場(chǎng)景設(shè)計(jì)出來(lái)的,離開業(yè)務(wù)的技術(shù)沒有意義,離開技術(shù)的業(yè)務(wù)沒有底氣。

3. 高可用

無(wú)論是我們使用消息隊(duì)列來(lái)做解耦、異步還是削峰,消息隊(duì)列肯定不能是單機(jī)的。試著想一下,如果是單機(jī)的消息隊(duì)列,萬(wàn)一這臺(tái)機(jī)器掛了,那我們整個(gè)系統(tǒng)幾乎就是不可用了。

所以,當(dāng)我們項(xiàng)目中使用消息隊(duì)列,都是得集群/分布式的。要做集群/分布式就必然希望該消息隊(duì)列能夠提供現(xiàn)成的支持,而不是自己寫代碼手動(dòng)去實(shí)現(xiàn)。

五. 消息隊(duì)列中間件

目前使用較多的消息隊(duì)列有Kafka、ActiveMQ、RabbitMQ、RocketMQ,我們后面會(huì)一一對(duì)比這些消息隊(duì)列。
ActiveMQ和RabbitMQ這兩著因?yàn)橥掏铝窟€有GitHub的社區(qū)活躍度的原因,在各大互聯(lián)網(wǎng)公司都已經(jīng)基本上絕跡了,業(yè)務(wù)體量一般的公司會(huì)是有在用的,但是越來(lái)越多的公司更青睞RocketMQ這樣的消息中間件了。

Kafka和RocketMQ一直在各自擅長(zhǎng)的領(lǐng)域發(fā)光發(fā)亮,不過寫這篇文章的時(shí)候我問了螞蟻金服,字節(jié)跳動(dòng)和美團(tuán)的朋友,好像大家用的都有點(diǎn)不一樣,應(yīng)該都是各自的中間件,可能做過修改,也可能是自研的,大多沒有開源。

1. 對(duì)比

特性 ActiveMQ RabbitMQ RocketMQ Kafka
單機(jī)吞吐量 萬(wàn)級(jí),比RocketMQ、Kafka低一個(gè)數(shù)量級(jí) 萬(wàn)級(jí) 10萬(wàn)級(jí),支撐高吞吐量 10萬(wàn)級(jí),高吞吐,一般配合大數(shù)據(jù)類系統(tǒng)進(jìn)行實(shí)時(shí)數(shù)據(jù)計(jì)算、日志采集等場(chǎng)景
topic數(shù)量對(duì)吞吐量的影響 topic可以達(dá)到幾百/幾千的級(jí)別,吞吐量會(huì)有較小幅度下降,這是RocketMQ的一大優(yōu)勢(shì),在同等機(jī)器下可以支撐大量的Topic topic從幾十到幾百個(gè),吞吐量會(huì)大幅下降,在同等機(jī)器下,Kafka盡量保證topic數(shù)量不要過多,如需支撐大規(guī)模Topic需要增加更多的機(jī)器資源
時(shí)效性 ms級(jí) 微秒級(jí),延遲最低,這是RabbitMQ的一大特性 ms級(jí) 延遲在ms級(jí)內(nèi)
可用性 高,基于主從架構(gòu)實(shí)現(xiàn)高可用 同ActiveMQ 非常高,分布式架構(gòu) 非常高,分布式,一個(gè)數(shù)據(jù)多個(gè)副本,少數(shù)機(jī)器宕機(jī),不會(huì)丟失數(shù)據(jù),不會(huì)導(dǎo)致不可用
消息可靠性 有較低的概率丟失數(shù)據(jù) 基本不會(huì)丟失 經(jīng)過參數(shù)優(yōu)化配置,可以做到0丟失 同RocketMQ
功能支持 MQ領(lǐng)域的功能機(jī)器完備 基于erlang開發(fā),并發(fā)能力很強(qiáng),性能極好,延時(shí)很低 MQ功能較為完善,還是分布式的,擴(kuò)展性好 功能較為簡(jiǎn)單,主要支持簡(jiǎn)單的MQ功能,在大數(shù)據(jù)領(lǐng)域的實(shí)時(shí)計(jì)算以及日志采集中被大規(guī)模的使用
社區(qū)活躍度

2. 選擇

從上面對(duì)比可以看出其中的一些差距:
吞吐量:早期比較活躍的ActiveMQ 和RabbitMQ基本上不是后兩者的對(duì)手了,在現(xiàn)在這樣大數(shù)據(jù)的年代吞吐量真的很重要。
比如現(xiàn)在突然爆發(fā)了一個(gè)超級(jí)熱點(diǎn)新聞,你的APP注冊(cè)用戶高達(dá)億數(shù),你要想辦法第一時(shí)間把突發(fā)全部推送到每個(gè)人手上,你沒有大吞吐量的消息隊(duì)列中間件用啥去推?

部署方式:后面兩個(gè)天然分布式架構(gòu),都是高可用的分布式架構(gòu),而且數(shù)據(jù)多個(gè)副本的數(shù)據(jù)也能做到0丟失。

RabbitMQ 這個(gè)中間件其實(shí)還行,但是這玩意開發(fā)語(yǔ)言居然是erlang,我敢說絕大部分工程師肯定不會(huì)為了一個(gè)中間件去刻意學(xué)習(xí)一門語(yǔ)言的,開發(fā)維護(hù)成本你想都想不到,出個(gè)問題查都查半天。

RocketMQ(阿里開源的)git活躍度還可以,基本上你push了自己的bug確認(rèn)了有問題都有阿里大佬跟你試試解答并修復(fù)的,我個(gè)人推薦的也是這個(gè),他的架構(gòu)設(shè)計(jì)部分跟同樣是阿里開源的一個(gè)RPC框架是真的很像(Dubbo)。

Kafka:大數(shù)據(jù)領(lǐng)域,公司的日志采集,實(shí)時(shí)計(jì)算等場(chǎng)景,都離不開他的身影,他基本上算得上是世界范圍級(jí)別的消息隊(duì)列標(biāo)桿了。

沒有最好的技術(shù),只有最適合的技術(shù),不要為了用而用

RocketMQ

1. 介紹

RocketMQ是一個(gè)純Java、分布式、隊(duì)列模型的開源消息中間件,前身是MetaQ,是阿里參考Kafka特點(diǎn)研發(fā)的一個(gè)隊(duì)列模型的消息中間件,后開源給apache基金會(huì)成為了apache的頂級(jí)開源項(xiàng)目,具有高性能、高可靠、高實(shí)時(shí)、分布式特點(diǎn)。


rocketMQ官網(wǎng)

2. 發(fā)展歷程

2007年:淘寶實(shí)施了“五彩石”項(xiàng)目,“五彩石”用于將交易系統(tǒng)從單機(jī)變成分布式,也是在這個(gè)過程中產(chǎn)生了阿里巴巴第一代消息引擎——Notify。

2010年:阿里巴巴B2B部門基于ActiveMQ的5.1版本也開發(fā)了自己的一款消息引擎,稱為Napoli,這款消息引擎在B2B里面廣泛地被使用,不僅僅是在交易領(lǐng)域,在很多的后臺(tái)異步解耦等方面也得到了廣泛的應(yīng)用。

2011年:業(yè)界出現(xiàn)了現(xiàn)在被很多大數(shù)據(jù)領(lǐng)域所推崇的Kafka消息引擎,阿里巴巴在研究了Kafka的整體機(jī)制和架構(gòu)設(shè)計(jì)之后,基于Kafka的設(shè)計(jì)使用Java進(jìn)行了完全重寫并推出了MetaQ 1.0版本,主要是用于解決順序消息和海量堆積的問題。

2012年:阿里巴巴開源其自研的第三代分布式消息中間件——RocketMQ。

經(jīng)過幾年的技術(shù)打磨,阿里稱基于RocketMQ技術(shù),目前雙十一當(dāng)天消息容量可達(dá)到萬(wàn)億級(jí)。

2016年11月:阿里將RocketMQ捐獻(xiàn)給Apache軟件基金會(huì),正式成為孵化項(xiàng)目。

阿里稱會(huì)將其打造成頂級(jí)項(xiàng)目。這是阿里邁出的一大步,因?yàn)榧尤氲介_源軟件基金會(huì)需要經(jīng)過評(píng)審方的考核與觀察。

坦率而言,業(yè)界還對(duì)國(guó)人的代碼開源參與度仍保持著刻板印象;而Apache基金會(huì)中的342個(gè)項(xiàng)目中,暫時(shí)還只有Kylin、CarbonData、Eagle 、Dubbo和 RocketMQ 共計(jì)五個(gè)中國(guó)技術(shù)人主導(dǎo)的項(xiàng)目。

2017年2月20日:RocketMQ正式發(fā)布4.0版本,專家稱新版本適用于電商領(lǐng)域,金融領(lǐng)域,大數(shù)據(jù)領(lǐng)域,兼有物聯(lián)網(wǎng)領(lǐng)域的編程模型。

以上就是RocketMQ的整體發(fā)展歷史,其實(shí)在阿里巴巴內(nèi)部圍繞著RocketMQ內(nèi)核打造了三款產(chǎn)品,分別是MetaQ、Notify和Aliware MQ。

這三者分別采用了不同的模型,MetaQ主要使用了拉模型,解決了順序消息和海量堆積問題;Notify主要使用了推模型,解決了事務(wù)消息;而云產(chǎn)品Aliware MQ則是提供了商業(yè)化的版本。

3.功能

Apache RocketMQ是一個(gè)分布式消息傳遞和流平臺(tái),具有低延遲,高性能和可靠性,萬(wàn)億級(jí)容量和靈活的可伸縮性。
它具有多種功能:

  • 消息模式包括發(fā)布/訂閱,請(qǐng)求/答復(fù)和流式傳輸
  • 財(cái)務(wù)級(jí)交易信息
  • 基于DLedger的內(nèi)置容錯(cuò)和高可用性配置選項(xiàng)
  • 各種跨語(yǔ)言客戶端,例如Java,C / C ++,Python,Go
  • 可插拔的傳輸協(xié)議,例如TCP,SSL,AIO
  • 內(nèi)置消息跟蹤功能,還支持開放式跟蹤
  • 多功能的大數(shù)據(jù)和流生態(tài)系統(tǒng)集成
  • 按時(shí)間或偏移量追溯消息
  • 可靠的FIFO和嚴(yán)格的有序消息傳遞在同一隊(duì)列中
  • 高效的推拉消費(fèi)模型
  • 單個(gè)隊(duì)列中的百萬(wàn)級(jí)消息累積容量
  • 多種消息傳遞協(xié)議,例如JMS和OpenMessaging
  • 靈活的分布式橫向擴(kuò)展部署架構(gòu)
  • 快如閃電的批量消息交換系統(tǒng)
  • 各種消息過濾器機(jī)制,例如SQL和Tag
  • 用于隔離測(cè)試和云隔離群集的Docker映像
  • 功能豐富的管理儀表板,用于配置,指標(biāo)和監(jiān)視
  • 認(rèn)證與授權(quán)
  • 免費(fèi)的開源連接器,適用于源和接收器

4. 核心模塊

  • rocketmq-broker:接受生產(chǎn)者發(fā)來(lái)的消息并存儲(chǔ)(通過調(diào)用rocketmq-store),消費(fèi)者從這里取得消息
  • rocketmq-client:提供發(fā)送、接受消息的客戶端API。
  • rocketmq-namesrv:NameServer,類似于Zookeeper,這里保存著消息的TopicName,隊(duì)列等運(yùn)行時(shí)的元信息。
  • rocketmq-common:通用的一些類,方法,數(shù)據(jù)結(jié)構(gòu)等。
  • rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定義二進(jìn)制協(xié)議。
  • rocketmq-store:消息、索引存儲(chǔ)等。
  • rocketmq-filtersrv:消息過濾器Server,需要注意的是,要實(shí)現(xiàn)這種過濾,需要上傳代碼到MQ?。ㄒ话愣?,我們利用Tag足以滿足大部分的過濾需求,如果更靈活更復(fù)雜的過濾需求,可以考慮filtersrv組件)。
  • rocketmq-tools:命令行工具。

5. 四大核心組成部分

主要有四大核心組成部分:
NameServer
Broker
Producer
Consumer


核心組成

我們可以看到RocketMQ啥都是集群部署的,這是他吞吐量大,高可用的原因之一,集群的模式也很花哨,可以支持多master 模式、多master多slave異步復(fù)制模式、多 master多slave同步雙寫模式。

而且這個(gè)模式好像Kafka?。。ㄎ疫@里是廢話,本身就是阿里基于Kafka的很多特性研發(fā)的)。

(1)NameServer
  • 其角色類似Dubbo中的Zookeeper,但NameServer與Zookeeper相比更輕量。主要是因?yàn)槊總€(gè)NameServer節(jié)點(diǎn)互相之間是獨(dú)立的,沒有任何信息交互。
  • NameServer壓力不會(huì)太大,平時(shí)主要開銷是在維持心跳和提供Topic-Broker的關(guān)系數(shù)據(jù)。
  • 但有一點(diǎn)需要注意,Broker向NameServer發(fā)心跳時(shí), 會(huì)帶上當(dāng)前自己所負(fù)責(zé)的所有Topic信息,如果Topic個(gè)數(shù)太多(萬(wàn)級(jí)別),會(huì)導(dǎo)致一次心跳中,就Topic的數(shù)據(jù)就幾十M,網(wǎng)絡(luò)情況差的話, 網(wǎng)絡(luò)傳輸失敗,心跳失敗,導(dǎo)致NameServer誤認(rèn)為Broker心跳失敗。
  • NameServer 被設(shè)計(jì)成幾乎無(wú)狀態(tài)的,可以橫向擴(kuò)展,節(jié)點(diǎn)之間相互之間無(wú)通信,通過部署多臺(tái)機(jī)器來(lái)標(biāo)記自己是一個(gè)偽集群。
  • 每個(gè) Broker 在啟動(dòng)的時(shí)候會(huì)到 NameServer 注冊(cè),Producer 在發(fā)送消息前會(huì)根據(jù) Topic 到 NameServer 獲取到 Broker 的路由信息,Consumer 也會(huì)定時(shí)獲取 Topic 的路由信息。
(2)Broker
  • 消息中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息,轉(zhuǎn)發(fā)消息。
  • Broker是具體提供業(yè)務(wù)的服務(wù)器,單個(gè)Broker節(jié)點(diǎn)與所有的NameServer節(jié)點(diǎn)保持長(zhǎng)連接及心跳,并會(huì)定時(shí)將Topic信息注冊(cè)到NameServer,順帶一提底層的通信和連接都是基于Netty實(shí)現(xiàn)的。
  • Broker負(fù)責(zé)消息存儲(chǔ),以Topic為緯度支持輕量級(jí)的隊(duì)列,單機(jī)可以支撐上萬(wàn)隊(duì)列規(guī)模,支持消息推拉模型。
  • 官網(wǎng)上有數(shù)據(jù)顯示:具有上億級(jí)消息堆積能力,同時(shí)可嚴(yán)格保證消息的有序性。
(3)Producer
  • 消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生消息,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)產(chǎn)生消息。
  • Producer由用戶進(jìn)行分布式部署,消息由Producer通過多種負(fù)載均衡模式發(fā)送到Broker集群,發(fā)送低延時(shí),支持快速失敗。
  • RocketMQ 提供了三種方式發(fā)送消息:同步、異步和單向
    (1)同步發(fā)送:同步發(fā)送指消息發(fā)送方發(fā)出數(shù)據(jù)后會(huì)在收到接收方發(fā)回響應(yīng)之后才發(fā)下一個(gè)數(shù)據(jù)包。一般用于重要通知消息,例如重要通知郵件、營(yíng)銷短信。
    (2)異步發(fā)送:異步發(fā)送指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng),接著發(fā)送下個(gè)數(shù)據(jù)包,一般用于可能鏈路耗時(shí)較長(zhǎng)而對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景,例如用戶視頻上傳后通知啟動(dòng)轉(zhuǎn)碼服務(wù)。
    (3)單向發(fā)送:?jiǎn)蜗虬l(fā)送是指只負(fù)責(zé)發(fā)送消息而不等待服務(wù)器回應(yīng)且沒有回調(diào)函數(shù)觸發(fā),適用于某些耗時(shí)非常短但對(duì)可靠性要求并不高的場(chǎng)景,例如日志收集。
(4)Consumer
  • 消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)。
  • Consumer也由用戶部署,支持PUSH和PULL兩種消費(fèi)模式,支持集群消費(fèi)和廣播消息,提供實(shí)時(shí)的消息訂閱機(jī)制。
  • Pull:拉取型消費(fèi)者(Pull Consumer)主動(dòng)從消息服務(wù)器拉取信息,只要批量拉取到消息,用戶應(yīng)用就會(huì)啟動(dòng)消費(fèi)過程,所以 Pull 稱為主動(dòng)消費(fèi)型。
  • Push:推送型消費(fèi)者(Push Consumer)封裝了消息的拉取、消費(fèi)進(jìn)度和其他的內(nèi)部維護(hù)工作,將消息到達(dá)時(shí)執(zhí)行的回調(diào)接口留給用戶應(yīng)用程序來(lái)實(shí)現(xiàn)。所以 Push 稱為被動(dòng)消費(fèi)類型,但從實(shí)現(xiàn)上看還是從消息服務(wù)器中拉取消息,不同于 Pull 的是 Push 首先要注冊(cè)消費(fèi)監(jiān)聽器,當(dāng)監(jiān)聽器處觸發(fā)后才開始消費(fèi)消息。
6. 優(yōu)缺點(diǎn)

RocketMQ優(yōu)點(diǎn):

  • 單機(jī)吞吐量:十萬(wàn)級(jí)
  • 可用性:非常高,分布式架構(gòu)
  • 消息可靠性:經(jīng)過參數(shù)優(yōu)化配置,消息可以做到0丟失
  • 功能支持:MQ功能較為完善,還是分布式的,擴(kuò)展性好
  • 支持10億級(jí)別的消息堆積,不會(huì)因?yàn)槎逊e導(dǎo)致性能下降
  • 源碼是java,我們可以自己閱讀源碼,定制自己公司的MQ,可以掌控
  • 天生為金融互聯(lián)網(wǎng)領(lǐng)域而生,對(duì)于可靠性要求很高的場(chǎng)景,尤其是電商里面的訂單扣款,以及業(yè)務(wù)削峰,在大量交易涌入時(shí),后端可能無(wú)法及時(shí)處理的情況
  • RoketMQ在穩(wěn)定性上可能更值得信賴,這些業(yè)務(wù)場(chǎng)景在阿里雙11已經(jīng)經(jīng)歷了多次考驗(yàn),如果你的業(yè)務(wù)有上述并發(fā)場(chǎng)景,建議可以選擇RocketMQ

RocketMQ缺點(diǎn):

  • 支持的客戶端語(yǔ)言不多,目前是java及c++,其中c++不成熟
  • 社區(qū)活躍度不是特別活躍那種
  • 沒有在 mq 核心中去實(shí)現(xiàn)JMS等接口,有些系統(tǒng)要遷移需要修改大量代碼

Kafka介紹:

Kafka是天然分布式的

現(xiàn)在我們已經(jīng)知道了往topic里邊放數(shù)據(jù),實(shí)際上這些數(shù)據(jù)會(huì)分到不同的partition上,這些partition存在不同的broker上。分布式肯定會(huì)帶來(lái)問題:“萬(wàn)一其中一臺(tái)broker(Kafka服務(wù)器)出現(xiàn)網(wǎng)絡(luò)抖動(dòng)或者掛了,怎么辦?”

Kafka是這樣做的:我們數(shù)據(jù)存在不同的partition上,那kafka就把這些partition做備份。比如,現(xiàn)在我們有三個(gè)partition,分別存在三臺(tái)broker上。每個(gè)partition都會(huì)備份,這些備份散落在不同的broker上。

生產(chǎn)者往topic丟數(shù)據(jù),是與主分區(qū)交互,消費(fèi)者消費(fèi)topic的數(shù)據(jù),也是與主分區(qū)交互。
備份分區(qū)僅僅用作于備份,不做讀寫。如果某個(gè)Broker掛了,那就會(huì)選舉出其他Broker的partition來(lái)作為主分區(qū),這就實(shí)現(xiàn)了高可用。

當(dāng)生產(chǎn)者把數(shù)據(jù)丟進(jìn)topic時(shí),我們知道是寫在partition上的,那partition是怎么將其持久化的呢?(不持久化如果Broker中途掛了,那肯定會(huì)丟數(shù)據(jù)嘛)。
Kafka是將partition的數(shù)據(jù)寫在磁盤的(消息日志),不過Kafka只允許追加寫入(順序訪問),避免緩慢的隨機(jī) I/O 操作。

參考公眾號(hào):三太子敖丙

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 為什么使用消息隊(duì)列 解耦 看這么個(gè)場(chǎng)景。A 系統(tǒng)發(fā)送數(shù)據(jù)到 BCD 三個(gè)系統(tǒng),通過接口調(diào)用發(fā)送。如果 E 系統(tǒng)也要...
    程序猿TODO閱讀 444評(píng)論 1 0
  • 正文: 1、什么是 rabbitmq 采用 AMQP 高級(jí)消息隊(duì)列協(xié)議的一種消息隊(duì)列技術(shù),最大的特點(diǎn)就是消費(fèi)并不需...
    阿杰子啊閱讀 931評(píng)論 0 1
  • 久違的晴天,家長(zhǎng)會(huì)。 家長(zhǎng)大會(huì)開好到教室時(shí),離放學(xué)已經(jīng)沒多少時(shí)間了。班主任說已經(jīng)安排了三個(gè)家長(zhǎng)分享經(jīng)驗(yàn)。 放學(xué)鈴聲...
    飄雪兒5閱讀 7,788評(píng)論 16 22
  • 今天感恩節(jié)哎,感謝一直在我身邊的親朋好友。感恩相遇!感恩不離不棄。 中午開了第一次的黨會(huì),身份的轉(zhuǎn)變要...
    余生動(dòng)聽閱讀 10,798評(píng)論 0 11
  • 可愛進(jìn)取,孤獨(dú)成精。努力飛翔,天堂翱翔。戰(zhàn)爭(zhēng)美好,孤獨(dú)進(jìn)取。膽大飛翔,成就輝煌。努力進(jìn)取,遙望,和諧家園??蓯塾巫?..
    趙原野閱讀 3,443評(píng)論 1 1

友情鏈接更多精彩內(nèi)容