一、利用docker安裝rabbitmq
https://www.cnblogs.com/yufeng218/p/9452621.html
rabbitmq技術(shù)文檔:http://www.itdecent.cn/p/78847c203b76
Broker:消息系統(tǒng)
Virtual host(虛擬地址,可以理解為命名空間)
Connection:(TCP生產(chǎn)者或者消費(fèi)端和消息系統(tǒng)建立的長連接)
Channel:每一個(gè)連接里面可以劃分出很多渠道,一個(gè)消息的發(fā)送通過一個(gè)渠道發(fā)送到交換機(jī)。
Exchange:交換機(jī)(根據(jù)路由健把消息分發(fā)到不同的隊(duì)列)
Queue:隊(duì)列,存儲(chǔ)消息的隊(duì)列
Binding:綁定器,綁定交換機(jī)的路由健和隊(duì)列。

路由鍵有四種:
direct :
這種類型的交換機(jī)的路由規(guī)則是根據(jù)一個(gè)routingKey的標(biāo)識,交換機(jī)通過一個(gè)routingKey與隊(duì)列綁定 ,在生
產(chǎn)者生產(chǎn)消息的時(shí)候 指定一個(gè)routingKey 當(dāng)綁定的隊(duì)列的routingKey 與生產(chǎn)者發(fā)送的一樣 那么交換機(jī)會(huì)吧
這個(gè)消息發(fā)送給對應(yīng)的隊(duì)列。

fanout:
這種類型的交換機(jī)路由規(guī)則很簡單,只要與他綁定了的隊(duì)列, 他就會(huì)吧消息發(fā)送給對應(yīng)隊(duì)列(與routingKey
沒關(guān)系)

topic:
這種類型的交換機(jī)路由規(guī)則也是和routingKey有關(guān) 只不過 topic他可以根據(jù):*,#( 號代表過濾一單詞,#代
表過濾后面所有單詞, 用.隔開)來識別routingKey 我打個(gè)比方 假設(shè) 我綁定的routingKey 有隊(duì)列A和B A的
routingKey是:.user B的routingKey是: #.user
那么我生產(chǎn)一條消息routingKey 為: error.user 那么此時(shí) 2個(gè)隊(duì)列都能接受到, 如果改為 topic.error.user
那么這時(shí)候 只有B能接受到了


發(fā)布方消息確認(rèn)機(jī)制和失敗回調(diào)機(jī)制

application.yml
rabbitmq:
host: 192.168.29.133
port: 5672
username: admin
password: admin
virtual-host: my_vhost
publisher-confirms: true
publisher-returns: true
package com.luban.mall.search.mq;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
@Bean
public DirectExchange directExchange2() {
return new DirectExchange("directExchange2");
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
@Bean
public Queue queue1() {
return new Queue("testQueue1", true);
}
@Bean
public Queue queue2() {
return new Queue("testQueue2", true);
}
@Bean
public Queue queue3() {
return new Queue("testQueue3", true);
}
@Bean
public Queue queue4() {
return new Queue("testQueue4", true);
}
@Bean
public Queue queue5() {
return new Queue("testQueue5", true);
}
@Bean
public Queue queue6() {
return new Queue("testQueue6", true);
}
@Bean
public Binding binding1(){
return BindingBuilder.bind(queue1()).to(directExchange()).with("rkey1");
}
@Bean
public Binding binding2(){
return BindingBuilder.bind(queue2()).to(directExchange()).with("rkey2");
}
@Bean
public Binding binding3(){
return BindingBuilder.bind(queue3()).to(fanoutExchange());
}
@Bean
public Binding binding4(){
return BindingBuilder.bind(queue4()).to(fanoutExchange());
}
@Bean
public Binding binding5(){
return BindingBuilder.bind(queue5()).to(topicExchange()).with("*.user");
}
@Bean
public Binding binding6(){
return BindingBuilder.bind(queue6()).to(topicExchange()).with("#.user");
}
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
template.setReturnCallback(new MyReturnCallback());
template.setConfirmCallback(new MyPublisherConfirmCallback());
return template;
}
}
package com.luban.mall.search.mq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class MyPublisherConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//通過這個(gè)參數(shù)可以拿到我們發(fā)送的消息對象
Message message = correlationData.getReturnedMessage();
if(ack){
System.out.println("消息發(fā)送成功:"+correlationData+"---ack:"+ack+"---cause:"+cause);
}else{
System.out.println("消息發(fā)送失敗:"+correlationData+"---ack:"+ack+"---cause:"+cause);
}
}
}
public class MyReturnCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replycode, String replyText, String exchange, String routingkey) {
System.out.println("error return call back: message"+message+"--replycode:"+replycode+"-- replyText:"+replyText+"--exchange:"+exchange+"---routingkey:"+routingkey);
}
}
@RequestMapping(value = "/send5",method = RequestMethod.POST)
@ApiOperation("發(fā)送消息到消息隊(duì)列")
@ResponseBody
@ApiImplicitParams({
@ApiImplicitParam(name = "name", value = "用戶名", defaultValue = "李四",paramType = "query"),
@ApiImplicitParam(name = "age", value = "年齡", defaultValue = "23", required = true,paramType = "query")
}
)
public CommonResult<String> send5(@RequestParam String name,@RequestParam String age){
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(name,age,"majunweitest callback",correlationData);
return CommonResult.success("馬","返回正確");
}
消費(fèi)方ack機(jī)制

