SpringBoot整合RabbitMQ,常用操作

本文介紹三種常用操作,基于spring-boot-starter-amqp依賴

  • 手動ack
  • work模式(能者多勞)
  • 消息格式轉(zhuǎn)換

手動ack

消息確認(rèn)模式

在amqp協(xié)議中消息確認(rèn)有兩種模式

  1. 自動確認(rèn)模式(automatic acknowledgement model)當(dāng)消息代理將消息發(fā)送給應(yīng)用后立即刪除

  2. 顯式確認(rèn)模式(explicit acknowledgement model)待應(yīng)用發(fā)送一個(gè)確認(rèn)回執(zhí)后再刪除消息

而在spring-boot-starter-amqp,spring定義了三種

  1. NONE 沒有ack的意思,對應(yīng)rabbitMQ的自動確認(rèn)模式

  2. MANUAL 手動模式,對應(yīng)rabbitMQ的顯式確認(rèn)模式

  3. AUTO 自動模式,對應(yīng)rabbitMQ的顯式確認(rèn)模式

首先注意的是spring-amqp中的自動模式與rabbit中的自動模式是不一樣的,其次,在spring-amqp中MANUAL 與 AUTO的關(guān)系有點(diǎn)類似于在spring中手動提交事務(wù)與自動提交事務(wù)的區(qū)別,一個(gè)是手動發(fā)送ack一個(gè)是在方法執(zhí)行完,沒有異常的情況下自動發(fā)送ack

代碼實(shí)現(xiàn)

三個(gè)步驟

  1. 設(shè)置消費(fèi)者的消息確認(rèn)模式

  2. 手動確認(rèn)/拒絕消息

  3. 設(shè)置消息拒絕策略

設(shè)置消費(fèi)者的消息確認(rèn)模式:

@Configuration
public class ListenerConfig {

   @Bean("myListenerFactory")
    public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory containerFactory= 
                new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(connectionFactory);
        //設(shè)置消費(fèi)者的消息確認(rèn)模式
        containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return containerFactory;
    }
}

手動確認(rèn)/拒絕消息:

@Component
@RabbitListener(
        containerFactory = "myListenerFactory",
        bindings = @QueueBinding(
            value = @Queue("myManualAckQueue"),
            exchange = @Exchange(value = "myManualAckExchange", type = ExchangeTypes.DIRECT),
            key = "mine.manual"))
public class MyAckListener {

    @RabbitHandler
    public void onMessage(@Payload String msg, 
                          @Headers Map<String, Object> headers, 
                          Channel channel) throws Exception{
        try {
            System.out.println(msg);
            //消息確認(rèn),(deliveryTag,multiple是否確認(rèn)所有消息)
            channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
        } catch (Exception e) {
            //消息拒絕(deliveryTag,multiple,requeue拒絕后是否重新回到隊(duì)列)
            channel.basicNack((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false, false);
            // 拒絕一條
            // channel.basicReject();
        }
    }
}

設(shè)置消息拒絕策略:

拒絕策略是指,當(dāng)消息被消費(fèi)者拒絕時(shí)該如何處理,丟棄或者是重新回到隊(duì)列.

在MANUAL 模式下,在拒絕消息的方法中設(shè)置

//消息拒絕(deliveryTag,multiple,requeue拒絕后是否重新回到隊(duì)列)
channel.basicNack((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false, false);

在AUTO 模式下可通過RabbitListenerContainerFactory或是ListenerContainer設(shè)置,如

@Bean("myListenerFactory")
    public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory containerFactory=
                new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(connectionFactory);
        //自動ack
        containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //拒絕策略,true回到隊(duì)列 false丟棄
        containerFactory.setDefaultRequeueRejected(false);
        return containerFactory;
    }

需要注意的是,默認(rèn)的拒絕策略是回到隊(duì)列,所以,如果隊(duì)列只有一個(gè)消費(fèi)者的話就會產(chǎn)生死循環(huán)

work模式-能者多勞

默認(rèn)情況下,如果有多個(gè)消費(fèi)者在一個(gè)隊(duì)列上,消息是公平的分發(fā)給消費(fèi)者的,一人一個(gè)輪著來,不考慮每個(gè)消費(fèi)者之間的處理能力的差異,這可以通過設(shè)置預(yù)處理消息數(shù)(prefetchCount)緩解,或是使用work-能者多勞模式

work-能者多勞模式: 每個(gè)消費(fèi)者的預(yù)處理消息數(shù)(prefetchCount)都設(shè)置為1,每個(gè)消費(fèi)者消息確認(rèn)都為顯式確認(rèn)模式,即MANUAL,或是AUTO

如下,兩個(gè)消費(fèi)者消費(fèi)同一個(gè)queue上的消息,理論上consumer-one處理能力是consumer-two的兩倍

@Component
public class WorkListener {
    private int one = 1;
    private int two = 1;

