Rocketmq源碼-namesrv模塊詳解

Rocketmq 使用 namesrv 來管理所有的元數(shù)據(jù)信息,包括主題 Topic 路由信息和 Broker 信息。
首先我們介紹一下一些基礎(chǔ)概念:

  1. Broker : 儲(chǔ)存消息的服務(wù)器。
    • 分為主從兩種模式,通過 brokerId 來區(qū)分,目前 brokerId = 0 就表示主節(jié)點(diǎn)。
    • 每個(gè) Broker 啟動(dòng)時(shí),會(huì)向 namesrv 注冊(cè)自己的信息,并會(huì)定期發(fā)送心跳信息。
  2. Broker 組 : 相同 brokerName 名字的 Broker 服務(wù)器就是一個(gè)組的。

    注意: 這里就有一個(gè)小問題,如果兩個(gè) Broker 有相同brokerName名字,而且 brokerId 都是 0 時(shí),它們都可以向 namesrv 注冊(cè)自己信息,后面覆蓋前面信息,而且因?yàn)樗鼈兌紩?huì)發(fā)送心跳消息,就會(huì)導(dǎo)致不斷地相互覆蓋。

  3. Broker 集群 : 有相同 clusterName 名字的Broker 服務(wù)器就是同一個(gè)集群的。
  4. Topic : 主題 Topic 是以 Broker 組進(jìn)行區(qū)分的。
    • Broker 組有一個(gè) TopicConfigManager 來管理該 Broker 所擁有的所有主題 Topic 信息,包括主題Topic 的權(quán)限perm,讀隊(duì)列數(shù)量readQueueNums,writeQueueNums 寫隊(duì)列數(shù)量等等。
    • Broker 組中主 Broker 創(chuàng)建主題 Topic 在這個(gè)Broker 組擁有隊(duì)列文件Queue,從 Broker 只是復(fù)制主 Broker
    • 生產(chǎn)者發(fā)送消息時(shí),也是從主題 Topic 擁有的Broker 組數(shù)組中,挑選一個(gè)Broker 組,向這個(gè)Broker 組的主 Broker 發(fā)送消息,然后主 Broker 再將這些數(shù)據(jù)發(fā)送給從 Broker

一. NamesrvStartup

這個(gè)類是 namesrv 的啟動(dòng)類,用來開啟 namesrv

1.1 main 方法

 public static void main(String[] args) {
        main0(args);
    }

    public static NamesrvController main0(String[] args) {

        try {
            // 創(chuàng)建 NamesrvController
            NamesrvController controller = createNamesrvController(args);
            // 啟動(dòng) NamesrvController
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }
  1. 通過 createNamesrvController() 方法創(chuàng)建 NamesrvController 實(shí)例。
  2. 調(diào)用 start(controller) 方法啟動(dòng) NamesrvController。

1.2 createNamesrvController 方法

 public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();

        // 通過 commandLine 解析 args 參數(shù)
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }

        // namesrv 相關(guān)的配置參數(shù)
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        // netty 相關(guān)的配置參數(shù)
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        // 默認(rèn)端口 9876, 但是可以通過 -c 傳入的 properties 文件參數(shù)進(jìn)行覆蓋
        nettyServerConfig.setListenPort(9876);
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }

        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        // 創(chuàng)建 NamesrvController
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }
  1. 通過 commandLine 解析命令行 args 參數(shù)。
  2. 如果指定了配置文件,那么讀取配置文件中的配置項(xiàng),并賦值到 namesrvConfignettyServerConfig 。
  3. 創(chuàng)建 NamesrvController 實(shí)例,并將配置項(xiàng)數(shù)據(jù) properties 賦值到 Configuration 中。

1.3 start 方法

    public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        // 初始化 NamesrvController
        boolean initResult = controller.initialize();
        if (!initResult) {
            // 初始化失敗,就退出
            controller.shutdown();
            System.exit(-3);
        }

        // 添加鉤子函數(shù),保證 JVM 正常退出時(shí),關(guān)閉 NamesrvController
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        // 開啟 NamesrvController
        controller.start();

        return controller;
    }

