springboot集成rabbitmq

Springboot集成Rabbitmq

前期準備

導入依賴

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yml配置

spring:
# rabbitmq配置
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    virtual-host: /
    port:
      5672
    listener:
      simple:
        #消費者最小數量
        concurrency: 10
        #消費者最大數量
        max-concurrency: 10
        #限制消費者每次處理一條消息,處理完再處理下一條消息
        prefetch: 1
        #啟動時是否默認啟動容器
        auto-startup: true
        #被拒絕時重新進入隊列
        default-requeue-rejected: true
    template:
      #消息被拒絕重試配置
      retry:
        #發(fā)表重試
        enabled: true
        #重試時間默認1000ms
        initial-interval: 1000ms
        #重試最大次數默認3次
        max-attempts: 3
        #重試最大間隔時間默認10000ms
        max-interval: 10000ms
        #重試最大間隔倍數,2的時候就是第一次10s 第二次20s 第三次40s
        multiplier: 1

簡單使用

simple模式

  1. 創(chuàng)建隊列
@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue queue(){
        return new Queue("queue",true);
    }
}
  1. 創(chuàng)建消費者以及生產者
@Service
@Slf4j
public class MQReceiver {
    @RabbitListener(queues = "queue")
    public void receive(Object msg){
        log.info("接受消息:"+msg);
    }
}
@Service
@Slf4j
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(Object msg){
        log.info("發(fā)送消息:"+msg);
        rabbitTemplate.convertAndSend("queue",msg);
    }
}
  1. 測試
@Controller
public class UserController {
    @Autowired
    private MQSender mqSender;
    /**
     * 測試發(fā)送mq消息
     */
    @RequestMapping("/mq")
    @ResponseBody
    public void mq(){
        mqSender.send("hello");
    }

結果:

發(fā)送消息:hello
接受消息:(Body:'hello' MessageProperties [headers={}, contentType=text/plain, ....)

fanout模式

簡介

特點:Fanout—發(fā)布與訂閱模式,是一種廣播機制,它是沒有路由key的模式。

  1. 創(chuàng)建隊列以及交換機并將其綁定
@Configuration
public class RabbitMQConfig {
    private static final String QUEUE01="queue_fanout01";
    private static final String QUEUE02="queue_fanout02";
    private static final String EXCHANGE="fanoutExchange";

    @Bean
    public Queue queue01(){
        return new Queue(QUEUE01);
    }
    @Bean
    public Queue queue02(){
        return new Queue(QUEUE02);
    }
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(EXCHANGE);
    }
    @Bean
    public Binding binding01(){
        return BindingBuilder.bind(queue01()).to(fanoutExchange());
    }
    @Bean
    public Binding binding02(){
        return BindingBuilder.bind(queue02()).to(fanoutExchange());
    }
}
  1. 創(chuàng)建消費者以及生產者
@Service
@Slf4j
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(Object msg){
        log.info("發(fā)送消息:"+msg);
        rabbitTemplate.convertAndSend("fanoutExchange","",msg);
    }
}
@Service
@Slf4j
public class MQReceiver {
    @RabbitListener(queues = "queue_fanout01")
    public void receive01(Object msg){
        log.info("QUEUE01接受消息:"+msg);
    }
    @RabbitListener(queues = "queue_fanout02")
    public void receive02(Object msg){
        log.info("QUEUE02接受消息:"+msg);
    }
}
  1. 測試
@Controller
public class UserController {
 /**
     * fanout測試
     */
    @RequestMapping("/mq/fanout")
    @ResponseBody
    public void fanout(){
        mqSender.send("hello");
    }
}

結果:

QUEUE02接受消息:(Body:'hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=fanoutExchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-Dvd0kyyFky5PAMwSdonsFA, consumerQueue=queue_fanout02])

QUEUE01接受消息:(Body:'hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=fanoutExchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-fLIDTnvOimVXaxMykL1OXA, consumerQueue=queue_fanout01])

direct模式

簡介

Direct模式是fanout模式上的一種疊加,增加了路由RoutingKey的模式。

  1. 創(chuàng)建隊列以及交換機并將其綁定
@Configuration
public class RabbitMQConfig {
    private static final String QUEUE01="queue_direct01";
    private static final String QUEUE02="queue_direct02";
    private static final String EXCHANGE="directExchange";
    private static final String ROUTINGKEY01="queue.red";
    private static final String ROUTINGKEY02="queue.green";

    @Bean
    public Queue queue01(){
        return new Queue(QUEUE01);
    }
    @Bean
    public Queue queue02(){
        return new Queue(QUEUE02);
    }
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(EXCHANGE);
    }
    @Bean
    public Binding binding01(){
        return BindingBuilder.bind(queue01()).to(directExchange()).with(ROUTINGKEY01);
    }
    @Bean
    public Binding binding02(){
        return BindingBuilder.bind(queue02()).to((directExchange())).with(ROUTINGKEY02);
    }
}
  1. 創(chuàng)建消費者以及生產者
@Service
@Slf4j
public class MQSender {
     public void send01(Object msg){
        log.info("發(fā)送消息:"+msg);
        rabbitTemplate.convertAndSend("directExchange","queue.red",msg);
    }
    public void send02(Object msg){
        log.info("發(fā)送消息:"+msg);
        rabbitTemplate.convertAndSend("directExchange","queue.green",msg);
    }
}
@Service
@Slf4j
public class MQReceiver {
    @RabbitListener(queues = "queue_direct01")
    public void receive03(Object msg){
        log.info("QUEUE01接受消息:"+msg);
    }
    @RabbitListener(queues = "queue_direct02")
    public void receive04(Object msg){
        log.info("QUEUE02接受消息:"+msg);
    }
}
  1. 測試
@Controller
public class UserController {
    @RequestMapping("/mq/direct01")
    @ResponseBody
    public void direct01(){
        mqSender.send01("hello,red");
    }
    @RequestMapping("/mq/direct02")
    @ResponseBody
    public void direct02(){
        mqSender.send02("hello,green");
    }
}

結果:

發(fā)送消息:hello,red

QUEUE01接受消息:(Body:'hello,red' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=directExchange, receivedRoutingKey=queue.red, deliveryTag=1, consumerTag=amq.ctag-1SllPzAeuub27hll8OZgxQ, consumerQueue=queue_direct01])

發(fā)送消息:hello,green

QUEUE02接受消息:(Body:'hello,green' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=directExchange, receivedRoutingKey=queue.green, deliveryTag=1, consumerTag=amq.ctag-O24J9QkBtYLy4rnhisUbYA, consumerQueue=queue_direct02])

topic模式(direct模式的延伸)

簡介

Topic模式是direct模式上的一種疊加,增加了模糊路由RoutingKey的模式。
'*':只能匹配一個詞
'#':匹配0個或者多個詞

  1. 創(chuàng)建隊列以及交換機并將其綁定
@Configuration
public class RabbitMQConfig {
    private static final String QUEUE01="queue_topic01";
    private static final String QUEUE02="queue_topci02";
    private static final String EXCHANGE="topicExchange";
    private static final String ROUTINGKEY01="#.queue.#";
    private static final String ROUTINGKEY02="*.queue.#";

    @Bean
    public Queue queue01(){
        return new Queue(QUEUE01);
    }
    @Bean
    public Queue queue02(){
        return new Queue(QUEUE02);
    }
    @Bean
    public TopicExchange directExchange(){
        return new TopicExchange(EXCHANGE);
    }
    @Bean
    public Binding binding01(){
        return BindingBuilder.bind(queue01()).to(directExchange()).with(ROUTINGKEY01);
    }
    @Bean
    public Binding binding02(){
        return BindingBuilder.bind(queue02()).to((directExchange())).with(ROUTINGKEY02);
    }
}
  1. 創(chuàng)建消費者以及生產者
@Service
@Slf4j
public class MQSender {
    public void send03(Object msg){
        log.info("發(fā)送消息(queue01接受):"+msg);
        rabbitTemplate.convertAndSend("topicExchange","queue.red.message",msg);
    }
    public void send04(Object msg){
        log.info("發(fā)送消息(queue01和02接受):"+msg);
        rabbitTemplate.convertAndSend("topicExchange","message.queue.abc",msg);
    }
}
@Service
@Slf4j
public class MQSender {   
    @RabbitListener(queues = "queue_topic01")
    public void receive05(Object msg){
        log.info("QUEUE01接受消息:"+msg);
    }
    @RabbitListener(queues = "queue_topic02")
    public void receive06(Object msg){
        log.info("QUEUE02接受消息:"+msg);
    }
}
  1. 測試
@Controller
public class UserController {
    @RequestMapping("/mq/topic01")
    @ResponseBody
    public void topic01(){
        mqSender.send03("hello,red");
    }
    @RequestMapping("/mq/topic02")
    @ResponseBody
    public void topic02(){
        mqSender.send04("hello,green");
    }
}

結果:

發(fā)送消息(queue01接受):hello,red

QUEUE01接受消息:(Body:'hello,red' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topicExchange, receivedRoutingKey=queue.red.message, deliveryTag=1, consumerTag=amq.ctag--VsmPOb3T_u2mMK88qJJtA, consumerQueue=queue_topic01])

發(fā)送消息(queue01和02接受):hello,green

QUEUE01接受消息:(Body:'hello,green' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topicExchange, receivedRoutingKey=message.queue.abc, deliveryTag=1, consumerTag=amq.ctag-nFnq3J7NgyV8cvvApLnDmw, consumerQueue=queue_topic01])

QUEUE02接受消息:(Body:'hello,green' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topicExchange, receivedRoutingKey=message.queue.abc, deliveryTag=1, consumerTag=amq.ctag-NgcNpvxa3zHubsciPnMjWg, consumerQueue=queue_topic02])

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容