2022最新 Rabbitmq 單消費端和單發(fā)布端代碼

基于默認(rèn)的direct(即路由)模式

yml的rabbitmq配置


image.png

引入maven依賴


image.png

1.自定義消息類

package com.zkmeiling.biz.rabbitmq;

import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.Date;
import java.util.List;

/**
 * 箱任務(wù)博
 *
 * @author WeiWei
 * @date 2022/10/17
 */
@NoArgsConstructor
@Data
public class BoxTaskBo  {
    private String request;
    private Date time;
    private Date data;


}


2.開啟手動ack模式并準(zhǔn)備相關(guān)的處理回調(diào)函數(shù)
2.1---ConfirmCallback 回調(diào)函數(shù)類( 發(fā)布端----send消息--> broker[由交換機exchang和隊列queue組成 {只要進入exchange就觸發(fā)ConfirmCllback }] )

package com.zkmeiling.biz.rabbitmq;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {


    /**
     * Confirmation callback.
     *
     * @param correlationData correlation data for the callback.
     * @param ack             true for ack, false for nack
     * @param cause           An optional cause, for nack, when available, otherwise null.
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("bbk樣本庫消息發(fā)送異常!");
        } else {
            log.info("bbk樣本庫發(fā)送者爸爸已經(jīng)收到確認(rèn),correlationData={} ,ack={}, cause={}", correlationData, ack, cause);
        }
    }
}

2.2---ReturnsCallback 回調(diào)函數(shù)類( 發(fā)布端----send消息--> broker[由交換機exchang和隊列queue組成 {進入exchange 但未投遞到queqe就觸發(fā)ReturnsCallback }] )

package com.zkmeiling.biz.rabbitmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback{

    /**
     * Returned message callback.
     *
     * @param returned the returned message and metadata.
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("bbk樣本庫returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", returned.getReplyCode(), returned.getReplyText(), returned.getExchange(), returned.getRoutingKey());
    }
}

3.發(fā)布端配置
下圖是發(fā)布端服務(wù)(簡單)相關(guān)目錄結(jié)構(gòu)

image.png

3.1發(fā)布端(生產(chǎn)端配置代碼)

package com.zkmeiling.biz.rabbitmq;


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//聲明交換機和需要綁定的隊列(這部分也可單獨拿出做一個rabbitmq配置供應(yīng)端)
@Configuration
public class RabbitmqQueueConfig {


    /**
     * 聲明樣本庫管理系統(tǒng)->工控系統(tǒng)隊列 支持持久化
     */
    @Bean(name = "bbkToIpcQueue")
    public Queue BbkToIpcQueue() {
        return QueueBuilder.durable("zkmeiling.bbk.to.ipc").build();
    }

    //聲明交換機
    @Bean(name = "directExchange")
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange("zkmeiling.direct").durable(true).build();
    }

    //聲明交換機和隊列的綁定關(guān)系
    @Bean
    public Binding bbkToIpcBinding(
            @Qualifier("directExchange") DirectExchange directExchange,
            @Qualifier("bbkToIpcQueue") Queue bbkToIpcQueue) {
        return BindingBuilder.bind(bbkToIpcQueue).to(directExchange).with("bbk2ipc_routing_key");
    }

}

3.2---發(fā)布端具體發(fā)送消息代碼

package com.zkmeiling.biz.rabbitmq;


import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.UUID;

