4.RabbitMq消息隊(duì)列

一、利用docker安裝rabbitmq

https://www.cnblogs.com/yufeng218/p/9452621.html
rabbitmq技術(shù)文檔:http://www.itdecent.cn/p/78847c203b76
Broker:消息系統(tǒng)
Virtual host(虛擬地址,可以理解為命名空間)
Connection:(TCP生產(chǎn)者或者消費(fèi)端和消息系統(tǒng)建立的長連接)
Channel:每一個(gè)連接里面可以劃分出很多渠道,一個(gè)消息的發(fā)送通過一個(gè)渠道發(fā)送到交換機(jī)。
Exchange:交換機(jī)(根據(jù)路由健把消息分發(fā)到不同的隊(duì)列)
Queue:隊(duì)列,存儲(chǔ)消息的隊(duì)列
Binding:綁定器,綁定交換機(jī)的路由健和隊(duì)列。

image.png

路由鍵有四種:

direct :

這種類型的交換機(jī)的路由規(guī)則是根據(jù)一個(gè)routingKey的標(biāo)識,交換機(jī)通過一個(gè)routingKey與隊(duì)列綁定 ,在生
產(chǎn)者生產(chǎn)消息的時(shí)候 指定一個(gè)routingKey 當(dāng)綁定的隊(duì)列的routingKey 與生產(chǎn)者發(fā)送的一樣 那么交換機(jī)會(huì)吧
這個(gè)消息發(fā)送給對應(yīng)的隊(duì)列。

image.png

fanout:

這種類型的交換機(jī)路由規(guī)則很簡單,只要與他綁定了的隊(duì)列, 他就會(huì)吧消息發(fā)送給對應(yīng)隊(duì)列(與routingKey
沒關(guān)系)


image.png

topic:

這種類型的交換機(jī)路由規(guī)則也是和routingKey有關(guān) 只不過 topic他可以根據(jù):*,#( 號代表過濾一單詞,#代
表過濾后面所有單詞, 用.隔開)來識別routingKey 我打個(gè)比方 假設(shè) 我綁定的routingKey 有隊(duì)列A和B A的
routingKey是:
.user B的routingKey是: #.user
那么我生產(chǎn)一條消息routingKey 為: error.user 那么此時(shí) 2個(gè)隊(duì)列都能接受到, 如果改為 topic.error.user
那么這時(shí)候 只有B能接受到了

image.png

image.png

發(fā)布方消息確認(rèn)機(jī)制和失敗回調(diào)機(jī)制

image.png
application.yml
rabbitmq:
    host: 192.168.29.133
    port: 5672
    username: admin
    password: admin
    virtual-host: my_vhost
    publisher-confirms: true
    publisher-returns: true
package com.luban.mall.search.mq;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
    @Autowired
    private ConnectionFactory connectionFactory;
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("directExchange");
    }
    @Bean
    public DirectExchange directExchange2() {
        return new DirectExchange("directExchange2");
    }
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }
    @Bean
    public Queue queue1() {
        return new Queue("testQueue1", true);
    }
    @Bean
    public Queue queue2() {
        return new Queue("testQueue2", true);
    }
    @Bean
    public Queue queue3() {
        return new Queue("testQueue3", true);
    }
    @Bean
    public Queue queue4() {
        return new Queue("testQueue4", true);
    }
    @Bean
    public Queue queue5() {
        return new Queue("testQueue5", true);
    }
    @Bean
    public Queue queue6() {
        return new Queue("testQueue6", true);
    }
    @Bean
    public Binding binding1(){
        return BindingBuilder.bind(queue1()).to(directExchange()).with("rkey1");
    }
    @Bean
    public Binding binding2(){
        return BindingBuilder.bind(queue2()).to(directExchange()).with("rkey2");
    }
    @Bean
    public Binding binding3(){
        return BindingBuilder.bind(queue3()).to(fanoutExchange());
    }
    @Bean
    public Binding binding4(){
        return BindingBuilder.bind(queue4()).to(fanoutExchange());
    }
    @Bean
    public Binding binding5(){
        return BindingBuilder.bind(queue5()).to(topicExchange()).with("*.user");
    }
    @Bean
    public Binding binding6(){
        return BindingBuilder.bind(queue6()).to(topicExchange()).with("#.user");
    }
    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMandatory(true);
        template.setReturnCallback(new MyReturnCallback());
        template.setConfirmCallback(new MyPublisherConfirmCallback());
        return template;
    }
}

