rabbitmq是消息隊(duì)列中的一種,以下是rabbitmq在spring boot中的集成和實(shí)現(xiàn)
spring boot中要集成rabbitmq很簡單,在maven添加依賴就可以了
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
接下來是rabbit一些屬性配置
srping:
rabbitmq:
host: 127.0.0.1
port: 15672
username: guest
password: guest
消息隊(duì)列和交換機(jī)配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
//聲明隊(duì)列
@Bean
public Queue queue1() {
return new Queue("queue1", true); // true表示持久化該隊(duì)列
}
@Bean
public Queue queue2() {
return new Queue("queue2");
}
//交換機(jī)
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
//綁定
@Bean
public Binding binding1() {
//通過key綁定
return BindingBuilder.bind(queue1()).to(topicExchange()).with("key1");
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(topicExchange()).with("key2");
}
}
我配置了兩個隊(duì)列分別和交換機(jī)進(jìn)行綁定,再來配置隊(duì)列發(fā)送者和接受者
@Component
public class Product {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send1(String message) {
System.out.println("queue1發(fā)送消息:" + message);
rabbitTemplate.convertAndSend("topicExchange", "key1", message);
}
public void send2(String message) {
System.out.println("queue2發(fā)送消息:" + message);
rabbitTemplate.convertAndSend("topicExchange", "key2", message);
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Customer {
@RabbitListener(queues = "queue1")
public void consumeMessage1(String message) {
System.out.println("queue1接收消息:"+ message);
}
@RabbitListener(queues = "queue2")
public void consumeMessage2(String message) {
System.out.println("queue2接收消息:"+ message);
}
}
配置完后進(jìn)行單元測試
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootRabbitmqApplicationTests {
@Autowired
private Product product;
@Test
public void send() {
for (int i = 0; i < 10; i++) {
product.send1("t1--" + i);
product.send2("t2--" + i);
}
}
}
單元測試結(jié)果

result.png
可能會出現(xiàn)消息發(fā)送完畢但是沒有被消費(fèi)完的情況,因?yàn)橄l(fā)送到隊(duì)列后單元測試就結(jié)束了,如果重新進(jìn)行單元測試,queue1被持久化所以上一次的消息能繼續(xù)被消費(fèi)queue2不能