非常簡單,先初始化 NamesrvController;然后添加鉤子函數(shù),保證 JVM 正常退出時(shí),關(guān)閉 NamesrvController;最后調(diào)用 start() 方法啟動(dòng)。

二. NamesrvController

2.1 成員屬性

    // Namesrv 的配置項(xiàng)
    private final NamesrvConfig namesrvConfig;

    // Netty 服務(wù)端配置項(xiàng)
    private final NettyServerConfig nettyServerConfig;

    // 定時(shí)器,用來定期檢查是否有不活躍的 broker,以及定期打印 kvConfigManager 中的值
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));
    // 儲(chǔ)存 KV 的值
    private final KVConfigManager kvConfigManager;
    //  namesrv 所有路由信息的管理器
    private final RouteInfoManager routeInfoManager;

    // 遠(yuǎn)程RPC服務(wù)服務(wù)端,用來處理遠(yuǎn)程請(qǐng)求命令
    private RemotingServer remotingServer;

    // ChannelEventListener 接口子類,監(jiān)聽 Netty 的 CONNECT, CLOSE, IDLE, EXCEPTION 事件,進(jìn)行對(duì)應(yīng)處理
    private BrokerHousekeepingService brokerHousekeepingService;

    // 處理遠(yuǎn)程請(qǐng)求的線程池執(zhí)行器
    private ExecutorService remotingExecutor;
    // 配置項(xiàng)
    private Configuration configuration;
    // 監(jiān)控 file 變化,主要用于 SSL
    private FileWatchService fileWatchService;
  1. namesrvConfignettyServerConfig: Namesrv 的配置項(xiàng)和 Netty 服務(wù)端配置項(xiàng)。
  2. scheduledExecutorService : 定時(shí)器

    用來定期檢查是否有不活躍的 Broker,以及定期打印 kvConfigManager 中的值。

  3. kvConfigManager : KV 值的管理器。
  4. routeInfoManager : 所有路由信息的管理器。
  5. remotingServer : 遠(yuǎn)程RPC服務(wù)服務(wù)端,用來處理遠(yuǎn)程請(qǐng)求命令。
  6. brokerHousekeepingService : ChannelEventListener 接口子類。

    監(jiān)聽 NettyCONNECT, CLOSE, IDLE, EXCEPTION 事件,進(jìn)行對(duì)應(yīng)處理。

  7. remotingExecutor : 處理遠(yuǎn)程請(qǐng)求的線程池執(zhí)行器。

2.2 initialize 方法

    public boolean initialize() {

        // 從 kvConfig.json 文件中加載之前存儲(chǔ)的 KV 值
        this.kvConfigManager.load();

        // 通過 netty 創(chuàng)建一個(gè)服務(wù)端
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        // 用于處理請(qǐng)求的線程池 remotingExecutor
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        // 注冊(cè)請(qǐng)求命令處理器
        this.registerProcessor();

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                // 每隔10秒 檢查是否有不活躍的 broker
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                // 每隔10秒打印一下 kvConfigManager 中的值
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        // 處理 SSL
        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }

        return true;
    }
  1. kvConfig.json 文件中加載之前存儲(chǔ)的 KV 值。
  2. 創(chuàng)建一個(gè)遠(yuǎn)程RPC服務(wù)服務(wù)端,用來處理遠(yuǎn)程請(qǐng)求命令。
  3. 用于處理請(qǐng)求的線程池 remotingExecutor。
  4. 注冊(cè)請(qǐng)求命令處理器。
  5. 通過 scheduledExecutorService 每隔10秒檢查是否有不活躍的 Broker,以及每隔10秒打印一下 kvConfigManager 中的值。
  6. 最后處理 SSL。

三. RouteInfoManager

