RocketMQ源碼-普通消息發(fā)送

一、問題思考

????1、DefaultMQProducerImpl如何發(fā)送多個(gè)topic消息?

????2、如何選取MessageQueue?

????3、發(fā)送失敗是如何進(jìn)行重試的?

????4、超時(shí)時(shí)間怎么判斷?

二、消息發(fā)送流程

消息發(fā)送流程

1、獲取TopicPublishInfo

? ? 根據(jù)msg.topic從topicPublishInfoTable中獲取TopicPublishInfo;

? ? 未取到則從NameServer拉取topic信息,超時(shí)時(shí)間為3000ms;

? ? 更新topicPublishInfoTable;

詳細(xì)源碼如下:

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,

? ? ? ? DefaultMQProducer defaultMQProducer) {

? ? ? ? try {

? ? ? ? ? ? if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? TopicRouteData topicRouteData;

? ? ? ? ? ? ? ? ? ? if (isDefault && defaultMQProducer != null) {

? ? ? ? ? ? ? ? ? ? ? ? topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),

? ? ? ? ? ? ? ? ? ? ? ? ? ? 1000 * 3);

? ? ? ? ? ? ? ? ? ? ? ? if (topicRouteData != null) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? for (QueueData data : topicRouteData.getQueueDatas()) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? data.setReadQueueNums(queueNums);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? data.setWriteQueueNums(queueNums);

? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? ? ? topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? if (topicRouteData != null) {

? ? ? ? ? ? ? ? ? ? ? ? TopicRouteData old = this.topicRouteTable.get(topic);

? ? ? ? ? ? ? ? ? ? ? ? boolean changed = topicRouteDataIsChange(old, topicRouteData);

? ? ? ? ? ? ? ? ? ? ? ? if (!changed) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? changed = this.isNeedUpdateTopicRouteInfo(topic);

? ? ? ? ? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? ? ? ? ? log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);

? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? if (changed) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

? ? ? ? ? ? ? ? ? ? ? ? ? ? for (BrokerData bd : topicRouteData.getBrokerDatas()) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());

? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? ? ? // Update Pub info

? ? ? ? ? ? ? ? ? ? ? ? ? ? {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? publishInfo.setHaveTopicRouterInfo(true);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? while (it.hasNext()) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Entry<String, MQProducerInner> entry = it.next();

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? MQProducerInner impl = entry.getValue();

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? if (impl != null) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? impl.updateTopicPublishInfo(topic, publishInfo);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? ? ? // Update sub info

? ? ? ? ? ? ? ? ? ? ? ? ? ? {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? while (it.hasNext()) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Entry<String, MQConsumerInner> entry = it.next();

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? MQConsumerInner impl = entry.getValue();

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? if (impl != null) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? impl.updateTopicSubscribeInfo(topic, subscribeInfo);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? ? ? log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);

? ? ? ? ? ? ? ? ? ? ? ? ? ? this.topicRouteTable.put(topic, cloneTopicRouteData);

? ? ? ? ? ? ? ? ? ? ? ? ? ? return true;

? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? ? ? log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? } catch (Exception e) {

? ? ? ? ? ? ? ? ? ? if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {

? ? ? ? ? ? ? ? ? ? ? ? log.warn("updateTopicRouteInfoFromNameServer Exception", e);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? ? ? this.lockNamesrv.unlock();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);

? ? ? ? ? ? }

? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? log.warn("updateTopicRouteInfoFromNameServer Exception", e);

? ? ? ? }

? ? ? ? return false;

? ? }

2、選取MessageQueue

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {

? ? ? ? if (this.sendLatencyFaultEnable) {

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? int index = tpInfo.getSendWhichQueue().getAndIncrement();

? ? ? ? ? ? ? ? for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {

? ? ? ? ? ? ? ? ? ? int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();

? ? ? ? ? ? ? ? ? ? if (pos < 0)

? ? ? ? ? ? ? ? ? ? ? ? pos = 0;

? ? ? ? ? ? ? ? ? ? MessageQueue mq = tpInfo.getMessageQueueList().get(pos);

? ? ? ? ? ? ? ? ? ? if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {

? ? ? ? ? ? ? ? ? ? ? ? if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))

? ? ? ? ? ? ? ? ? ? ? ? ? ? return mq;

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();

? ? ? ? ? ? ? ? int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);

? ? ? ? ? ? ? ? if (writeQueueNums > 0) {

? ? ? ? ? ? ? ? ? ? final MessageQueue mq = tpInfo.selectOneMessageQueue();

? ? ? ? ? ? ? ? ? ? if (notBestBroker != null) {

? ? ? ? ? ? ? ? ? ? ? ? mq.setBrokerName(notBestBroker);

? ? ? ? ? ? ? ? ? ? ? ? mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? return mq;

? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? latencyFaultTolerance.remove(notBestBroker);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? } catch (Exception e) {

? ? ? ? ? ? ? ? log.error("Error occurred when selecting message queue", e);

? ? ? ? ? ? }

? ? ? ? ? ? return tpInfo.selectOneMessageQueue();

? ? ? ? }

? ? ? ? return tpInfo.selectOneMessageQueue(lastBrokerName);

? ? }

3、發(fā)送失敗重試

? ? 發(fā)送失敗后選取其他Broker進(jìn)行消息發(fā)送;

4、超時(shí)判斷

long costTime = beginTimestampPrev - beginTimestampFirst;

if (timeout < costTime) {

????callTimeout = true;

????break;

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容