RocketMQ集群以及SpringBoot集成

1. 中間件下載

官網(wǎng)上直接下載,比較簡(jiǎn)單就不贅述!

2. 簡(jiǎn)介

2.1. RocketMQ是什么

RocketMQ

RocketMQ 是一個(gè)隊(duì)列模型的消息中間件,具有高性能、高可靠、高實(shí)時(shí)、分布式特點(diǎn),具體特性如下:

  • Producer、Consumer、隊(duì)列都可以分布式。
  • Producer 向隊(duì)列輪流發(fā)送消息,隊(duì)列集合稱為Topic,Consumer 如果做廣播消費(fèi),則一個(gè)consumer實(shí)例消費(fèi)這個(gè)Topic對(duì)應(yīng)的所有隊(duì)列,如果做集群消費(fèi),則多個(gè)Consumer 實(shí)例平均消費(fèi)這個(gè)topic 對(duì)應(yīng)的隊(duì)列集合。
  • 能夠保證嚴(yán)格的消息順序
  • 提供豐富的消息拉取模式
  • 高效的訂閱者水平擴(kuò)展能力
  • 實(shí)時(shí)的消息訂閱機(jī)制
  • 億級(jí)消息堆積能力
  • 較少的依賴

2.2. 概念

2.2.1. 消息模型(Message Model)

RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負(fù)責(zé)生產(chǎn)消息,Consumer 負(fù)責(zé)消費(fèi)消息,Broker 負(fù)責(zé)存儲(chǔ)消息。Broker 在實(shí)際部署過(guò)程中對(duì)應(yīng)一臺(tái)服務(wù)器,每個(gè) Broker 可以存儲(chǔ)多個(gè)Topic的消息,每個(gè)Topic的消息也可以分片存儲(chǔ)于不同的 Broker。Message Queue 用于存儲(chǔ)消息的物理地址,每個(gè)Topic中的消息地址存儲(chǔ)于多個(gè) Message Queue 中。ConsumerGroup 由多個(gè)Consumer 實(shí)例構(gòu)成。

2.2.2. 消息生產(chǎn)者(Producer)

負(fù)責(zé)生產(chǎn)消息,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)生產(chǎn)消息。一個(gè)消息生產(chǎn)者會(huì)把業(yè)務(wù)應(yīng)用系統(tǒng)里產(chǎn)生的消息發(fā)送到broker服務(wù)器。RocketMQ提供多種發(fā)送方式,同步發(fā)送、異步發(fā)送、順序發(fā)送、單向發(fā)送。同步和異步方式均需要Broker返回確認(rèn)信息,單向發(fā)送不需要。

2.2.3. 消息消費(fèi)者(Consumer)

負(fù)責(zé)消費(fèi)消息,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)。一個(gè)消息消費(fèi)者會(huì)從Broker服務(wù)器拉取消息、并將其提供給應(yīng)用程序。從用戶應(yīng)用的角度而言提供了兩種消費(fèi)形式:拉取式消費(fèi)、推動(dòng)式消費(fèi)。

2.2.4. 主題(Topic)

表示一類消息的集合,每個(gè)主題包含若干條消息,每條消息只能屬于一個(gè)主題,是RocketMQ進(jìn)行消息訂閱的基本單位。

2.2.5. 名字服務(wù)(Name Server)

名稱服務(wù)充當(dāng)路由消息的提供者。生產(chǎn)者或消費(fèi)者能夠通過(guò)名字服務(wù)查找各主題相應(yīng)的Broker IP列表。多個(gè)Namesrv實(shí)例組成集群,但相互獨(dú)立,沒(méi)有信息交換。

2.2.6. 拉取式消費(fèi)(Pull Consumer)

Consumer消費(fèi)的一種類型,應(yīng)用通常主動(dòng)調(diào)用Consumer的拉消息方法從Broker服務(wù)器拉消息、主動(dòng)權(quán)由應(yīng)用控制。一旦獲取了批量消息,應(yīng)用就會(huì)啟動(dòng)消費(fèi)過(guò)程。

2.2.7. 推動(dòng)式消費(fèi)(Push Consumer)

Consumer消費(fèi)的一種類型,該模式下Broker收到數(shù)據(jù)后會(huì)主動(dòng)推送給消費(fèi)端,該消費(fèi)模式一般實(shí)時(shí)性較高。

2.2.8. 生產(chǎn)者組(Producer Group)