3.1 成員屬性

    // 主題 topic 對(duì)應(yīng)的隊(duì)列相關(guān)信息QueueData,這里是 List 原因是每個(gè) broker 組都有一個(gè) QueueData。
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    // broker 組基礎(chǔ)信息集合,包括 broker 名字,所屬集群名字和主從 broker 的地址。
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    // broker 集群集合,包括所有集群名字以及它擁有所有broker 組名字。
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    // 每個(gè) broker 的狀態(tài)信息,每次收到心跳包時(shí),都會(huì)替換對(duì)應(yīng)數(shù)據(jù)。
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    // 每個(gè) broker 對(duì)應(yīng)的 FilterServer 地址列表,用于類模式消息過濾。
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
  1. topicQueueTable : 所有主題相關(guān)信息集合。
    • 每個(gè)主題 Topic 對(duì)應(yīng)一個(gè) List<QueueData> 集合,因?yàn)橐粋€(gè)主題有多個(gè) Broker 組。
    • QueueData 包括 Broker組的名字,這個(gè) Broker組中當(dāng)期主題Topic對(duì)應(yīng)的可讀隊(duì)列數(shù)量,可寫隊(duì)列數(shù)量,讀寫權(quán)限和同步標(biāo)記。
  2. brokerAddrTable : Broker 組基礎(chǔ)信息集合。

    BrokerData 包括 Broker 組名字,所屬集群名字和主從 Broker 的地址。

  3. clusterAddrTable : Broker 集群集合,key 是集群名字,value 是集群擁有所有的Broker組名字。
  4. brokerLiveTable : 每個(gè) Broker 的狀態(tài)信息。
  5. filterServerTable : 每個(gè) Broker 對(duì)應(yīng)的 FilterServer 地址列表,用于類模式消息過濾。

3.1.1 QueueData

public class QueueData implements Comparable<QueueData> {
    // broker組的名字
    private String brokerName;
    // 可讀隊(duì)列數(shù)量
    private int readQueueNums;
    // 可寫隊(duì)列數(shù)量
    private int writeQueueNums;
    // 讀寫權(quán)限,具體參考 PermName,
    private int perm;
    // 主題Topic 同步標(biāo)記; 參考TopicSysFlag類: FLAG_UNIT = 0x1 << 0, FLAG_UNIT_SUB = 0x1 << 1
    private int topicSynFlag;
}

3.1.2 BrokerData

public class BrokerData implements Comparable<BrokerData> {
    // broker 組所屬集群的名字
    private String cluster;
    // broker 組的名字
    private String brokerName;
    // broker 組中所有 broker 的地址;其中 brokerId = 0 表示主 broker,其他的都是從 broker。
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}

3.1.3 BrokerLiveInfo

class BrokerLiveInfo {
    // 最近一次更新時(shí)間,用來判斷這個(gè) broker 是否活躍
    private long lastUpdateTimestamp;
    // 這個(gè) broker 的數(shù)據(jù)版本,可以用來判斷這個(gè) Broker 的數(shù)據(jù)是否改變過。
    private DataVersion dataVersion;
    // 連接這個(gè) broker 的通道channel
    private Channel channel;
    // 該 broker 的 HaServer地址
    private String haServerAddr;
}

3.2 重要方法

