一.簡單隊列
1.配置pom文件,主要是添加spring-boot-starter-amqp的支持
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置application.properties文件
spring:
application:
name: spirng-boot-rabbitmq
rabbitmq:
host: 49.235.110.134
port: 5672
username: root
password: root
3.配置隊列
package cn.lovingliu.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author:LovingLiu
* @Description: 配置隊列
* @Date:Created in 2020-01-16
*/
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
return new Queue("q_rabbit");
}
}
4.生產(chǎn)者
package cn.lovingliu.rabbitmq.producer;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Author:LovingLiu
* @Description: 生產(chǎn)者
* @Date:Created in 2020-01-16
*/
@Component
public class Producer {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); //24小時制
String context = "hello " + date;
System.out.println("Sender : " + context);
//簡單對列的情況下routingKey即為Queue名
this.rabbitTemplate.convertAndSend("q_rabbit", context);
}
}
5.接收者
package cn.lovingliu.rabbitmq.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消費者
* @Date:Created in 2020-01-16
*/
@Component
@RabbitListener(queues = "q_rabbit")
public class Receiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
6.測試
package cn.lovingliu.rabbitmq.test;
import cn.lovingliu.rabbitmq.producer.Producer;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Author:LovingLiu
* @Description:
* @Date:Created in 2020-01-16
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
@Autowired
private Producer producer;
@org.junit.Test
public void oneToOne() throws Exception {
producer.send();
}
}

簡單隊列
二.工作隊列(work)
1.創(chuàng)建兩個消費者
cn.lovingliu.rabbitmq_work.consumer.ReceiverWork1
package cn.lovingliu.rabbitmq_work.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消費者1
* @Date:Created in 2020-01-16
*/
@Component
@RabbitListener(queues = "q_rabbit_work")
public class ReceiverWork1 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver1 : " + hello);
}
}
cn.lovingliu.rabbitmq_work.consumer.ReceiverWork2
package cn.lovingliu.rabbitmq_work.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消費者2
* @Date:Created in 2020-01-16
*/
@Component
@RabbitListener(queues = "q_rabbit_work")
public class ReceiverWork2 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver2 : " + hello);
}
}
2.消費者
package cn.lovingliu.rabbitmq_work.producer;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Author:LovingLiu
* @Description: 生產(chǎn)者
* @Date:Created in 2020-01-16
*/
@Component
public class Producer {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(int i) {
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小時制
String context = "hello " + i + " " + date;
System.out.println("Sender : " + context);
//簡單對列的情況下routingKey即為Q名
this.rabbitTemplate.convertAndSend("q_rabbit_work", context);
}
}
3.測試
package cn.lovingliu.rabbitmq_work.test;
import cn.lovingliu.rabbitmq_work.producer.Producer;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Author:LovingLiu
* @Description: work 工作隊列測試
* @Date:Created in 2020-01-16
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
@Autowired
private Producer producer;
@org.junit.Test
public void oneToMany() throws Exception {
for (int i=0;i<100;i++){
producer.send(i);
Thread.sleep(300);
}
}
}

三.Topic Exchange(主題模式)
topic 是RabbitMQ中最靈活的一種方式,可以根據(jù)routing_key自由的綁定不同的隊列.
1.配置隊列,綁定交換機
package cn.lovingliu.rabbitmq_topic.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author:LovingLiu
* @Description: 配置隊列,綁定交換機
* @Date:Created in 2020-01-16
*/
@Configuration
public class TopicRabbitConfig {
public final static String QUEUE_NAME_1 = "q_rabbit_topic_1";
public final static String QUEUE_NAME_2 = "q_rabbit_topic_2";
public final static String EXCHANGE_NAME = "my_exchange";
@Bean
public Queue queue1() {
return new Queue(TopicRabbitConfig.QUEUE_NAME_1);
}
@Bean
public Queue queue2() {
return new Queue(TopicRabbitConfig.QUEUE_NAME_2);
}
/**
* 聲明一個Topic類型的交換機
* @return
*/
@Bean
TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME);
}
/**
* 綁定Q到交換機,并且指定routingKey
* @param queue1
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessage(Queue queue1, TopicExchange exchange) {
return BindingBuilder.bind(queue1).to(exchange).with("topic.1.*");
}
@Bean
Binding bindingExchangeMessages(Queue queue2, TopicExchange exchange) {
return BindingBuilder.bind(queue2).to(exchange).with("topic.2.*");
}
}
2.創(chuàng)建2個消費者
package cn.lovingliu.rabbitmq_topic.consumer;
import cn.lovingliu.rabbitmq_topic.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消費者1
* @Date:Created in 2020-01-16
*/
@Component
@RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME_1)
public class ReceiverTopic1 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver1 : " + hello);
}
}
package cn.lovingliu.rabbitmq_topic.consumer;
import cn.lovingliu.rabbitmq_topic.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消費者2
* @Date:Created in 2020-01-16
*/
@Component
@RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME_2)
public class ReceiverTopic2 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver2 : " + hello);
}
}
3.消息發(fā)送者(生產(chǎn)者)
package cn.lovingliu.rabbitmq_topic.producer;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 生產(chǎn)者
* @Date:Created in 2020-01-16
*/
@Component
public class Producer {
private static final String EXCHANGE_NAME = "my_exchange";
@Autowired
private AmqpTemplate rabbitTemplate;
public void send1() {
String context = "hi, queue 1 message";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend(EXCHANGE_NAME, "topic.1.message", context);
}
public void send2() {
String context = "hi, queue 2 message";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend(EXCHANGE_NAME, "topic.2.messages", context);
}
}
4.測試
package cn.lovingliu.rabbitmq_topic.test;
import cn.lovingliu.rabbitmq_topic.producer.Producer;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Author:LovingLiu
* @Description: work 工作隊列測試
* @Date:Created in 2020-01-16
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
@Autowired
private Producer producer;
@org.junit.Test
public void send1() throws Exception {
producer.send1();
}
@org.junit.Test
public void send2() throws Exception {
producer.send2();
}
}

