RocketMQ源碼解析(二)-nameserv

NameServ是rocketMQ的注冊(cè)中心,保存所有Broker、Topic的元數(shù)據(jù)。Broker啟動(dòng)后會(huì)向nameserv發(fā)送心跳,nameserv也會(huì)定時(shí)檢測(cè)broker的可用性,并移除不可用的broker。

Nameserv的啟動(dòng)過程

啟動(dòng)腳本

> nohup sh bin/mqnamesrv &

nameserv啟動(dòng)過程會(huì)將所有初始化和啟動(dòng)工作交給NamesrvController來完成。

NamesrvController

nameserv的主要控制類,負(fù)責(zé)初始化和后臺(tái)任務(wù)啟動(dòng),Controller包含的主要組件都在構(gòu)造函數(shù)中做了初始化
Controller構(gòu)造函數(shù)

public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        //nameserv參數(shù)配置
        this.namesrvConfig = namesrvConfig; 
        //netty的參數(shù)配置
        this.nettyServerConfig = nettyServerConfig;
        this.kvConfigManager = new KVConfigManager(this);
        //初始化RouteInfoManager
        this.routeInfoManager = new RouteInfoManager();
        //監(jiān)聽客戶端連接(Channel)的變化,通知RouteInfoManager檢查broker是否有變化
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        //Nameserv的配置參數(shù)會(huì)保存到磁盤文件中
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }

構(gòu)造函數(shù)中初始化了RouteInfoManager,這個(gè)最重要的類,負(fù)責(zé)緩存整個(gè)集群的broker信息,以及topic和queue的配置信息。
RouteInfoManager數(shù)據(jù)結(jié)構(gòu)
RouteInfoManager的所有數(shù)據(jù)通過HashMap緩存在內(nèi)存中,通過讀寫鎖來控制并發(fā)更新。這樣可最大程度的提高客戶端查詢數(shù)據(jù)的速度。數(shù)據(jù)更新時(shí)會(huì)將數(shù)據(jù)保存到文件中,重啟后可恢復(fù)數(shù)據(jù)。

    //1、Topic和broker的Map,保存了topic在每個(gè)broker上的讀寫Queue的個(gè)數(shù)以及讀寫權(quán)限
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    //2、注冊(cè)到nameserv上的所有Broker,按照brokername分組
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    //3、broker的集群對(duì)應(yīng)關(guān)系
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    //4、broker最新的心跳時(shí)間和配置版本號(hào)
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    //5、broker和FilterServer的對(duì)應(yīng)關(guān)系
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

從以上的信息可以看出:
1)Broker使用brokerName來標(biāo)識(shí)主從關(guān)系,同一個(gè)brokerName下只能由一個(gè)master。
2)使用clusterName來判斷多個(gè)broker是不是屬于同一個(gè)集群。對(duì)于同一個(gè)cluster下的broker,producer在發(fā)送消息時(shí)只會(huì)選擇發(fā)送給其中一個(gè)。
3)nameserv會(huì)記錄brokerAddr的最后活躍時(shí)間,如果超過一定沒有心跳或其他數(shù)據(jù)交互,會(huì)認(rèn)為broker已下線。
4)nameserv和broker上都會(huì)保存DataVersion字段,當(dāng)broker配置有變更時(shí),DataVersion會(huì)+1。下次心跳時(shí)nameserv通過這個(gè)字段來判斷配置是否有變更。
【注意】因?yàn)閚ameserv是用brokername來區(qū)分broker,所以注冊(cè)到同一個(gè)nameserv上的多個(gè)集群,brokerName和topic也是不能重復(fù)的。
Controller initialize
啟動(dòng)過程中新建了一個(gè)Controller的實(shí)例后會(huì)調(diào)用它的initialize()方法:

public boolean initialize() {
        //1、初始化KVConfigManager
        this.kvConfigManager.load();
        //2、初始化netty server
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        //3、客戶端請(qǐng)求處理的線程池
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
       //4、注冊(cè)DefaultRequestProcessor,所有的客戶端請(qǐng)求都會(huì)轉(zhuǎn)給這個(gè)Processor來處理
        this.registerProcessor();
       //5、啟動(dòng)定時(shí)調(diào)度,每10秒鐘掃描所有Broker,檢查存活狀態(tài)
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        //6、日志打印的調(diào)度器,定時(shí)打印kvConfigManager的內(nèi)容      
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
        //7、監(jiān)聽ssl證書文件變化,
        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            ...
        }

        return true;
    }

