RabbitMQ

隊列對比
隊列 ActiveMq RabbitMq RocketMQ Kafka
性能 6000+ 12000+ 10萬+ 百萬
多語言支持 支持 支持 支持 支持
社區(qū)活躍度
綜合評價 成熟,性能較弱缺乏大規(guī)模吞吐場景的應用 性能較好,社區(qū)活躍管理界面豐富 內部機制很難了解,難定制 模型簡單。接口易用,文檔少,支持語言少 天生分布式,性能最好,運維難度大
使用docker進行安裝
  • 下載management的版本,帶web界面
  • 命令:
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.7.13-management
  • 參數(shù):

    • -d:后臺運行
    • --name : 指定容器名
    • -p : 端口映射
    • --hostname : rabbitmq主機名
    • -e RABBITMQ_DEFAULT_VHOST=my_vhost 虛擬機名
    • -e RABBITMQ_DEFAULT_USER=admin 用戶名
    • -e RABBITMQ_DEFAULT_PASS=123456 密碼
  • 相關概念:

    • 發(fā)送消息 生產(chǎn)者
    • 隊列、接收消息 消費者
    • 交換機:做了一層抽象。發(fā)送消息,和隊列通過交換機來做轉發(fā)。交換機會根據(jù)分發(fā)策略把消息轉給隊列。
    • 虛擬主機:支持權限控制,最小粒度為虛擬主機,一個虛擬主機可以包含多個交換機,隊列,綁定
    • 隊列:緩存消息的容器
    • 綁定: 設置交換機與隊列的關系
  • 交換機

    • 扇形交換機:Fanout exchange
    • 直連交換機:Direct exchange
    • 主題交換機:Topic exchange
    • 首部交換機:Headers exchange
  • 應用場景(優(yōu)點)

    • 解耦
    • 異步
    • 削峰
  • 缺點:

    • 增加了系統(tǒng)的復雜性
    • 增加了維護的難度
    • 降低系統(tǒng)的穩(wěn)定性:引入依賴越多,越容易掛掉
問題
  • 消息的重復消費:
    • 發(fā)送端

      • 發(fā)送端發(fā)送消息給中間件,中間件收到消息并成功存儲,此時的中間件出現(xiàn)了問題,導致發(fā)送端沒有收到發(fā)送成功的返回進行重試從而產(chǎn)生重復
      • 中間件由于負載過高,響應變慢,成功把消息儲存到中間件后,返回成功時間超時進行重試從而產(chǎn)生重復
      • 消息中間件成功寫入消息存儲,在返回結果時,出現(xiàn)網(wǎng)絡問題,導致超時,而重試時,網(wǎng)絡恢復出現(xiàn)重復
    • 消費端

      • 消費者收到消息進行處理,處理完畢程序出現(xiàn)問題,中間件不知道處理結果,會再次投遞
      • 消費者收到消息進行處理,處理完畢后網(wǎng)絡出現(xiàn)問題,中間件沒有收到處理結果,再次投遞
      • 消費端處理消息花費時間過長,中間件由于消息超時會再次投遞。
      • 收到消費者的處理結果,中間件出現(xiàn)問題,未及時更改狀態(tài),再次投遞
      • 處理完畢后,消息中間件收到結果,但是遇到消息存儲故障,沒能更新狀態(tài),在次投遞

發(fā)送端產(chǎn)生消息重復的主要原因是:消息成功寫入消息存儲后,因為各種原因使得消息發(fā)送端沒有收到成功的返回結果,并且進行重試,因而導致的重復。
消息接收端消息重復的主要原因是:消息接收者成功處理完消息后,消息中間件不能及時更新投遞狀態(tài)造成。

  • 如何解決重復消費:
    • 冪等性控制:多次調用得到相同的結果
    • 消費者發(fā)送消息進行數(shù)據(jù)更新時,需要帶上數(shù)據(jù)的版本號。
    • 去重表:利用數(shù)據(jù)庫的特性實現(xiàn)冪等。常用的思路就是在表上構建一個唯一性的索引,保證某類數(shù)據(jù)一旦執(zhí)行完畢,后續(xù)同樣的請求不在處理了。或者創(chuàng)建一個去重表,每次操作時都去該表中查看一下id是否已經(jīng)存在了,如果存在則直接返回。