package com.luban.mall.search.mq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class MyPublisherConfirmCallback implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //通過這個(gè)參數(shù)可以拿到我們發(fā)送的消息對象
        Message message = correlationData.getReturnedMessage();
        if(ack){
            System.out.println("消息發(fā)送成功:"+correlationData+"---ack:"+ack+"---cause:"+cause);
        }else{
            System.out.println("消息發(fā)送失敗:"+correlationData+"---ack:"+ack+"---cause:"+cause);
        }
    }
}
public class MyReturnCallback implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replycode, String replyText, String exchange, String routingkey) {
        System.out.println("error return call back: message"+message+"--replycode:"+replycode+"-- replyText:"+replyText+"--exchange:"+exchange+"---routingkey:"+routingkey);
    }
}
@RequestMapping(value = "/send5",method = RequestMethod.POST)
    @ApiOperation("發(fā)送消息到消息隊(duì)列")
    @ResponseBody
    @ApiImplicitParams({
            @ApiImplicitParam(name = "name", value = "用戶名", defaultValue = "李四",paramType = "query"),
            @ApiImplicitParam(name = "age", value = "年齡", defaultValue = "23", required = true,paramType = "query")
    }
    )
    public CommonResult<String> send5(@RequestParam String name,@RequestParam String age){
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(name,age,"majunweitest callback",correlationData);
        return CommonResult.success("馬","返回正確");
    }

消費(fèi)方ack機(jī)制

image.png

代碼實(shí)現(xiàn)

package com.luban.mall.search.mq;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyRabbitListenerContainerConif {
    @Bean
    public SimpleRabbitListenerContainerFactory
    simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory){
     SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =
    new SimpleRabbitListenerContainerFactory();
   //這個(gè)connectionFactory就是我們自己配置的連接工廠直接注入進(jìn)來
  simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
  //這邊設(shè)置消息確認(rèn)方式由自動(dòng)確認(rèn)變?yōu)槭謩?dòng)確認(rèn)
  simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  return simpleRabbitListenerContainerFactory;
    }

}
package com.luban.mall.search.controller;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TestListener {
    //@RabbitListener(queues = {"testQueue5","testQueue6"})
    public void reciveMessage(String msg){
        System.out.println("listener....:"+msg);
    }
    @RabbitListener(queues = {"testQueue5","testQueue6"},containerFactory = "simpleRabbitListenerContainerFactory")
    public void getMessage(Message message, Channel channel) throws Exception{
        System.out.println("list manul ack:"+(new String(message.getBody(),"UTF-8")));
        Thread.sleep(5000l);
        //消息消費(fèi)成功后調(diào)用第一個(gè)參數(shù)是消息的標(biāo)識字段,第二個(gè)是否是批量確認(rèn):false不是批量,true是批量
       // channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        //消費(fèi)失敗后調(diào)用這個(gè)方法告知消息隊(duì)列最后一個(gè)參數(shù):是否返回原隊(duì)列
       channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    }
}

消息預(yù)取

