1. 首先安裝rabbitmq-management
方法有很多, 我這里用的是rabbitmq的docker鏡像
docker安裝可以參照這篇文章deepin系統(tǒng)下的docker安裝
接下來使用docker安裝rabbitmq, 我們可以在Docker Hub中搜索rabbitmq, 找到最新的版本安裝
sudo docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.7.8-management
安裝之后使用
docker ps -a
檢查下, rabbitmq的鏡像是否啟動(dòng), 正常啟動(dòng)狀態(tài)如下:

如果沒有啟動(dòng)使用下面命令啟動(dòng)
docker start 1b84a82c0d0a
start后面那個(gè)id, 就是docker ps -a 第一列鏡像的container id
2. 訪問rabbitmq-management
我的rabbitmq是運(yùn)行在docker上的, 啟動(dòng)時(shí)默認(rèn)對(duì)外映射的端口是15672,
如果是直接安裝的rabbitmq, 默認(rèn)端口為5672,
我這里通過http://192.168.12.12:15672, 就可以訪問到rabbitmq的管理端,
默認(rèn)賬戶/密碼是:guest/guest
3. springboot集成
通過idea的創(chuàng)建一個(gè)springboot工程 , pom文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.jhinno</groupId>
<artifactId>stream-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>stream-rabbitmq</name>
<description>Demo project for Spring Boot</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.SR2</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3.創(chuàng)建消息接收類
@Component
@Slf4j
@EnableBinding(Processor.class)
public class MyMQReciver {
@StreamListener(Processor.INPUT)
public void process(String message){
log.info("hahahah : "+message);
System.out.println("hahahah : "+message);
}
}
這里有兩處關(guān)鍵點(diǎn):
1. @StreamListener(Processor.INPUT)
這里其實(shí)是要聲明一個(gè)訂閱的鍵值, Processor類是一個(gè)org.springframework.cloud.stream.messaging jar包中內(nèi)置的接口, 查看其源碼可以看到它繼承了Source和Sink兩個(gè)類
package org.springframework.cloud.stream.messaging;
public interface Processor extends Source, Sink {
}
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
結(jié)構(gòu)很簡(jiǎn)單, 我們也可以仿照去實(shí)現(xiàn)自己的Processor
2. @EnableBinding(Processor.class)
這里綁定的就是Processor(或者我們自己實(shí)現(xiàn)的Processor)
4.創(chuàng)建發(fā)送消息測(cè)試類
@RestController
public class SendController {
@Autowired
private Processor pipe;
@GetMapping("/send")
public void send(@RequestParam String message){
pipe.output().send(MessageBuilder.withPayload(message).build());
}
}
5.在application.yml中增加配置
spring:
cloud:
stream:
bindings:
input:
destination: queue.log.messages #要和output的destination一致, 這樣才能將隊(duì)列和寫入消息的exchange關(guān)聯(lián)起來
binder: local_rabbit
group: logMessageConsumers
output:
destination: queue.log.messages
binder: local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.12.12
port: 5672
username: guest
password: guest
virtual-host: /
6.啟動(dòng)springboot項(xiàng)目
首先瀏覽器進(jìn)入rabbitmq管理端查看, 發(fā)現(xiàn)我們?cè)赼pplication.yml中創(chuàng)建的output destination被自動(dòng)創(chuàng)建出來了

input destination也被自動(dòng)創(chuàng)建出來了,并且自動(dòng)添加了綁定


確認(rèn)rabbitmq這邊沒有問題后, 我們?cè)L問放松消息接口測(cè)試下, http://localhost:8080/send?message=hello
發(fā)現(xiàn)MyMQReciver已經(jīng)成功接受到了消息

看到控制臺(tái)的輸出意味著消息發(fā)送和接受成功了.
記錄這篇文章的原因是在網(wǎng)上搜了很多教程, 結(jié)果很多根本不能正確運(yùn)行, 有些關(guān)鍵點(diǎn)甚至是錯(cuò)誤的, 浪費(fèi)了很多時(shí)間, 最后是在外網(wǎng)看了兩篇Spring Cloud Stream 的文章才得以正確運(yùn)行,下面是原始文章地址:
https://www.javainuse.com/spring/cloud-stream-rabbitmq-1
https://www.javainuse.com/spring/cloud-stream-rabbitmq-2
本文的代碼都已經(jīng)提交到github,https://github.com/LucienYang/spring-cloud-stream-rabbitmq-example.git, 歡迎批評(píng)指正