3.2.1 registerBroker 方法

    /**
     * 注冊(cè) Broker
     */
    public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
                this.lock.writeLock().lockInterruptibly();

                // 這個(gè) broker 所屬的集群clusterName 是否已經(jīng)在 clusterAddrTable 中
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                // 因?yàn)?brokerNames 是 Set 類型,會(huì)自動(dòng)去重,所以這里直接添加
                brokerNames.add(brokerName);

                // 是否第一次注冊(cè)
                boolean registerFirst = false;

                // 通過 brokerName 從 brokerAddrTable 中獲取對(duì)應(yīng)的 BrokerData。
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    registerFirst = true;
                    // 如果不存在,就創(chuàng)建新的,并存入 brokerAddrTable 中。
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                //The same IP:PORT must only have one record in brokerAddrTable
                // 處理 slave 變成 master 的情況
                Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<Long, String> item = it.next();
                    // 如果 brokerAddr 相同,但是 brokerId 不一樣,說明這個(gè) broker 修改 brokerId,
                    // 那么就先把它從 brokerAddrsMap 集合中移除。
                    if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                        it.remove();
                    }
                }

                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                registerFirst = registerFirst || (null == oldAddr);

                /**
                 * 只有當(dāng) topicConfigWrapper 不為null且必須是master節(jié)點(diǎn),才能進(jìn)入
                 */
                if (null != topicConfigWrapper
                    && MixAll.MASTER_ID == brokerId) {
                    // 當(dāng) topicConfigWrapper 的數(shù)據(jù)版本dataVersion 和當(dāng)前儲(chǔ)存值不一樣,或者是第一次注冊(cè)時(shí);
                    // 都需要處理 Topic
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            // 遍歷 Broker 上所有的 topic 配置,改變 topicQueueTable 集合數(shù)據(jù)
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                // 只有 Broker 組中的主節(jié)點(diǎn)才有可能調(diào)用到這個(gè)方法。
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }

                // 更新 brokerLiveTable 中該 Broker 地址對(duì)應(yīng)狀態(tài)信息,表示該 Broker 地址是活躍的。
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel,
                        haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }

                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
                    }
                }

                if (MixAll.MASTER_ID != brokerId) {
                    // 如果這個(gè)Broker 是 slave 節(jié)點(diǎn), 那么給它設(shè)置主節(jié)點(diǎn)的地址和 HaServer的地址
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }

        return result;
    }

注冊(cè) Broker 信息, 每個(gè) Broker 會(huì)定時(shí)向 namesrv 發(fā)送自身的數(shù)據(jù),就會(huì)調(diào)用到這個(gè)方法。方法流程:

  1. 先將這個(gè) BrokerbrokerName 添加到集群集合 clusterAddrTable 中。
  2. 將這個(gè) Broker 的相關(guān)信息添加到 brokerAddrTable 集合中,并判斷這個(gè) Broker 是否第一次注冊(cè) registerFirst。
  3. 當(dāng)這個(gè) Broker 是主節(jié)點(diǎn),topicConfigWrapper 的數(shù)據(jù)版本dataVersion 和當(dāng)前儲(chǔ)存值不一樣,或者是第一次注冊(cè)時(shí);都需要將該 Broker的主題信息 topicConfigWrapper 添加到 topicQueueTable 中。
  4. 更新 brokerLiveTable 中該 Broker 地址對(duì)應(yīng)狀態(tài)信息,表示該 Broker 地址是活躍的。
  5. 如果這個(gè) Brokerslave 節(jié)點(diǎn), 那么給它設(shè)置主節(jié)點(diǎn)的地址和 HaServer的地址。

總結(jié)一下:

就是按照順序,分別改變 clusterAddrTable, brokerAddrTable, topicQueueTable,brokerLiveTablefilterServerTable 的數(shù)據(jù)。

