RocketMQ Namesrv啟動(dòng)流程

NamesrvStartup啟動(dòng)入口

public static NamesrvController main0(String[] args) {

    try {
        // 創(chuàng)建NamesrvController
        NamesrvController controller = createNamesrvController(args);
        // 啟動(dòng)NamesrvController
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

創(chuàng)建NamesrvController

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {

    // 根據(jù)命令行參數(shù),使用commons-cli命令行工具包解析生成CommandLine對(duì)象
    // 在parseCmdLine中,如果命令行中有-h選項(xiàng),執(zhí)行打印幫助文檔的邏輯,然后退出,不再繼續(xù)
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) {
        System.exit(-1);
        return null;
    }

    // 初始化了2個(gè)配置 NamesrvConfig,NettyServerConfig,其中NettyServerConfig監(jiān)聽9876是硬編碼的
    // 然后通過命令行參數(shù) -c 指定一個(gè)配置文件,然后將配置文件中的內(nèi)容解析成NamesrvConfig,NettyServerConfig的配置
    // 設(shè)置NamesrvConfig,NettyServerConfig的邏輯是看類中的set方法,如果set方法后的名字和配置文件中的key匹配,就會(huì)設(shè)置對(duì)應(yīng)的值
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    nettyServerConfig.setListenPort(9876);
    if (commandLine.hasOption('c')) {
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }

    //如果指定了 -p 選項(xiàng),會(huì)在控制臺(tái)打印配置信息,然后退出,不再繼續(xù)執(zhí)行
    if (commandLine.hasOption('p')) {
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }

    //將啟動(dòng)命令行的參數(shù)配置設(shè)置到NamesrvConfig中
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

    // 檢查必須設(shè)置RocketMQHome
    // 在NamesrvConfig中,可以看到使用系統(tǒng)屬性rocketmq.home.dir,環(huán)境變量ROCKETMQ_HOME和前面的-c指定的配置文件設(shè)置RocketMQHome
    // 在mqnamesrv啟動(dòng)腳本中會(huì)自定探測(cè)RockerMQ并export ROCKETMQ_HOME
    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }

    // 加載logback.xml
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    // 使用logback打印NamesrvConfig,NettyServerConfig配置信息
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);

    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

    // 最后還把-c指定的文件的配置在保存到Configruation中
    controller.getConfiguration().registerConfig(properties);

    return controller;
}

可以看到NamesrvStartup只是一個(gè)啟動(dòng)類,主要邏輯都在處理命令行和配置,主要功能都是在NamesrvController中,而且我們可以看到,在處理處理配置的時(shí)候,真的是對(duì)配置文件進(jìn)行反復(fù)鞭尸呀

首先通過-c指定配置文件,使用MixAll.properties2Object將配置設(shè)置到NamesrvConfig,NettyServerConfig

MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);

然后通過命令行參數(shù)設(shè)置到NamesrvConfig

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

然后初始化NamesrvController,NamesrvController中會(huì)初始化一個(gè)Configuration類,Configuration類中又會(huì)把NamesrvConfig,NettyServerConfig都merge到allConfigs中

public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
    this.namesrvConfig = namesrvConfig;
    this.nettyServerConfig = nettyServerConfig;
    this.kvConfigManager = new KVConfigManager(this);
    this.routeInfoManager = new RouteInfoManager();
    this.brokerHousekeepingService = new BrokerHousekeepingService(this);
    // 初始化Configuration
    this.configuration = new Configuration(
        log,
        this.namesrvConfig, this.nettyServerConfig
    );
    this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
