2018-04-21 消息的延遲

每個(gè)人的想法不同

, RocketMQ 介紹的時(shí)候就說 是阿里從他們使用的上 解耦出來 近一步簡化 便捷的 目的當(dāng)然是 讓其能快速入手和開發(fā) 如果不是在項(xiàng)目設(shè)計(jì)層面上 只是使用的話 從Git上下載該項(xiàng)目的源碼 其中有一個(gè)包是專門的測試 實(shí)例的 只需要照貓畫虎?

使用就可以了?

?1 不能有中文路徑!

不能有中文路徑!

?不能有中文路徑!

?關(guān)系 兩個(gè)接口 interface MQProducer?

?//生產(chǎn)者接口?

?{ 實(shí)現(xiàn)該接口的只有一個(gè) 默認(rèn)的 DefaultMQProducer DefaultMQProducer?

實(shí)現(xiàn) MQProducer 接口的時(shí)候?

還繼承了 ClientConfig類 (客戶端配置類) 可以配置如

?sendMsgTimeout 超時(shí)時(shí)間

?producerGroup 消息最大多少?

超過多少壓縮等等?

?關(guān)鍵方法 :

?send(Message) 發(fā)送一個(gè)消息到MQ 這個(gè)方法其實(shí)是調(diào)用 DefaultMQProducer構(gòu)造方法

?創(chuàng)建的 defaultMQProducerImpl 類對象的 send(..)方法

?defaultMQProducerImpl 類 才是真正發(fā)送消息的核心類?

?defaultMQProducerImpl.send 方法

?--》 sendDefaultImpl

方法 sendDefaultImpl --》

?tryToFindTopicPublishInfo 來檢測映射 隊(duì)列是否存在

?是否正常 {

?final Segment[] segments; 這個(gè) 鍵值 不存在 不正常 :

?創(chuàng)建一個(gè) TopicPublishInfo 到 segments 映射文件 同時(shí) 將 Topic (隊(duì)列) 信息?

更新到NameServer中 }?

?sendDefaultImpl --》

?通過設(shè)置是失敗重復(fù)次數(shù) 和 超時(shí)時(shí)間 來從新發(fā)送消息?

?詳細(xì) for (; times < 失敗重復(fù)次數(shù) && (結(jié)束時(shí)間 - 開始時(shí)間) < 配置的超時(shí)時(shí)間; times++)?

?sendDefaultImpl --》

sendKernelImpl 裝載 配置 信息 --》

sendKernelImpl --》

this.mQClientFactory.getMQClientAPIImpl().sendMessage()?

?MQClientInstance mQClientFactory 對象

?是在 DefaultMQProducer start啟動(dòng)方式時(shí)候初始化的 MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer,rpcHook);?

