RocketMQ自動創(chuàng)建topic

在之前的文章《在IDEA中debug NameSrv、Broker、Producer、Consumer》中,我們debug Producer測試發(fā)送時,遇到過一個問題:Broker啟動時我們沒有配置NameSrv地址,發(fā)送程序會報錯:No route info of this topic。但當我們配上NameSrv地址后,再次啟動,可以正常發(fā)送消息。

example.quickstart.Producer的代碼是:

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();

Message msg = new Message("TopicTest" /* Topic */,
    "TagA" /* Tag */,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);

producer.send(msg);

TopicTest之前并未創(chuàng)建過,Broker未配置NameSrv地址,無法發(fā)送,而配置NameSrv后則可以正常發(fā)送。這中間有2個問題:
1、topic是怎么自動創(chuàng)建的?
2、topic自動創(chuàng)建過程中Broker、NameSrv如何協(xié)作配合的?

下面我們開始分析下這個流程。想看結(jié)論的可以直接跳到文章最后面。

1、DefaultMQProducerImpl#sendDefaultImpl

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    
    // 省略校驗相關(guān)代碼

    // 關(guān)鍵代碼,如果獲取到topic的路由信息,則發(fā)送,否則拋異常
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        // 省略發(fā)送相關(guān)代碼
    }

    List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
    if (null == nsList || nsList.isEmpty()) {
        throw new MQClientException(
            "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
    }

    throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

tryToFindTopicPublishInfo是發(fā)送的關(guān)鍵,如果獲取到topic的信息,則發(fā)送,否則就異常。因此之前No route info of this topic的異常,就是Producer獲取不到TopicTest的信息,導致發(fā)送失敗。

那這跟Broker配沒配NameSrv地址有什么關(guān)系呢,我們接著往下看:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // topicPublishInfoTable是Producer本地緩存的topic信息表
    // Producer啟動后,會添加默認的topic:TBW102
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 未獲取到,從NameSrv獲取該topic的信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    // 獲取到了,則返回
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        // 沒獲取到,再換種方式從NameSrv獲取
        // 如果再獲取不到,那后續(xù)就無法發(fā)送了
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

1、Producer本地topicPublishInfoTable變量中沒有TopicTest的信息,只緩存了TBW102(可以認為只有key(topic),實際也無詳細信息,還是要從NameSrv獲?。?/p>

2、嘗試從NameSrv獲取TopicTest的信息。獲取失敗,NameSrv中根本沒有TopicTest,因為這個topic是Producer發(fā)送時設(shè)置的,沒有同步到NameSrv。

3、再換種方式從NameSrv獲取。這里就很關(guān)鍵了,如果獲取到了,那么可以執(zhí)行發(fā)送流程,如果還是沒有獲取到,就會拋No route info of this topic的異常了。

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
    DefaultMQProducer defaultMQProducer) {
    try {
        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                TopicRouteData topicRouteData;
                if (isDefault && defaultMQProducer != null) {
                    // 將入?yún)⒌膖opic轉(zhuǎn)換為默認的TBW102,獲取TBW102的信息
                    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                        1000 * 3);
                    
                    // 省略其他代碼
                    
                } else {
                    // 直接用入?yún)⒌膖opic去獲取
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                }
                
                // 省略其他代碼
                
            } catch (Exception e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                    log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                }
            } finally {
                this.lockNamesrv.unlock();
            }
        } else {
            log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
        }
    } catch (InterruptedException e) {
        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
    }

    return false;
}

1、第1次獲取時,isDefault傳的false,defaultMQProducer傳的null,因此在updateTopicRouteInfoFromNameServer會走else分支,用TopicTest去獲取

2、第2次獲取時,isDefault傳的true,defaultMQProducer也傳值了,因此會走if分支,將入?yún)⒌膖opic轉(zhuǎn)換為默認的TBW102,獲取TBW102的信息