同一類Producer的集合,這類Producer發(fā)送同一類消息且發(fā)送邏輯一致。如果發(fā)送的是事務(wù)消息且原始生產(chǎn)者在發(fā)送之后崩潰,則Broker服務(wù)器會(huì)聯(lián)系同一生產(chǎn)者組的其他生產(chǎn)者實(shí)例以提交或回溯消費(fèi)。

2.2.9. 消費(fèi)者組(Consumer Group)

同一類Consumer的集合,這類Consumer通常消費(fèi)同一類消息且消費(fèi)邏輯一致。消費(fèi)者組使得在消息消費(fèi)方面,實(shí)現(xiàn)負(fù)載均衡和容錯(cuò)的目標(biāo)變得非常容易。要注意的是,消費(fèi)者組的消費(fèi)者實(shí)例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費(fèi)(Clustering)和廣播消費(fèi)(Broadcasting)。

2.2.10. 普通順序消息(Normal Ordered Message)

普通順序消費(fèi)模式下,消費(fèi)者通過(guò)同一個(gè)消費(fèi)隊(duì)列收到的消息是有順序的,不同消息隊(duì)列收到的消息則可能是無(wú)順序的。

2.2.11. 嚴(yán)格順序消息(Strictly Ordered Message)

嚴(yán)格順序消息模式下,消費(fèi)者收到的所有消息均是有順序的。

2.3. 架構(gòu)設(shè)計(jì)

2.3.1. 技術(shù)架構(gòu)

技術(shù)架構(gòu)圖

2.3.2. 部署架構(gòu)

部署架構(gòu)圖

3. 安裝

由于RocketMQ安裝比較簡(jiǎn)單,在單機(jī)這塊就省略。

3.1. 單機(jī)

略?。。?!

3.2. 集群

3.2.1. 集群的方式

  • 單 Master:這種方式風(fēng)險(xiǎn)較大,一旦Broker重啟或者宕機(jī)時(shí),會(huì)導(dǎo)致整個(gè)服務(wù)不可用,不建議線上環(huán)境使用。

  • 多 Master 模式:一個(gè)集群無(wú) Slave,全是 Master,例如 2 個(gè) Master 或者 3 個(gè) Master

    • 優(yōu)點(diǎn):配置簡(jiǎn)單,單個(gè)Master 宕機(jī)或重啟維護(hù)對(duì)應(yīng)用無(wú)影響,在磁盤配置為 RAID10 時(shí),即使機(jī)器宕機(jī)不可恢復(fù)情況下,由與 RAID10磁盤非??煽?,消息也不會(huì)丟(異步刷盤丟失少量消息,同步刷盤一條不丟)。性能最高。

    • 缺點(diǎn):?jiǎn)闻_(tái)機(jī)器宕機(jī)期間,這臺(tái)機(jī)器上未被消費(fèi)的消息在機(jī)器恢復(fù)之前不可訂閱,消息實(shí)時(shí)性會(huì)受到受到影響。

  • 多 Master 多 Slave 模式,異步復(fù)制:每個(gè) Master 配置一個(gè) Slave,有多對(duì)Master-Slave,HA
    采用異步復(fù)制方式,主備有短暫消息延遲,毫秒級(jí)。

    • 優(yōu)點(diǎn):即使磁盤損壞,消息丟失的非常少,且消息實(shí)時(shí)性不會(huì)受影響,因?yàn)镸aster 宕機(jī)后,消費(fèi)者仍然可以從 Slave。

    • 缺點(diǎn):Master 宕機(jī),磁盤損壞情況,會(huì)丟失少量消息。

  • 多 Master 多 Slave 模式,同步雙寫:每個(gè) Master 配置一個(gè) Slave,有多對(duì)Master-Slave,HA
    采用同步雙寫方式,主備都寫成功,向應(yīng)用返回成功。

    • 優(yōu)點(diǎn):數(shù)據(jù)與服務(wù)都無(wú)單點(diǎn),Master宕機(jī)情況下,消息無(wú)延遲,服務(wù)可用性與數(shù)據(jù)可用性都非常高。

    • 缺點(diǎn):性能比異步復(fù)制模式略低,大約低 10%左右,發(fā)送單個(gè)消息的 RT 會(huì)略高。目前主宕機(jī)后,備機(jī)不能自動(dòng)切換為主機(jī),后續(xù)會(huì)支持自動(dòng)切換功能

