一、前言
Topic模式,又稱主題模式,他和路由模式類似,但是比路由模式靈活,綁定在Topic交換機(jī)上的隊列,可以設(shè)置一個帶通配符的路由key,比如orange.*,*.black.*等,例如producer發(fā)送消息到Topic交換機(jī)時,帶上路由可以為orange.1,則會將消息發(fā)送到orange.*這個隊列。需要注意的是,比如隊列綁定的路由key是*.black.*,那producer發(fā)送消息時,路由一定要black前后都有值才能匹配到,比如1.black.1才行,1.black是不行的。
二、生產(chǎn)者
1.TopicExchangeConfig.java
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicExchangeConfig {
public static final String TOPIC_EXCHANGE = "topic_exchange";
public static final String QUEUE_ORANGE = "queue_orange";
public static final String QUEUE_BLACK = "queue_black";
public static final String ROUTING_KEY_ORANGE = "orange.*";
public static final String ROUTING_KEY_BLACK = "*.black.*";
/**
* 定義一個Topic類型交換機(jī)
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}
/**
* 創(chuàng)建一個隊列'queue_orange'
* @return
*/
@Bean
public Queue queueOrange(){
return new Queue(QUEUE_ORANGE);
}
/**
* 創(chuàng)建一個隊列'queue_black'
* @return
*/
@Bean
public Queue queueBlack(){
return new Queue(QUEUE_BLACK);
}
/**
* 將隊列'queue_orange'綁定到Topic交換機(jī)上,并設(shè)置路由key為'orange.*'
* 該隊列則可以接收路由key為orange.1,orange.2等樣式的消息
* @return
*/
@Bean
public Binding bindingQueueOrange(){
return BindingBuilder.bind(queueOrange()).to(topicExchange()).with(ROUTING_KEY_ORANGE);
}
/**
* 將隊列'queue_orange'綁定到Topic交換機(jī)上,并設(shè)置路由key為'*.black.*'
* 該隊列則可以接收路由key為1.black.1,2.black.2等樣式的消息
* 注意:發(fā)送路由key為1.black是匹配不到這個隊列的
* @return
*/
@Bean
public Binding bindingQueueBlack(){
return BindingBuilder.bind(queueBlack()).to(topicExchange()).with(ROUTING_KEY_BLACK);
}
}
2.ProducerController.java
@RestController
@RequestMapping(value = "send")
public class ProducerController {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 發(fā)送路由key為"orange.1"的消息到Topic交換機(jī),交換機(jī)會將該消息投遞給隊列"queue_orange"
*/
@GetMapping("topic/orange1")
public void sendOrange1(){
String msg = "send routing key = orange.1 msg " + new Date().toString();
rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "orange.1", msg);
}
/**
* 發(fā)送路由key為"orange.2"的消息到Topic交換機(jī),交換機(jī)會將該消息投遞給隊列"queue_orange"
*/
@GetMapping("topic/orange2")
public void sendOrange2(){
String msg = "send routing key = orange.2 msg " + new Date().toString();
rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "orange.2", msg);
}
/**
* 發(fā)送路由key為"1.black.1"的消息到Topic交換機(jī),交換機(jī)會將該消息投遞給隊列"queue_black"
*/
@GetMapping("topic/black1")
public void sendBlack1(){
String msg = "send routing key = 1.black msg " + new Date().toString();
rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "1.black.1", msg);
}
/**
* 發(fā)送路由key為"2.black.2"的消息到Topic交換機(jī),交換機(jī)會將該消息投遞給隊列"queue_black"
*/
@GetMapping("topic/black2")
public void sendBlack2(){
String msg = "send routing key = 2.black msg " + new Date().toString();
rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "2.black.2", msg);
}
}
3.查看隊列
服務(wù)啟動后,將RabbitMQ上將會看到這兩個隊列

隊列.png

topic交換機(jī).png
三、消費(fèi)者
1.RabbitMqConfig.java
/**
* 定義隊列
*/
@Configuration
public class RabbitMqConfig {
public static final String QUEUE_ORANGE = "queue_orange";
public static final String QUEUE_BLACK = "queue_black";
}
2.RabbitMqReceiver.java
@Component
public class RabbitMqReceiver {
private final static Logger logger = LoggerFactory.getLogger(RabbitMqReceiver.class);
/**
* 監(jiān)聽隊列queue_orange的消息
*/
@RabbitListener(queues = RabbitMqConfig.QUEUE_ORANGE)
public void receiverQueueOrange(String msg, Channel channel, Message message) throws IOException {
logger.info("receiverQueueOrange 接收到消息為:"+msg);
}
/**
* 監(jiān)聽隊列queue_black的消息
*/
@RabbitListener(queues = RabbitMqConfig.QUEUE_BLACK)
public void receiverQueueBlack(String msg, Channel channel, Message message) throws IOException {
logger.info("receiverQueueBlack 接收到消息為:"+msg);
}
}