一、問題思考
????1、DefaultMQProducerImpl如何發(fā)送多個(gè)topic消息?
????2、如何選取MessageQueue?
????3、發(fā)送失敗是如何進(jìn)行重試的?
????4、超時(shí)時(shí)間怎么判斷?
二、消息發(fā)送流程

1、獲取TopicPublishInfo
? ? 根據(jù)msg.topic從topicPublishInfoTable中獲取TopicPublishInfo;
? ? 未取到則從NameServer拉取topic信息,超時(shí)時(shí)間為3000ms;
? ? 更新topicPublishInfoTable;
詳細(xì)源碼如下:
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
? ? ? ? DefaultMQProducer defaultMQProducer) {
? ? ? ? try {
? ? ? ? ? ? if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? TopicRouteData topicRouteData;
? ? ? ? ? ? ? ? ? ? if (isDefault && defaultMQProducer != null) {
? ? ? ? ? ? ? ? ? ? ? ? topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
? ? ? ? ? ? ? ? ? ? ? ? ? ? 1000 * 3);
? ? ? ? ? ? ? ? ? ? ? ? if (topicRouteData != null) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? for (QueueData data : topicRouteData.getQueueDatas()) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? data.setReadQueueNums(queueNums);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? data.setWriteQueueNums(queueNums);
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? ? ? topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? if (topicRouteData != null) {
? ? ? ? ? ? ? ? ? ? ? ? TopicRouteData old = this.topicRouteTable.get(topic);
? ? ? ? ? ? ? ? ? ? ? ? boolean changed = topicRouteDataIsChange(old, topicRouteData);
? ? ? ? ? ? ? ? ? ? ? ? if (!changed) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? changed = this.isNeedUpdateTopicRouteInfo(topic);
? ? ? ? ? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? ? ? ? ? log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? if (changed) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
? ? ? ? ? ? ? ? ? ? ? ? ? ? for (BrokerData bd : topicRouteData.getBrokerDatas()) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ? ? // Update Pub info
? ? ? ? ? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? publishInfo.setHaveTopicRouterInfo(true);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? while (it.hasNext()) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Entry<String, MQProducerInner> entry = it.next();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? MQProducerInner impl = entry.getValue();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? if (impl != null) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? impl.updateTopicPublishInfo(topic, publishInfo);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ? ? // Update sub info
? ? ? ? ? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? while (it.hasNext()) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Entry<String, MQConsumerInner> entry = it.next();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? MQConsumerInner impl = entry.getValue();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? if (impl != null) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? impl.updateTopicSubscribeInfo(topic, subscribeInfo);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ? ? log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
? ? ? ? ? ? ? ? ? ? ? ? ? ? this.topicRouteTable.put(topic, cloneTopicRouteData);
? ? ? ? ? ? ? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? ? ? log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? ? ? if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
? ? ? ? ? ? ? ? ? ? ? ? log.warn("updateTopicRouteInfoFromNameServer Exception", e);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? this.lockNamesrv.unlock();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
? ? ? ? ? ? }
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? log.warn("updateTopicRouteInfoFromNameServer Exception", e);
? ? ? ? }
? ? ? ? return false;
? ? }
2、選取MessageQueue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
? ? ? ? if (this.sendLatencyFaultEnable) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? int index = tpInfo.getSendWhichQueue().getAndIncrement();
? ? ? ? ? ? ? ? for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
? ? ? ? ? ? ? ? ? ? int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
? ? ? ? ? ? ? ? ? ? if (pos < 0)
? ? ? ? ? ? ? ? ? ? ? ? pos = 0;
? ? ? ? ? ? ? ? ? ? MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
? ? ? ? ? ? ? ? ? ? if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
? ? ? ? ? ? ? ? ? ? ? ? if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
? ? ? ? ? ? ? ? ? ? ? ? ? ? return mq;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
? ? ? ? ? ? ? ? int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
? ? ? ? ? ? ? ? if (writeQueueNums > 0) {
? ? ? ? ? ? ? ? ? ? final MessageQueue mq = tpInfo.selectOneMessageQueue();
? ? ? ? ? ? ? ? ? ? if (notBestBroker != null) {
? ? ? ? ? ? ? ? ? ? ? ? mq.setBrokerName(notBestBroker);
? ? ? ? ? ? ? ? ? ? ? ? mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? return mq;
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? latencyFaultTolerance.remove(notBestBroker);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? log.error("Error occurred when selecting message queue", e);
? ? ? ? ? ? }
? ? ? ? ? ? return tpInfo.selectOneMessageQueue();
? ? ? ? }
? ? ? ? return tpInfo.selectOneMessageQueue(lastBrokerName);
? ? }
3、發(fā)送失敗重試
? ? 發(fā)送失敗后選取其他Broker進(jìn)行消息發(fā)送;
4、超時(shí)判斷
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
????callTimeout = true;
????break;
}