RocketMQ NameServer 深入剖析

image.png

1、NameServer的作用

Name Server 是專為 RocketMQ 設(shè)計(jì)的輕量級(jí)名稱服務(wù),具有簡(jiǎn)單、可集群橫吐擴(kuò)展、無狀態(tài),節(jié)點(diǎn)之間互不通信等特點(diǎn)。

整個(gè)Rocketmq集群的工作原理如下圖所示:


在這里插入圖片描述

任何Producer、Consumer、Broker與所有NameServer通信,向NameServer請(qǐng)求或者發(fā)送數(shù)據(jù)。而且都是單向的,Producer和Consumer請(qǐng)求數(shù)據(jù),Broker發(fā)送數(shù)據(jù)。正是因?yàn)檫@種單向的通信,RocketMQ水平擴(kuò)容變得很容易。

Broker集群

Broker用于接收生產(chǎn)者發(fā)送消息,或者消費(fèi)者消費(fèi)消息的請(qǐng)求。一個(gè)Broker集群由多組Master/Slave組成,Master可寫可讀,Slave只可以讀,Master將寫入的數(shù)據(jù)同步給Slave。每個(gè)Broker節(jié)點(diǎn),在啟動(dòng)時(shí),都會(huì)遍歷NameServer列表,與每個(gè)NameServer建立長(zhǎng)連接,注冊(cè)自己的信息,之后定時(shí)上報(bào)。

Producer集群

消息的生產(chǎn)者,通過NameServer集群獲得Topic的路由信息,包括Topic下面有哪些Queue,這些Queue分布在哪些Broker上等。Producer只會(huì)將消息發(fā)送到Master節(jié)點(diǎn)上,因此只需要與Master節(jié)點(diǎn)建立連接。

Consumer集群

消息的消費(fèi)者,通過NameServer集群獲得Topic的路由信息,連接到對(duì)應(yīng)的Broker上消費(fèi)消息。注意,由于Master和Slave都可以讀取消息,因此Consumer會(huì)與Master和Slave都建立連接。

2、NameServer類結(jié)構(gòu)

在這里插入圖片描述
  • NamesrvStartup: NameServer的啟動(dòng)類;
  • NamesrvController: NameServer的核心控制類;
  • KVConfigManager: 讀取或變更NameServer的配置屬性,加載NamesrvConfig中配置的配置文件到內(nèi)存;
  • KVConfigSerializeWrapper: NameServer配置信息序列化包裝類;
  • RouteInfoManager: NameServer數(shù)據(jù)的載體,記錄Broker,Topic等信息;
  • DefaultRequestProcessor: NameServer處理請(qǐng)求的請(qǐng)求類,負(fù)責(zé)處理所有與NameServer交互的請(qǐng)求;
  • BrokerHousekeepingService: BrokerHouseKeepingService實(shí)現(xiàn)ChannelEventListener接口,可以說是通道在發(fā)送異常時(shí)的回調(diào)方法(Nameserver與Broker的連接通道在關(guān)閉、通道發(fā)送異常、通道空閑時(shí));
  • NamesrvConfig: NamesrvConfig,主要指定nameserver的相關(guān)配置目錄屬性;
  • NettyRemotingServer: Netty服務(wù)類;

(1)NameServer啟動(dòng)流程

NameServer的啟動(dòng)是由NamesrvStartup完成的,啟動(dòng)過程如下:


在這里插入圖片描述
  1. 獲取并解析配置參數(shù),包括NamesrvConfig和NettyServerConfig;
  2. 調(diào)用NamesrvController.initialize()初始化NamesrvController;若初始化失敗,則直接關(guān)閉NamesrvController;
  3. 然后調(diào)用NamesrvController.start()方法來開啟NameServer服務(wù);
  4. 注冊(cè)ShutdownHookThread服務(wù)。在JVM退出之前,調(diào)用NamesrvController.shutdown()來進(jìn)行關(guān)閉服務(wù),釋放資源;
public class NamesrvStartup {

    private static InternalLogger log;
    private static Properties properties = null;
    private static CommandLine commandLine = null;

    public static void main(String[] args) {
        main0(args);
    }