?--》 --》sendMessage { MQClientInstance --》

?MQClientAPIImpl mQClientAPIImpl MQClientAPIImpl.sendMessage() --> sendMessageSync switch (communicationMode) 同步 異步 單向 處理 默認(rèn)是 同步 } 后續(xù)返回 SendResult sendResult 改類型描述當(dāng)時(shí) 發(fā)送MQ 的最終狀態(tài) Message 消息的 Topic 不能為空 producer.shutdown(); 關(guān)閉 shutdown來清理資源,關(guān)閉網(wǎng)絡(luò)連接,從MetaQ服務(wù)器上注銷自己 } 發(fā)送消息負(fù)載的問題 { 看源碼 是通過循環(huán)從 namesrv 獲取的到 Topic 路由消息 (也就是有幾個(gè)broker 每個(gè) broker 有幾個(gè)隊(duì)列) 然后 記錄當(dāng)前發(fā)送過的 +1 備注 : 隊(duì)列數(shù)量 小于 消費(fèi)者數(shù)量 多余的消費(fèi)者將不起做用 } 關(guān)于順序消息發(fā)送 的問題 { 環(huán)境: 1 下單 2 付款 3 收貨 3個(gè)狀態(tài) , 普通模式下 發(fā)送到隊(duì)列中的 是輪詢隊(duì)列 將3個(gè)消息分別發(fā)送到多個(gè)隊(duì)列中。 很可能會照成出現(xiàn) 先消費(fèi) 2 在消費(fèi) 1 流程錯(cuò)亂的情況 當(dāng)然可以業(yè)務(wù)層處理 但是業(yè)務(wù)層處理比較麻煩 順序消費(fèi)的發(fā)送的原理 : 我們自己指定 消息將要添加的隊(duì)列 SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(Listmqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); // 通過取于來 講 同一個(gè)訂單號 訪入同一個(gè)隊(duì)列中 // 前提是 隊(duì)列數(shù)量沒有變動(dòng) return mqs.get(index); } }, “10001”); // orderID “10001”是傳遞給回調(diào)方法的 自定義數(shù)據(jù) Listmqs 就是從namesrv 獲取的所有隊(duì)列 } 備注 // 訂單 Id String orderId = "20034568923546"; message.setKeys(orderId); // Keys 每個(gè)消息在業(yè)務(wù)局面的唯一標(biāo)識碼 通過 topic,key 來查詢返條消息內(nèi)容,以及消息被誰消費(fèi) 查詢的時(shí)候 非常重要 消費(fèi)者 interface MQConsumer { // 回溯消費(fèi) { mqadmin resetOffsetByTime 命令 改方式 是通過消費(fèi)的日志來恢復(fù)的 但是只能通過 消費(fèi)的組來恢復(fù) 恢復(fù)消息后 也只能用改組來從新消費(fèi) -s : 時(shí)間戳的問題 可以是 毫秒 或者是從什么時(shí)候開始 } //拉取模式 interface MQPullConsumer: { } // 接收模式 長輪詢模式 一次獲取一批 消息 記錄 批量和單條 內(nèi)部實(shí)現(xiàn) 還是獲取了所有的 可以獲取到的隊(duì)列消息 放入集合中 判斷集合大小是否 大于設(shè)置的單次消費(fèi)數(shù)量 小于 直接將其 消息集合 放入執(zhí)行回調(diào)方法中 大于 使用的是For 循環(huán) 來單條處理 interface MQPushConsumer: { class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer DefaultMQPushConsumer 包含很多可以配置的信息 詳情見文檔 其中最主要的 有幾個(gè) messageModel 消息模型 支持以下兩種 1、集群消費(fèi) 2、廣播消費(fèi) messageListener 消息監(jiān)聽器 consumeThreadMin 消費(fèi)線程池?cái)?shù)量 默認(rèn)10 consumeThreadMax 消費(fèi)線程池?cái)?shù)量 默認(rèn)20 重要的是 消費(fèi)線程池 ! 這就說明 我發(fā)布一個(gè) 消費(fèi)應(yīng)用 消費(fèi)邏輯就可以 N 個(gè) 處理! 不用自己搞了有沒有?。? 安默認(rèn)的來算 20個(gè)消費(fèi)邏輯 可以配置 而且還 可以橫向 增加 消費(fèi)應(yīng)用 只要保持是一個(gè)組就可以了 難怪會在文檔中 特意話一個(gè) 性能圖?。?! 應(yīng)用通常吐 Consumer 對象注冊一個(gè) Listener 接口,一旦收到消息,Consumer 對象立刻回調(diào) Listener 接口方法 MessageListenerOrderly 這個(gè)是有序的 MessageListenerConcurrently 這個(gè)是無序的 關(guān)鍵方法 DefaultMQPushConsumer registerMessageListener(new implements MessageListenerConcurrently) { public void registerMessageListener(MessageListenerConcurrently messageListener) { this.messageListener = messageListener; // 給自己復(fù)制一個(gè) 消費(fèi)邏輯類對象 方法后續(xù)查詢 替換修改等 關(guān)鍵方法 this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); // 將消費(fèi)邏輯類告訴 調(diào)用者類 } } 關(guān)鍵方法 start DefaultMQPushConsumer.start() --> DefaultMQPushConsumerImpl.start() { this.serviceState 來記錄設(shè)置當(dāng)前程序運(yùn)行狀態(tài) 來做多態(tài) checkConfig() 檢查配置 初始化賦值 copySubscription() 拷貝訂閱者信息 賦值 消費(fèi)邏輯類 // 有就獲取 沒有就創(chuàng)建一個(gè) this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,this.rpcHook); 接著初始化一系列信息 // 加載消費(fèi)進(jìn)度 this.offsetStore.load(); // 該方法有兩個(gè)實(shí)現(xiàn) 一個(gè)是本地 this.readLocalOffset() 獲取數(shù)據(jù) { //獲取文件字符串 String content = MixAll.file2String(this.storePath); OffsetSerializeWrapper offsetSerializeWrapper =OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class); 可以看出 淘寶使用的是JSON } if (this.getMessageListenerInner() instanceof MessageListenerOrderly) else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) 判斷 消費(fèi)邏輯類 實(shí)現(xiàn)那個(gè)接口 創(chuàng)建對應(yīng)的 ConsumeMessageOrderlyService 對象 ConsumeMessageConcurrentlyService 該實(shí)現(xiàn)為空 本地 ConsumeMessageOrderlyService.start() { 創(chuàng)建并執(zhí)行一個(gè)周期性的動(dòng)作成為了第一個(gè)在給定的初始延遲之后,隨后用給定的時(shí)期,這是執(zhí)行后將開始initialDelay然后initialDelay +,然后initialDelay + 2 *時(shí)期,等等。如果任何執(zhí)行任務(wù)遇到異常,后續(xù)執(zhí)行的鎮(zhèn)壓。否則,只會終止的任務(wù)通過取消或終止執(zhí)行器。如果執(zhí)行這個(gè)任務(wù)花費(fèi)的時(shí)間比其期,然后后續(xù)執(zhí)行可能會遲到,但不會同時(shí)執(zhí)行。 //就是一個(gè)定時(shí)器 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } }, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS); scheduleAtFixedRate 應(yīng)該是一個(gè)線程池管理 不用去關(guān)心 scheduleAtFixedRate 方法 看 ConsumeMessageOrderlyService.this.lockMQPeriodically() { this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll() 是 RebalanceImpl.lockAll() // 將讀取到的消息上鎖 } } // 最關(guān)鍵的服務(wù)啟動(dòng)了 // 正在的啟動(dòng)了 mQClientFactory.start(); { synchronized (this){ //Start request-response channel 啟動(dòng)請求-響應(yīng)通道 this.mQClientAPIImpl.start(); //Start various schedule tasks 開始各種安排任務(wù) 啟動(dòng)定時(shí)任務(wù) 其中就包含 獲取到MQ消息消費(fèi)的 回調(diào)方法 this.startScheduledTask(); //Start pull service 開始拉取服務(wù) this.pullMessageService.start(); //Start rebalance service 啟動(dòng)負(fù)載均衡 // 該服務(wù)非常重要 this.rebalanceService.start(); //Start push service 開始推動(dòng)服務(wù) this.defaultMQProducer.getDefaultMQProducerImpl().start(false); } } } } 指定 group 訂閱 topic 注冊消息監(jiān)聽處理器,當(dāng)消息到來時(shí)消費(fèi)消息 消費(fèi)端 Start 復(fù)制訂閱關(guān)系 初始化 rebalance 變量 構(gòu)建 offsetStore 消費(fèi)進(jìn)度存儲對象 啟動(dòng)消費(fèi)消息服務(wù) 向 mqClientFactory 注冊本消費(fèi)者 啟動(dòng) client 端遠(yuǎn)程通信 * 加載消費(fèi)進(jìn)度 Loand() * 啟動(dòng)定時(shí)任務(wù) 定時(shí)獲取 nameserver 地址 定時(shí)從 nameserver 獲取 topic 路由信息 定時(shí)清理下線的 borker 定時(shí)向所有 broker 發(fā)送心跳信息, (包括訂閱關(guān)系) * 定時(shí)持久化 Consumer 消費(fèi)進(jìn)度(廣播存儲到本地,集群存儲到 Broker) PS: 這里也是是個(gè)關(guān)鍵 持久化消費(fèi)進(jìn)度 是用來記錄當(dāng)前 組的消費(fèi)情況 可以做到 回溯消費(fèi) 和宕機(jī)等情況下 啟動(dòng)后接著上次執(zhí)行消費(fèi) 統(tǒng)計(jì)信息打點(diǎn) 動(dòng)態(tài)調(diào)整消費(fèi)線程池 啟動(dòng)拉消息服務(wù) PullMessageService 啟動(dòng)消費(fèi)端負(fù)載均衡服務(wù) RebalanceService 從 namesrv 更新 topic 路由信息 向所有 broker 發(fā)送心跳信息, (包括訂閱關(guān)系) 喚醒 Rebalance 服務(wù)線程 } // 有些懶得看了 直接看別人 得了 消費(fèi)端負(fù)載均衡 { 這個(gè)也是個(gè)重點(diǎn) 消費(fèi)端會通過 RebalanceService 線程,10 秒鐘做一次基于 topic 下的所有隊(duì)列負(fù)載 消費(fèi)端 遍歷自己所有的 Topic 獲取 Topic 下所有的 隊(duì)列 (一個(gè)Topic 包含對個(gè)隊(duì)列 默認(rèn)是 4 個(gè) 有別于其他MQ ) 從 broker 獲取當(dāng)前 組(group)的所有消費(fèi)端( 有心跳的) 獲取隊(duì)列集合SetmqSet 現(xiàn)在隊(duì)列分配策略實(shí)例 AllocateMessageQueueStrategy 執(zhí)行分配算法 { 1:平均分配算法 : 其實(shí)是類似于分頁的算法 將所有 queue 排好序類似于記錄 將所有消費(fèi)端 consumer 排好序,相當(dāng)于頁數(shù) 然后獲取當(dāng)前 consumer 所在頁面應(yīng)該分配到的 queue 2:按照配置來分配隊(duì)列 : 消費(fèi)服務(wù)啟動(dòng)的時(shí)候 就指定好了要消費(fèi)的 是哪個(gè)隊(duì)列 3:按照機(jī)房來配置隊(duì)列 : Consumer 啟動(dòng)的時(shí)候會指定在哪些機(jī)房的消息 (應(yīng)該是指定 broker) 獲取指定機(jī)房的 queue 然后在執(zhí)行如 1)平均算法 } 根據(jù)分配隊(duì)列的結(jié)果更新 ProccessQueueTable{ 比對 mqSet 將多余的隊(duì)列刪除, 當(dāng) broker 當(dāng)機(jī)或者添加,會導(dǎo)致分配到 mqSet 變化, 添加新增隊(duì)列, 比對 mqSet,給新增的 messagequeue 構(gòu)建長輪詢對象 PullRequest 對象,會從 broker 獲取消費(fèi)的進(jìn)度 構(gòu)建這個(gè)隊(duì)列的 ProcessQueue 將 PullRequest 對象派發(fā)到長輪詢拉消息服務(wù)(單線程異步拉?。? 注:ProcessQueue 正在被消費(fèi)的隊(duì)列, (1) 長輪詢拉取到消息都會先存儲到 ProcessQueue 的 TreeMap集合中,消費(fèi)調(diào)后會刪除掉,用來控制 consumer 消息堆積, TreeMapkey 是消息在此 ConsumeQueue 隊(duì)列中索引 (2) 對于順序消息消費(fèi) 處理 locked 屬性:當(dāng) consumer 端向 broker 申請鎖隊(duì)列成功后設(shè)置 true, 只有被鎖定 的 processqueue 才能被執(zhí)行消費(fèi) rollback: 將消費(fèi)在 msgTreeMapTemp 中的消息,放回 msgTreeMap 重新消費(fèi) commit: 將臨時(shí)表 msgTreeMapTemp 數(shù)據(jù)清空,代表消費(fèi)完成,放回最大偏移 值 (3) 這里是個(gè) TreeMap,對 key 即消息的 offset 進(jìn)行排序,這個(gè)樣可以使得消息進(jìn) 行順序消費(fèi) } } 長輪詢 { Rocketmq 的消息是由 consumer 端主動(dòng)到 broker拉取的, consumer 向 broker 發(fā)送拉消息 請求, PullMessageService 服務(wù)通過一個(gè)線程將阻塞隊(duì)列 LinkedBlockingQueue中的 PullRequest 到 broker 拉取消息 DefaultMQPushConsumerImpl 的 pullMessage(pullRequest)方法執(zhí)行向 broker 拉消息動(dòng)作 1. 獲取 ProcessQueue 判讀是否 drop 的, drop 為 true 返回 2. 給 ProcessQueue 設(shè)置拉消息時(shí)間戳 3. 流量控制,正在消費(fèi)隊(duì)列中消息(未被消費(fèi)的)超過閥值,稍后在執(zhí)行拉消息 4. 流量控制,正在消費(fèi)隊(duì)列中消息的跨度超過閥值(默認(rèn) 2000) ,稍后在消費(fèi) 5. 根據(jù) topic 獲取訂閱關(guān)系 6. 構(gòu)建拉消息回調(diào)對象 PullBack, 從 broker 拉取消息(異步拉?。┓祷亟Y(jié)果是回調(diào) 7. 從內(nèi)存中獲取 commitOffsetValue //TODO 這個(gè)值跟 pullRequest.getNextOffset 區(qū)別 8. 構(gòu)建 sysFlag pull 接口用到的 flag 9. 調(diào)底層通信層向 broker 發(fā)送拉消息請求 如果 master 壓力過大,會建議去 slave 拉取消息 如果是到 broker 拉取消息清楚實(shí)時(shí)提交標(biāo)記位,因?yàn)?slave 不允許實(shí)時(shí)提交消費(fèi)進(jìn) 度,可以定時(shí)提交 //TODO 關(guān)于 master 拉消息實(shí)時(shí)提交指的是什么? 10. 拉到消息后回調(diào) PullCallback 處理 broker 返回結(jié)果 pullResult 更新從哪個(gè) broker(master 還是 slave)拉取消息 反序列化消息 消息過濾 消息中放入隊(duì)列最大最小 offset, 方便應(yīng)用來感知消息堆積度 將消息加入正在處理隊(duì)列 ProcessQueue 將消息提交到消費(fèi)消息服務(wù) ConsumeMessageService 流控處理, 如果 pullInterval 參數(shù)大于 0 (拉消息間隔,如果為了降低拉取速度, 可以設(shè)置大于 0 的值) , 延遲再執(zhí)行拉消息, 如果 pullInterval 為 0 立刻在執(zhí)行拉 消息動(dòng)作 看圖 人家花的不錯(cuò) 很明了 } push 消息 { PS: 長輪詢向broker拉取消息是批量拉取的, 默認(rèn)設(shè)置批量的值為pullBatchSize = 32, 可配置 消費(fèi)端 consumer 構(gòu)建一個(gè)消費(fèi)消息任務(wù) ConsumeRequest 消費(fèi)一批消息的個(gè)數(shù)是 可配置的 consumeMessageBatchMaxSize = 1, 默認(rèn)批量個(gè)數(shù)為一個(gè) 也就是說 每次傳遞給回調(diào)方法的 參數(shù) 消息集合 的解釋 ConsumeRequest 任務(wù) run 方法執(zhí)行 判斷 proccessQueue 是否被 droped 的, 廢棄直接返回,不在消費(fèi)消息 構(gòu)建并行消費(fèi)上下文 給消息設(shè)置消費(fèi)失敗時(shí)候的 retry topic,當(dāng)消息發(fā)送失敗的時(shí)候發(fā)送到 topic 為%RETRY%groupname 的隊(duì)列中 調(diào) MessageListenerConcurrently 監(jiān)聽器的 consumeMessage 方法消費(fèi)消息,返回消 費(fèi)結(jié)果 如果 ProcessQueue 的 droped 為 true,不處理結(jié)果,不更新 offset, 但其實(shí)這里消 費(fèi)端是消費(fèi)了消息的,這種情況感覺有被重復(fù)消費(fèi)的風(fēng)險(xiǎn) 處理消費(fèi)結(jié)果 : 消費(fèi)成功, 對于批次消費(fèi)消息, 返回消費(fèi)成功并不代表所有消息都消費(fèi)成功, 但是消費(fèi)消息的時(shí)候一旦遇到消費(fèi)消息失敗直接放回,根據(jù) ackIndex 來標(biāo)記 成功消費(fèi)到哪里了 消費(fèi)失敗, ackIndex 設(shè)置為-1 廣播模式發(fā)送失敗的消息丟棄, 廣播模式對于失敗重試代價(jià)過高,對整個(gè)集 群性能會有較大影響,失敗重試功能交由應(yīng)用處理 集群模式, 將消費(fèi)失敗的消息一條條的發(fā)送到 broker 的重試隊(duì)列中去,如果 此時(shí)還有發(fā)送到重試隊(duì)列發(fā)送失敗的消息, 那就在 cosumer 的本地線程定時(shí) 5 秒鐘以后重試重新消費(fèi)消息, 在走一次上面的消費(fèi)流程。 刪除正在消費(fèi)的隊(duì)列 processQueue 中本次消費(fèi)的消息,放回消費(fèi)進(jìn)度 更新消費(fèi)進(jìn)度, 這里的更新只是一個(gè)內(nèi)存 offsetTable 的更新,后面有定時(shí)任務(wù)定 時(shí)更新到 broker 上去 PS: 關(guān)于消費(fèi)成功 和 失敗的 問題 在集群模式下 回調(diào)方法設(shè)置為消費(fèi)失敗 會將當(dāng)前消費(fèi)的失敗消息 發(fā)送到 broker 的容錯(cuò)度列中 等待N次+ 從新消費(fèi) 。 push 消費(fèi)-順序消費(fèi)消息 順序消費(fèi)服務(wù) ConsumeMessageConcurrentlyService 構(gòu)建的時(shí)候 構(gòu)建一個(gè)線程池來接收消費(fèi)請求 ConsumeRequest 構(gòu)建一個(gè)單線程的本地線程, 用來稍后定時(shí)重新消費(fèi) ConsumeRequest, 用來執(zhí)行 定時(shí)周期性(一秒)鐘鎖隊(duì)列任務(wù) 周期性鎖隊(duì)列 lockMQPeriodically 獲取正在消費(fèi)隊(duì)列列表 ProcessQueueTable 所有 MesssageQueue, 構(gòu)建根據(jù) broker 歸類成 MessageQueue 集合 Map> 遍歷 Map>的 brokername, 獲取 broker 的 master 機(jī)器地址, 將brokerName的Set發(fā)送到broker請求鎖定這些隊(duì)列。 在broker 端鎖定隊(duì)列,其實(shí)就是在 broker 的 queue 中標(biāo)記一下消費(fèi)端,表示這個(gè) queue 被某個(gè) client 鎖定。 Broker 會返回成功鎖定隊(duì)列的集合, 根據(jù)成功鎖定的 MessageQueue,設(shè)置對應(yīng)的正 在處理隊(duì)列 ProccessQueue 的 locked 屬性為 true 沒有鎖定設(shè)置為 false 通過長輪詢拉取到消息后會提交到消息服務(wù) ConsumeMessageOrderlyService, ConsumeMessageOrderlyService 的 submitConsumeRequest 方法構(gòu)建 ConsumeRequest 任務(wù)提 交到線程池。ConsumeRequest 是由 ProcessQueue 和 Messagequeue 組成。 ConsumeRequest 任務(wù)的 run 方法 判斷 proccessQueue 是否被 droped 的, 廢棄直接返回,不在消費(fèi)消息 每個(gè) messagequeue 都會生成一個(gè)隊(duì)列鎖來保證在當(dāng)前 consumer 內(nèi),同一個(gè)隊(duì)列串行 消費(fèi), 判斷 processQueue 的 lock 屬性是否為 true, lock 屬性是否過期, 如果為 false 或者過期, 放到本地線程稍后鎖定在消費(fèi)。 如果 lock 為 true 且沒有過期,開始消費(fèi)消息 計(jì)算任務(wù)執(zhí)行的時(shí)間如果大于一分鐘且線程數(shù)小于隊(duì)列數(shù)情況下,將 processqueue, messagequeue 重新構(gòu)建 ConsumeRequest 加到線程池 10ms 后在消費(fèi),這樣防止個(gè)別隊(duì)列被 餓死 獲取客戶端的消費(fèi)批次個(gè)數(shù),默認(rèn)一批次為一條 從 proccessqueue 獲取批次消息, processqueue.takeMessags(batchSize) , 從 msgTreeMap 中移除消息放到臨時(shí) map 中 msgTreeMapTemp, 這個(gè)臨時(shí) map 用來回滾消息和 commit 消 息來實(shí)現(xiàn)事物消費(fèi) 調(diào)回調(diào)接口消費(fèi)消息,返回狀態(tài)對象 ConsumeOrderlyStatus 根據(jù)消費(fèi)狀態(tài),處理結(jié)果 1) 非事物方式,自動(dòng)提交 消息消息狀態(tài)為 success: 調(diào)用 processQueue.commit 方法 獲取 msgTreeMapTemp 的最后一個(gè) key,表示提交的 offset 清空 msgTreeMapTemp 的消息,已經(jīng)成功消費(fèi) 2) 事物提交,由用戶來控制提交回滾(精衛(wèi)專用) 更新消費(fèi)進(jìn)度, 這里的更新只是一個(gè)內(nèi)存 offsetTable 的更新, 后面有定時(shí)任務(wù)定時(shí)更 新到 broker 上去 } 關(guān)閉 { shutdown DefaultMQPushConsumerImpl 關(guān)閉消費(fèi)端 關(guān)閉消費(fèi)線程 將分配到的 Set的消費(fèi)進(jìn)度保存到 broker 利 用 DefaultMQPushConsumerImpl 獲 取 ProcessQueueTable的 keyset 的 messagequeue 去獲取 RemoteBrokerOffsetStore.offsetTableMap 中的消費(fèi)進(jìn)

? ? ? ? ? ? ? ? 度,

? ? ? ? ? ? ? ? offsetTable 中 的 messagequeue 的 值, 在 update 的時(shí)候如果 沒有對應(yīng) 的

? ? ? ? ? ? ? ? Messagequeue 會構(gòu)建, 但是也會 rebalance 的時(shí)候?qū)]有分配到的 messagequeue

