Springboot集成RabbitMQ

一.簡單隊列

1.配置pom文件,主要是添加spring-boot-starter-amqp的支持

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置application.properties文件

spring:
  application:
    name: spirng-boot-rabbitmq
  rabbitmq:
    host: 49.235.110.134
    port: 5672
    username: root
    password: root

3.配置隊列

package cn.lovingliu.rabbitmq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author:LovingLiu
 * @Description: 配置隊列
 * @Date:Created in 2020-01-16
 */
@Configuration
public class RabbitConfig {
    @Bean
    public Queue queue() {
        return new Queue("q_rabbit");
    }
}

4.生產(chǎn)者

package cn.lovingliu.rabbitmq.producer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Author:LovingLiu
 * @Description: 生產(chǎn)者
 * @Date:Created in 2020-01-16
 */
@Component
public class Producer {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); //24小時制
        String context = "hello " + date;
        System.out.println("Sender : " + context);
        //簡單對列的情況下routingKey即為Queue名
        this.rabbitTemplate.convertAndSend("q_rabbit", context);
    }
}

5.接收者

package cn.lovingliu.rabbitmq.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author:LovingLiu
 * @Description: 消費者
 * @Date:Created in 2020-01-16
 */
@Component
@RabbitListener(queues = "q_rabbit")
public class Receiver {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }
}

6.測試

package cn.lovingliu.rabbitmq.test;

import cn.lovingliu.rabbitmq.producer.Producer;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @Author:LovingLiu
 * @Description:
 * @Date:Created in 2020-01-16
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
    @Autowired
    private Producer producer;

    @org.junit.Test
    public void oneToOne() throws Exception {
        producer.send();
    }
}
簡單隊列

二.工作隊列(work)

1.創(chuàng)建兩個消費者
cn.lovingliu.rabbitmq_work.consumer.ReceiverWork1

package cn.lovingliu.rabbitmq_work.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author:LovingLiu
 * @Description: 消費者1
 * @Date:Created in 2020-01-16
 */
@Component
@RabbitListener(queues = "q_rabbit_work")
public class ReceiverWork1 {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver1  : " + hello);
    }
}

cn.lovingliu.rabbitmq_work.consumer.ReceiverWork2

package cn.lovingliu.rabbitmq_work.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author:LovingLiu
 * @Description: 消費者2
 * @Date:Created in 2020-01-16
 */
@Component
@RabbitListener(queues = "q_rabbit_work")
public class ReceiverWork2 {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver2  : " + hello);
    }
}

2.消費者

package cn.lovingliu.rabbitmq_work.producer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Author:LovingLiu
 * @Description: 生產(chǎn)者
 * @Date:Created in 2020-01-16
 */
@Component
public class Producer {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(int i) {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小時制
        String context = "hello " + i + " " + date;
        System.out.println("Sender : " + context);
        //簡單對列的情況下routingKey即為Q名
        this.rabbitTemplate.convertAndSend("q_rabbit_work", context);
    }
}

3.測試

package cn.lovingliu.rabbitmq_work.test;


import cn.lovingliu.rabbitmq_work.producer.Producer;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @Author:LovingLiu
 * @Description: work 工作隊列測試
 * @Date:Created in 2020-01-16
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
    @Autowired
    private Producer producer;

    @org.junit.Test
    public void oneToMany() throws Exception {
        for (int i=0;i<100;i++){
            producer.send(i);
            Thread.sleep(300);
        }
    }
}

三.Topic Exchange(主題模式)

topicRabbitMQ中最靈活的一種方式,可以根據(jù)routing_key自由的綁定不同的隊列.
1.配置隊列,綁定交換機

package cn.lovingliu.rabbitmq_topic.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author:LovingLiu
 * @Description: 配置隊列,綁定交換機
 * @Date:Created in 2020-01-16
 */
@Configuration
public class TopicRabbitConfig {
    public final static String QUEUE_NAME_1 = "q_rabbit_topic_1";
    public final static String QUEUE_NAME_2 = "q_rabbit_topic_2";
    public final static String EXCHANGE_NAME = "my_exchange";

    @Bean
    public Queue queue1() {
        return new Queue(TopicRabbitConfig.QUEUE_NAME_1);
    }

    @Bean
    public Queue queue2() {
        return new Queue(TopicRabbitConfig.QUEUE_NAME_2);
    }

    /**
     * 聲明一個Topic類型的交換機
     * @return
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    /**
     * 綁定Q到交換機,并且指定routingKey
     * @param queue1
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue queue1, TopicExchange exchange) {
        return BindingBuilder.bind(queue1).to(exchange).with("topic.1.*");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queue2, TopicExchange exchange) {
        return BindingBuilder.bind(queue2).to(exchange).with("topic.2.*");
    }
}

2.創(chuàng)建2個消費者

package cn.lovingliu.rabbitmq_topic.consumer;

import cn.lovingliu.rabbitmq_topic.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author:LovingLiu
 * @Description: 消費者1
 * @Date:Created in 2020-01-16
 */
@Component
@RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME_1)
public class ReceiverTopic1 {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver1  : " + hello);
    }
}
package cn.lovingliu.rabbitmq_topic.consumer;

