選擇類似RabbitMQ全功能的消息代理。安裝消息代理后,以支持STOMP的情況情況運行服務(wù)。
開啟rabbitmq_web_stomp插件
我們在RabbitMQ上啟動rabbitmq_web_stomp插件
rabbitmq-plugins enable rabbitmq_stomp
rabbitmq-plugins enable rabbitmq_web_stomp_examples
rabbitmq-plugins enable rabbitmq_web_stomp

image.png
Springboot集成
github代碼:代碼
配置類 : RabbitMQConfig
package com.xjxxxc.rabbitmqdemo;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: Jason Xu
* @Date: 2019年3月30日
* @description: RabiitMQ配置類
*/
@Configuration
public class RabbitMQConfig {
/** 消息交換機(jī)的名字*/
public static final String EXCHANGE = "my-mq-exchange";
/** 隊列key1*/
public static final String ROUTINGKEY1 = "queue_one_key1";
/** 隊列key2*/
public static final String ROUTINGKEY2 = "queue_one_key2";
/**
* @Title: connectionFactory
* @Description: 配置鏈接信息
* @return ConnectionFactory
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); // 必須要設(shè)置
return connectionFactory;
}
/**
*
* 針對消費者配置
FanoutExchange: 將消息分發(fā)到所有的綁定隊列,無routingkey的概念
HeadersExchange :通過添加屬性key-value匹配
DirectExchange:按照routingkey分發(fā)到指定隊列
TopicExchange:多關(guān)鍵字匹配
*/
/**
* @Title: defaultExchange
* @Description: 配置消息交換機(jī)
* FanoutExchange: 將消息分發(fā)到所有的綁定隊列,無routingkey的概念
HeadersExchange :通過添加屬性key-value匹配
DirectExchange:按照routingkey分發(fā)到指定隊列
TopicExchange:多關(guān)鍵字匹配
* @return DirectExchange
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE, true, false);
}
/**
* @Title: queue
* @Description: 配置消息隊列1,針對消費者配置
* @return Queue
*/
@Bean
public Queue queue() {
//隊列持久
return new Queue("queue_one", true);
}
/**
* @Title: binding
* @Description: 將消息隊列1與交換機(jī)綁定,針對消費者配置
* @return
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(RabbitMQConfig.ROUTINGKEY1);
}
/**
* @Title: queue1
* @Description: 配置消息隊列2,針對消費者配置
* @return
*/
@Bean
public Queue queue1() {
//隊列持久
return new Queue("queue_one1", true);
}
/**
* 將消息隊列2與交換機(jī)綁定
* 針對消費者配置
* @return
*/
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(defaultExchange()).with(RabbitMQConfig.ROUTINGKEY2);
}
/**
* 接受消息的監(jiān)聽,這個監(jiān)聽會接受消息隊列1的消息
* 針對消費者配置
* @return
*/
@Bean
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設(shè)置確認(rèn)模式手工確認(rèn)
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認(rèn)消息成功消費
}
});
return container;
}
/**
* 接受消息的監(jiān)聽,這個監(jiān)聽會接受消息隊列1的消息
* 針對消費者配置
* @return
*/
@Bean
public SimpleMessageListenerContainer messageContainer2() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue1());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設(shè)置確認(rèn)模式手工確認(rèn)
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("queue1 收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認(rèn)消息成功消費
}
});
return container;
}
}
測試發(fā)送消息的控制層 : SendController
package com.xjxxxc.rabbitmqdemo;
import java.util.UUID;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author: Jason Xu
* @Date: 2019年3月30日
* @description: 測試RabbitMQ發(fā)送消息的Controller
*/
@RestController
public class SendController implements RabbitTemplate.ConfirmCallback {
private RabbitTemplate rabbitTemplate;
/**
* @Title: SendController
* @Description: 配置發(fā)送消息的rabbitTemplate,因為是構(gòu)造方法,所以不用注解Spring也會自動注入。
* @param rabbitTemplate
*/
public SendController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
//設(shè)置消費回調(diào)
this.rabbitTemplate.setConfirmCallback(this);
}
/**
* @Title: send1
* @Description: 向消息隊列1中發(fā)送消息
* @param msg
* @return String
*/
@RequestMapping("send1")
public String send1(String msg) {
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTINGKEY1, msg, correlationId);
return null;
}
/**
* @Title: send2
* @Description: 向消息隊列2中發(fā)送消息
* @param msg
* @return String
*/
@RequestMapping("send2")
public String send2(String msg) {
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTINGKEY2, msg, correlationId);
return null;
}
/**
* @Title: confirm
* @Description: 消息的回調(diào),主要是實現(xiàn)RabbitTemplate.ConfirmCallback接口
* 注意,消息回調(diào)只能代表消息發(fā)送成功,不能代表消息被成功處理
* @param correlationData
* @param ack
* @param cause
*/
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(" 回調(diào)id:" + correlationData);
if (ack) {
System.out.println("消息成功消費");
} else {
System.out.println("消息消費失敗:" + cause + "\n重新發(fā)送");
}
}
}
測試
- 啟動Application
- 瀏覽器訪問:
http://localhost:9876/send1?msg=helloworld
http://localhost:9876/send2?msg=helloworld
結(jié)果:(控制臺可查看如下信息)

image.png

image.png
備注:個人博客同步至簡書。