代碼實(shí)現(xiàn)
package com.luban.mall.search.mq;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyRabbitListenerContainerConif {
@Bean
public SimpleRabbitListenerContainerFactory
simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =
new SimpleRabbitListenerContainerFactory();
//這個(gè)connectionFactory就是我們自己配置的連接工廠直接注入進(jìn)來
simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
//這邊設(shè)置消息確認(rèn)方式由自動(dòng)確認(rèn)變?yōu)槭謩?dòng)確認(rèn)
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return simpleRabbitListenerContainerFactory;
}
}
package com.luban.mall.search.controller;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TestListener {
//@RabbitListener(queues = {"testQueue5","testQueue6"})
public void reciveMessage(String msg){
System.out.println("listener....:"+msg);
}
@RabbitListener(queues = {"testQueue5","testQueue6"},containerFactory = "simpleRabbitListenerContainerFactory")
public void getMessage(Message message, Channel channel) throws Exception{
System.out.println("list manul ack:"+(new String(message.getBody(),"UTF-8")));
Thread.sleep(5000l);
//消息消費(fèi)成功后調(diào)用第一個(gè)參數(shù)是消息的標(biāo)識字段,第二個(gè)是否是批量確認(rèn):false不是批量,true是批量
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
//消費(fèi)失敗后調(diào)用這個(gè)方法告知消息隊(duì)列最后一個(gè)參數(shù):是否返回原隊(duì)列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
}
消息預(yù)取
消息預(yù)取
扯完消息確認(rèn) 我們來講一下剛剛所說的批量處理的問題
什么情況下回遇到批量處理的問題呢?
在這里 就要先扯一下rabbitmq的消息發(fā)放機(jī)制了
rabbitmq 默認(rèn) 他會(huì)最快 以輪詢的機(jī)制吧隊(duì)列所有的消息發(fā)送給所有客戶端 (如果消息沒確認(rèn)的話 他會(huì)添加一個(gè)
Unacked的標(biāo)識上圖已經(jīng)看過了)
那么 這種機(jī)制會(huì)有什么問題呢, 對于Rabbitmq來講 這樣子能最快速的使自己不會(huì)囤積消息而對性能造成影響,
但是 對于我們整個(gè)系統(tǒng)來講, 這種機(jī)制會(huì)帶來很多問題, 比如說 我一個(gè)隊(duì)列有2個(gè)人同時(shí)在消費(fèi),而且他們處理
能力不同, 我打個(gè)最簡單的比方 有100個(gè)訂單消息需要處理(消費(fèi)) 現(xiàn)在有消費(fèi)者A 和消費(fèi)者B , 消費(fèi)者A消費(fèi)一
條消息的速度是 10ms 消費(fèi)者B 消費(fèi)一條消息的速度是15ms ( 當(dāng)然 這里只是打比方) 那么 rabbitmq 會(huì)默認(rèn)給
消費(fèi)者A B 一人50條消息讓他們消費(fèi) 但是 消費(fèi)者A 他500ms 就可以消費(fèi)完所有的消息 并且處于空閑狀態(tài) 而 消費(fèi)
}
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
者B需要750ms 才能消費(fèi)完 如果從性能上來考慮的話 這100條消息消費(fèi)完的時(shí)間一共是750ms(因?yàn)?個(gè)人同時(shí)在
消費(fèi)) 但是如果 在消費(fèi)者A消費(fèi)完的時(shí)候 能把這個(gè)空閑的性能用來和B一起消費(fèi)剩下的信息的話, 那么這處理速
度就會(huì)快非常多。
這個(gè)例子可能有點(diǎn)抽象, 我們通過代碼來演示一下
我往Rabbitmq生產(chǎn)100條消息 由2個(gè)消費(fèi)者來消費(fèi) 其中我們讓一個(gè)消費(fèi)者在消費(fèi)的時(shí)候休眠0.5秒(模擬處理業(yè)務(wù)
的延遲) 另外一個(gè)消費(fèi)者正常消費(fèi) 我們來看看效果:
正常的那個(gè)消費(fèi)者會(huì)一瞬間吧所有消息(50條)全部消費(fèi)完(因?yàn)槲覀冇?jì)算機(jī)處理速度非??欤?下圖是加了延遲
的消費(fèi)者:
可能我筆記里面你看不出效果,這個(gè)你自己測試就會(huì)發(fā)現(xiàn) 其中一個(gè)消費(fèi)者很快就處理完自己的消息了 另外一個(gè)消
費(fèi)者還在慢慢的處理 其實(shí) 這樣嚴(yán)重影響了我們的性能了。
其實(shí)講了這么多 那如何來解決這個(gè)問題呢?
我剛剛解釋過了 造成這個(gè)原因的根本就是rabbitmq消息的發(fā)放機(jī)制導(dǎo)致的, 那么我們現(xiàn)在來講一下解決方案: 消
息預(yù)取
什么是消息預(yù)??? 講白了以前是rabbitmq一股腦吧所有消息都均發(fā)給所有的消費(fèi)者(不管你受不受得了) 而現(xiàn)在
是在我消費(fèi)者消費(fèi)之前 先告訴rabbitmq 我一次能消費(fèi)多少數(shù)據(jù) 等我消費(fèi)完了之后告訴rabbitmq rabbitmq再給
我發(fā)送數(shù)據(jù)
在代碼中如何體現(xiàn)?
在使用消息預(yù)取前 要注意一定要設(shè)置為手動(dòng)確認(rèn)消息, 原因參考上面劃重點(diǎn)的那句話。
因?yàn)槲覀儎倓傇O(shè)置過了 這里就不貼代碼了, 完了之后設(shè)置一下我們預(yù)取消息的數(shù)量 一樣 是在容器(Container)
里面設(shè)置:
@Configuration
public class MyRabbitListenerContainerConif {
@Bean
public SimpleRabbitListenerContainerFactory
simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =
new SimpleRabbitListenerContainerFactory();
//這個(gè)connectionFactory就是我們自己配置的連接工廠直接注入進(jìn)來
simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
//這邊設(shè)置消息確認(rèn)方式由自動(dòng)確認(rèn)變?yōu)槭謩?dòng)確認(rèn)
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//設(shè)置消息預(yù)取得數(shù)量
simpleRabbitListenerContainerFactory.setPrefetchCount(2);
return simpleRabbitListenerContainerFactory;
}
}
@Component
public class TestListener {
//@RabbitListener(queues = {"testQueue5","testQueue6"})
public void reciveMessage(String msg){
System.out.println("listener....:"+msg);
}
@RabbitListener(queues = {"testQueue5","testQueue6"},containerFactory = "simpleRabbitListenerContainerFactory")
public void getMessage(Message message, Channel channel) throws Exception{
System.out.println("list manul ack:"+(new String(message.getBody(),"UTF-8")));
Thread.sleep(5000l);
//消息消費(fèi)成功后調(diào)用第一個(gè)參數(shù)是消息的標(biāo)識字段,第二個(gè)是否是批量確認(rèn):false不是批量,true是批量
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
//消費(fèi)失敗后調(diào)用這個(gè)方法告知消息隊(duì)列最后一個(gè)參數(shù):是否返回原隊(duì)列
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
}
死信交換機(jī)

package com.luban.mall.search.mq;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitmqConfig {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public DirectExchange deathExchange() {
return new DirectExchange("deathExchange");
}
@Bean
public Queue queue6() {
Map<String, Object> map = new HashMap();
//設(shè)置消息的過期時(shí)間 單位毫秒
map.put("x-message-ttl", 100000);
//設(shè)置附帶的死信交換機(jī)
map.put("x-dead-letter-exchange", "deathExchange");
//指定重定向的路由建 消息作廢之后可以決定需不需要更改他的路由建 如果需要 就在這里指定
map.put("x-dead-letter-routing-key", "drkey1");
return new Queue("testQueue6", true,false,false,map);
}
@Bean
public Queue queue7() {
return new Queue("deathQueue", true);
}
@Bean
public Binding binding7(){
return BindingBuilder.bind(queue7()).to(deathExchange()).with("drkey1");
}
@Bean
public Binding binding6(){
return BindingBuilder.bind(queue6()).to(topicExchange()).with("#.user");
}
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
template.setReturnCallback(new MyReturnCallback());
template.setConfirmCallback(new MyPublisherConfirmCallback());
return template;
}
}
怎么保證消息不丟失、不重復(fù)消費(fèi)問題
https://www.cnblogs.com/jis121/p/11050202.html
rabbitmq集群搭建,配置
rabbbitmq由于是由erlang語言開發(fā)的 天生就支持分布式
rabbitmq 的集群分兩種模式 一種是默認(rèn)模式 一種是鏡像模式
當(dāng)然 所謂的鏡像模式是基于默認(rèn)模式加上一定的配置來的
在rabbitmq集群當(dāng)中 所有的節(jié)點(diǎn)(一個(gè)rabbitmq服務(wù)器) 會(huì)被歸為兩類 一類是磁盤節(jié)點(diǎn) 一類是內(nèi)存節(jié)點(diǎn)
磁盤節(jié)點(diǎn)會(huì)把集群的所有信息(比如交換機(jī),隊(duì)列等信息)持久化到磁盤當(dāng)中,而內(nèi)存節(jié)點(diǎn)只會(huì)將這些信息保存到內(nèi)存
當(dāng)中 講白了 重啟一遍就沒了。
為了可用性考慮 rabbitmq官方強(qiáng)調(diào)集群環(huán)境至少需要有一個(gè)磁盤節(jié)點(diǎn), 而且為了高可用的話, 必須至少要有2個(gè)
磁盤節(jié)點(diǎn), 因?yàn)槿绻挥幸粋€(gè)磁盤節(jié)點(diǎn) 而剛好這唯一的磁盤節(jié)點(diǎn)宕機(jī)了的話, 集群雖然還是可以運(yùn)作, 但是不能
對集群進(jìn)行任何的修改操作(比如 隊(duì)列添加,交換機(jī)添加,增加/移除 新的節(jié)點(diǎn)等)
具體想讓rabbitmq實(shí)現(xiàn)集群, 我們首先需要改一下系統(tǒng)的hostname (因?yàn)閞abbitmq集群節(jié)點(diǎn)名稱是讀取
hostname的)
這里 我們模擬3個(gè)節(jié)點(diǎn) :
rabbitmq1
rabbitmq2
rabbitmq3
linux修改hostname命令: hostnamectl set-hostname [name]
修改后重啟一下 讓rabbitmq重新讀取節(jié)點(diǎn)名字
然后 我們需要讓每個(gè)節(jié)點(diǎn)通過hostname能ping通(記得關(guān)閉防火墻) 這里 我們可以修改修改一下hosts文件
關(guān)閉防火墻:
關(guān)閉防火墻 systemctl stop firewalld.service 禁止開機(jī)自啟 systemctl disable firewalld.service
接下來,我們需要將各個(gè)節(jié)點(diǎn)的.erlang.cookie文件內(nèi)容保持一致(文件路徑/var/lib/rabbitmq/.erlang.cookie)
因?yàn)槲沂遣捎锰摂M機(jī)的方式來模擬集群環(huán)境, 所以如果像我一樣是克隆的虛擬機(jī)的話 同步.erlang.cookie文件這個(gè)
操作在克隆的時(shí)候就已經(jīng)完成了。
上面這些步驟完成之后 我們就可以開始來構(gòu)建集群 了
我們先讓rabbitmq2 加入 rabbitmq1與他構(gòu)建為一個(gè)集群
執(zhí)行命令( ram:使rabbitmq2成為一個(gè)內(nèi)存節(jié)點(diǎn) 默認(rèn)為:disk 磁盤節(jié)點(diǎn)):
rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@rabbitmq1 --ram rabbitmqctl start_app
在構(gòu)建的時(shí)候 我們需要先停掉rabbitmqctl服務(wù)才能構(gòu)建 等構(gòu)建完畢之后再啟動(dòng)
我們吧rabbitmq2添加完之后在rabbitmq3節(jié)點(diǎn)上也執(zhí)行同樣的代碼 使他也加入進(jìn)去 當(dāng)然 我們也可以讓
rabbitmq3也作為一個(gè)磁盤節(jié)點(diǎn)
當(dāng)執(zhí)行完操作以后我們來看看效果:

隨便在哪個(gè)節(jié)點(diǎn)打開管理頁面都能看到集群環(huán)境各節(jié)點(diǎn)的信息;
有關(guān)集群的其他命令:
rabbitmq-server -detached 啟動(dòng)RabbitMQ節(jié)點(diǎn) rabbitmqctl start_app 啟動(dòng)RabbitMQ應(yīng)用,而不是節(jié)點(diǎn)
rabbitmqctl stop_app 停止 rabbitmqctl status 查看狀態(tài) rabbitmqctl add_user mq 123456 rabbitmqctl
set_user_tags mq administrator 新增賬戶 rabbitmq-plugins enable rabbitmq_management 啟用
RabbitMQ_Management rabbitmqctl cluster_status 集群狀態(tài) rabbitmqctl forget_cluster_node
rabbit@[nodeName] 節(jié)點(diǎn)摘除 rabbitmqctl reset application 重置
普通模式的rabbitmq集群搭建好后, 我們來說一下鏡像模式
在普通模式下的rabbitmq集群 他會(huì)吧所有節(jié)點(diǎn)的交換機(jī)信息 和隊(duì)列的元數(shù)據(jù)(隊(duì)列數(shù)據(jù)分為兩種 一種為隊(duì)列里面
的消息, 另外一種是隊(duì)列本身的信息 比如隊(duì)列的最大容量,隊(duì)列的名稱,等等配置信息, 后者稱之為元數(shù)據(jù)) 進(jìn)
行復(fù)制 確保所有節(jié)點(diǎn)都有一份。
而鏡像模式,則是吧所有的隊(duì)列數(shù)據(jù)完全同步(當(dāng)然 對性能肯定會(huì)有一定影響) 當(dāng)對數(shù)據(jù)可靠性要求高時(shí) 可以使
用鏡像模式
實(shí)現(xiàn)鏡像模式也非常簡單 有2種方式 一種是直接在管理臺控制, 另外一種是在聲明隊(duì)列的時(shí)候控制
聲明隊(duì)列的時(shí)候可以加入鏡像隊(duì)列參數(shù) 在上方的參數(shù)列表當(dāng)中有解釋 我們來講一下管理臺控制
鏡像隊(duì)列配置命令解釋:
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可選參數(shù),針對指定vhost下的queue進(jìn)行設(shè)置 Name: policy的名稱 Pattern: queue的匹配模式(正
則表達(dá)式) Definition:鏡像定義,包括三個(gè)部分ha-mode, ha-params, ha-sync-mode ha-mode:指明鏡像隊(duì)
列的模式,有效值為 all/exactly/nodes all:表示在集群中所有的節(jié)點(diǎn)上進(jìn)行鏡像 exactly:表示在指定個(gè)數(shù)的
節(jié)點(diǎn)上進(jìn)行鏡像,節(jié)點(diǎn)的個(gè)數(shù)由ha-params指定 nodes:表示在指定的節(jié)點(diǎn)上進(jìn)行鏡像,節(jié)點(diǎn)名稱通過ha-
params指定 ha-params:ha-mode模式需要用到的參數(shù) ha-sync-mode:進(jìn)行隊(duì)列中消息的同步方式,有效
值為automatic和manual
這里舉個(gè)例子 如果想配置所有名字開頭為 policy的隊(duì)列進(jìn)行鏡像 鏡像數(shù)量為1那么命令如下:
rabbitmqctl set_policy ha_policy "^policy_" '{"ha-mode":"exactly","ha-params":1,"ha-sync-
mode":"automatic"}'
怎么保證消息不丟失
消息發(fā)布方:
1.創(chuàng)建隊(duì)列時(shí),設(shè)置隊(duì)列的持久化
2.發(fā)送消息是設(shè)置消息的模式deliveryMode=2,持久化
3.利用發(fā)送方ack機(jī)制,發(fā)送前在redis中維護(hù)id-發(fā)送狀態(tài)[0-未發(fā)送,1-已發(fā)送,2-發(fā)送成功,3-發(fā)送失敗],時(shí)間戳
2-發(fā)送成功的移除集合,對3-發(fā)送失敗和1-已發(fā)送但超過規(guī)定時(shí)間未ack的消息,開啟一個(gè)新的線程或任務(wù)定期掃描并發(fā)送。
消息接收方:
1.開啟手動(dòng)確認(rèn)機(jī)制。
2.消費(fèi)消息后還未手動(dòng)確認(rèn),系統(tǒng)掛掉了(這種情況不用考慮,消息會(huì)從unacked狀態(tài)重新入隊(duì)改為redy狀態(tài))
3.消息消費(fèi)時(shí)出現(xiàn)業(yè)務(wù)異常的情況,手動(dòng)ack消息失敗,放入原隊(duì)列或者重定向到死信隊(duì)列。
怎么保證消息的順序性
1.要實(shí)現(xiàn)嚴(yán)格的順序消息,簡單且可行的辦法就是:保證生產(chǎn)者 - MQServer - 消費(fèi)者是一對一對一的關(guān)系
2.設(shè)定相關(guān)的路由鍵,把強(qiáng)相關(guān)的數(shù)據(jù)分配到同一個(gè)隊(duì)列,一個(gè)隊(duì)列對應(yīng)一個(gè)消費(fèi)者。
面試題解答:
1.rabbitmq
應(yīng)用場景:
質(zhì)控分組
怎么保證消息不丟失:
1.搭建高可用集群,設(shè)置多個(gè)磁盤節(jié)點(diǎn)保證元數(shù)據(jù)不丟失
2.創(chuàng)建持久化隊(duì)列,創(chuàng)建鏡像隊(duì)列
3.發(fā)送消息時(shí)指定消息是持久化消息
4.利用消息發(fā)送端的消息確認(rèn)回調(diào)機(jī)制,比如在redis中維護(hù)消息id與發(fā)送狀態(tài)的記錄,ack確認(rèn)發(fā)送成功的清除掉,發(fā)送失敗的,redis中更新失敗狀態(tài)
5.消費(fèi)端開啟手動(dòng)確認(rèn),業(yè)務(wù)異常情況 存入數(shù)據(jù)庫,存庫失敗,放入死信隊(duì)列。
怎么保證消息不重復(fù)消費(fèi)?
消息冪等性問題:消費(fèi)消息時(shí)判斷該消息是否已經(jīng)處理過,處理過就不在處理。(比如你存庫已經(jīng)存了那這個(gè)時(shí)候 再過來這個(gè)消息 要么就不處理)
怎么保證消息的有序性?
一個(gè)消息隊(duì)列只開啟一個(gè)消費(fèi)端:或者通過路由間把強(qiáng)關(guān)聯(lián)的消息發(fā)送到同一個(gè)隊(duì)列,然后每個(gè)消費(fèi)端跟隊(duì)列一一對應(yīng)。
怎么解決消息堆積問題?
大量消息堆積:
造成消息堆積有兩種原因:
1.很大原因是消費(fèi)端掛了或者消費(fèi)端的處理能力比較差?
臨時(shí)借調(diào)10倍的機(jī)器,部署消費(fèi)端進(jìn)行消費(fèi)
2.消費(fèi)端bug問題
,如果是消費(fèi)端bug問題,那就寫一個(gè)臨時(shí)的消費(fèi)消息的應(yīng)用 把消息釋放掉,等bug修復(fù)之后,重新再服務(wù)端生成消息。