Spring Cloud Stream rabbitmq集成說明

1. 首先安裝rabbitmq-management

方法有很多, 我這里用的是rabbitmq的docker鏡像

docker安裝可以參照這篇文章deepin系統(tǒng)下的docker安裝

接下來使用docker安裝rabbitmq, 我們可以在Docker Hub中搜索rabbitmq, 找到最新的版本安裝

sudo docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.7.8-management

安裝之后使用

docker ps -a

檢查下, rabbitmq的鏡像是否啟動(dòng), 正常啟動(dòng)狀態(tài)如下:


image.png

如果沒有啟動(dòng)使用下面命令啟動(dòng)

docker start 1b84a82c0d0a

start后面那個(gè)id, 就是docker ps -a 第一列鏡像的container id

2. 訪問rabbitmq-management

我的rabbitmq是運(yùn)行在docker上的, 啟動(dòng)時(shí)默認(rèn)對(duì)外映射的端口是15672,
如果是直接安裝的rabbitmq, 默認(rèn)端口為5672,
我這里通過http://192.168.12.12:15672, 就可以訪問到rabbitmq的管理端,
默認(rèn)賬戶/密碼是:guest/guest

3. springboot集成

通過idea的創(chuàng)建一個(gè)springboot工程 , pom文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.jhinno</groupId>
    <artifactId>stream-rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>stream-rabbitmq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>Finchley.SR2</spring-cloud.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3.創(chuàng)建消息接收類

@Component
@Slf4j
@EnableBinding(Processor.class)
public class MyMQReciver {

    @StreamListener(Processor.INPUT)
    public void process(String message){
        log.info("hahahah : "+message);
        System.out.println("hahahah : "+message);
    }
}

這里有兩處關(guān)鍵點(diǎn):
1. @StreamListener(Processor.INPUT)
這里其實(shí)是要聲明一個(gè)訂閱的鍵值, Processor類是一個(gè)org.springframework.cloud.stream.messaging jar包中內(nèi)置的接口, 查看其源碼可以看到它繼承了Source和Sink兩個(gè)類

package org.springframework.cloud.stream.messaging;

public interface Processor extends Source, Sink {
}
package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}
package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

結(jié)構(gòu)很簡(jiǎn)單, 我們也可以仿照去實(shí)現(xiàn)自己的Processor
2. @EnableBinding(Processor.class)
這里綁定的就是Processor(或者我們自己實(shí)現(xiàn)的Processor)

4.創(chuàng)建發(fā)送消息測(cè)試類

@RestController
public class SendController {

    @Autowired
    private Processor pipe;

    @GetMapping("/send")
    public void send(@RequestParam String message){
        pipe.output().send(MessageBuilder.withPayload(message).build());
    }
}

5.在application.yml中增加配置

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages #要和output的destination一致, 這樣才能將隊(duì)列和寫入消息的exchange關(guān)聯(lián)起來
          binder: local_rabbit
          group: logMessageConsumers
        output:
          destination: queue.log.messages
          binder: local_rabbit
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.12.12
                port: 5672
                username: guest
                password: guest
                virtual-host: /

6.啟動(dòng)springboot項(xiàng)目

首先瀏覽器進(jìn)入rabbitmq管理端查看, 發(fā)現(xiàn)我們?cè)赼pplication.yml中創(chuàng)建的output destination被自動(dòng)創(chuàng)建出來了


image.png

input destination也被自動(dòng)創(chuàng)建出來了,并且自動(dòng)添加了綁定


image.png

image.png

確認(rèn)rabbitmq這邊沒有問題后, 我們?cè)L問放松消息接口測(cè)試下, http://localhost:8080/send?message=hello
發(fā)現(xiàn)MyMQReciver已經(jīng)成功接受到了消息

image.png

看到控制臺(tái)的輸出意味著消息發(fā)送和接受成功了.

記錄這篇文章的原因是在網(wǎng)上搜了很多教程, 結(jié)果很多根本不能正確運(yùn)行, 有些關(guān)鍵點(diǎn)甚至是錯(cuò)誤的, 浪費(fèi)了很多時(shí)間, 最后是在外網(wǎng)看了兩篇Spring Cloud Stream 的文章才得以正確運(yùn)行,下面是原始文章地址:

https://www.javainuse.com/spring/cloud-stream-rabbitmq-1
https://www.javainuse.com/spring/cloud-stream-rabbitmq-2

本文的代碼都已經(jīng)提交到github,https://github.com/LucienYang/spring-cloud-stream-rabbitmq-example.git, 歡迎批評(píng)指正

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容