以上最重要的就是第2,4步,初始化nameserv的Server,用來接收客戶端請(qǐng)求。所有的客戶端請(qǐng)求都會(huì)轉(zhuǎn)給第4步中注冊(cè)的DefaultRequestProcessor來處理。
第5步中,啟動(dòng)了一個(gè)定時(shí)器來掃描RouteInfoManager中緩存的broker信息,如果broker已經(jīng)長(zhǎng)時(shí)間沒有心跳信息,則認(rèn)為broker已經(jīng)down掉了,將其移除。
Controller啟動(dòng):

   public void start() throws Exception {
        this.remotingServer.start();
        //監(jiān)聽ssl文件變化,可以實(shí)時(shí)更新證書
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }

啟動(dòng)的過程比較簡(jiǎn)單,就是啟動(dòng)netty server開始接收客戶端請(qǐng)求。

DefaultRequestProcessor請(qǐng)求處理

前面講過nameserv最重要的兩個(gè)作用,一個(gè)是路由管理,通過RouteInfoManager管理路由信息供客戶端查詢。一個(gè)是Broker管理,接收broker注冊(cè)并通過心跳機(jī)制檢查broker的可用性。
NameServer通過DefaultRequestProcessor的processRequest方法來提供請(qǐng)求處理。

@Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        ...
        switch (request.getCode()) {
            ...
            //broker注冊(cè)請(qǐng)求
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    return this.registerBroker(ctx, request);
                }
             //Broker注銷請(qǐng)求
            case RequestCode.UNREGISTER_BROKER:
                return this.unregisterBroker(ctx, request);
            //根據(jù)topic獲取broker路由信息
            case RequestCode.GET_ROUTEINTO_BY_TOPIC:
                return this.getRouteInfoByTopic(ctx, request);
            //獲取broker集群信息
            case RequestCode.GET_BROKER_CLUSTER_INFO:
                return this.getBrokerClusterInfo(ctx, request);
            case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                return this.wipeWritePermOfBroker(ctx, request);
            //獲取所有topic信息
            case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                return getAllTopicListFromNameserver(ctx, request);
            //刪除topic
            case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                return deleteTopicInNamesrv(ctx, request);
            ...
        }
        return null;
    }

查詢路由信息

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final GetRouteInfoRequestHeader requestHeader =
            (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
        //從RouteInfoManager中獲取topic的路由信息
        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
        //如果支持順序消息,則填充KVConfig信息
        if (topicRouteData != null) {
            if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
                String orderTopicConf =
                    this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                        requestHeader.getTopic());
                topicRouteData.setOrderTopicConf(orderTopicConf);
            }

            byte[] content = topicRouteData.encode();
            response.setBody(content);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }

        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
            + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
        return response;
    }

路由信息就是直接到RouteInfoManager查詢,我們看下具體實(shí)現(xiàn):

public TopicRouteData pickupTopicRouteData(final String topic) {
          ...
            try {
                //獲取讀鎖
                this.lock.readLock().lockInterruptibly();
                //獲取所有支持該topic的broker的queue配置
                List<QueueData> queueDataList = this.topicQueueTable.get(topic);
                if (queueDataList != null) {
                    topicRouteData.setQueueDatas(queueDataList);
                    foundQueueData = true;
                    //獲取brokerName
                    Iterator<QueueData> it = queueDataList.iterator();
                    while (it.hasNext()) {
                        QueueData qd = it.next();
                        brokerNameSet.add(qd.getBrokerName());
                    }
              
                    for (String brokerName : brokerNameSet) {
                        //根據(jù)brokerName獲取broker主從地址信息
                        BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                        if (null != brokerData) {
                            BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
                                .getBrokerAddrs().clone());
                            brokerDataList.add(brokerDataClone);
                            foundBrokerData = true;
                            for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                                List<String> filterServerList = this.filterServerTable.get(brokerAddr);
                                filterServerMap.put(brokerAddr, filterServerList);
                            }
                        }
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }
            ...
    }