到這里,大概就能建立如下的推斷:
1、不管Broker配沒配NameSrv地址,獲取TopicTest的信息,必失敗
2、獲取TBW102信息:
2.1、Broker配置了NameSrv地址,成功
2.2、Broker沒有配置NameSrv地址,失敗

那我們進入NameSrv的源碼,看看為什么如此。

2、RouteInfoManager#pickupTopicRouteData

updateTopicRouteInfoFromNameServer最終會發(fā)給NameSrv一個GET_ROUTEINTO_BY_TOPIC請求

public TopicRouteData pickupTopicRouteData(final String topic) {
    TopicRouteData topicRouteData = new TopicRouteData();
    boolean foundQueueData = false;
    boolean foundBrokerData = false;
    // 省略其他代碼

    try {
        try {
            this.lock.readLock().lockInterruptibly();
            // 從topicQueueTable獲取topic信息
            // 有則有,無則null
            List<QueueData> queueDataList = this.topicQueueTable.get(topic);
            if (queueDataList != null) {
                topicRouteData.setQueueDatas(queueDataList);
                foundQueueData = true;

                // 省略其他代碼
            }
        } finally {
            this.lock.readLock().unlock();
        }
    } catch (Exception e) {
        log.error("pickupTopicRouteData Exception", e);
    }

    log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);

    if (foundBrokerData && foundQueueData) {
        return topicRouteData;
    }

    return null;
}

NameSrv從本地變量topicQueueTable中獲取對應的topic信息,沒有則會返回null。由此可以推斷出:
1、NameSrv本地沒有TopicTest的信息(顯而易見)
2、NameSrv本地記錄了TBW102的topic信息

那TBW102的topic信息,NameSrv又是從哪里獲取并緩存到本地的呢?

答案:來自REGISTER_BROKER請求

case RequestCode.REGISTER_BROKER:
    Version brokerVersion = MQVersion.value2Version(request.getVersion());
    if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
        return this.registerBrokerWithFilterServer(ctx, request);
    } else {
        return this.registerBroker(ctx, request);
    }

3、BrokerController#start

REGISTER_BROKER請求又是誰發(fā)給NameSrv的呢?很簡單,Broker發(fā)的。Broker在啟動時,會向NameSrv注冊,同時有一個定時任務會定時上報

public void start() throws Exception {
    
    // ...
    
    this.registerBrokerAll(true, false, true);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    
    // ...
}

上報的內(nèi)容有哪些呢?

# BrokerOuterAPI#registerBrokerAll
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);

# TopicConfigSerializeWrapper
public class TopicConfigSerializeWrapper extends RemotingSerializable {
    private ConcurrentMap<String, TopicConfig> topicConfigTable;
    private DataVersion dataVersion = new DataVersion();
    
    // ...
}

上報的內(nèi)容包括TopicConfigSerializeWrapper,它的結(jié)構(gòu)其實跟${user.home}\store\config\topics.json是一樣的:

{
    "dataVersion":{
        "counter":2,
        "timestamp":1579838252574
    },
    "topicConfigTable":{
        "SELF_TEST_TOPIC":Object{...},
        "DefaultCluster":Object{...},
        "RMQ_SYS_TRANS_HALF_TOPIC":Object{...},
        "DESKTOP-FJIT15L":Object{...},
        "TBW102":{
            "order":false,
            "perm":7,
            "readQueueNums":8,
            "topicFilterType":"SINGLE_TAG",
            "topicName":"TBW102",
            "topicSysFlag":0,
            "writeQueueNums":8
        },
        "BenchmarkTest":Object{...},
        "OFFSET_MOVED_EVENT":Object{...}
    }
}

TopicConfigManager的構(gòu)造函數(shù)中,默認創(chuàng)建了上面的topic,其中就有TBW102,這些都是在Broker啟動的時候就完成了。

因此,當Producer用TBW102去NameSrv獲取topic信息時,是可以獲取的。因為TBW102是Broker啟動時默認創(chuàng)建的,Broker啟動時會向NameSrv注冊。這也是為什么Broker沒配NameSrv時,獲取不到TBW102的topic信息。