如何保證數(shù)據(jù)的可靠性傳輸
  • 生產(chǎn)者弄丟了數(shù)據(jù)

    • 選擇用rabbitmq提供的事務功能。生產(chǎn)者在發(fā)送數(shù)據(jù)的之前開啟RabbitMQ事務 ,channel.txSelect . 然后發(fā)送消息。如果消息沒有被成功的接收到,那么生產(chǎn)者會收到異常報錯。此時可以回滾事務,然后重試發(fā)送。如果收到了消息可以提交事務。缺點 就是吞吐量會下降,太耗性能。
    • 開啟confirm模式。開啟之后,每次都會分配一個唯一的id,如果寫入了隊列中,rabbitmq會回傳一個ack,如果沒有處理這個消息,會回調你的nack接口。告訴你接收失敗,可以重試。
    • 事務機制和confirm最大的區(qū)別是 事務時同步的。你提交一個事務后悔阻塞在哪里。confirm是異步的。你發(fā)送消息之后會發(fā)送下一個消息,然后那個消息rabbitmq接收后會異步回調你的接口
  • rabbitmq 自己弄丟了數(shù)據(jù)

    • ,必須開啟rabbitmq的持久化,就是消息會寫入之后持久化到硬盤上,哪怕是rabbitmq掛了重啟之后,數(shù)據(jù)還是能恢復的。
      • 設置持久化的兩個步驟:
        • 創(chuàng)建queue和交換器的時候將其設置為持久化。這樣可以保證rabbitmq持久化相關的元數(shù)據(jù)。但是不會持久化隊列中的數(shù)據(jù)
        • 發(fā)送消息的時候將消息的deliveryMode設置為2,就是將消息設置為持久化(默認為2,可不設置)。
  • 消費端弄丟了數(shù)據(jù):

    • rabbitmq如果丟失了數(shù)據(jù),主要是因為你消費的時候,剛消費到,還沒有處理,結果進程掛了,比如重啟
      • 用rabbitmq提供的ack機制。關閉自動ack,進行手動ack.

實際項目中遇到的問題:

  • 之前遇到過這樣的一個問題, 由于發(fā)送方與消費方的速度不匹配,在壓測時,導致大量數(shù)據(jù)在隊列中積壓,大約有10萬條,因為是測試數(shù)據(jù),當時采用的方式比較暴力,直接刪除隊列,在新建隊列,完成消息的刪除。但是在生產(chǎn)上是不能這么干滴?。。?/p>

  • 大量數(shù)據(jù)持續(xù)擠壓怎么解決?

    • 先修復consumer的問題
    • 新建幾個隊列,具體數(shù)量自己把控。
    • 新建一個程序,不做耗時處理,將現(xiàn)在積壓的數(shù)據(jù)分發(fā)到新建的隊列中。
    • 每個隊列綁定一個consumer,進行業(yè)務處理。
    • 處理完畢后,恢復之前的程序部署

消費方的業(yè)務處理時間可能比較長,最好采用多線程和多consumer進行消費。

程序實現(xiàn)

發(fā)送方
  • 配置文件:
    關于rabbitmq的主機,端口等我直接寫死在程序中,可通過properties文件注入,方便配置,基于rabbitTemplate進行代碼封裝


    配置.png
  • 初始化


    初始化.png

此處說下callback和returncallbakc的執(zhí)行時機:
消息沒有到達exchange,則confirm回調,ack=false
消息到達exchange,則confirm回調,ack=false
exchange到queue成功,則不回調returnCallBack。
exchange到queue失敗,則回調returnCallBack,在回調confirmCallBack.需設置mandatory=true,否則不回回調,消息就丟了

  • 消息發(fā)送


    消息發(fā)送.png

上面為封裝后對外開放的接口,可為此方法編寫重載方法。對外只暴露 泛型參數(shù)T

  • 消費方:兩種實現(xiàn)方式
    • 使用注解:@RabbitListener(queues = "notify.payment")


      手動ack.png
    • 實現(xiàn)ChannelAwareMessageListener,并在container中注入


      image.png

      image.png

消費方推薦使用多線程進行處理。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • RabbitMQ 簡介 MQ 消息隊列,上承生產(chǎn)者,下接消費者。從生產(chǎn)者側獲取消息,然后將消息轉發(fā)給消費者。由此可...
    2205閱讀 3,654評論 1 11
  • 應用場景 異步處理 場景說明:用戶注冊后,需要發(fā)注冊郵件和注冊短信,傳統(tǒng)的做法有兩種: 1.串行的方式 2.并行的...
    lijun_m閱讀 1,937評論 0 3
  • 網(wǎng)上rabbitmq的學習日志非常豐富,官網(wǎng)文檔也很完美,這里主要記錄學習和部署過程中的一些記錄。會按以下菜單進行...
    恐龍打醬油閱讀 2,653評論 0 4
  • 不知道你們有沒有被比你年長的所謂的“過來人”這樣教導過:在大學,重要的是學會與別人社交,多交朋友,多認識人,其他的...
    昔我往矣__閱讀 1,710評論 4 7
  • 繼續(xù)延續(xù)向上的力度還有,看中樞的第三類買點能不能在明后二天走出?否則就會是中樞震蕩。3254點開始的5分鐘級別向上...
    禪中悟纏閱讀 142評論 0 1

友情鏈接更多精彩內容