簡單介紹一下RabbitMQ安裝和使用,這里使用docker安裝RabbitMQ3.7版本。
docker安裝rabbitmq
- 下載rabbitmq鏡像,官方rabbitmq有很多版本,這里推薦下載帶management的版本,帶管理界面
>docker pull rabbitmq:3.7-management
- 運行rabbitmq
查看鏡像
>docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
rabbitmq 3.7-management c497955c219a 12 days ago 180MB
redis 5.0.9 5120d23bad51 2 weeks ago 98.3MB
containerize/elastichd latest c2202f76db37 2 years ago 28.1MB
mobz/elasticsearch-head 5 b19a5c98e43b 3 years ago 824MB
可以看到rabbitmq的鏡像id是c497955c219a,我們使用鏡像Id安裝
> docker run --name rabbitmq-3.7 -d -p 15672:15672 -p 5672:5672 --restart=always c497955c219a
可以看到很快就安裝好了
> docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
34d6319d5720 c497955c219a "docker-entrypoint.s…" 21 minutes ago Up 21 minutes 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp rabbitmq-3.7
-
訪問http://主機IP:15672,就可以看到rabbitmq管理界面了,默認用戶名和密碼都是guest
rabbitmq管理界面
rabbitmq使用
1. 在使用rabbitmq之前,首先得對AMQP協(xié)議有所了解。
-
AMQP協(xié)議模型
AMQP協(xié)議模型 -
AMQP消息流轉過程
消息流轉過程
Publisher:消息發(fā)送者,將消息發(fā)送到Exchange并指明Routing Key,以便Message Queue可以正確的收到消息
Consumer:消息接受者,從Message Queue獲取消息,一個Consumer可以訂閱多個Queue, 來接受Queue中的消息
Server: 一個具體的MQ服務實例
Virtual host: 虛擬主機,一個server下面可以有多個虛擬主機,通常用于隔離不同的項目,一個Virtual host下面通常會有多個Exchange、Message Queue
Exchange:交換器,從Producer接受消息, 根據Bindings中配置的Routing key, 把消息分派到對應的Message Queue中
Routing key:路由鍵,用于Exchange判斷哪些消息需要發(fā)送對應的Message Queue
Bindings: 描述了Exchange和Queue之間的關系。Exchange 根據消息內容 (routing key),和Binding配置來決定把消息分派到哪個Queue中
Message Queue: 存儲消息, 并把消息傳遞給最終的 Consumer
2. 創(chuàng)建exchange和queue
首先需要創(chuàng)建一個exchange和queue才能收發(fā)消息,直接登錄到管理界面進入Exchanges菜單(這里創(chuàng)建一個type為topic的exchange)

- Name是Exchage的名字
- Type是Exchage的Routingkey的綁定類型(direct,topic,fanout,header)
- Durability:消息是否持久化
- Auto delete:如果設置為yes則當exchange最后一個綁定的隊列被刪除后,就會自動刪除
- Internal:如果設置為yes,是RabbitMQ的內部使用,不提供給外部,自己編寫erlang語言做擴展時使用
- Arguments:擴展AMQP的自定義參數
再創(chuàng)建一個queue

新建routingkey,綁定exchange和queue的關系

3. 操作rabbitmq
使用springboot操作rabbitmq非常簡單,首先加入maven依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置rabbitmq地址
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 15000
發(fā)送消息,可以直接使用RabbitTemplate發(fā)送消息。
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test0(){
//傳入exchange 和 routingkey 就可以投遞消息到綁定的隊列,消息可以發(fā)字符串也可以發(fā)序列化對象
rabbitTemplate.convertAndSend("order-exchange","order.update","hello rabbit mq");
}
接收消息也非常簡單,添加listener配置。
spring:
rabbitmq:
host: 39.99.219.219
username: guest
password: guest
virtual-host: /
connection-timeout: 15000
listener: #消費端配置
simple:
concurrency: 5 #初始化并發(fā)數
max-concurrency: 10 #最大并發(fā)數據
auto-startup: true #自動開啟監(jiān)聽
prefetch: 1 #每個并發(fā)連接同一時間最多處理幾個消息,限流設置
acknowledge-mode: manual #簽收模式,設置為手動
接收消息只需要在方法上加@RabbitListener和@RabbitHandler注解。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "order-exchange",durable = "true",type = "topic"),
key = "order.update"
)
)
@RabbitHandler
public void onOrderMessage(@Payload String msg, @Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println("************消息接收開始***********");
System.out.println("msg: "+msg);
}
4. 消息接收的應答模式ACK和NACK的使用
為了保證消息可以正確被消費MQ提供了ACK和NACK的機制。
- ACK是手動簽收消息的標識,如果消息的簽收模式設置成為了手工模式,在MQ沒有接收到ACK信息時都是Unacked的狀態(tài),并且消息還在隊列中,這個時候消息不會重試,不會再主動發(fā)給消費者
- NACK:將消息重回隊列,如果我們發(fā)現異常,就可以調用NACK來將消息重回隊列,他會重回到隊尾重新發(fā)給消費者
可以看到上面的例子打印出了隊列信息但卻無法把這個隊列消費掉,當你重新啟動的時候又會讀到之前未消費完的消息。