消息預(yù)取
扯完消息確認(rèn) 我們來講一下剛剛所說的批量處理的問題
什么情況下回遇到批量處理的問題呢?
在這里 就要先扯一下rabbitmq的消息發(fā)放機(jī)制了
rabbitmq 默認(rèn) 他會(huì)最快 以輪詢的機(jī)制吧隊(duì)列所有的消息發(fā)送給所有客戶端 (如果消息沒確認(rèn)的話 他會(huì)添加一個(gè)
Unacked的標(biāo)識上圖已經(jīng)看過了)
那么 這種機(jī)制會(huì)有什么問題呢, 對于Rabbitmq來講 這樣子能最快速的使自己不會(huì)囤積消息而對性能造成影響,
但是 對于我們整個(gè)系統(tǒng)來講, 這種機(jī)制會(huì)帶來很多問題, 比如說 我一個(gè)隊(duì)列有2個(gè)人同時(shí)在消費(fèi),而且他們處理
能力不同, 我打個(gè)最簡單的比方 有100個(gè)訂單消息需要處理(消費(fèi)) 現(xiàn)在有消費(fèi)者A 和消費(fèi)者B , 消費(fèi)者A消費(fèi)一
條消息的速度是 10ms 消費(fèi)者B 消費(fèi)一條消息的速度是15ms ( 當(dāng)然 這里只是打比方) 那么 rabbitmq 會(huì)默認(rèn)給
消費(fèi)者A B 一人50條消息讓他們消費(fèi) 但是 消費(fèi)者A 他500ms 就可以消費(fèi)完所有的消息 并且處于空閑狀態(tài) 而 消費(fèi)
}
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
者B需要750ms 才能消費(fèi)完 如果從性能上來考慮的話 這100條消息消費(fèi)完的時(shí)間一共是750ms(因?yàn)?個(gè)人同時(shí)在
消費(fèi)) 但是如果 在消費(fèi)者A消費(fèi)完的時(shí)候 能把這個(gè)空閑的性能用來和B一起消費(fèi)剩下的信息的話, 那么這處理速
度就會(huì)快非常多。
這個(gè)例子可能有點(diǎn)抽象, 我們通過代碼來演示一下
我往Rabbitmq生產(chǎn)100條消息 由2個(gè)消費(fèi)者來消費(fèi) 其中我們讓一個(gè)消費(fèi)者在消費(fèi)的時(shí)候休眠0.5秒(模擬處理業(yè)務(wù)
的延遲) 另外一個(gè)消費(fèi)者正常消費(fèi) 我們來看看效果:
正常的那個(gè)消費(fèi)者會(huì)一瞬間吧所有消息(50條)全部消費(fèi)完(因?yàn)槲覀冇?jì)算機(jī)處理速度非??欤?下圖是加了延遲
的消費(fèi)者:
可能我筆記里面你看不出效果,這個(gè)你自己測試就會(huì)發(fā)現(xiàn) 其中一個(gè)消費(fèi)者很快就處理完自己的消息了 另外一個(gè)消
費(fèi)者還在慢慢的處理 其實(shí) 這樣嚴(yán)重影響了我們的性能了。
其實(shí)講了這么多 那如何來解決這個(gè)問題呢?
我剛剛解釋過了 造成這個(gè)原因的根本就是rabbitmq消息的發(fā)放機(jī)制導(dǎo)致的, 那么我們現(xiàn)在來講一下解決方案: 消
息預(yù)取
什么是消息預(yù)??? 講白了以前是rabbitmq一股腦吧所有消息都均發(fā)給所有的消費(fèi)者(不管你受不受得了) 而現(xiàn)在
是在我消費(fèi)者消費(fèi)之前 先告訴rabbitmq 我一次能消費(fèi)多少數(shù)據(jù) 等我消費(fèi)完了之后告訴rabbitmq rabbitmq再給
我發(fā)送數(shù)據(jù)
在代碼中如何體現(xiàn)?
在使用消息預(yù)取前 要注意一定要設(shè)置為手動(dòng)確認(rèn)消息, 原因參考上面劃重點(diǎn)的那句話。
因?yàn)槲覀儎倓傇O(shè)置過了 這里就不貼代碼了, 完了之后設(shè)置一下我們預(yù)取消息的數(shù)量 一樣 是在容器(Container)
里面設(shè)置:

@Configuration
public class MyRabbitListenerContainerConif {
    @Bean
    public SimpleRabbitListenerContainerFactory
    simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =
                new SimpleRabbitListenerContainerFactory();
        //這個(gè)connectionFactory就是我們自己配置的連接工廠直接注入進(jìn)來
        simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
        //這邊設(shè)置消息確認(rèn)方式由自動(dòng)確認(rèn)變?yōu)槭謩?dòng)確認(rèn)
        simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //設(shè)置消息預(yù)取得數(shù)量
        simpleRabbitListenerContainerFactory.setPrefetchCount(2);
        return simpleRabbitListenerContainerFactory;
    }

}