? ? ? ? ? ? ? ? 刪除

? ? ? ? ? ? ? ? rebalance 會將 offsettable 中沒有分配到 messagequeue 刪除,? 但是在從 offsettable

? ? ? ? ? ? ? ? 刪除之前會將 offset 保存到 broker


? ? ? ? ? ? ? ? Unregiser 客戶端


? ? ? ? ? ? ? ? pullMessageService 關(guān)閉


? ? ? ? ? ? ? ? scheduledExecutorService 關(guān)閉,關(guān)閉一些客戶端的起的定時(shí)任務(wù)


? ? ? ? ? ? ? ? mqClientApi 關(guān)閉


? ? ? ? ? ? ? ? rebalanceService 關(guān)閉



? ? ? ? }

補(bǔ)充 一

消息的延遲

? ? {

? ? ? ? 通過測試程序可以看出? 通過設(shè)置 message 的DelayTimeLevel 可以設(shè)置消息延遲處理

? ? }

? ? 消息重試機(jī)制? 容錯(cuò)機(jī)制

? ? {

? ? ? ? 通過源碼可以看出 消費(fèi)方法的返回對象 只有兩個(gè)值


? ? ? ? CONSUME_SUCCESS // 消費(fèi)成功


? ? ? ? RECONSUME_LATER // 消費(fèi)失敗,稍后重試?



? ? ? ? CONSUME_SUCCESS 無異議?


? ? ? ? 關(guān)鍵是 RECONSUME_LATER


? ? ? ? ? ? 我們可以通過 RECONSUME_LATER 來容錯(cuò)。 阿里提供的這個(gè)? 重試機(jī)制 是通過添加到一個(gè)錯(cuò)誤隊(duì)列中 設(shè)置期? DelayTimeLevel 來實(shí)現(xiàn)的


? ? ? ? ? ? 第一次消費(fèi)的時(shí)候? 打印 MessageExt 沒有 properties屬性的詳細(xì)信息? 返回 RECONSUME_LATER 稍后重試


? ? ? ? ? ? [queueId=0, storeSize=106, queueOffset=0, sysFlag=0, bornTimestamp=1458803327047, bornHost=/10.10.12.27:41697, storeTimestamp=1458803327059, storeHost=/10         .10.12.27:10911, msgId=0A0A0C1B00002A9F0000000000031F10, commitLogOffset=204560, bodyCRC=910247988, reconsumeTimes=0, preparedTransactionOffset=0, toStrin         g()=Message [topic=Topic2, flag=0, properties={MAX_OFFSET=1, MIN_OFFSET=0}, body=9]]

? ? ? ? ? ? 第二次消費(fèi)的時(shí)候


? ? ? ? ? ? [queueId=0, storeSize=260, queueOffset=0, sysFlag=0, bornTimestamp=1458803327047, bornHost=/10.10.12.27:41697, storeTimestamp=1458803516104, storeHost=/10         .10.12.27:10911, msgId=0A0A0C1B00002A9F0000000000032079, commitLogOffset=204921, bodyCRC=910247988, reconsumeTimes=1, preparedTransactionOffset=0, toStrin         g()=Message [topic=Topic2, flag=0, properties={ORIGIN_MESSAGE_ID=0A0A0C1B00002A9F0000000000031F10, DELAY=3, REAL_TOPIC=%RETRY%ConsumerGroupName, WAIT=fals         e, RETRY_TOPIC=Topic2, MAX_OFFSET=1, MIN_OFFSET=0, REAL_QID=0}, body=9]]

? ? ? ? ? ? 可以看出 消息? 雖然? queueId 是相同的值 0 但是? msgId 卻變了 ! 同時(shí)用rocketmq-console 來監(jiān)控 該 消費(fèi)者 你會發(fā)現(xiàn) 多了個(gè) Topic? %RETRY%ConsumerGroupName?


? ? ? ? ? ? 所有 可以得出一個(gè)結(jié)論?


? ? ? ? ? ? 我們返回 消費(fèi)失敗,稍后重試? RECONSUME_LATER? 消息會回到 broker 同時(shí)創(chuàng)建一條相同的消息 訪如? %RETRY%ConsumerGroupName?

? ? ? ? ? ? 同時(shí) 設(shè)置 該 消息的 延遲消費(fèi) 每次延遲時(shí)間 +1?


? ? ? ? ? ? 我覺得可以通過 reconsumeTimes 來做一個(gè)簡單的容錯(cuò)? 獲取 當(dāng)前消費(fèi)的 次數(shù)? 是否大于 設(shè)定值? 大于就說明其是死信? 記錄到異常數(shù)據(jù)庫



? ? }