3.2.2 unregisterBroker 方法

 /**
     * 取消注冊(cè)Broker
     */
    public void unregisterBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId) {
        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                // 刪除的時(shí)候,先刪除簡單的。
                // 1. 刪除 brokerLiveTable 集合中 brokerAddr 對(duì)應(yīng)數(shù)據(jù)
                BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
                log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
                    brokerLiveInfo != null ? "OK" : "Failed",
                    brokerAddr
                );
                // 2. 刪除 filterServerTable 集合中 brokerAddr 對(duì)應(yīng)數(shù)據(jù)
                this.filterServerTable.remove(brokerAddr);

                // 3. 再根據(jù) brokerName 處理 brokerAddrTable 集合中brokerData 數(shù)據(jù)
                boolean removeBrokerName = false;
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null != brokerData) {
                    String addr = brokerData.getBrokerAddrs().remove(brokerId);
                    log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
                        addr != null ? "OK" : "Failed",
                        brokerAddr
                    );

                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        // 如果 brokerName 只包含這一個(gè) brokerId,被刪除了;
                        // 那么也要從 brokerAddrTable 集合中刪除這個(gè) brokerName
                        this.brokerAddrTable.remove(brokerName);
                        log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
                            brokerName
                        );

                        removeBrokerName = true;
                    }
                }

                if (removeBrokerName) {
                    // brokerName 被刪除了,要更新 clusterAddrTable 集合數(shù)據(jù)
                    Set<String> nameSet = this.clusterAddrTable.get(clusterName);
                    if (nameSet != null) {
                        boolean removed = nameSet.remove(brokerName);
                        log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
                            removed ? "OK" : "Failed",
                            brokerName);

                        if (nameSet.isEmpty()) {
                            this.clusterAddrTable.remove(clusterName);
                            log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
                                clusterName
                            );
                        }
                    }
                    // 只有當(dāng) brokerName 被刪除了,那么就要更新 topicQueueTable 集合了。
                    this.removeTopicByBrokerName(brokerName);
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("unregisterBroker Exception", e);
        }
    }
  1. 先刪除 brokerLiveTable 集合中 brokerAddr 對(duì)應(yīng)數(shù)據(jù)。
  2. 刪除 filterServerTable 集合中 brokerAddr 對(duì)應(yīng)數(shù)據(jù)。
  3. 再根據(jù) brokerName 處理 brokerAddrTable 集合中 BrokerData 數(shù)據(jù)。

    如果 BrokerData 中只包含當(dāng)前這個(gè) Broker ,那么當(dāng)它被刪除后,那么就要從 brokerAddrTable 刪除這個(gè) brokerName 鍵。表示這個(gè) Broker 組已經(jīng)不存在了。

  4. 當(dāng)一個(gè)Broker 組被刪除后,那么就需要改變 clusterAddrTabletopicQueueTable 的數(shù)據(jù)了。
    • clusterAddrTable 中刪除這個(gè)Broker 組名字 brokerName , 如果這個(gè)集群只有這一個(gè)Broker 組,那么這個(gè)集群也要從 clusterAddrTable 中刪除。
    • 通過 removeTopicByBrokerName() 方法,更新 topicQueueTable 集合。

總結(jié)一下:

就是先刪除 brokerLiveTablefilterServerTable 中的數(shù)據(jù),因?yàn)樗鼈冎械臄?shù)據(jù)是比較獨(dú)立的;然后修改brokerAddrTable 集合中數(shù)據(jù);最后根據(jù) Broker 組是否被刪除,來決定是否修改clusterAddrTabletopicQueueTable 集合中的數(shù)據(jù)。

