ROCKETMQ集群安裝-多Master多Slave模式,異步復(fù)制(2m-2s-async)

簡介

官方簡介:

  • RocketMQ是一款分布式、隊(duì)列模型的消息中間件,具有以下特點(diǎn):
    1. 能夠保證嚴(yán)格的消息順序
    2. 提供豐富的消息拉取模式
    3. 高效的訂閱者水平擴(kuò)展能力
    4. 實(shí)時的消息訂閱機(jī)制
    5. 億級消息堆積能力

1.還是先理解一些重要概念及說明

  • Disk Flush(磁盤刷新/同步操作):就是將內(nèi)存的數(shù)據(jù)落地,存儲在磁盤中。RocketMQ提供了以下兩種模式:
    • SYNC_FLUSH(同步刷盤):生產(chǎn)者發(fā)送的每一條消息都在保存到磁盤成功后才返回告訴生產(chǎn)者成功。這種方式不會存在消息丟失的問
      題,但是有很大的磁盤IO開銷,性能有一定影響。
    • ASYNC_FLUSH(異步刷盤):生產(chǎn)者發(fā)送的每一條消息并不是立即保存到磁盤,而是暫時緩存起來,然后就返回生產(chǎn)者成功。隨后再異步>的將緩存數(shù)據(jù)保存到磁盤,有兩種情況:1是定期將緩存中更新的數(shù)據(jù)進(jìn)行刷盤,2是當(dāng)緩存中更新的數(shù)據(jù)條數(shù)達(dá)到某一設(shè)定值后進(jìn)行刷盤。這種>方式會存在消息丟失(在還未來得及同步到磁盤的時候宕機(jī)),但是性能很好。默認(rèn)是這種模式。
  • Broker Replication(Broker間數(shù)據(jù)同步/復(fù)制):集群環(huán)境下需要部署多個Broker,Broker分為兩種角色:一種是master,即可以寫也可以>
    讀,其brokerId=0,只能有一個;另外一種是slave,只允許讀,其brokerId為非0。一個master與多個slave通過指定相同的brokerName被歸為一>個broker set(broker集)。通常生產(chǎn)環(huán)境中,我們至少需要2個broker set。Broker Replication只的就是slave獲取或者是復(fù)制master的數(shù)據(jù)。
    • Sync Broker:生產(chǎn)者發(fā)送的每一條消息都至少同步復(fù)制到一個slave后才返回告訴生產(chǎn)者成功,即“同步雙寫”。
  • Async Broker:生產(chǎn)者發(fā)送的每一條消息只要寫入master就返回告訴生產(chǎn)者成功。然后再“異步復(fù)制”到slave。
  • 推薦的幾種Broker集群方式:(官網(wǎng)提供了下面幾種集群方式的配置文件供參考,在$ROCKETMQ_HOME/target/apache-rocketmq-all/conf目>錄下)
  • 2m-2s-sync:兩主兩從同步雙寫(兩個master,兩個slave,數(shù)據(jù)同步雙寫到master和slave)
  • 2m-2s-async:兩主兩從異步復(fù)制(兩個master,兩個slave,master數(shù)據(jù)通過異步復(fù)制到slave)
  • 2m-noslave:兩主(只有兩個master,沒有slave)
    注意:
    1、上述“2”只是說作為一個集群的最低配置數(shù)量,可以根據(jù)實(shí)際情況擴(kuò)展。
    2、所有的刷盤(Dish Flush)操作全部默認(rèn)為:ASYNC_FLUSH(異步刷盤)。
  • Name Server集群:Name Server集群比較簡單,只要部署多個實(shí)例就行了,多個實(shí)例間不需要進(jìn)行數(shù)據(jù)共享,只要保證一個實(shí)例存活就可>以正常運(yùn)轉(zhuǎn)。

2、三種Broker集群方式優(yōu)缺點(diǎn)

上面三種集群方式的優(yōu)缺點(diǎn)(主要區(qū)別在于主從復(fù)制方式):

多Master模式(2m-noslave)