運行結(jié)果

4.Fanout Exchange(訂閱模式)
Fanout 就是我們熟悉的廣播模式或者訂閱模式,給Fanout交換機發(fā)送消息,綁定了這個交換機的所有隊列都收到這個消息。
1.配置隊列,綁定交換機
package cn.lovingliu.rabbitmq_fanout.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author:LovingLiu
* @Description: 配置隊列,綁定交換機
* @Date:Created in 2020-01-16
*/
@Configuration
public class FanoutRabbitConfig {
public final static String QUEUE_NAME_A = "q_rabbit_fanout_a";
public final static String QUEUE_NAME_B = "q_rabbit_fanout_b";
public final static String QUEUE_NAME_C = "q_rabbit_fanout_c";
public final static String EXCHANGE_NAME = "my_fanout_exchange";
@Bean
public Queue aQueue() {
return new Queue(QUEUE_NAME_A);
}
@Bean
public Queue bQueue() {
return new Queue(QUEUE_NAME_B);
}
@Bean
public Queue cQueue() {
return new Queue(QUEUE_NAME_C);
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}
@Bean
Binding bindingExchangeA(Queue aQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(aQueue).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue bQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(bQueue).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue cQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(cQueue).to(fanoutExchange);
}
}
2.創(chuàng)建3個消費者
cn.lovingliu.rabbitmq_fanout.consumer.ReceiverFanoutA
package cn.lovingliu.rabbitmq_fanout.consumer;
import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消費者A
* @Date:Created in 2020-01-17
*/
@Component
@RabbitListener(queues = FanoutRabbitConfig.QUEUE_NAME_A)
public class ReceiverFanoutA {
@RabbitHandler
public void process(String hello) {
System.out.println("AReceiver : " + hello + "/n");
}
}
cn.lovingliu.rabbitmq_fanout.consumer.ReceiverFanoutB
package cn.lovingliu.rabbitmq_fanout.consumer;
import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消費者B
* @Date:Created in 2020-01-17
*/
@Component
@RabbitListener(queues = FanoutRabbitConfig.QUEUE_NAME_B)
public class ReceiverFanoutB {
@RabbitHandler
public void process(String hello) {
System.out.println("BReceiver : " + hello + "/n");
}
}
cn.lovingliu.rabbitmq_fanout.consumer.ReceiverFanoutC
package cn.lovingliu.rabbitmq_fanout.consumer;
import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description: 消費者C
* @Date:Created in 2020-01-17
*/
@Component
@RabbitListener(queues = FanoutRabbitConfig.QUEUE_NAME_C)
public class ReceiverFanoutC {
@RabbitHandler
public void process(String hello) {
System.out.println("CReceiver : " + hello + "/n");
}
}
3.生產(chǎn)者
cn.lovingliu.rabbitmq_fanout.producer.Producer
package cn.lovingliu.rabbitmq_fanout.producer;
import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author:LovingLiu
* @Description:
* @Date:Created in 2020-01-17
*/
@Component
public class Producer {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend(FanoutRabbitConfig.EXCHANGE_NAME,"", context);
}
}
4.測試
package cn.lovingliu.rabbitmq_fanout.test;
import cn.lovingliu.rabbitmq_fanout.producer.Producer;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Author:LovingLiu
* @Description: 測試
* @Date:Created in 2020-01-17
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
@Autowired
private Producer producer;
@org.junit.Test
public void send1() throws Exception {
producer.send();
}
}
運行截圖

生產(chǎn)者

消費者