    public static NamesrvController main0(String[] args) {

        try {
            // 創(chuàng)建NamesrvController
            NamesrvController controller = createNamesrvController(args);
            // 啟動(dòng)NamesrvController
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();

        // 構(gòu)建命令行
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }
        // nameServer配置參數(shù)
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        // netty server 配置參數(shù)
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
        // 命令行參數(shù)是否包含配置文件
        if (commandLine.hasOption('c')) {
            // 獲取配置文件路徑
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);
                namesrvConfig.setConfigStorePath(file);
                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        // 是否打印參數(shù)
        if (commandLine.hasOption('p')) {
            // 都不打印
            MixAll.printObjectProperties(null, namesrvConfig);
            MixAll.printObjectProperties(null, nettyServerConfig);
            System.exit(0);
        }
        // 設(shè)置命令行的參數(shù),優(yōu)先級(jí)高(會(huì)覆蓋掉配置文件的配置項(xiàng))
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
        // 未設(shè)置 rocketMQ home
        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        // 配置Logger
        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        // 控制臺(tái)打印參數(shù)
        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        // 創(chuàng)建 NamesrvController
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // 注冊(cè)配置參數(shù),防止丟失
        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);
        return controller;
    }

    public static NamesrvController start(final NamesrvController controller) throws Exception {
        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        // 初始化NamesrvController
        boolean initResult = controller.initialize();
        // 初始化失敗
        if (!initResult) {
            // 關(guān)閉NamesrvController
            controller.shutdown();
            // 關(guān)閉JVM
            System.exit(-3);
        }
        // 注冊(cè)關(guān)閉鉤子方法:當(dāng)JVM關(guān)閉的時(shí)候,先關(guān)閉NamesrvController
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                // 關(guān)閉NamesrvController
                controller.shutdown();
                return null;
            }
        }));

        // 啟動(dòng)NamesrvController
        controller.start();
        return controller;
    }

    public static void shutdown(final NamesrvController controller) {
        controller.shutdown();
    }

    public static Options buildCommandlineOptions(final Options options) {
        Option opt = new Option("c", "configFile", true, "Name server config properties file");
        opt.setRequired(false);
        options.addOption(opt);

        opt = new Option("p", "printConfigItem", false, "Print all config item");
        opt.setRequired(false);
        options.addOption(opt);
        return options;
    }
}

調(diào)用NamesrvController.initialize()初始化NamesrvController

public boolean initialize() {
        // 加載KV配置
        this.kvConfigManager.load();

        // 初始化通信層
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        // 初始化線程池
        this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
                    new ThreadFactoryImpl("RemotingExecutorThread_"));

        this.registerProcessor();

        // 增加定時(shí)任務(wù)
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        //
        // @Override
        // public void run() {
        // NamesrvController.this.routeInfoManager.printAllPeriodically();
        // }
        // }, 1, 5, TimeUnit.MINUTES);

        return true;
    }

加載KV配置,創(chuàng)建NettyServer網(wǎng)絡(luò)處理對(duì)象,然后開啟兩個(gè)定時(shí)任務(wù),此類定時(shí)任務(wù)統(tǒng)稱為心跳檢測(cè)

  1. NameServer每隔10秒掃描一次Broker,移除處于不激活狀態(tài)的Broker
  2. NameServer每隔10分鐘打印一次KV配置

3、NameServer如何保證數(shù)據(jù)的最終一致

NameServer作為一個(gè)名稱服務(wù),需要提供服務(wù)注冊(cè)、服務(wù)剔除、服務(wù)發(fā)現(xiàn)這些基本功能,但是NameServer節(jié)點(diǎn)之間并不通信,在某個(gè)時(shí)刻各個(gè)節(jié)點(diǎn)數(shù)據(jù)可能不一致的情況下,如何保證客戶端可以最終拿到正確的數(shù)據(jù)。下面分別從路由元信息、路由注冊(cè)、路由剔除,路由發(fā)現(xiàn)四個(gè)角度進(jìn)行介紹。

路由元信息

NameServer路由實(shí)現(xiàn)類: RoutelnfoManager