備注問題:

    背景:

          生產(chǎn)端 使用 linux 服務(wù)器 (UTF-8 編碼)

              Message me = new Message();

              me.setBody("中國人".getBytes());

              producer.send(me);

          消費(fèi)端 使用? Windows 服務(wù)器 (GBK 編碼)

              MessageExt msg = msgs.get(0);

              String strBody = new String(msg.getBody());

    問題 :

          生產(chǎn)端無問題,消費(fèi)端 存在 字符集 編碼問題 。

    原因 :

          生產(chǎn)端發(fā)送給MQ 的數(shù)據(jù)是 字節(jié) !? getBytes() 不指定字節(jié)格式 會默認(rèn)使用 本地系統(tǒng)編碼格式? linux下通常是 UTF-8 格式

          消費(fèi)端由于是Windows 本地系統(tǒng)的編碼格式是 GBK 格式 。

          new String(msg.getBody()); 方法 不指定編碼格式 使用的也是 本地系統(tǒng)編碼格式 也就是 GBK格式

          可能會說 直接 GBK轉(zhuǎn)換UTF-8就好了,但是! GBK 對應(yīng)的是2個(gè)字節(jié)? UTF-8 對應(yīng)的是3個(gè)字節(jié)? 當(dāng)出現(xiàn) 3個(gè)字的中文或者 特殊符號的時(shí)候

          轉(zhuǎn)換過程中會 主動(dòng) 2補(bǔ)1 所有 “中國人”? 這里 人 字就會亂碼

          String iso = new String(strBody.getBytes("UTF-8"), "ISO-8859-1");

            strBody = new String(iso.getBytes("ISO-8859-1"), "UTF-8");


          上面這種解決方法在 測試方法中有效? 在消費(fèi)端 具體消費(fèi)類中的消費(fèi)方法 并未生效?

          這里希望有大神可以指出為什么?。?/p>

    解決方法:

          MessageExt msg = msgs.get(0);

          strBody = new String(msg.getBody(), "UTF-8");?

          在第一次 字節(jié)轉(zhuǎn)換成字符串的時(shí)候 就指定 該字節(jié)按照 UTF-8 格式轉(zhuǎn)換!?

      PS:

          雖然解決方法很簡單,但是 稍微不注意 就會跳過這里啊? 勁量做到統(tǒng)一開發(fā)環(huán)境??!

? ? ? 消費(fèi)端 多實(shí)例問題

      經(jīng)過試驗(yàn),一個(gè)消費(fèi) 組 只能處理一個(gè) Topic 下的一個(gè) Tags? !

努力或許不會有收獲,但是不努力卻一定不會有收獲!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容