@RestController
public class PublishController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ConfirmCallbackService confirmCallbackService;

    @Autowired
    private ReturnCallbackService returnCallbackService;



    @PostMapping("frozenBoxPutInStore")
    public void frozenBoxPutInStore(@RequestBody BoxTaskBo boxs){
        sendMessage("zkmeiling.direct","bbk2ipc_routing_key",boxs);
    }


    public void sendMessage(String exchange, String routingKey, BoxTaskBo msg) {

//        /**
//         * 確保消息發(fā)送失敗后可以重新返回到隊列中
//         * 注意:yml需要配置 publisher-returns: true
//         */
//        rabbitTemplate.setMandatory(true);

        /**
         * 消費者確認(rèn)收到消息后,手動ack回執(zhí)回調(diào)處理
         */
        rabbitTemplate.setConfirmCallback(confirmCallbackService);

        /**
         * 消息投遞到隊列失敗回調(diào)處理
         */
        rabbitTemplate.setReturnsCallback(returnCallbackService);

        /**
         * 開啟Jackson2JsonMessageConverter的消息轉(zhuǎn)換器(進行msg編碼操作)
         */
       rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());


        /**
         * 發(fā)送消息
         */
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {
            //這里是對msg的aop做一些前置處理操作
            //下面是為了解決
  <----原理說明--->
            實體類轉(zhuǎn)換異常

            failed to resolve class name.
            解決方案

            1.依賴系統(tǒng)1的jar包,直接使用類A來接收
            2.不依賴系統(tǒng)1的jar包,自己建一個和A一模一樣的類,連名稱,包路徑都一樣
            3.負(fù)責(zé)監(jiān)聽 queue 的類實現(xiàn) MessageListener 接口,直接接收 Message 類,再自己轉(zhuǎn)
                    顯然都不夠好也不是自己想要的

            在 JsonMessageConverter 的 fromMessage 方法中有這么一段:

            f (getClassMapper() == null) {
                JavaType targetJavaType = getJavaTypeMapper()
                        .toJavaType(message.getMessageProperties());
                content = convertBytesToObject(message.getBody(), encoding, targetJavaType);
            } else {
                Class<?> targetClass = getClassMapper().toClass(
                        message.getMessageProperties());
                content = convertBytesToObject(message.getBody(), encoding, targetClass);
            }
            就是說默認(rèn)情況下,JsonMessageConverter 使用的 ClassMapper 是 DefaultJackson2JavaTypeMapper,在轉(zhuǎn)換時通過 Message 的 Properties 來獲取要轉(zhuǎn)換的目標(biāo)類的類型。通過 Debug 可以發(fā)現(xiàn),目標(biāo)類的類型是存儲在 Message 的 Proterties 的 一個 headers 的 Map 中,Key 叫“__TypeId__”。所以只要想辦法在傳輸消息時更改__TypeId__的值即可。
  <----原理說明--->


            message.getMessageProperties().getHeaders().put("__TypeId__","com.example.rabbitmqdemo.BoxTaskBo");
            return message;
        } ,new CorrelationData(UUID.randomUUID().toString()));
        System.out.println("bbk樣本庫exchange = " + exchange);
    }
}

4.消費端配置
下圖是消費端服務(wù)(簡單)相關(guān)目錄結(jié)構(gòu)

image.png

4.1---消費端配置代碼

package com.example.rabbitmqdemo;


import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //設(shè)置開啟Jackson2JsonMessageConverter
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //設(shè)置開啟手動確認(rèn)ack
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
}

4.1---消費端具體接收消息消費業(yè)務(wù)代碼

package com.example.rabbitmqdemo;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
@RabbitListener(queues = "zkmeiling.bbk.to.ipc")
public class ReceiverMessage1 {

    @RabbitHandler
    public void processHandler(@Payload BoxTaskBo msg, Channel channel, Message message) throws IOException {

        try {
            log.info("ipc工業(yè)端小富收到消息:{}", msg);

            //TODO 具體業(yè)務(wù)

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }  catch (Exception e) {

            if (message.getMessageProperties().getRedelivered()) {

                log.error("ipc工業(yè)端消息已重復(fù)處理失敗,拒絕再次接收...");

                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
            } else {

                log.error("ipc工業(yè)端消息即將再次返回隊列處理...");

                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }

}

驗證下
1.postmain 模擬發(fā)送請求

image.png

2.發(fā)布端:發(fā)送成功日志


image.png

3.消費端: 接收成功日志


image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容