本系列博客主要是以學(xué)習(xí)官方文檔為主
基本概念
1.消息模型
RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負(fù)責(zé)生產(chǎn)消息,Consumer 負(fù)責(zé)消費(fèi)消息,Broker 負(fù)責(zé)存儲(chǔ)消息。Broker 在實(shí)際部署過程中對(duì)應(yīng)一臺(tái)服務(wù)器,每個(gè) Broker 可以存儲(chǔ)多個(gè)Topic的消息,每個(gè)Topic的消息也可以分片存儲(chǔ)于不同的 Broker。Message Queue 用于存儲(chǔ)消息的物理地址,每個(gè)Topic中的消息地址存儲(chǔ)于多個(gè) Message Queue 中。ConsumerGroup 由多個(gè)Consumer 實(shí)例構(gòu)成。
2.消息生產(chǎn)者
負(fù)責(zé)生產(chǎn)消息,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)生產(chǎn)消息。一個(gè)消息生產(chǎn)者會(huì)把業(yè)務(wù)應(yīng)用系統(tǒng)里產(chǎn)生的消息發(fā)送到broker服務(wù)器。RocketMQ提供多種發(fā)送方式,同步發(fā)送、異步發(fā)送、順序發(fā)送、單向發(fā)送。同步和異步方式均需要Broker返回確認(rèn)信息,單向發(fā)送不需要。(為了防止在生產(chǎn)階段丟消息,生產(chǎn)的時(shí)候是需要一個(gè)broker的確認(rèn)的,但某些業(yè)務(wù)不需要確認(rèn)如日志,日志允許丟一些。這里的日志指的是業(yè)務(wù)日志,如果用來做重做日志和崩潰回滾的日志是不允許丟失的)。
3.消息消費(fèi)者
負(fù)責(zé)消費(fèi)消息,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)。一個(gè)消息消費(fèi)者會(huì)從Broker服務(wù)器拉取消息、并將其提供給應(yīng)用程序。從用戶應(yīng)用的角度而言提供了兩種消費(fèi)形式:拉取式消費(fèi)、推動(dòng)式消費(fèi)。
4.主題
表示一類消息的集合,每個(gè)主題包含若干條消息,每條消息只能屬于一個(gè)主題,是RocketMQ進(jìn)行消息訂閱的基本單位。
5.代理服務(wù)器(Broker Server)
消息中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息、轉(zhuǎn)發(fā)消息。代理服務(wù)器在RocketMQ系統(tǒng)中負(fù)責(zé)接收從生產(chǎn)者發(fā)送來的消息并存儲(chǔ)、同時(shí)為消費(fèi)者的拉取請(qǐng)求作準(zhǔn)備。代理服務(wù)器也存儲(chǔ)消息相關(guān)的元數(shù)據(jù),包括消費(fèi)者組、消費(fèi)進(jìn)度偏移和主題和隊(duì)列消息等。
6.名字服務(wù)(Name Server)
名稱服務(wù)充當(dāng)路由消息的提供者。生產(chǎn)者或消費(fèi)者能夠通過名字服務(wù)查找各主題相應(yīng)的Broker IP列表。多個(gè)Namesrv實(shí)例組成集群,但相互獨(dú)立,沒有信息交換。
7.拉取式消費(fèi)(Pull Consumer)
Consumer消費(fèi)的一種類型,應(yīng)用通常主動(dòng)調(diào)用Consumer的拉消息方法從Broker服務(wù)器拉消息、主動(dòng)權(quán)由應(yīng)用控制。一旦獲取了批量消息,應(yīng)用就會(huì)啟動(dòng)消費(fèi)過程。
8.推動(dòng)式消費(fèi)(Push Consumer)
Consumer消費(fèi)的一種類型,該模式下Broker收到數(shù)據(jù)后會(huì)主動(dòng)推送給消費(fèi)端,該消費(fèi)模式一般實(shí)時(shí)性較高。
9.生產(chǎn)者組(Producer Group)
同一類Producer的集合,這類Producer發(fā)送同一類消息且發(fā)送邏輯一致。如果發(fā)送的是事務(wù)消息且原始生產(chǎn)者在發(fā)送之后崩潰,則Broker服務(wù)器會(huì)聯(lián)系同一生產(chǎn)者組的其他生產(chǎn)者實(shí)例以提交或回溯消費(fèi)。
10.消費(fèi)者組(Consumer Group)
同一類Consumer的集合,這類Consumer通常消費(fèi)同一類消息且消費(fèi)邏輯一致。消費(fèi)者組使得在消息消費(fèi)方面,實(shí)現(xiàn)負(fù)載均衡和容錯(cuò)的目標(biāo)變得非常容易。要注意的是,消費(fèi)者組的消費(fèi)者實(shí)例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費(fèi)(Clustering)和廣播消費(fèi)(Broadcasting)。
11.集群消費(fèi)(Clustering)
集群消費(fèi)模式下,相同Consumer Group的每個(gè)Consumer實(shí)例平均分?jǐn)傁ⅰ?/p>
12.廣播消費(fèi)(Broadcasting)
廣播消費(fèi)模式下,相同Consumer Group的每個(gè)Consumer實(shí)例都接收全量的消息。
13.普通順序消息(Normal Ordered Message)
普通順序消費(fèi)模式下,消費(fèi)者通過同一個(gè)消費(fèi)隊(duì)列收到的消息是有順序的,不同消息隊(duì)列收到的消息則可能是無順序的。
14.嚴(yán)格順序消息(Strictly Ordered Message)
嚴(yán)格順序消息模式下,消費(fèi)者收到的所有消息均是有順序的。
15.消息(Message)
消息系統(tǒng)所傳輸信息的物理載體,生產(chǎn)和消費(fèi)數(shù)據(jù)的最小單位,每條消息必須屬于一個(gè)主題。RocketMQ中每個(gè)消息擁有唯一的Message ID,且可以攜帶具有業(yè)務(wù)標(biāo)識(shí)的Key。系統(tǒng)提供了通過Message ID和Key查詢消息的功能。
16. 標(biāo)簽(Tag)
為消息設(shè)置的標(biāo)志,用于同一主題下區(qū)分不同類型的消息。來自同一業(yè)務(wù)單元的消息,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽。標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)。消費(fèi)者可以根據(jù)Tag實(shí)現(xiàn)對(duì)不同子主題的不同消費(fèi)邏輯,實(shí)現(xiàn)更好的擴(kuò)展性。
特性
1.訂閱與發(fā)布
消息的發(fā)布是指某個(gè)生產(chǎn)者向某個(gè)topic發(fā)送消息;消息的訂閱是指某個(gè)消費(fèi)者關(guān)注了某個(gè)topic中帶有某些tag的消息,進(jìn)而從該topic消費(fèi)數(shù)據(jù)。
2.消息順序
消息有序指的是一類消息消費(fèi)時(shí),能按照發(fā)送的順序來消費(fèi)。例如:一個(gè)訂單產(chǎn)生了三條消息分別是訂單創(chuàng)建、訂單付款、訂單完成。消費(fèi)時(shí)要按照這個(gè)順序消費(fèi)才能有意義,但是同時(shí)訂單之間是可以并行消費(fèi)的。RocketMQ可以嚴(yán)格的保證消息有序。
順序消息分為全局順序消息與分區(qū)順序消息,全局順序是指某個(gè)Topic下的所有消息都要保證順序;部分順序消息只要保證每一組消息被順序消費(fèi)即可。
全局順序 對(duì)于指定的一個(gè) Topic,所有消息按照嚴(yán)格的先入先出(FIFO)的順序進(jìn)行發(fā)布和消費(fèi)。 適用場(chǎng)景:性能要求不高,所有的消息嚴(yán)格按照 FIFO 原則進(jìn)行消息發(fā)布和消費(fèi)的場(chǎng)景
分區(qū)順序 對(duì)于指定的一個(gè) Topic,所有消息根據(jù) sharding key 進(jìn)行區(qū)塊分區(qū)。 同一個(gè)分區(qū)內(nèi)的消息按照嚴(yán)格的 FIFO 順序進(jìn)行發(fā)布和消費(fèi)。 Sharding key 是順序消息中用來區(qū)分不同分區(qū)的關(guān)鍵字段,和普通消息的 Key 是完全不同的概念。 適用場(chǎng)景:性能要求高,以 sharding key 作為分區(qū)字段,在同一個(gè)區(qū)塊中嚴(yán)格的按照 FIFO 原則進(jìn)行消息發(fā)布和消費(fèi)的場(chǎng)景。
個(gè)人理解:用的時(shí)候基本采用分區(qū)有序而不采用主題有序,這樣方便與Consumer的擴(kuò)展,把Consumer擴(kuò)展到與分區(qū)數(shù)相同可以達(dá)到最大并發(fā),因?yàn)槊總€(gè)分區(qū)還是需要有序的先進(jìn)先出需要得到消費(fèi)確認(rèn)不能并發(fā)操作同一分區(qū)。
3.消息過濾
RocketMQ的消費(fèi)者可以根據(jù)Tag進(jìn)行消息過濾,也支持自定義屬性過濾。消息過濾目前是在Broker端實(shí)現(xiàn)的,優(yōu)點(diǎn)是減少了對(duì)于Consumer無用消息的網(wǎng)絡(luò)傳輸,缺點(diǎn)是增加了Broker的負(fù)擔(dān)、而且實(shí)現(xiàn)相對(duì)復(fù)雜。
4.消息可靠性
RocketMQ支持消息的高可靠,影響消息可靠性的幾種情況:
1.Broker非正常關(guān)閉
2.Broker異常Crash
3.OS Crash
4.機(jī)器掉電,但是能立即恢復(fù)供電情況
5.機(jī)器無法開機(jī)(可能是cpu、主板、內(nèi)存等關(guān)鍵設(shè)備損壞)
6.磁盤設(shè)備損壞
1)、2)、3)、4) 四種情況都屬于硬件資源可立即恢復(fù)情況,RocketMQ在這四種情況下能保證消息不丟,或者丟失少量數(shù)據(jù)(依賴刷盤方式是同步還是異步)。
5)、6)屬于單點(diǎn)故障,且無法恢復(fù),一旦發(fā)生,在此單點(diǎn)上的消息全部丟失。RocketMQ在這兩種情況下,通過異步復(fù)制,可保證99%的消息不丟,但是仍然會(huì)有極少量的消息可能丟失。通過同步雙寫技術(shù)可以完全避免單點(diǎn),同步雙寫勢(shì)必會(huì)影響性能,適合對(duì)消息可靠性要求極高的場(chǎng)合,例如與Money相關(guān)的應(yīng)用。注:RocketMQ從3.0版本開始支持同步雙寫。
個(gè)人理解:這里提供了一個(gè)很低的保證呀,感覺是很簡(jiǎn)單的分布式集群策略如果有同步節(jié)點(diǎn)就可以保證不丟數(shù)據(jù)但是對(duì)請(qǐng)求的響應(yīng)時(shí)間會(huì)變長(zhǎng),如果是異步節(jié)點(diǎn)那就會(huì)丟數(shù)據(jù)。
5.至少一次
至少一次(At least Once)指每個(gè)消息必須投遞一次。Consumer先Pull消息到本地,消費(fèi)完成后,才向服務(wù)器返回ack,如果沒有消費(fèi)一定不會(huì)ack消息,所以RocketMQ可以很好的支持此特性。
個(gè)人理解:這里有三種策略
1.至多一次:投完完事,不做丟失重傳。
2.至少一次:需要一個(gè)確認(rèn),但不能保證消息重復(fù)如broker沒有給到producer ACK,導(dǎo)致重復(fù)的消
息被收錄,亦或是consumer消費(fèi)了消息ACK也丟了也會(huì)導(dǎo)致broker中消費(fèi)進(jìn)度不前進(jìn)導(dǎo)致消費(fèi)重復(fù)消息。
3.剛好一次:應(yīng)對(duì)broker丟了給producer的ACK可以采取一個(gè)回調(diào)的方式檢測(cè)一下,consumer也
如此但是會(huì)增加響應(yīng)時(shí)間降低吞吐還有超時(shí)時(shí)間不好把控。一般不采取這個(gè)策略,但是這個(gè)策略需
要我們自己去實(shí)現(xiàn)比如實(shí)現(xiàn)一個(gè)自增ID,然后在消費(fèi)端做個(gè)檢測(cè)就可以知道是不是重復(fù)消息。
6.回溯消費(fèi)
回溯消費(fèi)是指Consumer已經(jīng)消費(fèi)成功的消息,由于業(yè)務(wù)上需求需要重新消費(fèi),要支持此功能,Broker在向Consumer投遞成功消息后,消息仍然需要保留。并且重新消費(fèi)一般是按照時(shí)間維度,例如由于Consumer系統(tǒng)故障,恢復(fù)后需要重新消費(fèi)1小時(shí)前的數(shù)據(jù),那么Broker要提供一種機(jī)制,可以按照時(shí)間維度來回退消費(fèi)進(jìn)度。RocketMQ支持按照時(shí)間回溯消費(fèi),時(shí)間維度精確到毫秒。
7.事務(wù)消息
RocketMQ事務(wù)消息(Transactional Message)是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以被定義到全局事務(wù)中,要么同時(shí)成功,要么同時(shí)失敗。RocketMQ的事務(wù)消息提供類似 X/Open XA 的分布事務(wù)功能,通過事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。
8.定時(shí)消息
定時(shí)消息(延遲隊(duì)列)是指消息發(fā)送到broker后,不會(huì)立即被消費(fèi),等待特定時(shí)間投遞給真正的topic。 broker有配置項(xiàng)messageDelayLevel,默認(rèn)值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18個(gè)level。可以配置自定義messageDelayLevel。注意,messageDelayLevel是broker的屬性,不屬于某個(gè)topic。發(fā)消息時(shí),設(shè)置delayLevel等級(jí)即可:msg.setDelayLevel(level)。level有以下三種情況:
- level == 0,消息為非延遲消息
- 1<=level<=maxLevel,消息延遲特定時(shí)間,例如level==1,延遲1s
- level > maxLevel,則level== maxLevel,例如level==20,延遲2h
定時(shí)消息會(huì)暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據(jù)delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一個(gè)queue只存相同延遲的消息,保證具有相同發(fā)送延遲的消息能夠順序消費(fèi)。broker會(huì)調(diào)度地消費(fèi)SCHEDULE_TOPIC_XXXX,將消息寫入真實(shí)的topic。
需要注意的是,定時(shí)消息會(huì)在第一次寫入和調(diào)度寫入真實(shí)topic時(shí)都會(huì)計(jì)數(shù),因此發(fā)送數(shù)量、tps都會(huì)變高。
9.消息重試
Consumer消費(fèi)消息失敗后,要提供一種重試機(jī)制,令消息再消費(fèi)一次。Consumer消費(fèi)消息失敗通??梢哉J(rèn)為有以下幾種情況:
- 由于消息本身的原因,例如反序列化失敗,消息數(shù)據(jù)本身無法處理(例如話費(fèi)充值,當(dāng)前消息的手機(jī)號(hào)被注銷,無法充值)等。這種錯(cuò)誤通常需要跳過這條消息,再消費(fèi)其它消息,而這條失敗的消息即使立刻重試消費(fèi),99%也不成功,所以最好提供一種定時(shí)重試機(jī)制,即過10秒后再重試。
- 由于依賴的下游應(yīng)用服務(wù)不可用,例如db連接不可用,外系統(tǒng)網(wǎng)絡(luò)不可達(dá)等。遇到這種錯(cuò)誤,即使跳過當(dāng)前失敗的消息,消費(fèi)其他消息同樣也會(huì)報(bào)錯(cuò)。這種情況建議應(yīng)用sleep 30s,再消費(fèi)下一條消息,這樣可以減輕Broker重試消息的壓力。
RocketMQ會(huì)為每個(gè)消費(fèi)組都設(shè)置一個(gè)Topic名稱為“%RETRY%+consumerGroup”的重試隊(duì)列(這里需要注意的是,這個(gè)Topic的重試隊(duì)列是針對(duì)消費(fèi)組,而不是針對(duì)每個(gè)Topic設(shè)置的),用于暫時(shí)保存因?yàn)楦鞣N異常而導(dǎo)致Consumer端無法消費(fèi)的消息。考慮到異?;謴?fù)起來需要一些時(shí)間,會(huì)為重試隊(duì)列設(shè)置多個(gè)重試級(jí)別,每個(gè)重試級(jí)別都有與之對(duì)應(yīng)的重新投遞延時(shí),重試次數(shù)越多投遞延時(shí)就越大。RocketMQ對(duì)于重試消息的處理是先保存至Topic名稱為“SCHEDULE_TOPIC_XXXX”的延遲隊(duì)列中,后臺(tái)定時(shí)任務(wù)按照對(duì)應(yīng)的時(shí)間進(jìn)行Delay后重新保存至“%RETRY%+consumerGroup”的重試隊(duì)列中。
10.消息重投
生產(chǎn)者在發(fā)送消息時(shí),同步消息失敗會(huì)重投,異步消息有重試,oneway沒有任何保證。消息重投保證消息盡可能發(fā)送成功、不丟失,但可能會(huì)造成消息重復(fù),消息重復(fù)在RocketMQ中是無法避免的問題。消息重復(fù)在一般情況下不會(huì)發(fā)生,當(dāng)出現(xiàn)消息量大、網(wǎng)絡(luò)抖動(dòng),消息重復(fù)就會(huì)是大概率事件。另外,生產(chǎn)者主動(dòng)重發(fā)、consumer負(fù)載變化也會(huì)導(dǎo)致重復(fù)消息。如下方法可以設(shè)置消息重試策略:
- retryTimesWhenSendFailed:同步發(fā)送失敗重投次數(shù),默認(rèn)為2,因此生產(chǎn)者會(huì)最多嘗試發(fā)送retryTimesWhenSendFailed + 1次。不會(huì)選擇上次失敗的broker,嘗試向其他broker發(fā)送,最大程度保證消息不丟。超過重投次數(shù),拋出異常,由客戶端保證消息不丟。當(dāng)出現(xiàn)RemotingException、MQClientException和部分MQBrokerException時(shí)會(huì)重投。
- retryTimesWhenSendAsyncFailed:異步發(fā)送失敗重試次數(shù),異步重試不會(huì)選擇其他broker,僅在同一個(gè)broker上做重試,不保證消息不丟。
- retryAnotherBrokerWhenNotStoreOK:消息刷盤(主或備)超時(shí)或slave不可用(返回狀態(tài)非SEND_OK),是否嘗試發(fā)送到其他broker,默認(rèn)false。十分重要消息可以開啟。
11.流量控制
生產(chǎn)者流控,因?yàn)閎roker處理能力達(dá)到瓶頸;消費(fèi)者流控,因?yàn)橄M(fèi)能力達(dá)到瓶頸。
生產(chǎn)者流控:
- commitLog文件被鎖時(shí)間超過osPageCacheBusyTimeOutMills時(shí),參數(shù)默認(rèn)為1000ms,返回流控。
- 如果開啟transientStorePoolEnable == true,且broker為異步刷盤的主機(jī),且transientStorePool中資源不足,拒絕當(dāng)前send請(qǐng)求,返回流控。
- broker每隔10ms檢查send請(qǐng)求隊(duì)列頭部請(qǐng)求的等待時(shí)間,如果超過waitTimeMillsInSendQueue,默認(rèn)200ms,拒絕當(dāng)前send請(qǐng)求,返回流控。
- broker通過拒絕send 請(qǐng)求方式實(shí)現(xiàn)流量控制。
注意,生產(chǎn)者流控,不會(huì)嘗試消息重投。
消費(fèi)者流控:
- 消費(fèi)者本地緩存消息數(shù)超過pullThresholdForQueue時(shí),默認(rèn)1000。
- 消費(fèi)者本地緩存消息大小超過pullThresholdSizeForQueue時(shí),默認(rèn)100MB。
- 消費(fèi)者本地緩存消息跨度超過consumeConcurrentlyMaxSpan時(shí),默認(rèn)2000。
消費(fèi)者流控的結(jié)果是降低拉取頻率。
12.死信隊(duì)列
死信隊(duì)列用于處理無法被正常消費(fèi)的消息。當(dāng)一條消息初次消費(fèi)失敗,消息隊(duì)列會(huì)自動(dòng)進(jìn)行消息重試;達(dá)到最大重試次數(shù)后,若消費(fèi)依然失敗,則表明消費(fèi)者在正常情況下無法正確地消費(fèi)該消息,此時(shí),消息隊(duì)列 不會(huì)立刻將消息丟棄,而是將其發(fā)送到該消費(fèi)者對(duì)應(yīng)的特殊隊(duì)列中。
RocketMQ將這種正常情況下無法被消費(fèi)的消息稱為死信消息(Dead-Letter Message),將存儲(chǔ)死信消息的特殊隊(duì)列稱為死信隊(duì)列(Dead-Letter Queue)。在RocketMQ中,可以通過使用console控制臺(tái)對(duì)死信隊(duì)列中的消息進(jìn)行重發(fā)來使得消費(fèi)者實(shí)例再次進(jìn)行消費(fèi)。
上述就是基本概念與特性,下面進(jìn)行跑幾個(gè)樣例
簡(jiǎn)單的消息接受與發(fā)送樣例
RocketMQ部署在阿里云上 版本號(hào)為4.8.0,maven依賴好像沒有4.8.0的我試了一下4.7.0可用
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
生產(chǎn)者有三種模式
1.同步
2.異步
3.單向
同步生產(chǎn)者
public class SyncProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("hello1");
producer.setNamesrvAddr("182.92.8.169:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TestTopic","TagA",("Hello World"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg,2000);
System.out.printf("%s%n",sendResult);
}
producer.shutdown();
}
}
異步生產(chǎn)者
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("hello3");
producer.setNamesrvAddr("182.92.8.169:9876");
producer.start();
//重投次數(shù) retryTimesWhenSendFailed 異步重投不會(huì)選擇其他broker就選擇那個(gè)有問題的一直重試
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < 100; i++) {
try {
final int index = 1;
Message msg = new Message(
"TestTopic",
"TagA",("Hello World"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable throwable) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, throwable);
throwable.printStackTrace();
}
});
}catch (Exception e){
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
單向生產(chǎn)者
public class OnewayProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("hello2");
producer.setNamesrvAddr("182.92.8.169:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TestTopic","TagA",("Hello World"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
}
Thread.sleep(5000);
producer.shutdown();
}
}
消費(fèi)者
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello");
consumer.setNamesrvAddr("182.92.8.169:9876");
consumer.subscribe("TestTopic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list);
// 標(biāo)記該消息已經(jīng)被成功消費(fèi)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
每個(gè)生產(chǎn)者都發(fā)100條消息
異步生產(chǎn)者用了一閉鎖來控制所有線程都發(fā)送完畢并設(shè)置了5秒的超時(shí)時(shí)間
下面是一個(gè)測(cè)試






可以到看到接收同步消息是有序的而異步是無序的
這是一個(gè)基礎(chǔ)的實(shí)驗(yàn)跑一下官方教程,大致了解一下生產(chǎn)者和消費(fèi)者的合作過程