可以看一下下面的例子,每個消息進來重試三次,第四次才簽收完成。
int flag = 1;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "order-exchange",durable = "true",type = "topic"),
key = "order.update"
)
)
@RabbitHandler
public void onListenMessage(@Payload String msg, @Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println("************消息接收開始***********");
System.out.println("msg: "+msg);
Long deliverTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//ACK進行簽收,第一個參數是標識,第二個參數是批量接收為fasle
//channel.basicAck(deliverTag,false);
if(flag>3){
//說明執(zhí)行了3次都沒有成功
//消息確認
channel.basicAck(deliverTag,false);
}else {
flag = flag+1;
//前兩個參數和上面ACK一樣,第三個參數是否重回隊列
channel.basicNack(deliverTag, false, true);
}
}
}
************Select消息接收開始***********
mq msg: hello rabbit mq
************Select消息接收開始***********
mq msg: hello rabbit mq
************Select消息接收開始***********
mq msg: hello rabbit mq
************Select消息接收開始***********
mq msg: hello rabbit mq
5. 發(fā)送確認和發(fā)送失敗退回
為了保證消息能一定投遞成功,AMQP提供了消息確認和消息失敗退回。
- 先加入配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 15000
#開啟發(fā)送確認
publisher-confirms: true
#開啟發(fā)送失敗退回
publisher-returns: true
2.發(fā)送消息的時候添加一個消息的唯一標識,在消息回調的時候可以通過唯一標識判斷消息投遞是否成功
@Test
public void test01(){
CorrelationData correlationData = new CorrelationData();
correlationData.setId("ms0001");
rabbitTemplate.convertAndSend("order-exchange","order.select","this is a msg",correlationData);
}
- 創(chuàng)建一個類并實現RabbitTemplate.ConfirmCallback和 RabbitTemplate.ReturnCallback接口,來接收消息投遞回調
@Component
public class OrderSenderCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* 每次發(fā)送完消息都會調這個方法
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("==========消息publish成功的ID: " + (correlationData != null ? correlationData.getId() : "null"));
System.out.println("==========消息是否發(fā)布成功: " + ack);
System.out.println("==========失敗的異常信息: " + cause);
}
/**
* 每次消息發(fā)送失敗會調用這個方法
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("******replayCode: " + replyCode);
System.out.println("******replayText: " + replyText);
System.out.println("******exchange: " + exchange);
System.out.println("******routingkey: " + routingKey);
}
}
消息隊列高可用方案
1. 生產端的可靠性投遞
假如用MQ所做的業(yè)務不允許消息丟失,那么生產端就要保證消息能夠百分之百投遞,消費端就要保證消息百分之百被消費。那么該怎么做呢?
- 消費端需要開啟手動應答,用ACK和NACK的機制來確保消息的消費應答
- 生產者發(fā)送消息后一定要有一個確認應答來確認消息的發(fā)送狀態(tài)
- RabbitMQ本身需要做HA高可用
- 做一個完善的消息補償機制
- RabbitMQHA高可用方案
鏡像模式和普通模式的區(qū)別就是,隊列的數據都鏡像了一份到所有的節(jié)點上。這樣任何一個節(jié)點失效,不會影響整個集群的使用。

- 消息補償機制
- 一開始要有一個業(yè)務DB和MQDB,將業(yè)務數據存入DB
- 業(yè)務落庫后就通過Sender發(fā)送消息給Broker,發(fā)送消息后就將消息體和消息狀態(tài)記錄到MQDB中(發(fā)送中,發(fā)送完成),并且會接收到Broker返回的消息投遞確認狀態(tài)(需要代碼支持)
- 消息發(fā)送方接收Broker返回的消息和網絡狀態(tài)的異常
- 如果消費發(fā)送確認失敗應該調用業(yè)務來將MQDB消息的狀態(tài)更新為發(fā)送失敗,如果發(fā)送狀態(tài)成功就將MQDB中的記錄更新為成功
- 使用分布式定時Job來對我們的消息進行判斷,只要是消息狀態(tài)不是我們發(fā)送成功的都需要進行重新發(fā)送
- 重試一定要有次數限制,當達到現在的次數就不再重試通知人工處理

2. 消費端冪等性問題解決
為了確保發(fā)送端的投遞成功,我們都會進行消息的補償,有可能消息補償的過程中會多發(fā)送幾次消息導致重復,這個時候就需要提前考慮消費端的冪等問題。
-
唯一ID+業(yè)務碼在數據庫中做主鍵來做去重
- 優(yōu)點:實現簡單方便
- 缺點:會對數據庫產生異常壓力,并且只能用來做insert的冪等
為執(zhí)行的內容做前置條件,類似于mysql的樂觀鎖
給數據更新前增加一個前置的條件,需要將拿到的前置條件做為更新的條件之一來做操作,如果在你之前已經更新了這個前置條件,那么你的更新就會不成功。這樣同樣也會對數據庫產生一定的壓力利用Redis原子性
第一個進入并拿到鎖的線程在鎖內部先判斷是否已消費,如果沒有消費則操作并記錄這個Measge_ID已消費,并發(fā)過程中的線程如果拿不到鎖就直接返回,這個線程能拿到鎖,但拿到后要去查詢這個Measge_ID是否消費過,如果已經消費了,就中止消費
3.消費端的消息可靠性保障
如果生產端能夠保證消息的可靠性那么消費端就一定能收到消息,需要注意的有兩點
- 開啟消息手動簽收,如果消費成功就手動發(fā)送ACK響應,表明這個消息消費成功
- 出現異??梢酝ㄟ^NACK將消息重回隊尾變成Ready狀態(tài)然后再次消費
- NACK要有重試次數限制,當超過次數就將這個消息發(fā)送到一個人工補償消息隊列或持久化對象中等待人工補償


