隊列對比
| 隊列 | ActiveMq | RabbitMq | RocketMQ | Kafka |
|---|---|---|---|---|
| 性能 | 6000+ | 12000+ | 10萬+ | 百萬 |
| 多語言支持 | 支持 | 支持 | 支持 | 支持 |
| 社區(qū)活躍度 | 高 | 高 | 中 | 高 |
| 綜合評價 | 成熟,性能較弱缺乏大規(guī)模吞吐場景的應用 | 性能較好,社區(qū)活躍管理界面豐富 內部機制很難了解,難定制 | 模型簡單。接口易用,文檔少,支持語言少 | 天生分布式,性能最好,運維難度大 |
使用docker進行安裝
- 下載management的版本,帶web界面
- 命令:
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.7.13-management
-
參數(shù):
- -d:后臺運行
- --name : 指定容器名
- -p : 端口映射
- --hostname : rabbitmq主機名
- -e RABBITMQ_DEFAULT_VHOST=my_vhost 虛擬機名
- -e RABBITMQ_DEFAULT_USER=admin 用戶名
- -e RABBITMQ_DEFAULT_PASS=123456 密碼
-
相關概念:
- 發(fā)送消息 生產(chǎn)者
- 隊列、接收消息 消費者
- 交換機:做了一層抽象。發(fā)送消息,和隊列通過交換機來做轉發(fā)。交換機會根據(jù)分發(fā)策略把消息轉給隊列。
- 虛擬主機:支持權限控制,最小粒度為虛擬主機,一個虛擬主機可以包含多個交換機,隊列,綁定
- 隊列:緩存消息的容器
- 綁定: 設置交換機與隊列的關系
-
交換機
- 扇形交換機:Fanout exchange
- 直連交換機:Direct exchange
- 主題交換機:Topic exchange
- 首部交換機:Headers exchange
-
應用場景(優(yōu)點)
- 解耦
- 異步
- 削峰
-
缺點:
- 增加了系統(tǒng)的復雜性
- 增加了維護的難度
- 降低系統(tǒng)的穩(wěn)定性:引入依賴越多,越容易掛掉
問題
- 消息的重復消費:
-
發(fā)送端
- 發(fā)送端發(fā)送消息給中間件,中間件收到消息并成功存儲,此時的中間件出現(xiàn)了問題,導致發(fā)送端沒有收到發(fā)送成功的返回進行重試從而產(chǎn)生重復
- 中間件由于負載過高,響應變慢,成功把消息儲存到中間件后,返回成功時間超時進行重試從而產(chǎn)生重復
- 消息中間件成功寫入消息存儲,在返回結果時,出現(xiàn)網(wǎng)絡問題,導致超時,而重試時,網(wǎng)絡恢復出現(xiàn)重復
-
消費端
- 消費者收到消息進行處理,處理完畢程序出現(xiàn)問題,中間件不知道處理結果,會再次投遞
- 消費者收到消息進行處理,處理完畢后網(wǎng)絡出現(xiàn)問題,中間件沒有收到處理結果,再次投遞
- 消費端處理消息花費時間過長,中間件由于消息超時會再次投遞。
- 收到消費者的處理結果,中間件出現(xiàn)問題,未及時更改狀態(tài),再次投遞
- 處理完畢后,消息中間件收到結果,但是遇到消息存儲故障,沒能更新狀態(tài),在次投遞
-
發(fā)送端產(chǎn)生消息重復的主要原因是:消息成功寫入消息存儲后,因為各種原因使得消息發(fā)送端沒有收到成功的返回結果,并且進行重試,因而導致的重復。
消息接收端消息重復的主要原因是:消息接收者成功處理完消息后,消息中間件不能及時更新投遞狀態(tài)造成。
- 如何解決重復消費:
- 冪等性控制:多次調用得到相同的結果
- 消費者發(fā)送消息進行數(shù)據(jù)更新時,需要帶上數(shù)據(jù)的版本號。
- 去重表:利用數(shù)據(jù)庫的特性實現(xiàn)冪等。常用的思路就是在表上構建一個唯一性的索引,保證某類數(shù)據(jù)一旦執(zhí)行完畢,后續(xù)同樣的請求不在處理了。或者創(chuàng)建一個去重表,每次操作時都去該表中查看一下id是否已經(jīng)存在了,如果存在則直接返回。
如何保證數(shù)據(jù)的可靠性傳輸
-
生產(chǎn)者弄丟了數(shù)據(jù)
- 選擇用rabbitmq提供的事務功能。生產(chǎn)者在發(fā)送數(shù)據(jù)的之前開啟RabbitMQ事務 ,channel.txSelect . 然后發(fā)送消息。如果消息沒有被成功的接收到,那么生產(chǎn)者會收到異常報錯。此時可以回滾事務,然后重試發(fā)送。如果收到了消息可以提交事務。缺點 就是吞吐量會下降,太耗性能。
- 開啟confirm模式。開啟之后,每次都會分配一個唯一的id,如果寫入了隊列中,rabbitmq會回傳一個ack,如果沒有處理這個消息,會回調你的nack接口。告訴你接收失敗,可以重試。
- 事務機制和confirm最大的區(qū)別是 事務時同步的。你提交一個事務后悔阻塞在哪里。confirm是異步的。你發(fā)送消息之后會發(fā)送下一個消息,然后那個消息rabbitmq接收后會異步回調你的接口
-
rabbitmq 自己弄丟了數(shù)據(jù)
- ,必須開啟rabbitmq的持久化,就是消息會寫入之后持久化到硬盤上,哪怕是rabbitmq掛了重啟之后,數(shù)據(jù)還是能恢復的。
- 設置持久化的兩個步驟:
- 創(chuàng)建queue和交換器的時候將其設置為持久化。這樣可以保證rabbitmq持久化相關的元數(shù)據(jù)。但是不會持久化隊列中的數(shù)據(jù)
- 發(fā)送消息的時候將消息的deliveryMode設置為2,就是將消息設置為持久化(默認為2,可不設置)。
- 設置持久化的兩個步驟:
- ,必須開啟rabbitmq的持久化,就是消息會寫入之后持久化到硬盤上,哪怕是rabbitmq掛了重啟之后,數(shù)據(jù)還是能恢復的。
-
消費端弄丟了數(shù)據(jù):
- rabbitmq如果丟失了數(shù)據(jù),主要是因為你消費的時候,剛消費到,還沒有處理,結果進程掛了,比如重啟
- 用rabbitmq提供的ack機制。關閉自動ack,進行手動ack.
- rabbitmq如果丟失了數(shù)據(jù),主要是因為你消費的時候,剛消費到,還沒有處理,結果進程掛了,比如重啟
實際項目中遇到的問題:
之前遇到過這樣的一個問題, 由于發(fā)送方與消費方的速度不匹配,在壓測時,導致大量數(shù)據(jù)在隊列中積壓,大約有10萬條,因為是測試數(shù)據(jù),當時采用的方式比較暴力,直接刪除隊列,在新建隊列,完成消息的刪除。但是在生產(chǎn)上是不能這么干滴?。。?/p>
-
大量數(shù)據(jù)持續(xù)擠壓怎么解決?
- 先修復consumer的問題
- 新建幾個隊列,具體數(shù)量自己把控。
- 新建一個程序,不做耗時處理,將現(xiàn)在積壓的數(shù)據(jù)分發(fā)到新建的隊列中。
- 每個隊列綁定一個consumer,進行業(yè)務處理。
- 處理完畢后,恢復之前的程序部署
消費方的業(yè)務處理時間可能比較長,最好采用多線程和多consumer進行消費。
程序實現(xiàn)
發(fā)送方
-
配置文件:
關于rabbitmq的主機,端口等我直接寫死在程序中,可通過properties文件注入,方便配置,基于rabbitTemplate進行代碼封裝
配置.png -
初始化
初始化.png
此處說下callback和returncallbakc的執(zhí)行時機:
消息沒有到達exchange,則confirm回調,ack=false
消息到達exchange,則confirm回調,ack=false
exchange到queue成功,則不回調returnCallBack。
exchange到queue失敗,則回調returnCallBack,在回調confirmCallBack.需設置mandatory=true,否則不回回調,消息就丟了
-
消息發(fā)送
消息發(fā)送.png
上面為封裝后對外開放的接口,可為此方法編寫重載方法。對外只暴露 泛型參數(shù)T
- 消費方:兩種實現(xiàn)方式
-
使用注解:@RabbitListener(queues = "notify.payment")
手動ack.png -
實現(xiàn)ChannelAwareMessageListener,并在container中注入
image.png
image.png
-
消費方推薦使用多線程進行處理。





