RocketMQ源碼分析之路由中心

微信公眾號「后端進階」,專注后端技術分享:Java、Golang、WEB框架、分布式中間件、服務治理等等。
老司機傾囊相授,帶你一路進階,來不及解釋了快上車!

早期的rocketmq版本的路由功能是使用zookeeper實現(xiàn)的,后來rocketmq為了追求性能,自己實現(xiàn)了一個性能更高效且實現(xiàn)簡單的路由中心NameServer,而且可以通過部署多個路由節(jié)點實現(xiàn)高可用,但它們之間并不能互相通信,這也就會導致在某一個時刻各個路由節(jié)點間的數(shù)據(jù)并不完全相同,但數(shù)據(jù)某個時刻不一致并不會導致消息發(fā)送不了,這也是rocketmq追求簡單高效的一個做法。

路由啟動

看了Nameserver的源碼后大呼驚嘆,整個NameServer總共就由這么幾個類類組成:

rocketmq nameserver

其中NamesrvStartup為啟動類,NamesrvController為核心控制器,RouteInfoManager為路由信息表。

知道了這幾個類的功能之后,我們就直接定位到NamesrvStartup啟動類的啟動方法:

org.apache.rocketmq.namesrv.NamesrvStartup#main0:

public static NamesrvController main0(String[] args) {
    try {
        // 步驟一
        NamesrvController controller = createNamesrvController(args);
        // 步驟二
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
    return null;
}

整個NameServer服務啟動的流程代碼都在main0(String[] args)方法了,看起來是不是很簡單,我們繼續(xù)往下擼它的具體實現(xiàn):

步驟一:

org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController:

這個方法的代碼有點多,下面我會拆分成幾段進行分析:

// 創(chuàng)建命令行參數(shù)對象,這里定義了 -h 和 -n參數(shù)
Options options = ServerUtil.buildCommandlineOptions(new Options());
// 根據(jù)Options和運行時參數(shù)args生成命令行對象,buildCommandlineOptions定義了-c參數(shù)(Name server config properties file)和-p參數(shù)(Print all config item)
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
    System.exit(-1);
    return null;
}

這里使用了Apache Commons CLI 命令行解析工具,它可以幫助開發(fā)者快速構建啟動命令,并且?guī)椭憬M織命令的參數(shù)、以及輸出列表等。

這段主要是根據(jù)運行時傳遞的參數(shù)生成commandLine命令行對象,用于解析運行時類似于 -c 指定文件路徑,然后填充到namesrvConfig和nettyServerConfig對象中。

final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
    // 讀取命令行-c參數(shù)指定的配置文件
    String file = commandLine.getOptionValue('c');
    if (file != null) {
        // 將文件轉成輸入流
        InputStream in = new BufferedInputStream(new FileInputStream(file));
        properties = new Properties();
        // 加載到屬性對象
        properties.load(in);
        // 裝載配置
        MixAll.properties2Object(properties, namesrvConfig);
        MixAll.properties2Object(properties, nettyServerConfig);
    }
}

這段是createNamesrvController(String[] args)方法最為核心的代碼,從代碼知道先創(chuàng)建NamesrvConfig和NettyServerConfig對象,接著利用commandLine命令行工具讀取-c指定的配置文件路徑,然后將其讀取到流中,生成properties對象,最后將namesrvConfig和nettyServerConfig對象進行初始化。

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);

到這里就水到渠成地利用namesrvConfig和nettyServerConfig對象創(chuàng)建NamesrvController對象,然后在注冊一遍properties防止丟失。

createNamesrvController(String[] args)這一步也是啟動nameserver最為關鍵的操作,它為我們啟動時提供了namesrvConfig和nettyServerConfig配置對象,以及創(chuàng)建NamesrvController核心控制器。

步驟二:

org.apache.rocketmq.namesrv.NamesrvStartup#start:

public static NamesrvController start(final NamesrvController controller) throws Exception {

    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }

    // 對核心控制器進行初始化操作
    boolean initResult = controller.initialize();
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }

    // 注冊一個鉤子函數(shù),用于JVM進程關閉時,優(yōu)雅地釋放netty服務、線程池等資源
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            controller.shutdown();
            return null;
        }
    }));

    // 核心控制器啟動操作
    controller.start();

    return controller;
}

步驟二也是一個次級的啟動流程控制方法,該方法主要對核心控制器進行初始化操作,同時注冊一個鉤子函數(shù),用于JVM進程關閉時,優(yōu)雅地釋放netty服務、線程池等資源,最后對核心控制器進行啟動操作,接下來我們繼續(xù)擼詳細實現(xiàn):