那獲取到了TBW102的topic信息,跟TopicTest又有什么關(guān)系呢?TopicTest的信息還是沒有啊。讓我們繼續(xù)往下看:

再回到MQClientInstance#updateTopicRouteInfoFromNameServer:

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
    // ...

    TopicRouteData topicRouteData;
    if (isDefault && defaultMQProducer != null) {
        // ①第2次獲取TopicTest的信息會走到這里,先轉(zhuǎn)換topic,實際獲取的是TBW102
        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
            1000 * 3);
        // 獲取成功,修正讀寫隊列數(shù)    
        if (topicRouteData != null) {
            for (QueueData data : topicRouteData.getQueueDatas()) {
                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                data.setReadQueueNums(queueNums);
                data.setWriteQueueNums(queueNums);
            }
        }
    } else {
        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
    }
    
    if (topicRouteData != null) {
        // old為null
        TopicRouteData old = this.topicRouteTable.get(topic);
        // changed為true
        boolean changed = topicRouteDataIsChange(old, topicRouteData);
        if (!changed) {
            changed = this.isNeedUpdateTopicRouteInfo(topic);
        } else {
            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
        }
    
        // ②走到該邏輯
        if (changed) {
            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
    
            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
            }
    
            // Update Pub info
            {
                // ③將topic的route信息轉(zhuǎn)換為publish信息。實際是用了TBW102的route信息,給TopicTest用
                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                publishInfo.setHaveTopicRouterInfo(true);
                
                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<String, MQProducerInner> entry = it.next();
                    MQProducerInner impl = entry.getValue();
                    if (impl != null) {
                        // ④更新到本地
                        impl.updateTopicPublishInfo(topic, publishInfo);
                    }
                }
            }
    
            // ...
        }
    }
    
    // ...
}

在第2步獲取到的數(shù)據(jù)結(jié)構(gòu)為:


TBW102的TopicRouteData數(shù)據(jù)結(jié)構(gòu).png

第3步,轉(zhuǎn)換后的數(shù)據(jù)結(jié)構(gòu)為:


TopicTest的TopicPublishInfo數(shù)據(jù)結(jié)構(gòu).png

更新本地變量:topicPublishInfoTable

public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {
    if (info != null && topic != null) {
        TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);
        if (prev != null) {
            log.info("updateTopicPublishInfo prev is not null, " + prev.toString());
        }
    }
}

獲取TBW102的topic信息,當成是TopicTest的。也可以說是,TopicTest繼承了TBW102的配置信息。因此TopicTest的信息就有了。

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // Producer啟動后,會添加默認的topic:TBW102,但具體的信息還是沒有,需要從NameSrv獲取
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 未獲取到TopicTest,從NameSrv獲取該topic的信息,還是未獲取到
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }


    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        // 沒獲取到,向NameSrv獲取TBW102的信息,當成是TopicTest的信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        // 這次有TopicTest的信息了
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

分析完上面所有的邏輯后,就到了發(fā)送消息的步驟。

4、發(fā)送流程

發(fā)送消息的請求到達Broker后,會有一步msgCheck的過程

protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,

    // ...

    // ①Broker本地沒有TopicTest,得到null
    TopicConfig topicConfig =
        this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    if (null == topicConfig) {
        
        // ...

        // ②自動創(chuàng)建topic,實際就是創(chuàng)建一個topicConfig對象,存放到本地map,并同步到NameSrv
        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
            requestHeader.getTopic(),
            requestHeader.getDefaultTopic(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            requestHeader.getDefaultTopicQueueNums(), topicSysFlag);

        // ...

        // ③如果步驟2失敗了,那就會報TOPIC_NOT_EXIST的錯誤
        if (null == topicConfig) {
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
                + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
            return response;
        }
    }

    // ...
    
    return response;
}