獲取topic路由的過程就是直接從HashMap中獲取緩存的broker配置。
Broker注冊(cè)
Broker在啟動(dòng)的時(shí)候會(huì)將topic和queue的配置同步給nameserv。

public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        ...
        //checksum,檢查CRC32是否相等
        if (!checksum(ctx, request, requestHeader)) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("crc32 not match");
            return response;
        }

        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
        //decode request body,如果body已壓縮,則先解壓。如果body為空,會(huì)將topic的版本號(hào)默認(rèn)置為0.
        if (request.getBody() != null) {
            try {
                registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
            } catch (Exception e) {
                throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
            }
        } else {
            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
        }
        //使用broker上報(bào)的信息更新nameserv的RouteInfo
        RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
            requestHeader.getClusterName(),
            requestHeader.getBrokerAddr(),
            requestHeader.getBrokerName(),
            requestHeader.getBrokerId(),
            requestHeader.getHaServerAddr(),
            registerBrokerBody.getTopicConfigSerializeWrapper(),
            registerBrokerBody.getFilterServerList(),
            ctx.channel());
        //如果broker是slave的話,會(huì)將master address和ha server address通過response返回給broker
        responseHeader.setHaServerAddr(result.getHaServerAddr());
        responseHeader.setMasterAddr(result.getMasterAddr());
        //將Order topic的KV配置信息通過response返回
        byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
        response.setBody(jsonValue);

        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

nameserv收到broker注冊(cè)后,更新routeInfo過程

public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                //更新cluster和broker對(duì)應(yīng)關(guān)系
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNames.add(brokerName);

                boolean registerFirst = false;
                //更新brokername和brokerdata的map
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                registerFirst = registerFirst || (null == oldAddr);
                //如果是master broker,第一次注冊(cè)或者是topic信息發(fā)生變化了,更新topicQueueTable
                if (null != topicConfigWrapper
                    && MixAll.MASTER_ID == brokerId) {
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }
                //更新broker的心跳時(shí)間
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel,
                        haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }
                //更新filter server table
                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
                    }
                }
                //如果是slave broker注冊(cè),如果master存在,則返回master broker信息
                if (MixAll.MASTER_ID != brokerId) {
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }

        return result;
    }

總結(jié)

1、nameserv通過clusterName來判斷broker是不是屬于同一個(gè)集群
2、nameserv通過brokerName來判斷兩個(gè)broker是不是主從關(guān)系
3、對(duì)于相同的brokerName,只有一個(gè)master(id=0),不同的slave必須使用不同的Id (id>0)
4、NameServ只會(huì)保存master的topic配置信息,因?yàn)槔碚撋蟬lave的topic信息是從master同步過去的
5、所有的topic信息以broker上報(bào)為準(zhǔn),broker在啟動(dòng)的時(shí)候是不會(huì)去nameserv獲取topic配置的,只會(huì)從自己持久化文件中加載。所以,一個(gè)新的broker啟動(dòng)后默認(rèn)只有System topic信息。如果broker是新的,或者broker在掛掉一段時(shí)間重啟topic不是最新的話,只能通過客戶端更新topic來使broker能加入到正常的消息收發(fā)中。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,897評(píng)論 13 425
  • kafka的定義:是一個(gè)分布式消息系統(tǒng),由LinkedIn使用Scala編寫,用作LinkedIn的活動(dòng)流(Act...
    時(shí)待吾閱讀 5,537評(píng)論 1 15
  • 核心組件(4個(gè)組件+消息存儲(chǔ)結(jié)構(gòu)) 客戶端消費(fèi)模式 1. MQ的使用場(chǎng)景 昨天在寫完之后,有些讀者在評(píng)論中提出:到...
    樓亭樵客閱讀 1,142評(píng)論 0 3
  • 有一天,球球問我:媽媽,什么是旅行???我就跟他說:旅行就是從自己待膩的地方到別人待膩的地方去待一待,然后再回到自己...
    麥子妹妹寧閱讀 1,021評(píng)論 0 0
  • 大大咧咧的我,沒心沒肺!可是最近一年多,忽然開始覺得人言可畏起來。暗箭傷人,背后下刀的招數(shù)在比武場(chǎng)上姑且是...
    就叫我田田閱讀 300評(píng)論 0 0

友情鏈接更多精彩內(nèi)容