后兩種的方式比較復(fù)雜,涉及主從同步的問(wèn)題,非必要的場(chǎng)景,建議采用多Master 這種方式。而我下面的例子也是以 多Master來(lái)部署的。

3.2.2. 部署

  • 服務(wù)器環(huán)境準(zhǔn)備
序號(hào) IP 角色 模式
A 192.168.244.128 nameServer1,brokerServer1 Master1
B 192.168.244.129 nameServer2,brokerServer2 Master2
  • 修改HOST

在 A、B 兩服務(wù)器中將HOST文件修改,vi /etc/hosts

修改內(nèi)容如下:

192.168.244.128 rocketmq-nameserver1
192.168.244.128 rocketmq-master1

192.168.244.129 rocketmq-nameserver2
192.168.244.129 rocketmq-master2
  • 上傳文件

在A、B兩臺(tái)機(jī)器上傳RockerMQ文件,并解壓。我這里上傳和解壓路徑在/usr/local

[root@centos8 rocketmq]$ pwd
/usr/local/rocketmq
[root@centos8 rocketmq]$ ll
total 40
drwxr-xr-x. 2 root root    83 Jun 24 02:49 benchmark
drwxr-xr-x. 3 root root  4096 Jun 24 02:02 bin
drwxr-xr-x. 6 root root   211 Jun  2 02:09 conf
drwxr-xr-x. 2 root root  4096 Jun 24 02:49 lib
-rw-r--r--. 1 root root 17336 Jun  2 02:09 LICENSE
-rw-r--r--. 1 root root  1338 Jun  2 02:09 NOTICE
-rw-r--r--. 1 root root  5069 Jun 24 02:02 README.md
[root@centos8 rocketmq]$ 

  • 創(chuàng)建存儲(chǔ)路徑

在A、B兩臺(tái)機(jī)器執(zhí)行創(chuàng)建路徑。

