1 生產(chǎn)者
1.1 發(fā)送消息注意事項
1 Tags的使用
一個應(yīng)用盡可能用一個Topic,而消息子類型則可以用tags來標(biāo)識。tags可以由應(yīng)用自由設(shè)置,只有生產(chǎn)者在發(fā)送消息設(shè)置了tags,消費方在訂閱消息時才可以利用tags通過broker做消息過濾:message.setTags("TagA")。
2 Keys的使用
每個消息在業(yè)務(wù)層面的唯一標(biāo)識碼要設(shè)置到keys字段,方便將來定位消息丟失問題。服務(wù)器會為每個消息創(chuàng)建索引(哈希索引),應(yīng)用可以通過topic、key來查詢這條消息內(nèi)容,以及消息被誰消費。由于是哈希索引,請務(wù)必保證key盡可能唯一,這樣可以避免潛在的哈希沖突。
// 訂單Id
String orderId = "20034568923546";
message.setKeys(orderId);
3 日志的打印
?消息發(fā)送成功或者失敗要打印消息日志,務(wù)必要打印SendResult和key字段。send消息方法只要不拋異常,就代表發(fā)送成功。發(fā)送成功會有多個狀態(tài),在sendResult里定義。以下對每個狀態(tài)進行說明:
- SEND_OK
消息發(fā)送成功。要注意的是消息發(fā)送成功也不意味著它是可靠的。要確保不會丟失任何消息,還應(yīng)啟用同步Master服務(wù)器或同步刷盤,即SYNC_MASTER或SYNC_FLUSH。
- FLUSH_DISK_TIMEOUT
消息發(fā)送成功但是服務(wù)器刷盤超時。此時消息已經(jīng)進入服務(wù)器隊列(內(nèi)存),只有服務(wù)器宕機,消息才會丟失。消息存儲配置參數(shù)中可以設(shè)置刷盤方式和同步刷盤時間長度,如果Broker服務(wù)器設(shè)置了刷盤方式為同步刷盤,即FlushDiskType=SYNC_FLUSH(默認(rèn)為異步刷盤方式),當(dāng)Broker服務(wù)器未在同步刷盤時間內(nèi)(默認(rèn)為5s)完成刷盤,則將返回該狀態(tài)——刷盤超時。
- FLUSH_SLAVE_TIMEOUT
消息發(fā)送成功,但是服務(wù)器同步到Slave時超時。此時消息已經(jīng)進入服務(wù)器隊列,只有服務(wù)器宕機,消息才會丟失。如果Broker服務(wù)器的角色是同步Master,即SYNC_MASTER(默認(rèn)是異步Master即ASYNC_MASTER),并且從Broker服務(wù)器未在同步刷盤時間(默認(rèn)為5秒)內(nèi)完成與主服務(wù)器的同步,則將返回該狀態(tài)——數(shù)據(jù)同步到Slave服務(wù)器超時。
- SLAVE_NOT_AVAILABLE
消息發(fā)送成功,但是此時Slave不可用。如果Broker服務(wù)器的角色是同步Master,即SYNC_MASTER(默認(rèn)是異步Master服務(wù)器即ASYNC_MASTER),但沒有配置slave Broker服務(wù)器,則將返回該狀態(tài)——無Slave服務(wù)器可用。
1.2 消息發(fā)送失敗處理方式
Producer的send方法本身支持內(nèi)部重試,重試邏輯如下:
- 至多重試2次(同步發(fā)送為2次,異步發(fā)送為0次)。
- 如果發(fā)送失敗,則輪轉(zhuǎn)到下一個Broker。這個方法的總耗時時間不超過sendMsgTimeout設(shè)置的值,默認(rèn)10s。
- 如果本身向broker發(fā)送消息產(chǎn)生超時異常,就不會再重試。
以上策略也是在一定程度上保證了消息可以發(fā)送成功。如果業(yè)務(wù)對消息可靠性要求比較高,建議應(yīng)用增加相應(yīng)的重試邏輯:比如調(diào)用send同步方法發(fā)送失敗時,則嘗試將消息存儲到db,然后由后臺線程定時重試,確保消息一定到達(dá)Broker。
上述db重試方式為什么沒有集成到MQ客戶端內(nèi)部做,而是要求應(yīng)用自己去完成,主要基于以下幾點考慮:首先,MQ的客戶端設(shè)計為無狀態(tài)模式,方便任意的水平擴展,且對機器資源的消耗僅僅是cpu、內(nèi)存、網(wǎng)絡(luò)。其次,如果MQ客戶端內(nèi)部集成一個KV存儲模塊,那么數(shù)據(jù)只有同步落盤才能較可靠,而同步落盤本身性能開銷較大,所以通常會采用異步落盤,又由于應(yīng)用關(guān)閉過程不受MQ運維人員控制,可能經(jīng)常會發(fā)生 kill -9 這樣暴力方式關(guān)閉,造成數(shù)據(jù)沒有及時落盤而丟失。第三,Producer所在機器的可靠性較低,一般為虛擬機,不適合存儲重要數(shù)據(jù)。綜上,建議重試過程交由應(yīng)用來控制。
1.3選擇oneway形式發(fā)送
通常消息的發(fā)送是這樣一個過程:
- 客戶端發(fā)送請求到服務(wù)器
- 服務(wù)器處理請求
- 服務(wù)器向客戶端返回應(yīng)答
所以,一次消息發(fā)送的耗時時間是上述三個步驟的總和,而某些場景要求耗時非常短,但是對可靠性要求并不高,例如日志收集類應(yīng)用,此類應(yīng)用可以采用oneway形式調(diào)用,oneway形式只發(fā)送請求不等待應(yīng)答,而發(fā)送請求在客戶端實現(xiàn)層面僅僅是一個操作系統(tǒng)系統(tǒng)調(diào)用的開銷,即將數(shù)據(jù)寫入客戶端的socket緩沖區(qū),此過程耗時通常在微秒級。
2 消費者
2.1 消費過程冪等
RocketMQ無法避免消息重復(fù)(Exactly-Once),所以如果業(yè)務(wù)對消費重復(fù)非常敏感,務(wù)必要在業(yè)務(wù)層面進行去重處理??梢越柚P(guān)系數(shù)據(jù)庫進行去重。首先需要確定消息的唯一鍵,可以是msgId,也可以是消息內(nèi)容中的唯一標(biāo)識字段,例如訂單Id等。在消費之前判斷唯一鍵是否在關(guān)系數(shù)據(jù)庫中存在。如果不存在則插入,并消費,否則跳過。(實際過程要考慮原子性問題,判斷是否存在可以嘗試插入,如果報主鍵沖突,則插入失敗,直接跳過)
msgId一定是全局唯一標(biāo)識符,但是實際使用中,可能會存在相同的消息有兩個不同msgId的情況(消費者主動重發(fā)、因客戶端重投機制導(dǎo)致的重復(fù)等),這種情況就需要使業(yè)務(wù)字段進行重復(fù)消費。
2.2 消費速度慢的處理方式
1 提高消費并行度
絕大部分消息消費行為都屬于 IO 密集型,即可能是操作數(shù)據(jù)庫,或者調(diào)用 RPC,這類消費行為的消費速度在于后端數(shù)據(jù)庫或者外系統(tǒng)的吞吐量,通過增加消費并行度,可以提高總的消費吞吐量,但是并行度增加到一定程度,反而會下降。所以,應(yīng)用必須要設(shè)置合理的并行度。 如下有幾種修改消費并行度的方法:
- 同一個 ConsumerGroup 下,通過增加 Consumer 實例數(shù)量來提高并行度(需要注意的是超過訂閱隊列數(shù)的 Consumer 實例無效)??梢酝ㄟ^加機器,或者在已有機器啟動多個進程的方式。
- 提高單個 Consumer 的消費并行線程,通過修改參數(shù) consumeThreadMin、consumeThreadMax實現(xiàn)。
2 批量方式消費
某些業(yè)務(wù)流程如果支持批量方式消費,則可以很大程度上提高消費吞吐量,例如訂單扣款類應(yīng)用,一次處理一個訂單耗時 1 s,一次處理 10 個訂單可能也只耗時 2 s,這樣即可大幅度提高消費的吞吐量,通過設(shè)置 consumer的 consumeMessageBatchMaxSize 返個參數(shù),默認(rèn)是 1,即一次只消費一條消息,例如設(shè)置為 N,那么每次消費的消息數(shù)小于等于 N。
3 跳過非重要消息
發(fā)生消息堆積時,如果消費速度一直追不上發(fā)送速度,如果業(yè)務(wù)對數(shù)據(jù)要求不高的話,可以選擇丟棄不重要的消息。例如,當(dāng)某個隊列的消息數(shù)堆積到100000條以上,則嘗試丟棄部分或全部消息,這樣就可以快速追上發(fā)送消息的速度。示例代碼如下:
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
long offset = msgs.get(0).getQueueOffset();
String maxOffset =
msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset) - offset;
if (diff > 100000) {
// TODO 消息堆積情況的特殊處理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// TODO 正常消費過程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
4 優(yōu)化每條消息消費過程
舉例如下,某條消息的消費過程如下:
- 根據(jù)消息從 DB 查詢【數(shù)據(jù) 1】
- 根據(jù)消息從 DB 查詢【數(shù)據(jù) 2】
- 復(fù)雜的業(yè)務(wù)計算
- 向 DB 插入【數(shù)據(jù) 3】
- 向 DB 插入【數(shù)據(jù) 4】
這條消息的消費過程中有4次與 DB的 交互,如果按照每次 5ms 計算,那么總共耗時 20ms,假設(shè)業(yè)務(wù)計算耗時 5ms,那么總過耗時 25ms,所以如果能把 4 次 DB 交互優(yōu)化為 2 次,那么總耗時就可以優(yōu)化到 15ms,即總體性能提高了 40%。所以應(yīng)用如果對時延敏感的話,可以把DB部署在SSD硬盤,相比于SCSI磁盤,前者的RT會小很多。
2.3 消費打印日志
如果消息量較少,建議在消費入口方法打印消息,消費耗時等,方便后續(xù)排查問題。
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
// TODO 正常消費過程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
如果能打印每條消息消費耗時,那么在排查消費慢等線上問題時,會更方便。
2.4 其他消費建議
1 關(guān)于消費者和訂閱
?第一件需要注意的事情是,不同的消費者組可以獨立的消費一些 topic,并且每個消費者組都有自己的消費偏移量,請確保同一組內(nèi)的每個消費者訂閱信息保持一致。
2 關(guān)于有序消息
消費者將鎖定每個消息隊列,以確保他們被逐個消費,雖然這將會導(dǎo)致性能下降,但是當(dāng)你關(guān)心消息順序的時候會很有用。我們不建議拋出異常,你可以返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 作為替代。
3 關(guān)于并發(fā)消費
顧名思義,消費者將并發(fā)消費這些消息,建議你使用它來獲得良好性能,我們不建議拋出異常,你可以返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 作為替代。
4 關(guān)于消費狀態(tài)Consume Status
對于并發(fā)的消費監(jiān)聽器,你可以返回 RECONSUME_LATER 來通知消費者現(xiàn)在不能消費這條消息,并且希望可以稍后重新消費它。然后,你可以繼續(xù)消費其他消息。對于有序的消息監(jiān)聽器,因為你關(guān)心它的順序,所以不能跳過消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告訴消費者等待片刻。
5 關(guān)于Blocking
不建議阻塞監(jiān)聽器,因為它會阻塞線程池,并最終可能會終止消費進程
6 關(guān)于線程數(shù)設(shè)置
消費者使用 ThreadPoolExecutor 在內(nèi)部對消息進行消費,所以你可以通過設(shè)置 setConsumeThreadMin 或 setConsumeThreadMax 來改變它。
7 關(guān)于消費位點
當(dāng)建立一個新的消費者組時,需要決定是否需要消費已經(jīng)存在于 Broker 中的歷史消息CONSUME_FROM_LAST_OFFSET 將會忽略歷史消息,并消費之后生成的任何消息。CONSUME_FROM_FIRST_OFFSET 將會消費每個存在于 Broker 中的信息。你也可以使用 CONSUME_FROM_TIMESTAMP 來消費在指定時間戳后產(chǎn)生的消息。
3 Broker
3.1 Broker 角色
? Broker 角色分為 ASYNC_MASTER(異步主機)、SYNC_MASTER(同步主機)以及SLAVE(從機)。如果對消息的可靠性要求比較嚴(yán)格,可以采用 SYNC_MASTER加SLAVE的部署方式。如果對消息可靠性要求不高,可以采用ASYNC_MASTER加SLAVE的部署方式。如果只是測試方便,則可以選擇僅ASYNC_MASTER或僅SYNC_MASTER的部署方式。
3.2 FlushDiskType
? SYNC_FLUSH(同步刷新)相比于ASYNC_FLUSH(異步處理)會損失很多性能,但是也更可靠,所以需要根據(jù)實際的業(yè)務(wù)場景做好權(quán)衡。
3.3 Broker 配置
| 參數(shù)名 | 默認(rèn)值 | 說明 |
|---|---|---|
| listenPort | 10911 | 接受客戶端連接的監(jiān)聽端口 |
| namesrvAddr | null | nameServer 地址 |
| brokerIP1 | 網(wǎng)卡的 InetAddress | 當(dāng)前 broker 監(jiān)聽的 IP |
| brokerIP2 | 跟 brokerIP1 一樣 | 存在主從 broker 時,如果在 broker 主節(jié)點上配置了 brokerIP2 屬性,broker 從節(jié)點會連接主節(jié)點配置的 brokerIP2 進行同步 |
| brokerName | null | broker 的名稱 |
| brokerClusterName | DefaultCluster | 本 broker 所屬的 Cluser 名稱 |
| brokerId | 0 | broker id, 0 表示 master, 其他的正整數(shù)表示 slave |
| storePathCommitLog | $HOME/store/commitlog/ | 存儲 commit log 的路徑 |
| storePathConsumerQueue | $HOME/store/consumequeue/ | 存儲 consume queue 的路徑 |
| mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |
| deleteWhen | 04 | 在每天的什么時間刪除已經(jīng)超過文件保留時間的 commit log |
| fileReservedTime | 72 | 以小時計算的文件保留時間 |
| brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |
| flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保證在收到確認(rèn)生產(chǎn)者之前將消息刷盤。ASYNC_FLUSH 模式下的 broker 則利用刷盤一組消息的模式,可以取得更好的性能。 |
4 NameServer
?RocketMQ 中,Name Servers 被設(shè)計用來做簡單的路由管理。其職責(zé)包括:
- Brokers 定期向每個名稱服務(wù)器注冊路由數(shù)據(jù)。
- 名稱服務(wù)器為客戶端,包括生產(chǎn)者,消費者和命令行客戶端提供最新的路由信息。 ?
?
5 客戶端配置
? 相對于RocketMQ的Broker集群,生產(chǎn)者和消費者都是客戶端。本小節(jié)主要描述生產(chǎn)者和消費者公共的行為配置。
5.1 客戶端尋址方式
RocketMQ可以令客戶端找到Name Server, 然后通過Name Server再找到Broker。如下所示有多種配置方式,優(yōu)先級由高到低,高優(yōu)先級會覆蓋低優(yōu)先級。
- 代碼中指定Name Server地址,多個namesrv地址之間用分號分割
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
- Java啟動參數(shù)中指定Name Server地址
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
- 環(huán)境變量指定Name Server地址
export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
- HTTP靜態(tài)服務(wù)器尋址(默認(rèn))
客戶端啟動后,會定時訪問一個靜態(tài)HTTP服務(wù)器,地址如下:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,這個URL的返回內(nèi)容如下:
192.168.0.1:9876;192.168.0.2:9876
客戶端默認(rèn)每隔2分鐘訪問一次這個HTTP服務(wù)器,并更新本地的Name Server地址。URL已經(jīng)在代碼中硬編碼,可通過修改/etc/hosts文件來改變要訪問的服務(wù)器,例如在/etc/hosts增加如下配置:
10.232.22.67 jmenv.erbadagang.net
推薦使用HTTP靜態(tài)服務(wù)器尋址方式,好處是客戶端部署簡單,且Name Server集群可以熱升級。
5.2 客戶端配置
DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都繼承于ClientConfig類,ClientConfig為客戶端的公共配置類??蛻舳说呐渲枚际莋et、set形式,每個參數(shù)都可以用spring來配置,也可以在代碼中配置,例如namesrvAddr這個參數(shù)可以這樣配置,producer.setNamesrvAddr("192.168.0.1:9876"),其他參數(shù)同理。
1 客戶端的公共配置
| 參數(shù)名 | 默認(rèn)值 | 說明 |
|---|---|---|
| namesrvAddr | Name Server地址列表,多個NameServer地址用分號隔開 | |
| clientIP | 本機IP | 客戶端本機IP地址,某些機器會發(fā)生無法識別客戶端IP地址情況,需要應(yīng)用在代碼中強制指定 |
| instanceName | DEFAULT | 客戶端實例名稱,客戶端創(chuàng)建的多個Producer、Consumer實際是共用一個內(nèi)部實例(這個實例包含網(wǎng)絡(luò)連接、線程資源等) |
| clientCallbackExecutorThreads | 4 | 通信層異步回調(diào)線程數(shù) |
| pollNameServerInteval | 30000 | 輪詢Name Server間隔時間,單位毫秒 |
| heartbeatBrokerInterval | 30000 | 向Broker發(fā)送心跳間隔時間,單位毫秒 |
| persistConsumerOffsetInterval | 5000 | 持久化Consumer消費進度間隔時間,單位毫秒 |
2 Producer配置
| 參數(shù)名 | 默認(rèn)值 | 說明 |
|---|---|---|
| producerGroup | DEFAULT_PRODUCER | Producer組名,多個Producer如果屬于一個應(yīng)用,發(fā)送同樣的消息,則應(yīng)該將它們歸為同一組 |
| createTopicKey | TBW102 | 在發(fā)送消息時,自動創(chuàng)建服務(wù)器不存在的topic,需要指定Key,該Key可用于配置發(fā)送消息所在topic的默認(rèn)路由。 |
| defaultTopicQueueNums | 4 | 在發(fā)送消息,自動創(chuàng)建服務(wù)器不存在的topic時,默認(rèn)創(chuàng)建的隊列數(shù) |
| sendMsgTimeout | 10000 | 發(fā)送消息超時時間,單位毫秒 |
| compressMsgBodyOverHowmuch | 4096 | 消息Body超過多大開始壓縮(Consumer收到消息會自動解壓縮),單位字節(jié) |
| retryAnotherBrokerWhenNotStoreOK | FALSE | 如果發(fā)送消息返回sendResult,但是sendStatus!=SEND_OK,是否重試發(fā)送 |
| retryTimesWhenSendFailed | 2 | 如果消息發(fā)送失敗,最大重試次數(shù),該參數(shù)只對同步發(fā)送模式起作用 |
| maxMessageSize | 4MB | 客戶端限制的消息大小,超過報錯,同時服務(wù)端也會限制,所以需要跟服務(wù)端配合使用。 |
| transactionCheckListener | 事務(wù)消息回查監(jiān)聽器,如果發(fā)送事務(wù)消息,必須設(shè)置 | |
| checkThreadPoolMinSize | 1 | Broker回查Producer事務(wù)狀態(tài)時,線程池最小線程數(shù) |
| checkThreadPoolMaxSize | 1 | Broker回查Producer事務(wù)狀態(tài)時,線程池最大線程數(shù) |
| checkRequestHoldMax | 2000 | Broker回查Producer事務(wù)狀態(tài)時,Producer本地緩沖請求隊列大小 |
| RPCHook | null | 該參數(shù)是在Producer創(chuàng)建時傳入的,包含消息發(fā)送前的預(yù)處理和消息響應(yīng)后的處理兩個接口,用戶可以在第一個接口中做一些安全控制或者其他操作。 |
3 PushConsumer配置
| 參數(shù)名 | 默認(rèn)值 | 說明 |
|---|---|---|
| consumerGroup | DEFAULT_CONSUMER | Consumer組名,多個Consumer如果屬于一個應(yīng)用,訂閱同樣的消息,且消費邏輯一致,則應(yīng)該將它們歸為同一組 |
| messageModel | CLUSTERING | 消費模型支持集群消費和廣播消費兩種 |
| consumeFromWhere | CONSUME_FROM_LAST_OFFSET | Consumer啟動后,默認(rèn)從上次消費的位置開始消費,這包含兩種情況:一種是上次消費的位置未過期,則消費從上次中止的位置進行;一種是上次消費位置已經(jīng)過期,則從當(dāng)前隊列第一條消息開始消費 |
| consumeTimestamp | 半個小時前 | 只有當(dāng)consumeFromWhere值為CONSUME_FROM_TIMESTAMP時才起作用。 |
| allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法實現(xiàn)策略 |
| subscription | 訂閱關(guān)系 | |
| messageListener | 消息監(jiān)聽器 | |
| offsetStore | 消費進度存儲 | |
| consumeThreadMin | 10 | 消費線程池最小線程數(shù) |
| consumeThreadMax | 20 | 消費線程池最大線程數(shù) |
| consumeConcurrentlyMaxSpan | 2000 | 單隊列并行消費允許的最大跨度 |
| pullThresholdForQueue | 1000 | 拉消息本地隊列緩存消息最大數(shù) |
| pullInterval | 0 | 拉消息間隔,由于是長輪詢,所以為0,但是如果應(yīng)用為了流控,也可以設(shè)置大于0的值,單位毫秒 |
| consumeMessageBatchMaxSize | 1 | 批量消費,一次消費多少條消息 |
| pullBatchSize | 32 | 批量拉消息,一次最多拉多少條 |
4 PullConsumer配置
| 參數(shù)名 | 默認(rèn)值 | 說明 |
|---|---|---|
| consumerGroup | DEFAULT_CONSUMER | Consumer組名,多個Consumer如果屬于一個應(yīng)用,訂閱同樣的消息,且消費邏輯一致,則應(yīng)該將它們歸為同一組 |
| brokerSuspendMaxTimeMillis | 20000 | 長輪詢,Consumer拉消息請求在Broker掛起最長時間,單位毫秒 |
| consumerTimeoutMillisWhenSuspend | 30000 | 長輪詢,Consumer拉消息請求在Broker掛起超過指定時間,客戶端認(rèn)為超時,單位毫秒 |
| consumerPullTimeoutMillis | 10000 | 非長輪詢,拉消息超時時間,單位毫秒 |
| messageModel | BROADCASTING | 消息支持兩種模式:集群消費和廣播消費 |
| messageQueueListener | 監(jiān)聽隊列變化 | |
| offsetStore | 消費進度存儲 | |
| registerTopics | 注冊的topic集合 | |
| allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法實現(xiàn)策略 |
5 Message數(shù)據(jù)結(jié)構(gòu)
| 字段名 | 默認(rèn)值 | 說明 |
|---|---|---|
| Topic | null | 必填,消息所屬topic的名稱 |
| Body | null | 必填,消息體 |
| Tags | null | 選填,消息標(biāo)簽,方便服務(wù)器過濾使用。目前只支持每個消息設(shè)置一個tag |
| Keys | null | 選填,代表這條消息的業(yè)務(wù)關(guān)鍵詞,服務(wù)器會根據(jù)keys創(chuàng)建哈希索引,設(shè)置后,可以在Console系統(tǒng)根據(jù)Topic、Keys來查詢消息,由于是哈希索引,請盡可能保證key唯一,例如訂單號,商品Id等。 |
| Flag | 0 | 選填,完全由應(yīng)用來設(shè)置,RocketMQ不做干預(yù) |
| DelayTimeLevel | 0 | 選填,消息延時級別,0表示不延時,大于0會延時特定的時間才會被消費 |
| WaitStoreMsgOK | TRUE | 選填,表示消息是否在服務(wù)器落盤后才返回應(yīng)答。 |
6 系統(tǒng)配置
本小節(jié)主要介紹系統(tǒng)(JVM/OS)相關(guān)的配置。
6.1 JVM選項
? 推薦使用最新發(fā)布的JDK 1.8版本。通過設(shè)置相同的Xms和Xmx值來防止JVM調(diào)整堆大小以獲得更好的性能。簡單的JVM配置如下所示: ?
?? ?-server -Xms8g -Xmx8g -Xmn4g ? ? ?
如果您不關(guān)心RocketMQ Broker的啟動時間,還有一種更好的選擇,就是通過“預(yù)觸摸”Java堆以確保在JVM初始化期間每個頁面都將被分配。那些不關(guān)心啟動時間的人可以啟用它: ? -XX:+AlwaysPreTouch
禁用偏置鎖定可能會減少JVM暫停, ? -XX:-UseBiasedLocking
至于垃圾回收,建議使用帶JDK 1.8的G1收集器。
-XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=30
? 這些GC選項看起來有點激進,但事實證明它在我們的生產(chǎn)環(huán)境中具有良好的性能。另外不要把-XX:MaxGCPauseMillis的值設(shè)置太小,否則JVM將使用一個小的年輕代來實現(xiàn)這個目標(biāo),這將導(dǎo)致非常頻繁的minor GC,所以建議使用rolling GC日志文件:
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5
-XX:GCLogFileSize=30m</pre>
如果寫入GC文件會增加代理的延遲,可以考慮將GC日志文件重定向到內(nèi)存文件系統(tǒng):
-Xloggc:/dev/shm/mq_gc_%p.log123
6.2 Linux內(nèi)核參數(shù)
? os.sh腳本在bin文件夾中列出了許多內(nèi)核參數(shù),可以進行微小的更改然后用于生產(chǎn)用途。下面的參數(shù)需要注意,更多細(xì)節(jié)請參考/proc/sys/vm/*的文檔
- vm.extra_free_kbytes,告訴VM在后臺回收(kswapd)啟動的閾值與直接回收(通過分配進程)的閾值之間保留額外的可用內(nèi)存。RocketMQ使用此參數(shù)來避免內(nèi)存分配中的長延遲。(與具體內(nèi)核版本相關(guān))
- vm.min_free_kbytes,如果將其設(shè)置為低于1024KB,將會巧妙的將系統(tǒng)破壞,并且系統(tǒng)在高負(fù)載下容易出現(xiàn)死鎖。
- vm.max_map_count,限制一個進程可能具有的最大內(nèi)存映射區(qū)域數(shù)。RocketMQ將使用mmap加載CommitLog和ConsumeQueue,因此建議將為此參數(shù)設(shè)置較大的值。(agressiveness --> aggressiveness)
- vm.swappiness,定義內(nèi)核交換內(nèi)存頁面的積極程度。較高的值會增加攻擊性,較低的值會減少交換量。建議將值設(shè)置為10來避免交換延遲。
- File descriptor limits,RocketMQ需要為文件(CommitLog和ConsumeQueue)和網(wǎng)絡(luò)連接打開文件描述符。我們建議設(shè)置文件描述符的值為655350。
- Disk scheduler,RocketMQ建議使用I/O截止時間調(diào)度器,它試圖為請求提供有保證的延遲。