@Component
public class TestListener {
    //@RabbitListener(queues = {"testQueue5","testQueue6"})
    public void reciveMessage(String msg){
        System.out.println("listener....:"+msg);
    }
    @RabbitListener(queues = {"testQueue5","testQueue6"},containerFactory = "simpleRabbitListenerContainerFactory")
    public void getMessage(Message message, Channel channel) throws Exception{
        System.out.println("list manul ack:"+(new String(message.getBody(),"UTF-8")));
        Thread.sleep(5000l);
        //消息消費(fèi)成功后調(diào)用第一個(gè)參數(shù)是消息的標(biāo)識字段,第二個(gè)是否是批量確認(rèn):false不是批量,true是批量
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        //消費(fèi)失敗后調(diào)用這個(gè)方法告知消息隊(duì)列最后一個(gè)參數(shù):是否返回原隊(duì)列
       //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    }
}

死信交換機(jī)

image.png
package com.luban.mall.search.mq;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitmqConfig {
    @Autowired
    private ConnectionFactory connectionFactory;
    
    @Bean
    public DirectExchange deathExchange() {
        return new DirectExchange("deathExchange");
    }
   
    @Bean
    public Queue queue6() {
        Map<String, Object> map = new HashMap();
        //設(shè)置消息的過期時(shí)間 單位毫秒
        map.put("x-message-ttl", 100000);
        //設(shè)置附帶的死信交換機(jī)
        map.put("x-dead-letter-exchange", "deathExchange");
        //指定重定向的路由建 消息作廢之后可以決定需不需要更改他的路由建 如果需要 就在這里指定
        map.put("x-dead-letter-routing-key", "drkey1");
        return new Queue("testQueue6", true,false,false,map);
    }
    @Bean
    public Queue queue7() {
        return new Queue("deathQueue", true);
    }
    @Bean
    public Binding binding7(){
        return BindingBuilder.bind(queue7()).to(deathExchange()).with("drkey1");
    }

    @Bean
    public Binding binding6(){
        return BindingBuilder.bind(queue6()).to(topicExchange()).with("#.user");
    }
    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMandatory(true);
        template.setReturnCallback(new MyReturnCallback());
        template.setConfirmCallback(new MyPublisherConfirmCallback());
        return template;
    }
}

怎么保證消息不丟失、不重復(fù)消費(fèi)問題

https://www.cnblogs.com/jis121/p/11050202.html

rabbitmq集群搭建,配置

rabbbitmq由于是由erlang語言開發(fā)的 天生就支持分布式
rabbitmq 的集群分兩種模式 一種是默認(rèn)模式 一種是鏡像模式
當(dāng)然 所謂的鏡像模式是基于默認(rèn)模式加上一定的配置來的
在rabbitmq集群當(dāng)中 所有的節(jié)點(diǎn)(一個(gè)rabbitmq服務(wù)器) 會(huì)被歸為兩類 一類是磁盤節(jié)點(diǎn) 一類是內(nèi)存節(jié)點(diǎn)
磁盤節(jié)點(diǎn)會(huì)把集群的所有信息(比如交換機(jī),隊(duì)列等信息)持久化到磁盤當(dāng)中,而內(nèi)存節(jié)點(diǎn)只會(huì)將這些信息保存到內(nèi)存
當(dāng)中 講白了 重啟一遍就沒了。
為了可用性考慮 rabbitmq官方強(qiáng)調(diào)集群環(huán)境至少需要有一個(gè)磁盤節(jié)點(diǎn), 而且為了高可用的話, 必須至少要有2個(gè)
磁盤節(jié)點(diǎn), 因?yàn)槿绻挥幸粋€(gè)磁盤節(jié)點(diǎn) 而剛好這唯一的磁盤節(jié)點(diǎn)宕機(jī)了的話, 集群雖然還是可以運(yùn)作, 但是不能
對集群進(jìn)行任何的修改操作(比如 隊(duì)列添加,交換機(jī)添加,增加/移除 新的節(jié)點(diǎn)等)
具體想讓rabbitmq實(shí)現(xiàn)集群, 我們首先需要改一下系統(tǒng)的hostname (因?yàn)閞abbitmq集群節(jié)點(diǎn)名稱是讀取
hostname的)
這里 我們模擬3個(gè)節(jié)點(diǎn) :
rabbitmq1
rabbitmq2
rabbitmq3
linux修改hostname命令: hostnamectl set-hostname [name]

修改后重啟一下 讓rabbitmq重新讀取節(jié)點(diǎn)名字

