Spring-Cloud RabbitMQ 用法 - 發(fā)送json對象

本篇主要介紹RabbitTemplate發(fā)送json對象的用法


1. 環(huán)境

spring-boot 2.1.1.RELEASE
spring-cloud Finchley.RELEASE
rabbitmq 3.7.10
spring-boot-starter-amqp

2. RabbitMQ的content_type

針對RabbitMQ的消息,最終存儲的都是字節(jié)數(shù)組,而我們在消費端,消息可能是字符串、對象等其它類型。我們發(fā)送時需要指定每個消息的content_type屬性,從而讓消費端知道如何把字節(jié)數(shù)組轉(zhuǎn)化成消息的原始類型。
例如,SimpleMessageConverter是默認的消息轉(zhuǎn)換器,以其作為例子:

  • content_type=text/plain的消息,直接new String,關(guān)鍵代碼為:
if (contentType != null && contentType.startsWith("text")) {
    String encoding = properties.getContentEncoding();
    if (encoding == null) {
        encoding = this.defaultCharset;
    }
    try {
        content = new String(message.getBody(), encoding);
    }
    catch (UnsupportedEncodingException e) {
        throw new MessageConversionException(
                "failed to convert text-based Message content", e);
    }
}
  • content_type=application/x-java-serialized-object,直接進行java反序列化,關(guān)鍵代碼為:
else if (contentType != null &&
        contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) {
    try {
        content = SerializationUtils.deserialize(
                createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
    }
    catch (IOException | IllegalArgumentException | IllegalStateException e) {
        throw new MessageConversionException(
                "failed to convert serialized Message content", e);
    }
}

我們從RabbitMQ的web頁面可以看到每個消息的content_type

image.png

正因為content_type的存在,RabbitMQ才能支持消息的多樣性。
在發(fā)送數(shù)據(jù),有兩種方式可以指定content_type

  1. 自己構(gòu)建消息對象Message,通過MessageProperties設(shè)置content_type
  2. 通過MessageConverter來設(shè)置content_type

我們下面的例子都以Department作為json對象,請注意json對象必須包含無參構(gòu)造器。

public class Department {
    private Integer departmentId ;
    private String departmentName ;
    private String departmentDesc ;

    /**
     * 對于json對象,必須要有無參的構(gòu)造器
     */
    public Department() {
    }

    public Department(Integer departmentId, String departmentName, String departmentDesc) {
        this.departmentId = departmentId;
        this.departmentName = departmentName;
        this.departmentDesc = departmentDesc;
    }

    @Override
    public String toString() {
        return "Department{" +
                "departmentId=" + departmentId +
                ", departmentName='" + departmentName + '\'' +
                ", departmentDesc='" + departmentDesc + '\'' +
                '}';
    }

    //省略getter and setter
}

3. 發(fā)送Json方式1:自定義Message

這種方式需要自己把對象轉(zhuǎn)化為json字符串,創(chuàng)建消息的MessageProperties,設(shè)置content_type,最后構(gòu)建Message對象,并發(fā)送。
核心代碼如下:

@Component
public class JsonSender {
    Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 發(fā)送對象消息,使用的json序列化
     *  Department不需要實現(xiàn)序列號接口,也不需要生產(chǎn)者、消費者是同一個包下的同一個類。只需要屬性相同即可。
     */
    public void sendTest1(Department department)  {
        logger.info("---> 我準備發(fā)送json對象:"+department);

        /**
         * 發(fā)送json對象,必須為消息指定contentType,否則發(fā)送時只會使用默認的消息轉(zhuǎn)換器SimpleMessageConverter
         * 而SimpleMessageConverter只支持字符串、byte數(shù)字和java Serializable對象
         * 不指定就會在發(fā)送時報錯:IllegalArgumentException:SimpleMessageConverter only supports String, byte[] and Serializable payloads
         */

        ObjectMapper objectMapper = new ObjectMapper();
        try {
            //對象轉(zhuǎn)化為json字符串
            String jsonStr = objectMapper.writeValueAsString(department) ;
            //配置消息的屬性contentType
            MessageProperties properties = MessagePropertiesBuilder.newInstance()
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                    .build();
            //構(gòu)建消息對象
            Message message = MessageBuilder.withBody(jsonStr.getBytes())
                    .andProperties(properties)
                    .build();
            //發(fā)送
            this.rabbitTemplate.convertAndSend(RabbitmqBaseConfig.EX_SIMPLE_DIRECT, RabbitmqJsonConfig.KEY_SIMPLE_JSON, message); ;

        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
}

通過RabbitMQ可以查看消息,其content_type=application/json

RabbitMQ Message

這種方式的優(yōu)點是簡單明了,可以與發(fā)送字符串、java等共用同一個RabbitTemplate;缺點則是需要自己把對象轉(zhuǎn)化成字符串、構(gòu)建消息屬性和消息對象,代碼比較多。

4. 發(fā)送Json方式2:設(shè)置MessageConverter

RabbitTemplate在發(fā)送消息對象時,會通過MessageConverter把對象轉(zhuǎn)化為byte[],我們以其中一個convertAndSend方法作為入口,其源碼為:

    @Override
    public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
        convertAndSend(exchange, routingKey, object, (CorrelationData) null);
    }

繼續(xù)跟進

    @Override
    public void convertAndSend(String exchange, String routingKey, final Object object,
            @Nullable CorrelationData correlationData) throws AmqpException {

        send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
    }

繼續(xù)跟進convertMessageIfNecessary方法:

    protected Message convertMessageIfNecessary(final Object object) {
        if (object instanceof Message) {
            return (Message) object;
        }
        return getRequiredMessageConverter().toMessage(object, new MessageProperties());
    }

getRequiredMessageConverter方法,則是獲取RabbitTemplate內(nèi)部的MessageConverter:

    private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
        MessageConverter converter = getMessageConverter();
        if (converter == null) {
            throw new AmqpIllegalStateException(
                    "No 'messageConverter' specified. Check configuration of RabbitTemplate.");
        }
        return converter;
    }

    //省略其它代碼....

    /**
     * Return the message converter for this template. Useful for clients that want to take advantage of the converter
     * in {@link ChannelCallback} implementations.
     *
     * @return The message converter.
     */
    public MessageConverter getMessageConverter() {
        return this.messageConverter;
    }

RabbitTemplate默認的MessageConverter為SimpleMessageConverter,它不支持json,所以我們需要替換為支持json序列化的消息轉(zhuǎn)換器,而Spring也給我們提供了一個Jackson2JsonMessageConverter

  • Jackson2JsonMessageConverter引來與jackson的jar包
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.8</version>
</dependency>
  • 配置用于發(fā)送json對象的RabbitTemplate
    /**
     * 定義支持json的RabbitTemplate
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        //重寫發(fā)送端的消息轉(zhuǎn)換器,來替換默認的SimpleMessageConverter
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
  • 發(fā)送json對象示例代碼如下:

    /**
     * 發(fā)送對象消息,使用的json序列化
     */
    public void sendTest2(Department department)  {
        logger.info("---> 我準備發(fā)送json對象:"+department);

        /**
         * 發(fā)送json對象,必須為消息指定contentType,否則發(fā)送時只會使用默認的消息轉(zhuǎn)換器SimpleMessageConverter
         * 而SimpleMessageConverter只支持字符串、byte數(shù)字和java Serializable對象
         * 不指定就會在發(fā)送時報錯:IllegalArgumentException:SimpleMessageConverter only supports String, byte[] and Serializable payloads
         */
        //發(fā)送
        this.jsonRabbitTemplate.convertAndSend(RabbitmqBaseConfig.EX_SIMPLE_DIRECT, RabbitmqJsonConfig.KEY_SIMPLE_JSON, department); ;

    }

通過RabbitMQ可以查看消息:

RabbitMQ Json Message

寫在最后:

細心的同學(xué)可能已經(jīng)發(fā)現(xiàn),這兩種方式發(fā)送json的方式,在RabbitMQ內(nèi)還是有些不同(注意對比兩張圖片),通過Jackson2JsonMessageConverter發(fā)送的消息多了一個__TypeId__header屬性,其值為json對象的全限定類名。__TypeId__的作用主要為消費者進行消息轉(zhuǎn)化時提供依據(jù),下一節(jié)我們會繼續(xù)講它的作用。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容