一個集群無Slave,全是Master,例如2個Master或者3個Master
優(yōu)點(diǎn):配置簡單,單個Master宕機(jī)或重啟維護(hù)對應(yīng)用無影響,在磁盤配置為RAID10時,即使機(jī)器宕機(jī)不可恢復(fù)情況下,由于RAID10磁盤非??煽?,消息也不會丟(異步刷盤丟失少量消息,同步刷盤一條不丟)。性能最高。
缺點(diǎn):單臺機(jī)器宕機(jī)期間,這臺機(jī)器上未被消費(fèi)的消息在機(jī)器恢復(fù)之前不可訂閱,消息實(shí)時性會受到受到影響。

多Master多Slave模式,異步復(fù)制(2m-2s-async)

每個Master配置一個Slave,有多對Master-Slave,HA采用異步復(fù)制方式,主備有短暫消息延遲,毫秒級。
優(yōu)點(diǎn):即使磁盤損壞,消息丟失的非常少,且消息實(shí)時性不會受影響,因?yàn)镸aster宕機(jī)后,消費(fèi)者仍然可以從Slave消費(fèi),此過程對應(yīng)用透明。不需要人工干預(yù)。性能同多Master模式幾乎一樣。
缺點(diǎn):Master宕機(jī),磁盤損壞情況,會丟失少量消息。

多Master多Slave模式,同步雙寫(2m-noslave)

每個Master配置一個Slave,有多對Master-Slave,HA采用同步雙寫方式,主備都寫成功,向應(yīng)用返回成功。
優(yōu)點(diǎn):數(shù)據(jù)與服務(wù)都無單點(diǎn),Master宕機(jī)情況下,消息無延遲,服務(wù)可用性與數(shù)據(jù)可用性都非常高
缺點(diǎn):性能比異步復(fù)制模式略低,大約低10%左右,發(fā)送單個消息的RT會略高。目前主宕機(jī)后,備機(jī)不能自動切換為主機(jī),后續(xù)會支持自動切換功能。

3、安裝集群 (broker-a)(ip :99.48.66.80)

  • 為了避免亂七八糟的錯誤建議先關(guān)閉防火墻
  • 目錄 /usr/local/src

1.這是一個master的配置

cd /usr/local/src
unzip rocketmq-all-4.2.0-bin-release.zip -d rocketmq-all-4.2.0-bin-release
#創(chuàng)建文件夾
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store 
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
vi /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-a.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
brokerClusterName=DefaultCluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
waitTimeMillsInSendQueue=300
brokerId=0
#nameServer地址,分號分割
namesrvAddr=99.48.66.80:9876;99.48.66.82:9876
#是否允許 Broker 自動創(chuàng)建Topic,建議線下開啟,線上關(guān)閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創(chuàng)建訂閱組,建議線下開啟,線上關(guān)閉
autoCreateSubscriptionGroup=false
#Broker 對外服務(wù)的監(jiān)聽端口
listenPort=11911
#刪除文件時間點(diǎn),默認(rèn)凌晨 4點(diǎn)
deleteWhen=04
#文件保留時間,默認(rèn) 48 小時
fileReservedTime=48
#commitLog每個文件的大小默認(rèn)1G
#mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認(rèn)存30W條,根據(jù)業(yè)務(wù)情況調(diào)整
#mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=80
#存儲路徑
storePathRootDir=/usr/local/src/rocketmq-all-4.2.0-bin-release/store
#commitLog 存儲路徑
storePathCommitLog=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
#消費(fèi)隊(duì)列存儲路徑存儲路徑
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
#消息索引存儲路徑
storePathIndex=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/checkpoint
#abort 文件存儲路徑
abortFile=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/abort
#限制的消息大小 默認(rèn)4M
#maxMessageSize=4194304
#Broker 的角色
#- ASYNC_MASTER 異步復(fù)制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
sendMessageThreadPoolNums=128
useReentrantLockWhenPutMessage=true

********************這里是公共可選配置 (maste ,slave)********************************

crunserver.sh 配置( /usr/local/src/rocketmq-all-4.2.0-bin-release/bin) 此處根據(jù)自己的硬件配置來調(diào)整

-server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=160m