然后 我們需要讓每個(gè)節(jié)點(diǎn)通過hostname能ping通(記得關(guān)閉防火墻) 這里 我們可以修改修改一下hosts文件
關(guān)閉防火墻:
關(guān)閉防火墻 systemctl stop firewalld.service 禁止開機(jī)自啟 systemctl disable firewalld.service
接下來,我們需要將各個(gè)節(jié)點(diǎn)的.erlang.cookie文件內(nèi)容保持一致(文件路徑/var/lib/rabbitmq/.erlang.cookie)
因?yàn)槲沂遣捎锰摂M機(jī)的方式來模擬集群環(huán)境, 所以如果像我一樣是克隆的虛擬機(jī)的話 同步.erlang.cookie文件這個(gè)
操作在克隆的時(shí)候就已經(jīng)完成了。
上面這些步驟完成之后 我們就可以開始來構(gòu)建集群 了
我們先讓rabbitmq2 加入 rabbitmq1與他構(gòu)建為一個(gè)集群
執(zhí)行命令( ram:使rabbitmq2成為一個(gè)內(nèi)存節(jié)點(diǎn) 默認(rèn)為:disk 磁盤節(jié)點(diǎn)):
rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@rabbitmq1 --ram rabbitmqctl start_app
在構(gòu)建的時(shí)候 我們需要先停掉rabbitmqctl服務(wù)才能構(gòu)建 等構(gòu)建完畢之后再啟動(dòng)
我們吧rabbitmq2添加完之后在rabbitmq3節(jié)點(diǎn)上也執(zhí)行同樣的代碼 使他也加入進(jìn)去 當(dāng)然 我們也可以讓
rabbitmq3也作為一個(gè)磁盤節(jié)點(diǎn)
當(dāng)執(zhí)行完操作以后我們來看看效果:


image.png

隨便在哪個(gè)節(jié)點(diǎn)打開管理頁面都能看到集群環(huán)境各節(jié)點(diǎn)的信息;
有關(guān)集群的其他命令:
rabbitmq-server -detached 啟動(dòng)RabbitMQ節(jié)點(diǎn) rabbitmqctl start_app 啟動(dòng)RabbitMQ應(yīng)用,而不是節(jié)點(diǎn)
rabbitmqctl stop_app 停止 rabbitmqctl status 查看狀態(tài) rabbitmqctl add_user mq 123456 rabbitmqctl
set_user_tags mq administrator 新增賬戶 rabbitmq-plugins enable rabbitmq_management 啟用
RabbitMQ_Management rabbitmqctl cluster_status 集群狀態(tài) rabbitmqctl forget_cluster_node
rabbit@[nodeName] 節(jié)點(diǎn)摘除 rabbitmqctl reset application 重置
普通模式的rabbitmq集群搭建好后, 我們來說一下鏡像模式
在普通模式下的rabbitmq集群 他會(huì)吧所有節(jié)點(diǎn)的交換機(jī)信息 和隊(duì)列的元數(shù)據(jù)(隊(duì)列數(shù)據(jù)分為兩種 一種為隊(duì)列里面
的消息, 另外一種是隊(duì)列本身的信息 比如隊(duì)列的最大容量,隊(duì)列的名稱,等等配置信息, 后者稱之為元數(shù)據(jù)) 進(jìn)
行復(fù)制 確保所有節(jié)點(diǎn)都有一份。
而鏡像模式,則是吧所有的隊(duì)列數(shù)據(jù)完全同步(當(dāng)然 對性能肯定會(huì)有一定影響) 當(dāng)對數(shù)據(jù)可靠性要求高時(shí) 可以使
用鏡像模式
實(shí)現(xiàn)鏡像模式也非常簡單 有2種方式 一種是直接在管理臺控制, 另外一種是在聲明隊(duì)列的時(shí)候控制
聲明隊(duì)列的時(shí)候可以加入鏡像隊(duì)列參數(shù) 在上方的參數(shù)列表當(dāng)中有解釋 我們來講一下管理臺控制
鏡像隊(duì)列配置命令解釋:
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可選參數(shù),針對指定vhost下的queue進(jìn)行設(shè)置 Name: policy的名稱 Pattern: queue的匹配模式(正
則表達(dá)式) Definition:鏡像定義,包括三個(gè)部分ha-mode, ha-params, ha-sync-mode ha-mode:指明鏡像隊(duì)
列的模式,有效值為 all/exactly/nodes all:表示在集群中所有的節(jié)點(diǎn)上進(jìn)行鏡像 exactly:表示在指定個(gè)數(shù)的
節(jié)點(diǎn)上進(jìn)行鏡像,節(jié)點(diǎn)的個(gè)數(shù)由ha-params指定 nodes:表示在指定的節(jié)點(diǎn)上進(jìn)行鏡像,節(jié)點(diǎn)名稱通過ha-
params指定 ha-params:ha-mode模式需要用到的參數(shù) ha-sync-mode:進(jìn)行隊(duì)列中消息的同步方式,有效
值為automatic和manual
這里舉個(gè)例子 如果想配置所有名字開頭為 policy的隊(duì)列進(jìn)行鏡像 鏡像數(shù)量為1那么命令如下:
rabbitmqctl set_policy ha_policy "^policy_" '{"ha-mode":"exactly","ha-params":1,"ha-sync-
mode":"automatic"}'

