RabbitMQ操作指南

安裝

服務(wù)器安裝CentOS7.3,在此服務(wù)器上安裝RabbitMQ

這里采用rpm的安裝方式,準備的安裝包如下

RabbitMQ是Erlang語言開發(fā)的因此安裝順序為:erlang->socat->rabbmitmq

 rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

 rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm

安裝socat時出現(xiàn)異常

原因是與系統(tǒng)本地庫有沖突

rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm --force --nodeps 采取強制安裝

最后安裝rabbitmq

rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

安裝成功后會有命令提示

修改配置信息

放開guest用戶

開啟控制臺插件,易于控制臺操作

rabbitmq-plugins enable rabbitmq_management

啟動RabbitMQ

rabbitmq-server start &

瀏覽器輸入IP:15672,但是毫無反饋

這時還要修改下firewall

firewall-cmd --zone=public --add-port=15672/tcp --permanent 
firewall-cmd --zone=public --add-port=5672/tcp --permanent 

systemctl restart firewalld.service

CentOS7數(shù)據(jù)服務(wù)器的搭建文內(nèi)有firewall參數(shù)介紹

成功訪問

關(guān)閉RabbitMQ

rabbitmqctl stop

開機自啟動

chkconfig rabbitmq-server on

五種交換機,六種分發(fā)模式

官網(wǎng)解釋

RabbitMQ各部件間的關(guān)系
  • P是生產(chǎn)者負責產(chǎn)生消息
  • X是交換機負責投遞消息
  • 紅色部分為隊列緩沖,用于存儲傳過來的消息
  • C為消費者,從緩沖隊列中取出消息進行消費

P產(chǎn)生消息只需知道要投遞的X即可,后續(xù)過程不關(guān)心
C從指定的隊列中取消息消費也無需關(guān)心消息的產(chǎn)生過程
X負責RabbitMQ內(nèi)部消息的流轉(zhuǎn),重任都在X上

交換機

1、Direct交換機

消息過來時指定了routingkey,因此只有完全相同routingkey的隊列才能夠收到消息

2、Topic交換機

消息過來時指定了routingkey其組成由“.”分隔,通過通配符匹配到對應(yīng)routingkey的隊列,類似于SQL中的模糊匹配

通配符有:

  • ‘*’代表一個單詞
  • ‘#’代表零個或多個單詞

3、Fanout交換機

將收到的所有消息,發(fā)送到所有綁定的隊列中(廣播)

4、Headers交換機

消息傳遞過程中不采用routingkey,采用Properties取代。

在P端channel發(fā)送消息時設(shè)置Properties的鍵值對屬性,在C端綁定隊列時設(shè)置對應(yīng)的Properties屬性

P:
channel.basicPublish(EXCHANGE_NAME,routingkey , properties, message);

C:
channel.queueBind(queueName, EXCHANGE_NAME, routingkey, properties);

注:P端與C端properties類型不同,routingkey為“”

5、System Default交換機

系統(tǒng)默認存在的交換機,規(guī)則同Direct交換機

分發(fā)模式

上述5種交換機,常用的為前三種,根據(jù)場景的需求不同共有如下幾種分發(fā)模式

代碼通用模板,下述只列出差異代碼塊

ConnectionFactory factory = new ConnectionFactory();  
// 設(shè)置鏈接地址
factory.setHost("127.0.0.1");  
// 創(chuàng)建連接  
Connection connection = factory.newConnection();  
// 創(chuàng)建頻道  
Channel channel = connection.createChannel();  

P端:

// 指定隊列  
channel.queueDeclare(QUEUE_NAME, Durable, false, false, null);  
// 發(fā)送的消息  
String message = "hello world!";  
// 發(fā)送消息  
channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, Properties, message.getBytes());  
// 關(guān)閉頻道和連接  
channel.close();  
connection.close();  

C端:

// 聲明隊列,防止消息接收者先運行,隊列不存在。  
channel.queueDeclare(QUEUE_NAME, Durable, false, false, null);  
// 創(chuàng)建隊列消費者  
QueueingConsumer consumer = new QueueingConsumer(channel);  
// 指定消費隊列  AutoACK表示是否自動簽收
channel.basicConsume(QUEUE_NAME, AutoACK, consumer);  
while (true) {  
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
    String message = new String(delivery.getBody());  
}

1、簡單分發(fā)模式

一個P一個C,采用Direct交換機,無特殊要求可默認使用System Default交換機,指明routingkey即可送達

P:
channel.basicPublish("", QUEUE_NAME, null, message); 

C:
channel.basicConsume(QUEUE_NAME, true, QueueingConsumer); 

2、工作隊列模式

一個P多個C,采用Direct交換機(默認即可)將消息存入隊列,多個C監(jiān)聽隊列消息

消息丟失

由于存在多個消費者,若一個消費者在處理過程中死掉,則處理的消息便會被丟失,因此不能夠自動簽收

負載均衡

場景:奇數(shù)任務(wù)繁重,偶數(shù)任務(wù)簡單;在存在2個C的情況下,就會累死一個C,所以需要保證C在完成任務(wù)后再接收新消息

P:
channel.basicPublish("", QUEUE_NAME, null, message);

C:
channel.basicQos(1);// 負載均衡,每次只處理一條消息
channel.basicConsume(QUEUE_NAME, false, QueueingConsumer); 

3、廣播模式

采用Fanout交換機,將消息復(fù)制發(fā)送到每一個隊列中

創(chuàng)建交換機時選擇類型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

4、路由模式

前幾種模式僅聲明了交換機和隊列的關(guān)系,綁定還可以有額外的routingkey參數(shù),采用Direct交換機。

消息到達X后,根據(jù)routingkey進行復(fù)制分發(fā)

聲明 routingkey
channel.queueBind(queueName, EXCHANGE_NAME,routingkey);

5、主題模式

與路由模式類似,區(qū)別在于routingkey。主題模式下routingkey是由‘.’構(gòu)成的字符串。

由于Topic交換機的特性,將消息復(fù)制發(fā)送到不同的隊列中

P:
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

C:
channel.queueBind(queueName, EXCHANGE_NAME, routingkey);

6、RPC模式

RPC模式在RabbitMQ中沒有明確的P與C,因為雙方都在發(fā)送和接收消息

重點:

  • C端與S端都需要綁定隊列以獲取對應(yīng)消息
  • 消息在傳遞過程中始終包含兩個屬性correlationId和replyTo
    -- correlationId用以標識請求和結(jié)果的對應(yīng)關(guān)系
    -- replyTo標識結(jié)果要去的隊列
  • 此模式下可以添加多個C端,以提升處理性能,通過basicQos控制負載
主子線程間數(shù)據(jù)傳遞

以上六種消息的分發(fā)模式都要根據(jù)場景設(shè)置容錯和消息補償機制

代碼實戰(zhàn)

work工作模式未設(shè)置prefetch1
work工作模式設(shè)置prefetch1
手動確認
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容