runbroker.sh( /usr/local/src/rocketmq-all-4.2.0-bin-release/bin) 此處根據(jù)自己的硬件配置來調(diào)整

-server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=160m

**********************這里是公共配置 (maste ,slave)********************************


啟動命令如下:

#啟動master:
nohup sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqbroker -c /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &

1.這是一個slave的配置

cd /usr/local/src
unzip rocketmq-all-4.2.0-bin-release.zip -d rocketmq-all-4.2.0-bin-release
#創(chuàng)建文件夾
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store 
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
vi /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-a-s.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
brokerClusterName=DefaultCluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
waitTimeMillsInSendQueue=300
brokerId=1
#nameServer地址,分號分割
namesrvAddr=99.48.66.80:9876;99.48.66.82:9876
#是否允許 Broker 自動創(chuàng)建Topic,建議線下開啟,線上關(guān)閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創(chuàng)建訂閱組,建議線下開啟,線上關(guān)閉
autoCreateSubscriptionGroup=false
#Broker 對外服務(wù)的監(jiān)聽端口
listenPort=11911
#刪除文件時間點(diǎn),默認(rèn)凌晨 4點(diǎn)
deleteWhen=04
#文件保留時間,默認(rèn) 48 小時
fileReservedTime=48
#commitLog每個文件的大小默認(rèn)1G
#mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認(rèn)存30W條,根據(jù)業(yè)務(wù)情況調(diào)整
#mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=80
#存儲路徑
storePathRootDir=/usr/local/src/rocketmq-all-4.2.0-bin-release/store
#commitLog 存儲路徑
storePathCommitLog=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
#消費(fèi)隊(duì)列存儲路徑存儲路徑
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
#消息索引存儲路徑
storePathIndex=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/checkpoint
#abort 文件存儲路徑
abortFile=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/abort
#限制的消息大小 默認(rèn)4M
#maxMessageSize=4194304
#Broker 的角色
#- ASYNC_MASTER 異步復(fù)制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=SLAVE
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
sendMessageThreadPoolNums=128
useReentrantLockWhenPutMessage=true
#啟動name server:
nohup sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqnamesrv &

#啟動slave:
nohup sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqbroker -c /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &

4、安裝集群 (broker-b)(ip :99.48.66.82)

  • 為了避免亂七八糟的錯誤建議先關(guān)閉防火墻
  • 目錄 /usr/local/src

1.這是一個master的配置

cd /usr/local/src
unzip rocketmq-all-4.2.0-bin-release.zip -d rocketmq-all-4.2.0-bin-release
#創(chuàng)建文件夾
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store 
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
vi /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-b.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
brokerClusterName=DefaultCluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
waitTimeMillsInSendQueue=300
brokerId=0
#nameServer地址,分號分割
namesrvAddr=99.48.66.80:9876;99.48.66.82:9876
#是否允許 Broker 自動創(chuàng)建Topic,建議線下開啟,線上關(guān)閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創(chuàng)建訂閱組,建議線下開啟,線上關(guān)閉
autoCreateSubscriptionGroup=false
#Broker 對外服務(wù)的監(jiān)聽端口
listenPort=11911
#刪除文件時間點(diǎn),默認(rèn)凌晨 4點(diǎn)
deleteWhen=04
#文件保留時間,默認(rèn) 48 小時
fileReservedTime=48
#commitLog每個文件的大小默認(rèn)1G
#mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認(rèn)存30W條,根據(jù)業(yè)務(wù)情況調(diào)整
#mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=80
#存儲路徑
storePathRootDir=/usr/local/src/rocketmq-all-4.2.0-bin-release/store
#commitLog 存儲路徑
storePathCommitLog=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
#消費(fèi)隊(duì)列存儲路徑存儲路徑
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
#消息索引存儲路徑
storePathIndex=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/checkpoint
#abort 文件存儲路徑
abortFile=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/abort
#限制的消息大小 默認(rèn)4M
#maxMessageSize=4194304
#Broker 的角色
#- ASYNC_MASTER 異步復(fù)制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
sendMessageThreadPoolNums=128
useReentrantLockWhenPutMessage=true