msgCheck做了如下校驗:
1、從本地獲取TopicTest的信息,得到null
2、自動創(chuàng)建topic,實際就是創(chuàng)建一個topicConfig對象,存放到本地map,并同步到NameSrv。如果topic已經(jīng)緩存在本地map了,就直接返回,不需要創(chuàng)建
3、如果步驟2失敗了,那就會報TOPIC_NOT_EXIST的錯誤,就不會進行下面的消息刷盤落地

下面看下createTopicInSendMessageMethod的流程:

public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
    final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
    
    // ...
    
    // 一開始topicConfigTable還未緩存TopicTest
    topicConfig = this.topicConfigTable.get(topic);
    if (topicConfig != null)
        return topicConfig;

    // 還是要依賴TBW102,來建立TopicTest的TopicConfig對象
    TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
    if (defaultTopicConfig != null) {
        if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
            if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
            }
        }

        if (PermName.isInherited(defaultTopicConfig.getPerm())) {
            topicConfig = new TopicConfig(topic);

            int queueNums =
                clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
                    .getWriteQueueNums() : clientDefaultTopicQueueNums;

            if (queueNums < 0) {
                queueNums = 0;
            }

            topicConfig.setReadQueueNums(queueNums);
            topicConfig.setWriteQueueNums(queueNums);
            int perm = defaultTopicConfig.getPerm();
            perm &= ~PermName.PERM_INHERIT;
            topicConfig.setPerm(perm);
            topicConfig.setTopicSysFlag(topicSysFlag);
            topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
        }
    }

    if (topicConfig != null) {
        log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]", defaultTopic, topicConfig, remoteAddress);

        // 緩存到本地
        this.topicConfigTable.put(topic, topicConfig);

        this.dataVersion.nextVersion();

        createNew = true;

        // 持久化到${user.home}\store\config\topics.json
        this.persist();
    }

    // 同步到NameSrv
    if (createNew) {
        this.brokerController.registerBrokerAll(false, true, true);
    }

    return topicConfig;
}

主要的邏輯是:
1、topicConfigTable還未緩存TopicTest
2、還是要依賴TBW102,來建立TopicTest的TopicConfig對象
3、緩存到本地
4、持久化到${user.home}\store\config\topics.json
5、同步到NameSrv

到這里,校驗也通過了,下一步就是消息刷盤落地了,由于不在本文分析范圍內(nèi),就不作展開了。

5、TBW102是為何物?

TBW102是Broker啟動時,當autoCreateTopicEnable的配置為true時,會自動創(chuàng)建該默認topic

public TopicConfigManager(BrokerController brokerController) {
    this.brokerController = brokerController;

    // ...
    
    {
        // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
        if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
            String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
            TopicConfig topicConfig = new TopicConfig(topic);
            this.systemTopicList.add(topic);
            topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                .getDefaultTopicQueueNums());
            topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                .getDefaultTopicQueueNums());
            int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
            topicConfig.setPerm(perm);
            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        }
    }
    
    // ...
}

autoCreateTopicEnable的默認值是true,可以同步外部配置文件,讓Broker啟動時加載,來改變該值。我理解的TBW102的作用是當開啟自動創(chuàng)建topic功能,發(fā)送時用了未配置的topic,可以讓該topic繼承默認TBW102的配置,實現(xiàn)消息的發(fā)送。

6、總結(jié)

發(fā)送未配置topic的消息,流程圖如下:


發(fā)送未配置topic的消息流程圖.png

1、Broker配不配NameSrv的區(qū)別在于:配了NameSrv后,Broker會把啟動默認創(chuàng)建的topic同步的NameSrv,而后續(xù)Producer發(fā)送時會向NameSrv查詢topic信息,當查詢未配置的topic信息時,Producer會將topic轉(zhuǎn)換成默認的TBW102進行查詢,讓topic繼承它的配置。

2、Broker在開啟autoCreateTopicEnable的配置后(默認是開啟的),才會自動創(chuàng)建topic,同樣是繼承默認TBW102的配置。

因此,要正常發(fā)送未配置topic的消息,有2個點:正確配置Broker的NameSrv地址,開啟autoCreateTopicEnable。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容