org.apache.rocketmq.namesrv.NamesrvController#initialize:

public boolean initialize() {
    // 加載KV配置
    this.kvConfigManager.load();
    // 創(chuàng)建Netty網(wǎng)絡服務對象
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    this.registerProcessor();

    // 創(chuàng)建定時任務--每個10s掃描一次Broker,并定時剔除不活躍的Broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    // 創(chuàng)建定時任務--每個10分鐘打印一遍KV配置
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    // ...

    return true;
}

該方法主要是對核心控制器進行啟動前的一些初始化操作,包括根據(jù)NamesrvConfig的kvConfigPath存儲KV配置屬性的路徑加載KV配置,創(chuàng)建定時任務:每個10s掃描一次Broker,并定時剔除不活躍的Broker;每個10分鐘打印一遍KV配置。

這里的每個10s掃描一次Broker,并定時剔除不活躍的Broker,這里是路由刪除的一些邏輯,后面會講到。

org.apache.rocketmq.namesrv.NamesrvController#start:

public void start() throws Exception {
    this.remotingServer.start();

    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }
}

到這里對啟動進行最后一步操作,即創(chuàng)建Netty服務,我們知道rocketmq是通過netty來進行通信,對應netty的一些細節(jié)這里就不展開講了,后面我也會計劃寫一些關于netty的系列文章,敬請期待。

路由啟動時序圖:

nameServer startup

路由注冊

路由注冊即是Broker向Nameserver注冊的過程,它們是通過Broker的心跳功能實現(xiàn)的,那么既然Nameserver是用來存儲Broker的注冊信息,那么我們就先來看看Nameserver到底存儲了哪些信息,回到文章最開始的那張結構圖,我們知道RouteInfoManager為路由信息表:

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager:

public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
  • topicQueueTable:Topic消息隊列路由信息,包括topic所在的broker名稱,讀隊列數(shù)量,寫隊列數(shù)量,同步標記等信息,rocketmq根據(jù)topicQueueTable的信息進行負載均衡消息發(fā)送。
  • brokerAddrTable:Broker節(jié)點信息,包括brokername,所在集群名稱,還有主備節(jié)點信息。
  • clusterAddrTable:Broker集群信息,存儲了集群中所有的Brokername。
  • brokerLiveTable:Broker狀態(tài)信息,Nameserver每次收到Broker的心跳包就會更新該信息。

這里也先講一下rocketmq是基于訂閱發(fā)布機制,我之前也寫過一篇文章《rocketmq的消費模式》,我們可知一個Topic擁有多個消息隊列,如果不指定隊列的數(shù)量,一個Broker會為每個Topic創(chuàng)建4個讀隊列和4個寫隊列,多個Broker組成集群,Broker會通過發(fā)送心跳包將自己的信息注冊到路由中心,路由中心brokerLiveTable存儲Broker的狀態(tài),它會根據(jù)Broker的心跳包更新Broker狀態(tài)信息。

步驟一:Broker發(fā)送心跳包

org.apache.rocketmq.broker.BrokerController#start:

public void start() throws Exception {
    
    // 初次啟動,這里會強制執(zhí)行發(fā)送心跳包
    this.registerBrokerAll(true, false, true);
    
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
}

Broker在核心控制器啟動時,會強制發(fā)送一次心跳包,接著創(chuàng)建一個定時任務,定時向路由中心發(fā)送心跳包。

org.apache.rocketmq.broker.BrokerController#registerBrokerAll:

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
    // 創(chuàng)建一個topic包裝類
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();

    // 這里比較有趣,如果該broker沒有讀寫權限,那么會新建一個臨時的topicConfigTable,再set進包裝類
    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
        for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
            TopicConfig tmp =
                new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                                this.brokerConfig.getBrokerPermission());
            topicConfigTable.put(topicConfig.getTopicName(), tmp);
        }
        topicConfigWrapper.setTopicConfigTable(topicConfigTable);
    }

     // 判斷是否該Broker是否需要發(fā)送心跳包
    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
                                      this.getBrokerAddr(),
                                      this.brokerConfig.getBrokerName(),
                                      this.brokerConfig.getBrokerId(),
                                      this.brokerConfig.getRegisterBrokerTimeoutMills())) {
        // 執(zhí)行發(fā)送心跳包
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
    }
}