#啟動master:
nohup sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqbroker -c /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-b.properties >/dev/null 2>&1 &

1.這是一個slave的配置

cd /usr/local/src
unzip rocketmq-all-4.2.0-bin-release.zip -d rocketmq-all-4.2.0-bin-release
#創(chuàng)建文件夾
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store 
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
vi /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-b-s.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
brokerClusterName=DefaultCluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
waitTimeMillsInSendQueue=300
brokerId=1
#nameServer地址,分號分割
namesrvAddr=99.48.66.80:9876;99.48.66.82:9876
#是否允許 Broker 自動創(chuàng)建Topic,建議線下開啟,線上關(guān)閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創(chuàng)建訂閱組,建議線下開啟,線上關(guān)閉
autoCreateSubscriptionGroup=false
#Broker 對外服務(wù)的監(jiān)聽端口
listenPort=11911
#刪除文件時間點(diǎn),默認(rèn)凌晨 4點(diǎn)
deleteWhen=04
#文件保留時間,默認(rèn) 48 小時
fileReservedTime=48
#commitLog每個文件的大小默認(rèn)1G
#mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認(rèn)存30W條,根據(jù)業(yè)務(wù)情況調(diào)整
#mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=80
#存儲路徑
storePathRootDir=/usr/local/src/rocketmq-all-4.2.0-bin-release/store
#commitLog 存儲路徑
storePathCommitLog=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
#消費(fèi)隊(duì)列存儲路徑存儲路徑
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
#消息索引存儲路徑
storePathIndex=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/checkpoint
#abort 文件存儲路徑
abortFile=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/abort
#限制的消息大小 默認(rèn)4M
#maxMessageSize=4194304
#Broker 的角色
#- ASYNC_MASTER 異步復(fù)制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=SLAVE
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
sendMessageThreadPoolNums=128
useReentrantLockWhenPutMessage=true
#啟動name server:
nohup sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqnamesrv &

#啟動slave:
nohup sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqbroker -c /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-b-s.properties >/dev/null 2>&1 &

image.png

這樣就安裝好啦 下面是關(guān)閉服務(wù)的命令

#關(guān)閉服務(wù) namesrv 和broker
sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqshutdown broker
sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqshutdown namesrv

測試是否成功 最后關(guān)閉防火墻 開通 9876,11911,11912,11909端口

*******特別注意以下幾段英文說明*****************
Consumer Group and Subscriptions
The first thing you should be aware of is that different Consumer Group can consume the same topic independently, and each of them will have their own consuming offsets. 
Please make sure each Consumer within the same Group to subscribe the same topics.

MessageListener

    Orderly
    The Consumer will lock each MessageQueue to make sure it is consumed one by one in order. This will cause a performance loss, 
    but it is useful when you care about the order of the messages. It is not recommended to throw exceptions, you can return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT instead.

    Concurrently
    As the name tells, the Consumer will consume the messages concurrently. It is recommended to use this for good performance. 
    It is not recommended to throw exceptions, you can return ConsumeConcurrentlyStatus.RECONSUME_LATER instead.

Consume Status
    For MessageListenerConcurrently, you can return RECONSUME_LATER to tell the consumer that you can not consume it right now and want to reconsume it later.
 Then you can continue to consume other messages. For MessageListenerOrderly, because you care about the order, you can not jump over the message, 
 but you can return SUSPEND_CURRENT_QUEUE_A_MOMENT to tell the consumer to wait for a moment.

 ConsumeFromWhere
    When a new Consumer Group is established, it will need to decide whether it needs to consume the historical messages which had already existed in the Broker.
 CONSUME_FROM_LAST_OFFSET will ignore the historical messages, and consume anything produced after that.
 CONSUME_FROM_FIRST_OFFSET will consume every message existed in the Broker. You can also use CONSUME_FROM_TIMESTAMP to consume messages produced after the specified timestamp.
 (注意,這個CONSUME_FROM_LAST_OFFSET 是對一個新的Consumer Group 生效,如果這個Consumer Group 原來就已經(jīng)有過,那么是不生效的)
 
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容