[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/commitlog
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/consumequeue
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/index
  • 修改配置文件

這里貼一個(gè)標(biāo)準(zhǔn)配置文件,具體如下:


// 所屬集群名字
brokerClusterName=rocketmq-cluster
//  broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a/broker-b ## 需要按機(jī)器A/B 區(qū)分
// 0 表示 Master,>0 表示 Slave
brokerId=0
//  nameServer地址,分號(hào)分割

namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
// 在發(fā)送消息時(shí),自動(dòng)創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊(duì)列數(shù)
defaultTopicQueueNums=4
// 是否允許 Broker 自動(dòng)創(chuàng)建Topic,建議線下開(kāi)啟,線上關(guān)閉
autoCreateTopicEnable=true
// 是否允許 Broker 自動(dòng)創(chuàng)建訂閱組,建議線下開(kāi)啟,線上關(guān)閉
autoCreateSubscriptionGroup=true
// Broker 對(duì)外服務(wù)的監(jiān)聽(tīng)端口
listenPort=10911
// 刪除文件時(shí)間點(diǎn),默認(rèn)凌晨 4點(diǎn)
deleteWhen=04
// 文件保留時(shí)間,默認(rèn) 48 小時(shí)
fileReservedTime=120
// commitLog每個(gè)文件的大小默認(rèn)1G
mapedFileSizeCommitLog=1073741824
//  ConsumeQueue每個(gè)文件默認(rèn)存30W條,根據(jù)業(yè)務(wù)情況調(diào)整
mapedFileSizeConsumeQueue=300000
// destroyMapedFileIntervalForcibly=120000
// redeleteHangedFileInterval=120000
// 檢測(cè)物理文件磁盤空間
diskMaxUsedSpaceRatio=88
// 存儲(chǔ)路徑
storePathRootDir=/usr/local/rocketmq/store
// commitLog 存儲(chǔ)路徑
storePathCommitLog=/usr/local/rocketmq/store/commitlog
// 消費(fèi)隊(duì)列存儲(chǔ)路徑存儲(chǔ)路徑
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
// 消息索引存儲(chǔ)路徑
storePathIndex=/usr/local/rocketmq/store/index
// checkpoint 文件存儲(chǔ)路徑
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
// abort 文件存儲(chǔ)路徑
abortFile=/usr/local/rocketmq/store/abort
// 限制的消息大小
maxMessageSize=65536
// flushCommitLogLeastPages=4
// flushConsumeQueueLeastPages=2
// flushCommitLogThoroughInterval=10000
// flushConsumeQueueThoroughInterval=60000
// Broker 的角色
//  - ASYNC_MASTER 異步復(fù)制Master
//  - SYNC_MASTER 同步雙寫Master
//  - SLAVE
brokerRole=ASYNC_MASTER
//  刷盤方式
//  - ASYNC_FLUSH 異步刷盤
//  - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
//  checkTransactionMessageEnable=false

//  發(fā)消息線程池?cái)?shù)量
//  sendMessageThreadPoolNums=128
//  拉消息線程池?cái)?shù)量
//  pullMessageThreadPoolNums=128

機(jī)器A

[root@centos8 rocketmq]$ vi/usr/local/rocketmq/conf/2m-noslave/broker-a.properties

其中brokerName=broker-a

在機(jī)器B

[root@centos8 rocketmq]$ vi/usr/local/rocketmq/conf/2m-noslave/broker-a.properties

其中brokerName=broker-b

  • 修改日志配置文件

不用說(shuō)肯定也是A、B兩臺(tái)都要改

[root@centos8 rocketmq]$ mkdir -p /usr/local/rocketmq/logs
[root@centos8 rocketmq]$ cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
  • 修改啟動(dòng)參數(shù)(A、B)

/usr/local/rocketmq/bin 路徑下,找到runbroker.shrunserver.sh。

我們將這兩個(gè)文件的JAVA_OPT 參數(shù)修改下,不然默認(rèn)情況下,JVM配置是 8G。如 JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"。

修改后:

...
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
....
  • 啟動(dòng)NameServer(A、B)
[root@centos8 rocketmq]$ cd /usr/local/rocketmq/bin
[root@centos8 rocketmq]$ nohup sh mqnamesrv &
機(jī)器A
機(jī)器B
  • 啟動(dòng)BrokerServer(A)
[root@centos8 rocketmq]$ cd /usr/local/rocketmq/bin
[root@centos8 rocketmq]$ nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties &
  • 啟動(dòng)BrokerServer(B)
[root@centos8 rocketmq]$ cd /usr/local/rocketmq/bin
[root@centos8 rocketmq]$ nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-b.properties &
  • rocketmq-console安裝

rocketmq-console下載

這個(gè)包需要利用maven編譯打包。我這里打包一個(gè),放百度云盤上,供下載!

鏈接: https://pan.baidu.com/s/1hfvzJeyBG7TXnvPHtn3C5Q

提取碼: atkq

最后執(zhí)行jar文件

[root@centos8 rocketmq]$ java -jar rocketmq-console-ng-1.0.0.jar

頁(yè)面的端口是 8082,剛開(kāi)始啟動(dòng)有點(diǎn)慢,稍微等會(huì)??!

rocketmq-console界面
  • 數(shù)據(jù)清理
[root@centos8 rocketmq]$ cd /usr/local/rocketmq/bin
[root@centos8 rocketmq]$ sh mqshutdown broker
[root@centos8 rocketmq]$ sh mqshutdown namesrv

這里需要等待完全停止!

[root@centos8 rocketmq]$ rm -rf /usr/local/rocketmq/store
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/commitlog
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/consumequeue
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/index

最終按照以上步驟重啟NameServer和BrokerServer即可!

4. SpringBoot集成

  • POM文件添加依賴
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>{換成相應(yīng)版本}</version>
</dependency>
  • application.yml
######### 4.1. RocketMQ ##########
rocketmq:
  name-server: 192.168.244.128:9876;192.168.244.129:9876;
  producer:
    group: drunkard
    send-message-timeout: 30000
  • 生產(chǎn)者
@Slf4j
@RestController
public class RocketMqDemo {

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    @GetMapping("send/{id}")
    public String send(@PathVariable("id") String id){
        UserVo userVo  = new UserVo(id,"侯征");
        log.warn(JSON.toJSONString(userVo));
        rocketMQTemplate.send("rocket-topic-01", MessageBuilder.withPayload(userVo).build());
        return "SUCESS";
    }
}
  • 消費(fèi)者
@Slf4j
@Component
@RocketMQMessageListener(topic = "rocket-topic-01", consumerGroup = "my-rocket-topic-01")
public class UserConsumer implements RocketMQListener<UserVo> {

    @Override
    public void onMessage(UserVo message) {
        log.warn("接受到消息: {}",message.toString());
    }
}

5. 案例下載

覺(jué)得對(duì)你有幫助,請(qǐng)Star

Gitee下載,希望多多給Star。

Github下載,希望多多給Star。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容