RouteInfoManager作為NameServer數(shù)據(jù)的載體,記錄Broker、Topic、QueueData等信息。

Broker在啟動(dòng)時(shí)會(huì)將Broker信息、Topic信息、QueueData信息注冊(cè)到所有的NameServer上,并和所有NameServer節(jié)點(diǎn)保持長(zhǎng)連接,之后也會(huì)定時(shí)注冊(cè)信息;

Producer、Consumer也會(huì)和其中一個(gè)NameServer節(jié)點(diǎn)保持長(zhǎng)連接,定時(shí)從NameServer中獲取Topic路由信息;

   private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
  • topicQueueTable: Topic 消息隊(duì)列路由信息,消息發(fā)送時(shí)根據(jù)路由表進(jìn)行負(fù) 載均衡 。
  • brokerAddrTable : Broker 基礎(chǔ)信息, 包含 brokerName、 所屬集群名稱 、 主備 Broker
    地址。
  • clusterAddrTable: Broker 集群信息,存儲(chǔ)集群中所有 Broker 名稱 。
  • brokerLiveTable: Broker 狀態(tài)信息 。 NameServer 每次 收到心跳包時(shí)會(huì) 替換該信 息 。
  • filterServerTable : Broker上的 FilterServer列表,用于類模式消息過濾,

RocketMQ 基于訂閱發(fā)布機(jī)制 , 一個(gè) Topic 擁有 多 個(gè)消息隊(duì) 列 ,一 個(gè) Broker 為每一主 題默 認(rèn)創(chuàng)建 4 個(gè)讀隊(duì)列 4 個(gè)寫隊(duì)列 。 多個(gè) Broker 組成 一個(gè)集群 , BrokerName 由相同的多臺(tái) Broker 組 成 Master-Slave 架構(gòu) , brokerId 為 0 代表 Master, 大于 0 表示 Slave。 BrokerLivelnfo 中 的 lastUpdateTimestamp 存儲(chǔ)上次收到 Broker 心跳包的時(shí)間 。

路由注冊(cè)

對(duì)于Zookeeper、Etcd這樣強(qiáng)一致性組件,數(shù)據(jù)只要寫到主節(jié)點(diǎn),內(nèi)部會(huì)通過狀態(tài)機(jī)將數(shù)據(jù)復(fù)制到其他節(jié)點(diǎn),Zookeeper使用的是Zab協(xié)議,etcd使用的是raft協(xié)議。

但是NameServer節(jié)點(diǎn)之間是互不通信的,無法進(jìn)行數(shù)據(jù)復(fù)制。RocketMQ采取的策略是,在Broker節(jié)點(diǎn)在啟動(dòng)的時(shí)候,輪訓(xùn)NameServer列表,與每個(gè)NameServer節(jié)點(diǎn)建立長(zhǎng)連接,發(fā)起注冊(cè)請(qǐng)求。NameServer內(nèi)部會(huì)維護(hù)一個(gè)Broker表,用來動(dòng)態(tài)存儲(chǔ)Broker的信息。

同時(shí),Broker節(jié)點(diǎn)為了證明自己是存活的,會(huì)將最新的信息上報(bào)給NameServer,然后每隔30秒向NameServer發(fā)送心跳包,心跳包中包含 BrokerId、Broker地址、Broker名稱、Broker所屬集群名稱等等,然后NameServer接收到心跳包后,會(huì)更新時(shí)間戳,記錄這個(gè)Broker的最新存活時(shí)間。

NameServer在處理心跳包的時(shí)候,存在多個(gè)Broker同時(shí)操作一張Broker表,為了防止并發(fā)修改Broker表導(dǎo)致不安全,路由注冊(cè)操作引入了ReadWriteLock讀寫鎖,這個(gè)設(shè)計(jì)亮點(diǎn)允許多個(gè)消息生產(chǎn)者并發(fā)讀,保證了消息發(fā)送時(shí)的高并發(fā),但是同一時(shí)刻N(yùn)ameServer只能處理一個(gè)Broker心跳包,多個(gè)心跳包串行處理。這也是讀寫鎖的經(jīng)典使用場(chǎng)景,即讀多寫少。

