rabbitMQ基本(消息確認(rèn) 對(duì)象傳輸)

RabbitMQ簡(jiǎn)單使用:

參考:http://www.itdecent.cn/p/dca01aad6bc8
(springboot + rabbitmq發(fā)送郵件(保證消息100%投遞成功并被消費(fèi)))

對(duì)于RabbitMQ進(jìn)行對(duì)象傳輸?shù)那榫?,添加如下的配置,注意?shí)體類實(shí)現(xiàn)序列化接口

發(fā)送端:

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

接收端:

@Bean
public MessageConverter messageConverter() {
    return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter());
}

這里的簡(jiǎn)單demo由消息發(fā)送端和消息消費(fèi)端構(gòu)成,分別對(duì)應(yīng)一個(gè)工程,實(shí)現(xiàn)消息確認(rèn)和對(duì)象發(fā)送 接收。

消息確認(rèn)消費(fèi)總體來(lái)說(shuō),就是發(fā)送端確認(rèn)兩點(diǎn):1.成功發(fā)送到ExChange 2.成功由Exchange發(fā)送到Queue。消費(fèi)端確認(rèn)一點(diǎn):在執(zhí)行完所有的邏輯后,通過(guò)channel.basicAck確認(rèn)已經(jīng)消費(fèi)

1570522744531.png

application.properties

server.port=9000
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#Whether to enable publisher confirms
spring.rabbitmq.publisher-confirms=true
#Whether to enable publisher returns
spring.rabbitmq.publisher-returns=true
package com.myrabbit.simplesender;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author huangQiChang
 * 對(duì)RabbitTemplate進(jìn)行定制,設(shè)置jsckson 成功發(fā)送到exChange的回調(diào)函數(shù) exChange發(fā)送到queue失敗的回調(diào)函數(shù)
 */
@Configuration
public class RabbitMQConfig {

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

// 消息是否成功發(fā)送到Exchange
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println(correlationData);
            if (ack) {
                System.out.println("收到ack");
            }
        });
// 觸發(fā)setReturnCallback回調(diào)必須設(shè)置mandatory=true, 否則Exchange沒(méi)有找到Queue就會(huì)丟棄掉消息, 而不會(huì)觸發(fā)回調(diào)
        rabbitTemplate.setMandatory(true);

// 消息是否從Exchange路由到Queue, 注意: 這是一個(gè)失敗回調(diào), 只有消息從Exchange路由到Queue失敗才會(huì)回調(diào)這個(gè)方法
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println(String.format("===%s===", "returnCallBack"));
            System.out.println("message = " + message);
            System.out.println("replyCode = " + replyCode);
            System.out.println("replyText = " + replyText);
            System.out.println("exchange = " + exchange);
            System.out.println("routingKey = " + routingKey);
        });
        return rabbitTemplate;
    }
}

package com.myrabbit.simplesender;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author huangQiChang
 * 設(shè)置交換機(jī) 消息隊(duì)列 以及相關(guān)的綁定信息
 */
@Configuration
public class RabbitMQComponentConfig {

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("myEx");
    }

    @Bean
    public Queue queue() {
        return new Queue("myQ.test");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(topicExchange()).with("myQ.*");
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("myDire",false,true);
    }

    @Bean
    public Queue queue_not_in() {
        return new Queue("myDire.notIn");
    }

    @Bean
    public Binding directExchange_queue_not_in_binding() {
        return BindingBuilder.bind(queue_not_in()).to(directExchange()).with("myDire.notIn");
    }
}

package com.myrabbit.simplesender.sender;

import com.myrabbit.simplesender.pojo.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author huangQiChang
 * @date 2019/10/8
 */
@Component
public class QueueSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

//    測(cè)試topic模式
    public void send() {
        rabbitTemplate.convertAndSend("myEx", "myQ.t", "this is a test msg");
    }
//    測(cè)試direct模式,沒(méi)queue匹配的情況
    public void sendDirect() {
        rabbitTemplate.convertAndSend("myEx", "qwe", "direct msg test");
    }

    public void sendObj() {
        User user = new User();
        user.setUserName("huangSir");
        user.setDesc("user's desc");
        rabbitTemplate.convertAndSend("myEx","myQ.test",user);
    }
}

?

消息消費(fèi)端

application.properties

spring.rabbitmq.listener.simple.acknowledge-mode=manual
@Configuration
public class RabbitConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter());
    }
}

package com.mpt.simplelistener.listener;

import com.mpt.simplelistener.pojo.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author huangQiChang
 * @date 2019/10/8
 */
@Component
public class MyListener {

    @RabbitListener(queues = "myQ.test")
    public void consume(Message msg, Channel channel, User user) throws IOException {
        System.out.println("user = " + user);

        MessageProperties properties = msg.getMessageProperties();
//        發(fā)布的每一條消息都會(huì)獲得一個(gè)唯一的deliveryTag,
//        (任何channel上發(fā)布的第一條消息的deliveryTag為1,此后的每一條消息都會(huì)加1),
//        deliveryTag在channel范圍內(nèi)是唯一的
        long tag = properties.getDeliveryTag();
        System.out.println("tag = " + tag);
//        這里multiple參數(shù)的含義是
//        true:將小于該tag值的消息確認(rèn)
//        falase:只確認(rèn)當(dāng)前消息
        channel.basicAck(tag, false);
//        出錯(cuò)了,通知MQ把消息塞回的隊(duì)列頭部(不是尾部)
//        channel.basicNack(tag,false,true);
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容