    @RabbitListener(containerFactory = "workListenerFactory",
            queuesToDeclare = @Queue("workQueue"))
    public void onMessageOne(String msg) throws InterruptedException {
        Thread.sleep(100);
        System.out.println("consumer-one 第 " + one + " 個(gè)消息 :" + msg);
        one++;
    }

    @RabbitListener(containerFactory = "workListenerFactory",
            queuesToDeclare = @Queue("workQueue"))
    public void onMessageTwo(String msg) throws InterruptedException {
        Thread.sleep(200);
        System.out.println("consumer-two 第 " + two + " 個(gè)消息 :" + msg);
        two++;
    }
}

生產(chǎn)者,使用了上一篇中介紹的默認(rèn)交換機(jī)

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private void send() {
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("workQueue", "this is a message");
        }
    }

執(zhí)行結(jié)果如下,符合預(yù)期,兩個(gè)消費(fèi)者幾乎同時(shí)消費(fèi)完畢,且one消費(fèi)的消息數(shù)是two的兩倍

......
consumer-two 第 31 個(gè)消息 :this is a message
consumer-one 第 62 個(gè)消息 :this is a message
consumer-one 第 63 個(gè)消息 :this is a message
consumer-two 第 32 個(gè)消息 :this is a message
consumer-one 第 64 個(gè)消息 :this is a message
consumer-one 第 65 個(gè)消息 :this is a message
consumer-two 第 33 個(gè)消息 :this is a message
consumer-one 第 66 個(gè)消息 :this is a message
consumer-two 第 34 個(gè)消息 :this is a message

消息格式轉(zhuǎn)換

rabbirMQ中的消息對應(yīng)到j(luò)ava中對應(yīng)的實(shí)體類是 org.springframework.amqp.core.Message,所以消息轉(zhuǎn)換接口MessageConverter 有兩個(gè)主要方法 toMessage 和 fromMessage 顧名思義,即將發(fā)送的內(nèi)容與Message的互轉(zhuǎn)

SimpleMessageConverter

spring中默認(rèn)使用的是 SimpleMessageConverter 它的兩個(gè)轉(zhuǎn)化方法如下

toMessage,根據(jù) object instanceof xxx 轉(zhuǎn)化


toMessage.png

fromMessage,根據(jù)MessageProperties的ContentType轉(zhuǎn)換

fromMessage.png

所以你大可以自己實(shí)現(xiàn)MessageConverter 接口自己轉(zhuǎn)換,當(dāng)然spring也提供了常用的轉(zhuǎn)化,如轉(zhuǎn)json,xml

Jackson2JsonMessageConverter

常用的將object與json互轉(zhuǎn)

生產(chǎn)者

@Autowired
private RabbitTemplate rabbitTemplate;

private void send() {
    //實(shí)際項(xiàng)目不建議這么干,spring單例模式,
    // 所以最好自己構(gòu)建一個(gè)"jasonRabbitTemplate",用的使用注入
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    rabbitTemplate.convertAndSend("jsonQueue", new Student("zhangSan",15,"男"));
}

消費(fèi)者

@Bean("jasonTemplate")
public RabbitTemplate jasonRabbitTemplate(ConnectionFactory connectionFactory) {
    Jackson2JsonMessageConverter messageConverter = 
        new Jackson2JsonMessageConverter();
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    //設(shè)置轉(zhuǎn)化類
    rabbitTemplate.setMessageConverter(messageConverter);
    return rabbitTemplate;
}

...

@Component
@RabbitListener(containerFactory = "jsonListenerFactory",
                queuesToDeclare = @Queue("jsonQueue"))
public class JasonListener {

    @RabbitHandler
    public void onMessage(Student student) {
        System.out.println(student);
    }
}

消息內(nèi)容:

json.png

轉(zhuǎn)載請注明出處
系列文章

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容