安裝
服務(wù)器安裝CentOS7.3,在此服務(wù)器上安裝RabbitMQ
這里采用rpm的安裝方式,準備的安裝包如下

RabbitMQ是Erlang語言開發(fā)的因此安裝順序為:erlang->socat->rabbmitmq
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm
安裝socat時出現(xiàn)異常

原因是與系統(tǒng)本地庫有沖突
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm --force --nodeps 采取強制安裝
最后安裝rabbitmq
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
安裝成功后會有命令提示

修改配置信息

放開guest用戶

開啟控制臺插件,易于控制臺操作
rabbitmq-plugins enable rabbitmq_management
啟動RabbitMQ
rabbitmq-server start &
瀏覽器輸入IP:15672,但是毫無反饋
這時還要修改下firewall
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
systemctl restart firewalld.service
CentOS7數(shù)據(jù)服務(wù)器的搭建文內(nèi)有firewall參數(shù)介紹
成功訪問

關(guān)閉RabbitMQ
rabbitmqctl stop
開機自啟動
chkconfig rabbitmq-server on
五種交換機,六種分發(fā)模式

- P是生產(chǎn)者負責產(chǎn)生消息
- X是交換機負責投遞消息
- 紅色部分為隊列緩沖,用于存儲傳過來的消息
- C為消費者,從緩沖隊列中取出消息進行消費
P產(chǎn)生消息只需知道要投遞的X即可,后續(xù)過程不關(guān)心
C從指定的隊列中取消息消費也無需關(guān)心消息的產(chǎn)生過程
X負責RabbitMQ內(nèi)部消息的流轉(zhuǎn),重任都在X上
交換機
1、Direct交換機
消息過來時指定了routingkey,因此只有完全相同routingkey的隊列才能夠收到消息

2、Topic交換機
消息過來時指定了routingkey其組成由“.”分隔,通過通配符匹配到對應(yīng)routingkey的隊列,類似于SQL中的模糊匹配
通配符有:
- ‘*’代表一個單詞
- ‘#’代表零個或多個單詞

3、Fanout交換機
將收到的所有消息,發(fā)送到所有綁定的隊列中(廣播)

4、Headers交換機
消息傳遞過程中不采用routingkey,采用Properties取代。
在P端channel發(fā)送消息時設(shè)置Properties的鍵值對屬性,在C端綁定隊列時設(shè)置對應(yīng)的Properties屬性
P:
channel.basicPublish(EXCHANGE_NAME,routingkey , properties, message);
C:
channel.queueBind(queueName, EXCHANGE_NAME, routingkey, properties);
注:P端與C端properties類型不同,routingkey為“”
5、System Default交換機
系統(tǒng)默認存在的交換機,規(guī)則同Direct交換機
分發(fā)模式
上述5種交換機,常用的為前三種,根據(jù)場景的需求不同共有如下幾種分發(fā)模式
代碼通用模板,下述只列出差異代碼塊
ConnectionFactory factory = new ConnectionFactory();
// 設(shè)置鏈接地址
factory.setHost("127.0.0.1");
// 創(chuàng)建連接
Connection connection = factory.newConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
P端:
// 指定隊列
channel.queueDeclare(QUEUE_NAME, Durable, false, false, null);
// 發(fā)送的消息
String message = "hello world!";
// 發(fā)送消息
channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, Properties, message.getBytes());
// 關(guān)閉頻道和連接
channel.close();
connection.close();
C端:
// 聲明隊列,防止消息接收者先運行,隊列不存在。
channel.queueDeclare(QUEUE_NAME, Durable, false, false, null);
// 創(chuàng)建隊列消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消費隊列 AutoACK表示是否自動簽收
channel.basicConsume(QUEUE_NAME, AutoACK, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
}
1、簡單分發(fā)模式
一個P一個C,采用Direct交換機,無特殊要求可默認使用System Default交換機,指明routingkey即可送達

P:
channel.basicPublish("", QUEUE_NAME, null, message);
C:
channel.basicConsume(QUEUE_NAME, true, QueueingConsumer);
2、工作隊列模式
一個P多個C,采用Direct交換機(默認即可)將消息存入隊列,多個C監(jiān)聽隊列消息
消息丟失
由于存在多個消費者,若一個消費者在處理過程中死掉,則處理的消息便會被丟失,因此不能夠自動簽收
負載均衡
場景:奇數(shù)任務(wù)繁重,偶數(shù)任務(wù)簡單;在存在2個C的情況下,就會累死一個C,所以需要保證C在完成任務(wù)后再接收新消息

P:
channel.basicPublish("", QUEUE_NAME, null, message);
C:
channel.basicQos(1);// 負載均衡,每次只處理一條消息
channel.basicConsume(QUEUE_NAME, false, QueueingConsumer);
3、廣播模式
采用Fanout交換機,將消息復(fù)制發(fā)送到每一個隊列中

創(chuàng)建交換機時選擇類型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
4、路由模式
前幾種模式僅聲明了交換機和隊列的關(guān)系,綁定還可以有額外的routingkey參數(shù),采用Direct交換機。
消息到達X后,根據(jù)routingkey進行復(fù)制分發(fā)

聲明 routingkey
channel.queueBind(queueName, EXCHANGE_NAME,routingkey);
5、主題模式
與路由模式類似,區(qū)別在于routingkey。主題模式下routingkey是由‘.’構(gòu)成的字符串。
由于Topic交換機的特性,將消息復(fù)制發(fā)送到不同的隊列中

P:
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
C:
channel.queueBind(queueName, EXCHANGE_NAME, routingkey);
6、RPC模式
RPC模式在RabbitMQ中沒有明確的P與C,因為雙方都在發(fā)送和接收消息
重點:
- C端與S端都需要綁定隊列以獲取對應(yīng)消息
- 消息在傳遞過程中始終包含兩個屬性correlationId和replyTo
-- correlationId用以標識請求和結(jié)果的對應(yīng)關(guān)系
-- replyTo標識結(jié)果要去的隊列- 此模式下可以添加多個C端,以提升處理性能,通過basicQos控制負載


以上六種消息的分發(fā)模式都要根據(jù)場景設(shè)置容錯和消息補償機制
代碼實戰(zhàn)