public Configuration(InternalLogger log, Object... configObjects) {
    this.log = log;
    if (configObjects == null || configObjects.length == 0) {
        return;
    }
    // 將NamesrvConfig,NettyServerConfig注冊(cè)
    for (Object configObject : configObjects) {
        registerConfig(configObject);
    }
}
public Configuration registerConfig(Object configObject) {
    try {
        readWriteLock.writeLock().lockInterruptibly();

        try {

            Properties registerProps = MixAll.object2Properties(configObject);
                        // 將NamesrvConfig,NettyServerConfig合并到allConfigs
            merge(registerProps, this.allConfigs);

            configObjectList.add(configObject);
        } finally {
            readWriteLock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("registerConfig lock error");
    }
    return this;
}

最后還要把-c指定的配置文件和allConfigs進(jìn)行合并

// 最后還把-c指定的文件的配置在保存到Configruation中
controller.getConfiguration().registerConfig(properties);

可以看到-c指定的配置文件讀取進(jìn)來后被拆分為NamesrvConfig,NettyServerConfig,然后又和Configuration中的allConfigs合并,最后還要再合并一次,你說這個(gè)-c指定的配置文件慘不慘

NamesrvController初始化完成后,就調(diào)用start(controller),才真正的開始

// 啟動(dòng)NamesrvController
start(controller);

start(controller)方法中最關(guān)鍵的就是下面2個(gè)方法

controller.initialize();
controller.start();

NamesrvController初始化

public boolean initialize() {
    
    // 從NamesrvConfig#KvConfigPath指定的文件中反序列化數(shù)據(jù)到KVConfigManager#configTable中
    this.kvConfigManager.load();
    // 啟動(dòng)網(wǎng)絡(luò)通信的Netty服務(wù)
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    // 初始化一下負(fù)責(zé)處理Netty網(wǎng)絡(luò)交互數(shù)據(jù)的線程池,
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    // 注冊(cè)一個(gè)默認(rèn)負(fù)責(zé)處理Netty網(wǎng)絡(luò)交互數(shù)據(jù)的DefaultRequestProcessor,這個(gè)Processor會(huì)使用remotingExecutor執(zhí)行
    // *劃重點(diǎn),后面這里會(huì)再次提到*
    this.registerProcessor();

    // 每10s掃描一下失效的Broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    // 每10min打印一下前面被反復(fù)蹂躪的配置
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);


    // 設(shè)置TLS,這塊不太了解,所以省略了,以后用空了再研究一下TLS吧
    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
       ...
    }

    return true;
}

啟動(dòng)NamesrvController

public void start() throws Exception {
    this.remotingServer.start();

    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }
}

可以看到邏輯還算比較清晰,關(guān)鍵功能在KVConfigManager,RouteInfoManager和NettyRemotingServer實(shí)現(xiàn)

我們先來看看NettyRemotingServer

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
    final ChannelEventListener channelEventListener) {
    // 初始化2個(gè)Semaphore,一個(gè)是one-way請(qǐng)求的并發(fā)數(shù),一個(gè)是asynchronous請(qǐng)求的并發(fā)數(shù),可以簡(jiǎn)單理解成對(duì)2種請(qǐng)求做了限流,至于什么是one-way請(qǐng)求,什么是asynchronous請(qǐng)求,分析到了再說吧
    super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
    this.serverBootstrap = new ServerBootstrap();
    this.nettyServerConfig = nettyServerConfig;
    this.channelEventListener = channelEventListener;

    int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }
        
     // 初始化一個(gè)公用的線程池,什么情況下用這個(gè)公用的線程池?看后面的分析
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });

    // 下面這個(gè)就是判斷系統(tǒng)是否支持epoll來初始化相應(yīng)的EventLoopGroup,如果不是很了解Netty的同學(xué)可以先去學(xué)學(xué)Netty
    if (useEpoll()) {
        this.eventLoopGroupBoss = new EpollEventLoopGroup();

        this.eventLoopGroupSelector = new EpollEventLoopGroup();
    } else {
        this.eventLoopGroupBoss = new NioEventLoopGroup();

        this.eventLoopGroupSelector = new NioEventLoopGroup();
    }
        // 這個(gè)也是TLS相關(guān)的忽略分析
    loadSslContext();
}

這里可以看一下怎么判斷系統(tǒng)是否支持epoll的

private static boolean isLinuxPlatform = false;
private static boolean isWindowsPlatform = false;

static {
    if (OS_NAME != null && OS_NAME.toLowerCase().contains("linux")) {
        isLinuxPlatform = true;
    }

    if (OS_NAME != null && OS_NAME.toLowerCase().contains("windows")) {
        isWindowsPlatform = true;
    }
}

-----

private boolean useEpoll() {
    return RemotingUtil.isLinuxPlatform()
        && nettyServerConfig.isUseEpollNativeSelector()
        && Epoll.isAvailable();
}