3.2.3 onChannelDestroy 方法

    /**
     * 通道 Channel 銷毀時(shí)的處理
     */
    public void onChannelDestroy(String remoteAddr, Channel channel) {
        String brokerAddrFound = null;
        if (channel != null) {
            // 根據(jù) channel,從 brokerLiveTable 中查找對(duì)應(yīng)的 brokerAddr 值。
            try {
                try {
                    this.lock.readLock().lockInterruptibly();
                    // 從 brokerLiveTable 中,根據(jù) channel 查找對(duì)應(yīng)的Broker地址 brokerAddrFound
                    Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                        this.brokerLiveTable.entrySet().iterator();
                    while (itBrokerLiveTable.hasNext()) {
                        Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                        if (entry.getValue().getChannel() == channel) {
                            brokerAddrFound = entry.getKey();
                            break;
                        }
                    }
                } finally {
                    this.lock.readLock().unlock();
                }
            } catch (Exception e) {
                log.error("onChannelDestroy Exception", e);
            }
        }

        if (null == brokerAddrFound) {
            brokerAddrFound = remoteAddr;
        } else {
            log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
        }

        if (brokerAddrFound != null && brokerAddrFound.length() > 0) {

            // 根據(jù) brokerAddr 的值,進(jìn)行移除操作
            try {
                try {
                    this.lock.writeLock().lockInterruptibly();
                    // 先移除 brokerLiveTable 和 filterServerTable 集合中,
                    // brokerAddrFound 對(duì)應(yīng)數(shù)據(jù)
                    this.brokerLiveTable.remove(brokerAddrFound);
                    this.filterServerTable.remove(brokerAddrFound);
                    String brokerNameFound = null;
                    boolean removeBrokerName = false;
                    Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                        this.brokerAddrTable.entrySet().iterator();
                    // 根據(jù)Broker地址 brokerAddrFound 從brokerAddrTable 中刪除這個(gè) Broker
                    while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                        BrokerData brokerData = itBrokerAddrTable.next().getValue();

                        Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                        while (it.hasNext()) {
                            Entry<Long, String> entry = it.next();
                            Long brokerId = entry.getKey();
                            String brokerAddr = entry.getValue();
                            if (brokerAddr.equals(brokerAddrFound)) {
                                // 找到了,就刪除這個(gè) Broker 地址
                                brokerNameFound = brokerData.getBrokerName();
                                it.remove();
                                log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                    brokerId, brokerAddr);
                                break;
                            }
                        }

                        if (brokerData.getBrokerAddrs().isEmpty()) {
                            removeBrokerName = true;
                            itBrokerAddrTable.remove();
                            log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                                brokerData.getBrokerName());
                        }
                    }

                    if (brokerNameFound != null && removeBrokerName) {
                        // 如果 brokerName 被刪除了,那么就要改變 clusterAddrTable 集合了
                        Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                        while (it.hasNext()) {
                            Entry<String, Set<String>> entry = it.next();
                            String clusterName = entry.getKey();
                            Set<String> brokerNames = entry.getValue();
                            boolean removed = brokerNames.remove(brokerNameFound);
                            if (removed) {
                                log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                    brokerNameFound, clusterName);

                                if (brokerNames.isEmpty()) {
                                    log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                        clusterName);
                                    it.remove();
                                }

                                break;
                            }
                        }
                    }

                    if (removeBrokerName) {
                        // 刪除了 brokerName,那么就要?jiǎng)h除 topicQueueTable 中所有這個(gè) brokerName 對(duì)應(yīng)的QueueData
                        Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                            this.topicQueueTable.entrySet().iterator();
                        while (itTopicQueueTable.hasNext()) {
                            Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                            String topic = entry.getKey();
                            List<QueueData> queueDataList = entry.getValue();

                            Iterator<QueueData> itQueueData = queueDataList.iterator();
                            while (itQueueData.hasNext()) {
                                QueueData queueData = itQueueData.next();
                                if (queueData.getBrokerName().equals(brokerNameFound)) {
                                    itQueueData.remove();
                                    log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                                        topic, queueData);
                                }
                            }

                            if (queueDataList.isEmpty()) {
                                // 如果刪除后,這個(gè) queueDataList 為空,
                                // 說明這個(gè) Topic 沒有對(duì)應(yīng)的 QueueData,也應(yīng)該刪除。
                                itTopicQueueTable.remove();
                                log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                                    topic);
                            }
                        }
                    }
                } finally {
                    this.lock.writeLock().unlock();
                }
            } catch (Exception e) {
                log.error("onChannelDestroy Exception", e);
            }
        }
    }

這個(gè)方法和 unregisterBroker 很像,區(qū)別是:

  1. 通過 channelbrokerLiveTable 找到對(duì)應(yīng)的 Broker 地址 brokerAddr 。
  2. 刪除 brokerAddrTable 集合數(shù)據(jù)時(shí),是通過 brokerAddr 進(jìn)行匹配的,而不是 brokerId;
  3. 其他的操作流程和 unregisterBroker 一樣。

3.2.4 scanNotActiveBroker 方法

  public void scanNotActiveBroker() {
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            // 如果某個(gè) Broker 超過 BROKER_CHANNEL_EXPIRED_TIME(1000 * 60 * 2) 沒有接收到信息,
            // 那么我們就認(rèn)為這個(gè) Broker 已經(jīng)出現(xiàn)問題,刪除它
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }

掃描不活躍的 Broker,并將它進(jìn)行關(guān)閉。

如果某個(gè) Broker 超過 BROKER_CHANNEL_EXPIRED_TIME(1000 * 60 * 2) 沒有接收到信息。那么我們就認(rèn)為這個(gè) Broker 已經(jīng)出現(xiàn)問題,就關(guān)閉它。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容