RabbitMQ簡(jiǎn)單使用:
參考:http://www.itdecent.cn/p/dca01aad6bc8
(springboot + rabbitmq發(fā)送郵件(保證消息100%投遞成功并被消費(fèi)))
對(duì)于RabbitMQ進(jìn)行對(duì)象傳輸?shù)那榫?,添加如下的配置,注意?shí)體類實(shí)現(xiàn)序列化接口
發(fā)送端:
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
接收端:
@Bean
public MessageConverter messageConverter() {
return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter());
}
這里的簡(jiǎn)單demo由消息發(fā)送端和消息消費(fèi)端構(gòu)成,分別對(duì)應(yīng)一個(gè)工程,實(shí)現(xiàn)消息確認(rèn)和對(duì)象發(fā)送 接收。
消息確認(rèn)消費(fèi)總體來(lái)說(shuō),就是發(fā)送端確認(rèn)兩點(diǎn):1.成功發(fā)送到ExChange 2.成功由Exchange發(fā)送到Queue。消費(fèi)端確認(rèn)一點(diǎn):在執(zhí)行完所有的邏輯后,通過(guò)channel.basicAck確認(rèn)已經(jīng)消費(fèi)

1570522744531.png
application.properties
server.port=9000
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#Whether to enable publisher confirms
spring.rabbitmq.publisher-confirms=true
#Whether to enable publisher returns
spring.rabbitmq.publisher-returns=true
package com.myrabbit.simplesender;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author huangQiChang
* 對(duì)RabbitTemplate進(jìn)行定制,設(shè)置jsckson 成功發(fā)送到exChange的回調(diào)函數(shù) exChange發(fā)送到queue失敗的回調(diào)函數(shù)
*/
@Configuration
public class RabbitMQConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 消息是否成功發(fā)送到Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println(correlationData);
if (ack) {
System.out.println("收到ack");
}
});
// 觸發(fā)setReturnCallback回調(diào)必須設(shè)置mandatory=true, 否則Exchange沒(méi)有找到Queue就會(huì)丟棄掉消息, 而不會(huì)觸發(fā)回調(diào)
rabbitTemplate.setMandatory(true);
// 消息是否從Exchange路由到Queue, 注意: 這是一個(gè)失敗回調(diào), 只有消息從Exchange路由到Queue失敗才會(huì)回調(diào)這個(gè)方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println(String.format("===%s===", "returnCallBack"));
System.out.println("message = " + message);
System.out.println("replyCode = " + replyCode);
System.out.println("replyText = " + replyText);
System.out.println("exchange = " + exchange);
System.out.println("routingKey = " + routingKey);
});
return rabbitTemplate;
}
}
package com.myrabbit.simplesender;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author huangQiChang
* 設(shè)置交換機(jī) 消息隊(duì)列 以及相關(guān)的綁定信息
*/
@Configuration
public class RabbitMQComponentConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("myEx");
}
@Bean
public Queue queue() {
return new Queue("myQ.test");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(topicExchange()).with("myQ.*");
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("myDire",false,true);
}
@Bean
public Queue queue_not_in() {
return new Queue("myDire.notIn");
}
@Bean
public Binding directExchange_queue_not_in_binding() {
return BindingBuilder.bind(queue_not_in()).to(directExchange()).with("myDire.notIn");
}
}
package com.myrabbit.simplesender.sender;
import com.myrabbit.simplesender.pojo.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author huangQiChang
* @date 2019/10/8
*/
@Component
public class QueueSender {
@Autowired
private RabbitTemplate rabbitTemplate;
// 測(cè)試topic模式
public void send() {
rabbitTemplate.convertAndSend("myEx", "myQ.t", "this is a test msg");
}
// 測(cè)試direct模式,沒(méi)queue匹配的情況
public void sendDirect() {
rabbitTemplate.convertAndSend("myEx", "qwe", "direct msg test");
}
public void sendObj() {
User user = new User();
user.setUserName("huangSir");
user.setDesc("user's desc");
rabbitTemplate.convertAndSend("myEx","myQ.test",user);
}
}
?
消息消費(fèi)端
application.properties
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@Configuration
public class RabbitConfig {
@Bean
public MessageConverter messageConverter() {
return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter());
}
}
package com.mpt.simplelistener.listener;
import com.mpt.simplelistener.pojo.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author huangQiChang
* @date 2019/10/8
*/
@Component
public class MyListener {
@RabbitListener(queues = "myQ.test")
public void consume(Message msg, Channel channel, User user) throws IOException {
System.out.println("user = " + user);
MessageProperties properties = msg.getMessageProperties();
// 發(fā)布的每一條消息都會(huì)獲得一個(gè)唯一的deliveryTag,
// (任何channel上發(fā)布的第一條消息的deliveryTag為1,此后的每一條消息都會(huì)加1),
// deliveryTag在channel范圍內(nèi)是唯一的
long tag = properties.getDeliveryTag();
System.out.println("tag = " + tag);
// 這里multiple參數(shù)的含義是
// true:將小于該tag值的消息確認(rèn)
// falase:只確認(rèn)當(dāng)前消息
channel.basicAck(tag, false);
// 出錯(cuò)了,通知MQ把消息塞回的隊(duì)列頭部(不是尾部)
// channel.basicNack(tag,false,true);
}
}