怎么保證消息不丟失

消息發(fā)布方:
1.創(chuàng)建隊(duì)列時(shí),設(shè)置隊(duì)列的持久化
2.發(fā)送消息是設(shè)置消息的模式deliveryMode=2,持久化
3.利用發(fā)送方ack機(jī)制,發(fā)送前在redis中維護(hù)id-發(fā)送狀態(tài)[0-未發(fā)送,1-已發(fā)送,2-發(fā)送成功,3-發(fā)送失敗],時(shí)間戳
2-發(fā)送成功的移除集合,對3-發(fā)送失敗和1-已發(fā)送但超過規(guī)定時(shí)間未ack的消息,開啟一個(gè)新的線程或任務(wù)定期掃描并發(fā)送。
消息接收方:
1.開啟手動(dòng)確認(rèn)機(jī)制。
2.消費(fèi)消息后還未手動(dòng)確認(rèn),系統(tǒng)掛掉了(這種情況不用考慮,消息會(huì)從unacked狀態(tài)重新入隊(duì)改為redy狀態(tài))
3.消息消費(fèi)時(shí)出現(xiàn)業(yè)務(wù)異常的情況,手動(dòng)ack消息失敗,放入原隊(duì)列或者重定向到死信隊(duì)列。

怎么保證消息的順序性

1.要實(shí)現(xiàn)嚴(yán)格的順序消息,簡單且可行的辦法就是:保證生產(chǎn)者 - MQServer - 消費(fèi)者是一對一對一的關(guān)系
2.設(shè)定相關(guān)的路由鍵,把強(qiáng)相關(guān)的數(shù)據(jù)分配到同一個(gè)隊(duì)列,一個(gè)隊(duì)列對應(yīng)一個(gè)消費(fèi)者。
面試題解答:

1.rabbitmq
應(yīng)用場景:
質(zhì)控分組

怎么保證消息不丟失:
1.搭建高可用集群,設(shè)置多個(gè)磁盤節(jié)點(diǎn)保證元數(shù)據(jù)不丟失
2.創(chuàng)建持久化隊(duì)列,創(chuàng)建鏡像隊(duì)列
3.發(fā)送消息時(shí)指定消息是持久化消息
4.利用消息發(fā)送端的消息確認(rèn)回調(diào)機(jī)制,比如在redis中維護(hù)消息id與發(fā)送狀態(tài)的記錄,ack確認(rèn)發(fā)送成功的清除掉,發(fā)送失敗的,redis中更新失敗狀態(tài)
5.消費(fèi)端開啟手動(dòng)確認(rèn),業(yè)務(wù)異常情況 存入數(shù)據(jù)庫,存庫失敗,放入死信隊(duì)列。

怎么保證消息不重復(fù)消費(fèi)?
消息冪等性問題:消費(fèi)消息時(shí)判斷該消息是否已經(jīng)處理過,處理過就不在處理。(比如你存庫已經(jīng)存了那這個(gè)時(shí)候 再過來這個(gè)消息 要么就不處理)

怎么保證消息的有序性?
 一個(gè)消息隊(duì)列只開啟一個(gè)消費(fèi)端:或者通過路由間把強(qiáng)關(guān)聯(lián)的消息發(fā)送到同一個(gè)隊(duì)列,然后每個(gè)消費(fèi)端跟隊(duì)列一一對應(yīng)。
 
 怎么解決消息堆積問題?
 大量消息堆積:
 造成消息堆積有兩種原因:
 1.很大原因是消費(fèi)端掛了或者消費(fèi)端的處理能力比較差?
  臨時(shí)借調(diào)10倍的機(jī)器,部署消費(fèi)端進(jìn)行消費(fèi)
 2.消費(fèi)端bug問題
,如果是消費(fèi)端bug問題,那就寫一個(gè)臨時(shí)的消費(fèi)消息的應(yīng)用 把消息釋放掉,等bug修復(fù)之后,重新再服務(wù)端生成消息。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容