概念

image.png
實(shí)現(xiàn)
根據(jù)圖我們看出路由的的類型為direct
實(shí)現(xiàn)步驟為:
生產(chǎn)者->聲明路由(direct)->聲明2個(gè)隊(duì)列- >路由分別綁定這2個(gè)隊(duì)列并分別綁定routingkey->發(fā)送帶routingkey的消息
消費(fèi)者->監(jiān)聽這2個(gè)隊(duì)列的消息
生產(chǎn)者實(shí)現(xiàn)
聲明路由注意類型
//聲明路由注意路由的類型為direct
@Bean
Exchange exchange(){
return ExchangeBuilder.directExchange("ethan.exchange_routing").build();
}
聲明隊(duì)列
//聲明隊(duì)列
@Bean("queue")
Queue queue(){
return QueueBuilder.durable("queue_routing").build();
}
//聲明隊(duì)列1
@Bean("queue01")
Queue queue01(){
return QueueBuilder.durable("queue_routing_01").build();
}
路由綁定隊(duì)列這里我們一個(gè)隊(duì)列設(shè)置routingkey為error另外一個(gè)設(shè)置一個(gè)error一個(gè)info
//綁定隊(duì)列1到路由并設(shè)置routingkey為routing_inforouting_error
@Bean
Binding binding01(@Qualifier("queue01") Queue queue,
Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("routing_error").noargs();
}
//綁定隊(duì)列1到路由并設(shè)置routingkey為routing_info
@Bean
Binding binding02(@Qualifier("queue01") Queue queue,
Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("routing_info").noargs();
}
發(fā)送一個(gè)error消息一個(gè)info消息
@Autowired
MyBean myBean;
@Test
void test(){
myBean.getAmqpTemplate().convertAndSend("ethan.exchange_routing","routing_error","error-----");
myBean.getAmqpTemplate().convertAndSend("ethan.exchange_routing","routing_info","info------");
}
期望達(dá)到的結(jié)果為發(fā)送到error的routingkey2個(gè)隊(duì)列都能收到消息,發(fā)送帶info的只有一個(gè)隊(duì)列可以收到消息
消費(fèi)者
@Component
@Slf4j
public class ReceiverBean {
@RabbitListener(queues = "queue_routing")
public void processMessage(String msg) {
log.info("queue_routing---"+msg);
}
@RabbitListener(queues = "queue_routing_01")
public void processMessage01(String msg) {
log.info("queue_routing_01---"+msg);
}
}
運(yùn)行效果

image.png
可以看出運(yùn)行效果和期待的一樣
代碼
生產(chǎn)端
https://gitee.com/ethanlab/rabbitmq/tree/master/rabbit-routing-producer
消費(fèi)端
https://gitee.com/ethanlab/rabbitmq/tree/master/rabbit-routing-consumer