在之前的文章《在IDEA中debug NameSrv、Broker、Producer、Consumer》中,我們debug Producer測試發(fā)送時,遇到過一個問題:Broker啟動時我們沒有配置NameSrv地址,發(fā)送程序會報錯:No route info of this topic。但當我們配上NameSrv地址后,再次啟動,可以正常發(fā)送消息。
example.quickstart.Producer的代碼是:
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
producer.send(msg);
TopicTest之前并未創(chuàng)建過,Broker未配置NameSrv地址,無法發(fā)送,而配置NameSrv后則可以正常發(fā)送。這中間有2個問題:
1、topic是怎么自動創(chuàng)建的?
2、topic自動創(chuàng)建過程中Broker、NameSrv如何協(xié)作配合的?
下面我們開始分析下這個流程。想看結(jié)論的可以直接跳到文章最后面。
1、DefaultMQProducerImpl#sendDefaultImpl
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 省略校驗相關(guān)代碼
// 關(guān)鍵代碼,如果獲取到topic的路由信息,則發(fā)送,否則拋異常
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
// 省略發(fā)送相關(guān)代碼
}
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
tryToFindTopicPublishInfo是發(fā)送的關(guān)鍵,如果獲取到topic的信息,則發(fā)送,否則就異常。因此之前No route info of this topic的異常,就是Producer獲取不到TopicTest的信息,導致發(fā)送失敗。
那這跟Broker配沒配NameSrv地址有什么關(guān)系呢,我們接著往下看:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// topicPublishInfoTable是Producer本地緩存的topic信息表
// Producer啟動后,會添加默認的topic:TBW102
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 未獲取到,從NameSrv獲取該topic的信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 獲取到了,則返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 沒獲取到,再換種方式從NameSrv獲取
// 如果再獲取不到,那后續(xù)就無法發(fā)送了
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
1、Producer本地topicPublishInfoTable變量中沒有TopicTest的信息,只緩存了TBW102(可以認為只有key(topic),實際也無詳細信息,還是要從NameSrv獲?。?/p>
2、嘗試從NameSrv獲取TopicTest的信息。獲取失敗,NameSrv中根本沒有TopicTest,因為這個topic是Producer發(fā)送時設(shè)置的,沒有同步到NameSrv。
3、再換種方式從NameSrv獲取。這里就很關(guān)鍵了,如果獲取到了,那么可以執(zhí)行發(fā)送流程,如果還是沒有獲取到,就會拋No route info of this topic的異常了。
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// 將入?yún)⒌膖opic轉(zhuǎn)換為默認的TBW102,獲取TBW102的信息
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
// 省略其他代碼
} else {
// 直接用入?yún)⒌膖opic去獲取
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
// 省略其他代碼
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}
1、第1次獲取時,isDefault傳的false,defaultMQProducer傳的null,因此在updateTopicRouteInfoFromNameServer會走else分支,用TopicTest去獲取
2、第2次獲取時,isDefault傳的true,defaultMQProducer也傳值了,因此會走if分支,將入?yún)⒌膖opic轉(zhuǎn)換為默認的TBW102,獲取TBW102的信息
到這里,大概就能建立如下的推斷:
1、不管Broker配沒配NameSrv地址,獲取TopicTest的信息,必失敗
2、獲取TBW102信息:
2.1、Broker配置了NameSrv地址,成功
2.2、Broker沒有配置NameSrv地址,失敗
那我們進入NameSrv的源碼,看看為什么如此。
2、RouteInfoManager#pickupTopicRouteData
updateTopicRouteInfoFromNameServer最終會發(fā)給NameSrv一個GET_ROUTEINTO_BY_TOPIC請求
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
// 省略其他代碼
try {
try {
this.lock.readLock().lockInterruptibly();
// 從topicQueueTable獲取topic信息
// 有則有,無則null
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
// 省略其他代碼
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
return null;
}
NameSrv從本地變量topicQueueTable中獲取對應的topic信息,沒有則會返回null。由此可以推斷出:
1、NameSrv本地沒有TopicTest的信息(顯而易見)
2、NameSrv本地記錄了TBW102的topic信息
那TBW102的topic信息,NameSrv又是從哪里獲取并緩存到本地的呢?
答案:來自REGISTER_BROKER請求
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
3、BrokerController#start
REGISTER_BROKER請求又是誰發(fā)給NameSrv的呢?很簡單,Broker發(fā)的。Broker在啟動時,會向NameSrv注冊,同時有一個定時任務會定時上報
public void start() throws Exception {
// ...
this.registerBrokerAll(true, false, true);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
// ...
}
上報的內(nèi)容有哪些呢?
# BrokerOuterAPI#registerBrokerAll
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
# TopicConfigSerializeWrapper
public class TopicConfigSerializeWrapper extends RemotingSerializable {
private ConcurrentMap<String, TopicConfig> topicConfigTable;
private DataVersion dataVersion = new DataVersion();
// ...
}
上報的內(nèi)容包括TopicConfigSerializeWrapper,它的結(jié)構(gòu)其實跟${user.home}\store\config\topics.json是一樣的:
{
"dataVersion":{
"counter":2,
"timestamp":1579838252574
},
"topicConfigTable":{
"SELF_TEST_TOPIC":Object{...},
"DefaultCluster":Object{...},
"RMQ_SYS_TRANS_HALF_TOPIC":Object{...},
"DESKTOP-FJIT15L":Object{...},
"TBW102":{
"order":false,
"perm":7,
"readQueueNums":8,
"topicFilterType":"SINGLE_TAG",
"topicName":"TBW102",
"topicSysFlag":0,
"writeQueueNums":8
},
"BenchmarkTest":Object{...},
"OFFSET_MOVED_EVENT":Object{...}
}
}
TopicConfigManager的構(gòu)造函數(shù)中,默認創(chuàng)建了上面的topic,其中就有TBW102,這些都是在Broker啟動的時候就完成了。
因此,當Producer用TBW102去NameSrv獲取topic信息時,是可以獲取的。因為TBW102是Broker啟動時默認創(chuàng)建的,Broker啟動時會向NameSrv注冊。這也是為什么Broker沒配NameSrv時,獲取不到TBW102的topic信息。
那獲取到了TBW102的topic信息,跟TopicTest又有什么關(guān)系呢?TopicTest的信息還是沒有啊。讓我們繼續(xù)往下看:
再回到MQClientInstance#updateTopicRouteInfoFromNameServer:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
// ...
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// ①第2次獲取TopicTest的信息會走到這里,先轉(zhuǎn)換topic,實際獲取的是TBW102
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
// 獲取成功,修正讀寫隊列數(shù)
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
// old為null
TopicRouteData old = this.topicRouteTable.get(topic);
// changed為true
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
// ②走到該邏輯
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
{
// ③將topic的route信息轉(zhuǎn)換為publish信息。實際是用了TBW102的route信息,給TopicTest用
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
// ④更新到本地
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// ...
}
}
// ...
}
在第2步獲取到的數(shù)據(jù)結(jié)構(gòu)為:

第3步,轉(zhuǎn)換后的數(shù)據(jù)結(jié)構(gòu)為:

更新本地變量:topicPublishInfoTable
public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {
if (info != null && topic != null) {
TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);
if (prev != null) {
log.info("updateTopicPublishInfo prev is not null, " + prev.toString());
}
}
}
獲取TBW102的topic信息,當成是TopicTest的。也可以說是,TopicTest繼承了TBW102的配置信息。因此TopicTest的信息就有了。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// Producer啟動后,會添加默認的topic:TBW102,但具體的信息還是沒有,需要從NameSrv獲取
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 未獲取到TopicTest,從NameSrv獲取該topic的信息,還是未獲取到
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 沒獲取到,向NameSrv獲取TBW102的信息,當成是TopicTest的信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
// 這次有TopicTest的信息了
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
分析完上面所有的邏輯后,就到了發(fā)送消息的步驟。
4、發(fā)送流程
發(fā)送消息的請求到達Broker后,會有一步msgCheck的過程
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
// ...
// ①Broker本地沒有TopicTest,得到null
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
// ...
// ②自動創(chuàng)建topic,實際就是創(chuàng)建一個topicConfig對象,存放到本地map,并同步到NameSrv
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
// ...
// ③如果步驟2失敗了,那就會報TOPIC_NOT_EXIST的錯誤
if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
// ...
return response;
}
msgCheck做了如下校驗:
1、從本地獲取TopicTest的信息,得到null
2、自動創(chuàng)建topic,實際就是創(chuàng)建一個topicConfig對象,存放到本地map,并同步到NameSrv。如果topic已經(jīng)緩存在本地map了,就直接返回,不需要創(chuàng)建
3、如果步驟2失敗了,那就會報TOPIC_NOT_EXIST的錯誤,就不會進行下面的消息刷盤落地
下面看下createTopicInSendMessageMethod的流程:
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
// ...
// 一開始topicConfigTable還未緩存TopicTest
topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null)
return topicConfig;
// 還是要依賴TBW102,來建立TopicTest的TopicConfig對象
TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
if (defaultTopicConfig != null) {
if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
}
}
if (PermName.isInherited(defaultTopicConfig.getPerm())) {
topicConfig = new TopicConfig(topic);
int queueNums =
clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
.getWriteQueueNums() : clientDefaultTopicQueueNums;
if (queueNums < 0) {
queueNums = 0;
}
topicConfig.setReadQueueNums(queueNums);
topicConfig.setWriteQueueNums(queueNums);
int perm = defaultTopicConfig.getPerm();
perm &= ~PermName.PERM_INHERIT;
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);
topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
}
}
if (topicConfig != null) {
log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]", defaultTopic, topicConfig, remoteAddress);
// 緩存到本地
this.topicConfigTable.put(topic, topicConfig);
this.dataVersion.nextVersion();
createNew = true;
// 持久化到${user.home}\store\config\topics.json
this.persist();
}
// 同步到NameSrv
if (createNew) {
this.brokerController.registerBrokerAll(false, true, true);
}
return topicConfig;
}
主要的邏輯是:
1、topicConfigTable還未緩存TopicTest
2、還是要依賴TBW102,來建立TopicTest的TopicConfig對象
3、緩存到本地
4、持久化到${user.home}\store\config\topics.json
5、同步到NameSrv
到這里,校驗也通過了,下一步就是消息刷盤落地了,由于不在本文分析范圍內(nèi),就不作展開了。
5、TBW102是為何物?
TBW102是Broker啟動時,當autoCreateTopicEnable的配置為true時,會自動創(chuàng)建該默認topic
public TopicConfigManager(BrokerController brokerController) {
this.brokerController = brokerController;
// ...
{
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
// ...
}
autoCreateTopicEnable的默認值是true,可以同步外部配置文件,讓Broker啟動時加載,來改變該值。我理解的TBW102的作用是當開啟自動創(chuàng)建topic功能,發(fā)送時用了未配置的topic,可以讓該topic繼承默認TBW102的配置,實現(xiàn)消息的發(fā)送。
6、總結(jié)
發(fā)送未配置topic的消息,流程圖如下:

1、Broker配不配NameSrv的區(qū)別在于:配了NameSrv后,Broker會把啟動默認創(chuàng)建的topic同步的NameSrv,而后續(xù)Producer發(fā)送時會向NameSrv查詢topic信息,當查詢未配置的topic信息時,Producer會將topic轉(zhuǎn)換成默認的TBW102進行查詢,讓topic繼承它的配置。
2、Broker在開啟autoCreateTopicEnable的配置后(默認是開啟的),才會自動創(chuàng)建topic,同樣是繼承默認TBW102的配置。
因此,要正常發(fā)送未配置topic的消息,有2個點:正確配置Broker的NameSrv地址,開啟autoCreateTopicEnable。