Broker端心跳包發(fā)送

Broker端心跳包發(fā)送( BrokerController#start)

 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false);
                }
                catch (Exception e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);

該方法主要是遍歷 NameServer列表, Broker消息服務(wù)器依次向 NameServer發(fā)送心跳包。

public RegisterBrokerResult registerBrokerAll(//
            final String clusterName,// 1
            final String brokerAddr,// 2
            final String brokerName,// 3
            final long brokerId,// 4
            final String haServerAddr,// 5
            final TopicConfigSerializeWrapper topicConfigWrapper,// 6
            final List<String> filterServerList,// 7
            final boolean oneway// 8
    ) {
        RegisterBrokerResult registerBrokerResult = null;

        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null) {
            for (String namesrvAddr : nameServerAddressList) {
                try {
                    RegisterBrokerResult result =
                            this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
                                haServerAddr, topicConfigWrapper, filterServerList, oneway);
                    if (result != null) {
                        registerBrokerResult = result;
                    }

                    log.info("register broker to name server {} OK", namesrvAddr);
                }
                catch (Exception e) {
                    log.warn("registerBroker Exception, " + namesrvAddr, e);
                }
            }
        }

NameServer端處理心跳包

Step1:路由 注冊(cè)需要加寫鎖 ,防止并發(fā)修改 RoutelnfoManager 中的路由 表 。 Broker 所屬 集群是否存在, 如果不存在,則創(chuàng) 建,然 后將 broker 名加入到集群 合中。

Step2 :維護(hù) BrokerData信息,首先從 brokerAddrTable根據(jù) BrokerName嘗試獲取 Broker信息,如果不存在, 則新建 BrokerData并放入到 brokerAddrTable, registerFirst設(shè) 置為 true;如果存在 , 直接替換原先的, registerFirst設(shè)置為 false,表示非第一次注冊(cè) 。

Step3 :如果Broker為Master,并且BrokerTopic配置信息發(fā)生變化或者是初次注冊(cè), 則需要?jiǎng)?chuàng)建或更新 Topic路由元數(shù)據(jù),填充 topicQueueTable, 其實(shí)就是為默認(rèn)主題自動(dòng)注 冊(cè)路由信息,其中包含 MixAII.DEFAULT TOPIC 的路由信息。 當(dāng)消息生產(chǎn)者發(fā)送主題時(shí), 如果該主題未創(chuàng)建并且BrokerConfig的autoCreateTopicEnable為true時(shí), 將返回MixAII. DEFAULT TOPIC的路由信息。

路由剔除

正常情況下,如果Broker關(guān)閉,則會(huì)與NameServer斷開長(zhǎng)連接,Netty的通道關(guān)閉監(jiān)聽器會(huì)監(jiān)聽到連接斷開事件,然后會(huì)將這個(gè)Broker信息剔除掉。

Broker 每隔 30s 向 NameServer 發(fā)送一個(gè)心跳包,心跳包中包含 BrokerId、Broker地址、Broker名稱、 Broker所屬集群名稱、Broker關(guān)聯(lián)的 FilterServer列表。 但是如果 Broker若機(jī) , NameServer無法收到心跳包,此時(shí) NameServer如何來剔除這些失 效的 Broker 呢? Name Server會(huì)每隔 IOs 掃描 brokerLiveTable狀態(tài)表,如果 BrokerLive 的 lastUpdateTimestamp 的時(shí)間戳距當(dāng)前時(shí)間超過 120s,則認(rèn)為 Broker失效,移除該 Broker, 關(guān)閉與Broker連接,并同時(shí)更新topicQueueTable、 brokerAddrTable、 brokerLiveTable、 filterServerTable。

RocktMQ 有兩個(gè)觸發(fā)點(diǎn)來觸發(fā)路由刪除 。

  1. NameServer定時(shí)掃描 brokerLiveTable檢測(cè)上次心跳包與 當(dāng)前系統(tǒng)時(shí)間的時(shí)間差, 如果時(shí)間戳大于 120s,則需要移除該 Broker 信息 。
  2. Broker在正常被關(guān)閉的情況下,會(huì)執(zhí)行 unregisterBroker指令。