還記得在NamesrvController初始化時(shí)注冊(cè)一個(gè)默認(rèn)負(fù)責(zé)處理Netty網(wǎng)絡(luò)交互數(shù)據(jù)的DefaultRequestProcessor,DefaultRequestProcessor使用了一個(gè)專用的remotingExecutor線程池,我們也可以注冊(cè)其他的Processor,如果我們注冊(cè)Processor時(shí)沒有指定線程池就會(huì)使用公共的線程池publicExecutor

 public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
    ExecutorService executorThis = executor;
    if (null == executor) {
        executorThis = this.publicExecutor;
    }

    Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
    this.processorTable.put(requestCode, pair);
}

上面只是將Netty的EventLoopGroup進(jìn)行了初始化,卻沒有真正的啟動(dòng)Netty,真正的啟動(dòng)還得調(diào)用remotingServer.start();

public void start() throws Exception {
    this.remotingServer.start();
        // fileWatchService和TLS有關(guān),大概就是會(huì)監(jiān)聽TLS相關(guān)文件的改變,也不仔細(xì)分析了
    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }
}

接下來看看NettyRemotingServer的start()方法干了啥

public void start() {
    // 初始化一個(gè)線程池,用于執(zhí)行共享的Handler
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
            }
        });
        // 初始化一些共享的Handler,HandshakeHandler,NettyEncoder,NettyConnectManageHandler,NettyServerHandler
    prepareSharableHandlers();
        // 后面就是一些Netty的設(shè)置,具體看Netty
    ServerBootstrap childHandler =
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
            .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .option(ChannelOption.SO_REUSEADDR, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
            .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                // 我們只需要關(guān)心這里設(shè)置了哪些handler
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                        .addLast(defaultEventExecutorGroup,
                            encoder,
                            new NettyDecoder(),
                            new IdleStateHandler(0, 0,          nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                            connectionManageHandler,
                            serverHandler
                        );
                }
            });

    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }

    try {
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        this.port = addr.getPort();
    } catch (InterruptedException e1) {
        throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
    }
        // 這里有一個(gè)channelEventListener,在NamesrvController中channelEventListener就是BrokerHousekeepingService,BrokerHousekeepingService負(fù)責(zé)在broker斷開連接的時(shí)候,移除RouteInfoManager中的路由信息
    // NettyEventExecutor會(huì)維護(hù)一個(gè)NettyEvent的隊(duì)列,NettyConnectManageHandler會(huì)向NettyEvent的隊(duì)列中添加Event,然后由channelEventListener進(jìn)行消費(fèi)
    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
        // 定時(shí)掃描responseTable,執(zhí)行超時(shí)請(qǐng)求的callback
    // 這里有2個(gè)疑問,是誰(shuí)向responseTable中put數(shù)據(jù)?為什么這里只執(zhí)行超時(shí)請(qǐng)求的callback,正常結(jié)束的請(qǐng)求在哪處理的?
    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}

到這里RocketMQ Namesrv的啟動(dòng)流程就結(jié)束了,下一篇在來分析具體怎么處理請(qǐng)求數(shù)據(jù)的吧

歡迎關(guān)注我的公眾號(hào)

我的公眾號(hào)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、簡(jiǎn)述 這篇內(nèi)容主要講述RocketMQ的NameSrv的啟動(dòng)流程,通過它的啟動(dòng),也能了解到NameSrv是干什...
    ASD_92f7閱讀 937評(píng)論 0 0
  • ORA-00001: 違反唯一約束條件 (.) 錯(cuò)誤說明:當(dāng)在唯一索引所對(duì)應(yīng)的列上鍵入重復(fù)值時(shí),會(huì)觸發(fā)此異常。 O...
    我想起個(gè)好名字閱讀 5,918評(píng)論 0 9
  • feisky云計(jì)算、虛擬化與Linux技術(shù)筆記posts - 1014, comments - 298, trac...
    不排版閱讀 4,269評(píng)論 0 5
  • 本來是準(zhǔn)備看一看Spring源碼的。然后在知乎上看到來一個(gè)帖子,說有一群**自己連Spring官方文檔都沒有完全讀...
    此魚不得水閱讀 7,036評(píng)論 4 21
  • [TOC] ##Assoc 顯示或修改文件擴(kuò)展名關(guān)聯(lián) Assoc [.Ext[=[Filetype]]] .Ex...
    btijjj閱讀 470評(píng)論 0 1

友情鏈接更多精彩內(nèi)容