該方法是Broker執(zhí)行發(fā)送心跳包的核心控制方法,它主要做了topic的包裝類封裝操作,判斷Broker此時是否需要執(zhí)行發(fā)送心跳包,但我查了下org.apache.rocketmq.common.BrokerConfig#forceRegister字段的值永遠等于true,也就是該判斷永遠為true,即每次都需要發(fā)送心跳包。

我們定位到needRegister遠程調用到路由中心的方法:

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#isBrokerTopicConfigChanged:

public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
    DataVersion prev = queryBrokerTopicConfig(brokerAddr);
    return null == prev || !prev.equals(dataVersion);
}

public DataVersion queryBrokerTopicConfig(final String brokerAddr) {
    BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
    if (prev != null) {
        return prev.getDataVersion();
    }
    return null;
}

發(fā)現(xiàn),Broker是否需要發(fā)送心跳包由該Broker在路由中心org.apache.rocketmq.namesrv.routeinfo.BrokerLiveInfo#dataVersion決定,如果dataVersion為空或者當前dataVersion不等于brokerLiveTable存儲的brokerLiveTable,Broker就需要發(fā)送心跳包。

步驟二:Nameserver處理心跳包

Nameserver的netty服務監(jiān)聽收到心跳包之后,會調用到路由中心以下方法進行處理:

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker:

public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            this.lock.writeLock().lockInterruptibly();

            // 獲取集群下所有的Broker,并將當前Broker加入clusterAddrTable,由于brokerNames是Set結構,并不會重復
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);

            boolean registerFirst = false;

            // 獲取Broker信息,如果是首次注冊,那么新建一個BrokerData并加入brokerAddrTable
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            // 這里判斷Broker是否是已經(jīng)注冊過
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);

            // 如果是Broker是Master節(jié)點嗎,并且Topic信息更新或者是首次注冊,那么創(chuàng)建更新topic隊列信息
            if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) {
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }

            // 更新BrokerLiveInfo狀態(tài)信息
            BrokerLiveInfo prevBrokerLiveInfo = 
                this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }

    return result;
}

該方法是處理Broker心跳包的最核心方法,它主要做了對RouteInfoManager路由信息的一些更新操作,包括對clusterAddrTable、brokerAddrTable、topicQueueTable、brokerLiveTable等路由信息。

路由注冊時序圖:

Broker register

路由刪除

前面部分我們分析了Nameserver啟動時會創(chuàng)建一個定時任務,定時剔除不活躍的Broker。

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker:

public void scanNotActiveBroker() {
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
        // 如果當前時間大于最后修改時間加上Broker過期時間,那么就剔除該Broker
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            // 關閉Broker對應的channel
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            // 從brokerLiveTable、brokerAddrTable、topicQueueTable移除Broker相關信息
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}

剔除Broker信息的邏輯比較簡單,首先從BrokerLiveInfo獲取狀態(tài)信息,判斷Broker的心跳時間是否已超過限定值,若超過之后就執(zhí)行剔除邏輯。

分析完了rocketmq自帶的路由中心源碼,其實我們自己實現(xiàn)一個路由中心貌似也不難。有時候我們發(fā)現(xiàn)公司有些項目可以獨立拆分出來做成中間件的形式,也就是單獨部署,其它業(yè)務依賴client包調用中間件服務,比如短信、推送、郵件、配置等模塊。如果我們把這些中間件做成高可用集群部署,也可以考慮自己實現(xiàn)一個路由中心。

公眾號「后端進階」,專注后端技術分享!
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • ? 只愿畢生棱角不平,傲氣不減,從前往日,一如今朝。 [還記得你曾經(jīng)最想活出的樣子嗎?] 討論到這個話題的時候,會...
    五象限閱讀 572評論 0 3
  • 2018年是我參加工作以后,過得最快的一年,時間像瀑布一樣,一瀉千里,把我們沖到了現(xiàn)在??傆X得小時候時間過得很慢,...
    小果果很好吃閱讀 706評論 1 5
  • 閑閑依孤窗, 清茶烈酒何物灑? 飄飄是冰花。
    果z輝煌閱讀 319評論 0 2
  • 這是我們一起走過的第四個年頭了,時間它太快,而我走的太慢,我要以怎樣的速度才能追趕上你世界里沒有我的十九年?這...
    豬會飛123閱讀 520評論 0 1
  • The most rewarding things you do in life are often the on...
    可樂W閱讀 272評論 0 0

友情鏈接更多精彩內容