路由發(fā)現(xiàn)

RocketMQ 路由發(fā)現(xiàn)是非實(shí)時(shí)的,當(dāng) Topic 路由出現(xiàn)變化后, NameServer不主動(dòng)推送給客戶端 , 而 是由客戶端定時(shí)拉取主題最新的路由 。

com.alibaba.rocketmq.filtersrv.processor.DefaultRequestProcessor#pullMessageForward

  private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request)
            throws Exception {
        final RemotingCommand response =
                RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
        final PullMessageResponseHeader responseHeader =
                (PullMessageResponseHeader) response.readCustomHeader();
        final PullMessageRequestHeader requestHeader =
                (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);

        // 由于異步返回,所以必須要設(shè)置
        response.setOpaque(request.getOpaque());

        DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
        final FilterClassInfo findFilterClass =
                this.filtersrvController.getFilterClassManager().findFilterClass(
                    requestHeader.getConsumerGroup(), requestHeader.getTopic());
        if (null == findFilterClass) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("Find Filter class failed, not registered");
            return response;
        }

        if (null == findFilterClass.getMessageFilter()) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("Find Filter class failed, registered but no class");
            return response;
        }

        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);

        // 構(gòu)造從Broker拉消息的參數(shù)
        MessageQueue mq = new MessageQueue();
        mq.setTopic(requestHeader.getTopic());
        mq.setQueueId(requestHeader.getQueueId());
        mq.setBrokerName(this.filtersrvController.getBrokerName());
        long offset = requestHeader.getQueueOffset();
        int maxNums = requestHeader.getMaxMsgNums();

        final PullCallback pullCallback = new PullCallback() {

            @Override
            public void onSuccess(PullResult pullResult) {
                responseHeader.setMaxOffset(pullResult.getMaxOffset());
                responseHeader.setMinOffset(pullResult.getMinOffset());
                responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());
                response.setRemark(null);

                switch (pullResult.getPullStatus()) {
                case FOUND:
                    response.setCode(ResponseCode.SUCCESS);

                    List<MessageExt> msgListOK = new ArrayList<MessageExt>();
                    try {
                        for (MessageExt msg : pullResult.getMsgFoundList()) {
                            boolean match = findFilterClass.getMessageFilter().match(msg);
                            if (match) {
                                msgListOK.add(msg);
                            }
                        }

                        // 有消息返回
                        if (!msgListOK.isEmpty()) {
                            returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx,
                                response, msgListOK);
                            return;
                        }
                        // 全部都被過濾掉了
                        else {
                            response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                        }
                    }
                    // 只要拋異常,就終止過濾,并返回客戶端異常
                    catch (Throwable e) {
                        final String error =
                                String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
                                    requestHeader.getConsumerGroup(), requestHeader.getTopic());
                        log.error(error, e);

                        response.setCode(ResponseCode.SYSTEM_ERROR);
                        response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));
                        returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx,
                            response, null);
                        return;
                    }

                    break;
                case NO_MATCHED_MSG:
                    response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                    break;
                case NO_NEW_MSG:
                    response.setCode(ResponseCode.PULL_NOT_FOUND);
                    break;
                case OFFSET_ILLEGAL:
                    response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                    break;
                default:
                    break;
                }

                returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response,
                    null);
            }


            @Override
            public void onException(Throwable e) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e));
                returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response,
                    null);
                return;
            }
        };

        pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);

        return null;
    }
  1. 調(diào)用 RouterlnfoManager 的方法,從路由 表 topicQueueTable、 brokerAddrTable、 filterServerTable中分別填充TopicRouteData中的List<Queu巳Data>、List<BrokerData>和 filterServer 地址表 。
  2. 如果找到主題對(duì)應(yīng)的路由信息并且該主題為順序消息,則從 NameServer KVconfig 中獲取關(guān)于順序消息相關(guān) 的配置填充路由信息 。

如果找不到路由信息 CODE 則使用 TOPIC NOT_EXISTS ,表示沒有找到對(duì)應(yīng)的路由 。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容