import cn.lovingliu.rabbitmq_topic.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author:LovingLiu
 * @Description: 消費者2
 * @Date:Created in 2020-01-16
 */
@Component
@RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME_2)
public class ReceiverTopic2 {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver2 : " + hello);
    }
}

3.消息發(fā)送者(生產(chǎn)者)

package cn.lovingliu.rabbitmq_topic.producer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @Author:LovingLiu
 * @Description: 生產(chǎn)者
 * @Date:Created in 2020-01-16
 */
@Component
public class Producer {
    private static final String EXCHANGE_NAME = "my_exchange";
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send1() {
        String context = "hi, queue 1 message";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend(EXCHANGE_NAME, "topic.1.message", context);
    }


    public void send2() {
        String context = "hi, queue 2 message";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend(EXCHANGE_NAME, "topic.2.messages", context);
    }
}

4.測試

package cn.lovingliu.rabbitmq_topic.test;


import cn.lovingliu.rabbitmq_topic.producer.Producer;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @Author:LovingLiu
 * @Description: work 工作隊列測試
 * @Date:Created in 2020-01-16
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
    @Autowired
    private Producer producer;

    @org.junit.Test
    public void send1() throws Exception {
        producer.send1();
    }

    @org.junit.Test
    public void send2() throws Exception {
        producer.send2();
    }
}
運行結(jié)果

4.Fanout Exchange(訂閱模式)

Fanout 就是我們熟悉的廣播模式或者訂閱模式,給Fanout交換機發(fā)送消息,綁定了這個交換機的所有隊列都收到這個消息。

1.配置隊列,綁定交換機

package cn.lovingliu.rabbitmq_fanout.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author:LovingLiu
 * @Description: 配置隊列,綁定交換機
 * @Date:Created in 2020-01-16
 */
@Configuration
public class FanoutRabbitConfig {
    public final static String QUEUE_NAME_A = "q_rabbit_fanout_a";
    public final static String QUEUE_NAME_B = "q_rabbit_fanout_b";
    public final static String QUEUE_NAME_C = "q_rabbit_fanout_c";
    public final static String EXCHANGE_NAME = "my_fanout_exchange";


    @Bean
    public Queue aQueue() {
        return new Queue(QUEUE_NAME_A);
    }

    @Bean
    public Queue bQueue() {
        return new Queue(QUEUE_NAME_B);
    }

    @Bean
    public Queue cQueue() {
        return new Queue(QUEUE_NAME_C);
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_NAME);
    }

    @Bean
    Binding bindingExchangeA(Queue aQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(aQueue).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue bQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(bQueue).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue cQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(cQueue).to(fanoutExchange);
    }
}

2.創(chuàng)建3個消費者

cn.lovingliu.rabbitmq_fanout.consumer.ReceiverFanoutA

package cn.lovingliu.rabbitmq_fanout.consumer;

import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author:LovingLiu
 * @Description: 消費者A
 * @Date:Created in 2020-01-17
 */
@Component
@RabbitListener(queues = FanoutRabbitConfig.QUEUE_NAME_A)
public class ReceiverFanoutA {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("AReceiver  : " + hello + "/n");
    }
}

cn.lovingliu.rabbitmq_fanout.consumer.ReceiverFanoutB

package cn.lovingliu.rabbitmq_fanout.consumer;

import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author:LovingLiu
 * @Description: 消費者B
 * @Date:Created in 2020-01-17
 */
@Component
@RabbitListener(queues = FanoutRabbitConfig.QUEUE_NAME_B)
public class ReceiverFanoutB {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("BReceiver  : " + hello + "/n");
    }
}

cn.lovingliu.rabbitmq_fanout.consumer.ReceiverFanoutC

package cn.lovingliu.rabbitmq_fanout.consumer;

import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author:LovingLiu
 * @Description: 消費者C
 * @Date:Created in 2020-01-17
 */
@Component
@RabbitListener(queues = FanoutRabbitConfig.QUEUE_NAME_C)
public class ReceiverFanoutC {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("CReceiver  : " + hello + "/n");
    }
}

3.生產(chǎn)者

cn.lovingliu.rabbitmq_fanout.producer.Producer

package cn.lovingliu.rabbitmq_fanout.producer;

import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @Author:LovingLiu
 * @Description:
 * @Date:Created in 2020-01-17
 */

@Component
public class Producer {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend(FanoutRabbitConfig.EXCHANGE_NAME,"", context);
    }
}

4.測試

package cn.lovingliu.rabbitmq_fanout.test;

import cn.lovingliu.rabbitmq_fanout.producer.Producer;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @Author:LovingLiu
 * @Description: 測試
 * @Date:Created in 2020-01-17
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
    @Autowired
    private Producer producer;

    @org.junit.Test
    public void send1() throws Exception {
        producer.send();
    }
}

運行截圖

生產(chǎn)者

消費者

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容