ES啟動(dòng)

流程

  • 解析配置,包括配置文件和命令行參數(shù)
  • 檢查外部環(huán)境和內(nèi)部環(huán)境,例如,JVM版本、操作系統(tǒng)內(nèi)核參數(shù)等
  • 初始化內(nèi)部資源,創(chuàng)建內(nèi)部模塊,初始化探測(cè)器
  • 啟動(dòng)各個(gè)子模塊和keepalive線程
ES啟動(dòng)流程

源碼閱讀

Elasticsearch.java

//啟動(dòng)elasticsearch
class Elasticsearch extends EnvironmentAwareCommand
//一個(gè)cli命令,需要org.elasticsearch.env.Environment來(lái)使用當(dāng)前路徑和設(shè)置
public abstract class EnvironmentAwareCommand extends Command
//要在cli中執(zhí)行的操作
public abstract class Command implements Closeable

Elasticsearch.main()

    public static void main(final String[] args) throws Exception {
        //重寫DNS Cache屬性
        //將屬性轉(zhuǎn)為整數(shù)再轉(zhuǎn)成字符串,以確保正確解析
        overrideDnsCachePolicyProperties();
        //設(shè)置安全管理器,讓基于安全管理器存在與否的內(nèi)部策略生效(如DNS緩存策略)
        //授予所有權(quán)限,以便稍后可以將安全管理器設(shè)置為所需的權(quán)限
        System.setSecurityManager(new SecurityManager() {
            @Override
            public void checkPermission(Permission perm){}
        });
        //為狀態(tài)記錄器StatusLogger注冊(cè)偵聽(tīng)器偵聽(tīng)error事件
        LogConfigurator.registerErrorListener();
        final Elasticsearch elasticsearch = new Elasticsearch();
        //進(jìn)入Command.main()
        int status = main(args, elasticsearch, Terminal.DEFAULT);
        //失敗,建議用戶查看日志文件
        if (status != ExitCodes.OK) {
            final String basePath = System.getProperty("es.logs.base_path");
            if (basePath != null) {
                Terminal.DEFAULT.errorPrintln(
                    "ERROR: Elasticsearch did not exit normally - check the logs at "
                        + basePath
                        + System.getProperty("file.separator")
                        + System.getProperty("es.logs.cluster_name") + ".log"
                );
            }
            exit(status);
        }
    }

SecurityManager

安全管理器是一個(gè)允許應(yīng)用程序?qū)崿F(xiàn)安全策略的類

它允許應(yīng)用程序在執(zhí)行一個(gè)可能不安全或敏感的操作前確定該操作是什么,以及是否是在允許執(zhí)行該操作的安全上下文中執(zhí)行它。應(yīng)用程序可以允許或不允許該操作。

類包含了很多名稱以單詞 check 開(kāi)頭的方法,Java 庫(kù)中的各種方法在執(zhí)行某些潛在的敏感操作前可以調(diào)用這些方法

安全管理器通過(guò)拋出異常來(lái)提供阻止操作完成的機(jī)會(huì)。如果允許執(zhí)行該操作,則安全管理器例程只是簡(jiǎn)單地返回。但如果不允許執(zhí)行該操作,則拋出一個(gè) SecurityException。該約定的唯一例外是 checkTopLevelWindow,它返回 boolean 值。

特殊方法 checkPermission(java.security.Permission)確定是應(yīng)該允許還是拒絕由指定權(quán)限所指示的訪問(wèn)請(qǐng)求

從 Java 2 SDK v1.2 開(kāi)始,SecurityManager 中其他所有 check 方法的默認(rèn)實(shí)現(xiàn)都是調(diào)用SecurityManager checkPermission 方法來(lái)確定調(diào)用線程是否具有執(zhí)行所請(qǐng)求操作的權(quán)限。

StatusLogger

記錄日志系統(tǒng)中發(fā)生的事件。默認(rèn)情況下,只有錯(cuò)誤消息被記錄到System.err。

Command.main()

    //從args中解析此命令的選項(xiàng)并執(zhí)行它
    public final int main(String[] args, Terminal terminal) throws Exception {
        //是否添加ShuntdownHook以在退出時(shí)清理資源
        //返回true
        if (addShutdownHook()) {

            shutdownHookThread = new Thread(() -> {
                try {
                    this.close();
                } catch (final IOException e) {
                    //當(dāng)Runtime異常關(guān)閉時(shí)打印異常信息
                    try (
                        StringWriter sw = new StringWriter();
                        PrintWriter pw = new PrintWriter(sw)) {
                        e.printStackTrace(pw);
                        terminal.errorPrintln(sw.toString());
                    } catch (final IOException impossible) {
                        throw new AssertionError(impossible);
                    }
                }
            });
            //RunTime.getRunTime().addShutdownHook的作用就是在JVM銷毀前執(zhí)行的一個(gè)線程
            Runtime.getRuntime().addShutdownHook(shutdownHookThread);
        }

        beforeMain.run();

        try {
            //打印參數(shù),設(shè)置參數(shù),執(zhí)行命令,拋出所有異常
            mainWithoutErrorHandling(args, terminal);
        } catch (OptionException e) {
            printHelp(terminal, true);
            terminal.errorPrintln(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
            return ExitCodes.USAGE;
        } catch (UserException e) {
            if (e.exitCode == ExitCodes.USAGE) {
                printHelp(terminal, true);
            }
            if (e.getMessage() != null) {
                terminal.errorPrintln(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
            }
            return e.exitCode;
        }
        return ExitCodes.OK;
    }

Command.mainWithoutErrorHandling()

    //打印參數(shù),設(shè)置參數(shù),執(zhí)行命令,拋出所有異常
    void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {
        final OptionSet options = parser.parse(args);

        if (options.has(helpOption)) {
            printHelp(terminal, false);
            return;
        }

        if (options.has(silentOption)) {
            terminal.setVerbosity(Terminal.Verbosity.SILENT);
        } else if (options.has(verboseOption)) {
            terminal.setVerbosity(Terminal.Verbosity.VERBOSE);
        } else {
            terminal.setVerbosity(Terminal.Verbosity.NORMAL);
        }
        //進(jìn)入EnviromentAwareCommand.execute()
        execute(terminal, options);
    }

EnvironmentAwareCommand.execute()

    protected void execute(Terminal terminal, OptionSet options) throws Exception {
        final Map<String, String> settings = new HashMap<>();
        for (final KeyValuePair kvp : settingOption.values(options)) {
            if (kvp.value.isEmpty()) {
                throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty");
            }
            if (settings.containsKey(kvp.key)) {
                final String message = String.format(
                        Locale.ROOT,
                        "setting [%s] already set, saw [%s] and [%s]",
                        kvp.key,
                        settings.get(kvp.key),
                        kvp.value);
                throw new UserException(ExitCodes.USAGE, message);
            }
            settings.put(kvp.key, kvp.value);
        }

        //確保給定設(shè)置存在,如果尚未設(shè)置,則從系統(tǒng)屬性中讀取該設(shè)置
        putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");
        putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");
        putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");

        //進(jìn)入Elasticsearch.execute()
        execute(terminal, options, createEnv(settings));
    }

EnvironmentAwareCommand.createEnv()

    //創(chuàng)建要使用的命令的Environment
    protected Environment createEnv(final Map<String, String> settings) throws UserException {
        return createEnv(Settings.EMPTY, settings);
    }

    protected final Environment createEnv(final Settings baseSettings, final Map<String, String> settings) throws UserException {
        final String esPathConf = System.getProperty("es.path.conf");
        if (esPathConf == null) {
            throw new UserException(ExitCodes.CONFIG, "the system property [es.path.conf] must be set");
        }
        //讀取config下的配置文件elasticsearch.yml內(nèi)容,收集plugins,bin,lib,modules等目錄下的文件信息
        return InternalSettingsPreparer.prepareEnvironment(baseSettings, settings,
            getConfigPath(esPathConf),
            () -> System.getenv("HOSTNAME"));
    }
Environment

Elasticsearch.execute()

    protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
        if (options.nonOptionArguments().isEmpty() == false) {
            throw new UserException(ExitCodes.USAGE, "Positional arguments not allowed, found " + options.nonOptionArguments());
        }
        if (options.has(versionOption)) {
            final String versionOutput = String.format(
                Locale.ROOT,
                "Version: %s, Build: %s/%s/%s/%s, JVM: %s",
                Build.CURRENT.getQualifiedVersion(),
                Build.CURRENT.flavor().displayName(),
                Build.CURRENT.type().displayName(),
                Build.CURRENT.hash(),
                Build.CURRENT.date(),
                JvmInfo.jvmInfo().version()
            );
            terminal.println(versionOutput);
            return;
        }

        //讀取daemonize,pidFile,quiet的值,并確保配置的臨時(shí)目錄是有效目錄
        final boolean daemonize = options.has(daemonizeOption);
        final Path pidFile = pidfileOption.value(options);
        final boolean quiet = options.has(quietOption);

        //配置錯(cuò)誤的tmpdir可能會(huì)導(dǎo)致以后難以診斷的問(wèn)題,因此立即拒絕它
        try {
            env.validateTmpFile();
        } catch (IOException e) {
            throw new UserException(ExitCodes.CONFIG, e.getMessage());
        }

        try {
            //調(diào)用Bootstrap.init()
            init(daemonize, pidFile, quiet, env);
        } catch (NodeValidationException e) {
            throw new UserException(ExitCodes.CONFIG, e.getMessage());
        }
    }

Bootstrap.init()(1)

    private static volatile Bootstrap INSTANCE;

    static void init(
            final boolean foreground,
            final Path pidFile,
            final boolean quiet,
            final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
        //讓Bootstrap的類初始化在安全管理器安裝前進(jìn)行
        //里面什么也沒(méi)做
        BootstrapInfo.init();

        //創(chuàng)建keepAliveThread,利用CountDownLatch,在shutdown前保持該線程存活
        INSTANCE = new Bootstrap();

Bootstrap()

    private final CountDownLatch keepAliveLatch = new CountDownLatch(1);

    Bootstrap() {
        keepAliveThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    keepAliveLatch.await();
                } catch (InterruptedException e) {
                    // bail out
                }
            }
        }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
        keepAliveThread.setDaemon(false);
        //在shutdown前保持該線程存活
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                keepAliveLatch.countDown();
            }
        });
    }

keepAliveThread

線程本身不做具體的工作。主線程執(zhí)行完啟動(dòng)流程后會(huì)退出,keepalive線程是唯一的用戶線程,作用是保持進(jìn)程運(yùn)行。在Java程序中,至少要有一個(gè)用戶線程。當(dāng)用戶線程為零時(shí)退出進(jìn)程。

CountDownLatch

CountDownLatch是一個(gè)同步工具類,用來(lái)協(xié)調(diào)多個(gè)線程之間的同步,或者說(shuō)起到線程之間的通信(而不是用作互斥的作用)。

CountDownLatch能夠使一個(gè)線程在等待另外一些線程完成各自工作之后,再繼續(xù)執(zhí)行。使用一個(gè)計(jì)數(shù)器進(jìn)行實(shí)現(xiàn)。計(jì)數(shù)器初始值為線程的數(shù)量。當(dāng)每一個(gè)線程完成自己任務(wù)后,計(jì)數(shù)器的值就會(huì)減一。當(dāng)計(jì)數(shù)器的值為0時(shí),表示所有的線程都已經(jīng)完成一些任務(wù),然后在CountDownLatch上等待的線程就可以恢復(fù)執(zhí)行接下來(lái)的任務(wù)。

CountDownLatch典型用法:1、某一線程在開(kāi)始運(yùn)行前等待n個(gè)線程執(zhí)行完畢。將CountDownLatch的計(jì)數(shù)器初始化為new CountDownLatch(n),每當(dāng)一個(gè)任務(wù)線程執(zhí)行完畢,就將計(jì)數(shù)器減1 countdownLatch.countDown(),當(dāng)計(jì)數(shù)器的值變?yōu)?時(shí),在CountDownLatch上await()的線程就會(huì)被喚醒。一個(gè)典型應(yīng)用場(chǎng)景就是啟動(dòng)一個(gè)服務(wù)時(shí),主線程需要等待多個(gè)組件加載完畢,之后再繼續(xù)執(zhí)行。

CountDownLatch典型用法:2、實(shí)現(xiàn)多個(gè)線程開(kāi)始執(zhí)行任務(wù)的最大并行性。注意是并行性,不是并發(fā),強(qiáng)調(diào)的是多個(gè)線程在某一時(shí)刻同時(shí)開(kāi)始執(zhí)行。類似于賽跑,將多個(gè)線程放到起點(diǎn),等待發(fā)令槍響,然后同時(shí)開(kāi)跑。做法是初始化一個(gè)共享的CountDownLatch(1),將其計(jì)算器初始化為1,多個(gè)線程在開(kāi)始執(zhí)行任務(wù)前首先countdownlatch.await(),當(dāng)主線程調(diào)用countDown()時(shí),計(jì)數(shù)器變?yōu)?,多個(gè)線程同時(shí)被喚醒。

Bootstrap.init()(2)

        //加載 keystore 安全配置,keystore文件不存在則創(chuàng)建,保存;存在則解密,更新keystore
        //創(chuàng)建Environment
        final SecureSettings keystore = loadSecureSettings(initialEnv);
        final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile());

         //LogConfigurator將用對(duì)我們的日志文件的重定向替換System.out和System.err
        //因此我們需要在調(diào)用LogConfigurator之前捕獲流對(duì)象,以便能夠在適當(dāng)?shù)臅r(shí)候關(guān)閉它們
        final Runnable sysOutCloser = getSysOutCloser();
        final Runnable sysErrorCloser = getSysErrorCloser();

        LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings()));
        try {
            LogConfigurator.configure(environment);
        } catch (IOException e) {
            throw new BootstrapException(e);
        }
        if (environment.pidFile() != null) {
            try {
                PidFile.create(environment.pidFile(), true);
            } catch (IOException e) {
                throw new BootstrapException(e);
            }
        }


        try {
            final boolean closeStandardStreams = (foreground == false) || quiet;
            if (closeStandardStreams) {
                final Logger rootLogger = LogManager.getRootLogger();
                final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
                if (maybeConsoleAppender != null) {
                    Loggers.removeAppender(rootLogger, maybeConsoleAppender);
                }
                sysOutCloser.run();
            }

            //檢查L(zhǎng)ucene版本
            checkLucene();

            //安裝默認(rèn)的未捕獲異常處理程序
            //必須在安全初始化之前完成,因?yàn)槲覀儾幌胧谟鑢untime權(quán)限setDefaultUncaughtExceptionHandler
            Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());

            INSTANCE.setup(true, environment);

keystore

keystore安全配置是為了解決有些敏感的信息不適合放到配置文件中的,因?yàn)榕渲梦募敲魑谋4娴?,雖然文件系統(tǒng)有基于用戶權(quán)限的保護(hù),但這仍然不夠。因此ES把這些敏感配置信息加密,單獨(dú)放到一個(gè)文件中:config/elasticsearch.keystore。

Bootstrap.setup()

    private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
        Settings settings = environment.settings();

        try {
            //遍歷所有模塊,為每個(gè)模塊生成Native Controller
            spawner.spawnNativeControllers(environment, true);
        } catch (IOException e) {
            throw new BootstrapException(e);
        }

        //初始化本地資源
        initializeNatives(
                environment.tmpFile(),
                BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),
                BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),
                BootstrapSettings.CTRLHANDLER_SETTING.get(settings));

        //在安裝安全管理器之前初始化探測(cè)
        initializeProbes();

        //當(dāng)ES退出時(shí)關(guān)閉必要的IO流和日志上下文
        if (addShutdownHook) {
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    try {
                        //關(guān)閉節(jié)點(diǎn)
                        //在Node.close()中調(diào)用各個(gè)模塊的doStop()和doClose()
                        IOUtils.close(node, spawner);
                        LoggerContext context = (LoggerContext) LogManager.getContext(false);
                        Configurator.shutdown(context);
                        if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) {
                            throw new IllegalStateException("Node didn't stop within 10 seconds. " +
                                    "Any outstanding requests or tasks might get killed.");
                        }
                    } catch (IOException ex) {
                        throw new ElasticsearchException("failed to stop node", ex);
                    } catch (InterruptedException e) {
                        LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");
                        Thread.currentThread().interrupt();
                    }
                }
            });
        }

        try {
            //檢查jar沖突,檢查當(dāng)前類路徑是否存在重復(fù)的類
            final Logger logger = LogManager.getLogger(JarHell.class);
            JarHell.checkJarHell(logger::debug);
        } catch (IOException | URISyntaxException e) {
            throw new BootstrapException(e);
        }

        //在安裝SecurityManager之前記錄ifconfig輸出
        IfConfig.logIfNecessary();

        //安裝SecurityManager
        try {
            Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
        } catch (IOException | NoSuchAlgorithmException e) {
            throw new BootstrapException(e);
        }

        //創(chuàng)建Node
        node = new Node(environment) {
            @Override
            protected void validateNodeBeforeAcceptingRequests(
                final BootstrapContext context,
                final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
                BootstrapChecks.check(context, boundTransportAddress, checks);
            }
        };
    }

Node()

    protected Node(final Environment initialEnvironment,
                   Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
        logger = LogManager.getLogger(Node.class);
        //關(guān)閉流程中需要關(guān)閉的service等資源
        final List<Closeable> resourcesToClose = new ArrayList<>(); 
        boolean success = false;
        try {
            //節(jié)點(diǎn)環(huán)境
            Settings tmpSettings = Settings.builder().put(initialEnvironment.settings())
                .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();

            final JvmInfo jvmInfo = JvmInfo.jvmInfo();
            logger.info(...)
            //檢查版本
            if (Build.CURRENT.isProductionRelease() == false) {
                logger.warn(
                    "version [{}] is a pre-release version of Elasticsearch and is not suitable for production",
                    Build.CURRENT.getQualifiedVersion());
            }

            if (logger.isDebugEnabled()) {
                logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]",
                    initialEnvironment.configFile(), Arrays.toString(initialEnvironment.dataFiles()),
                    initialEnvironment.logsFile(), initialEnvironment.pluginsFile());
            }

            //讀取并加載所有的插件和模塊
            this.pluginsService = new PluginsService(tmpSettings, initialEnvironment.configFile(), initialEnvironment.modulesFile(),
                initialEnvironment.pluginsFile(), classpathPlugins);
            final Settings settings = pluginsService.updatedSettings();

            final Set<DiscoveryNodeRole> possibleRoles = Stream.concat(
                    DiscoveryNodeRole.BUILT_IN_ROLES.stream(),
                    pluginsService.filterPlugins(Plugin.class)
                            .stream()
                            .map(Plugin::getRoles)
                            .flatMap(Set::stream))
                    .collect(Collectors.toSet());
            DiscoveryNode.setPossibleRoles(possibleRoles);

            //根據(jù)設(shè)置的最終視圖創(chuàng)建環(huán)境
            //這是為了確保組件獲得相同的設(shè)定值,無(wú)論它們從哪里進(jìn)行請(qǐng)求
            this.environment = new Environment(settings, initialEnvironment.configFile());
            Environment.assertEquivalent(initialEnvironment, this.environment);
            nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
            logger.info("node name [{}], node ID [{}], cluster name [{}]",
                NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value());
            resourcesToClose.add(nodeEnvironment);
            localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());

            final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);

            //創(chuàng)建線程池
            final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
            resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
            final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
            resourcesToClose.add(resourceWatcherService);
            //將上下文添加到DeprecationLogger,這樣它就不需要被注入到任何地方
            HeaderWarning.setThreadContext(threadPool.getThreadContext());
            resourcesToClose.add(() -> HeaderWarning.removeThreadContext(threadPool.getThreadContext()));

            final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
            final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
            for (final ExecutorBuilder<?> builder : threadPool.builders()) {
                additionalSettings.addAll(builder.getRegisteredSettings());
            }
            //創(chuàng)建NodeClient
            client = new NodeClient(settings, threadPool);

            //創(chuàng)建各種模塊和服務(wù)
            final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
            final ScriptService scriptService = newScriptService(settings, scriptModule.engines, scriptModule.contexts);
            ......
            //定義好的模塊由ModulesBuilder類統(tǒng)一管理
            //ModulesBuilder是ES對(duì)Guice的封裝
            ModulesBuilder modules = new ModulesBuilder();
            ......
            //綁定依賴,依賴注入
            modules.add(b -> {
                    b.bind(Node.class).toInstance(this);
                    ......
             });
            //可以通過(guò)injector獲取相應(yīng)Service類的實(shí)例
            injector = modules.createInjector();

            //我們通過(guò)在集群中尋找分片的可用副本來(lái)分配現(xiàn)有的分片副本
            //對(duì)可用副本的搜索由分配嘗試(即reroute)觸發(fā),并異步執(zhí)行
            //當(dāng)它完成時(shí),我們觸發(fā)另一個(gè)reroute再次嘗試分配
            //這意味著存在循環(huán)依賴:分配服務(wù)需要訪問(wèn)現(xiàn)有的分片分配器(例如,GatewayAllocator)
            //這些分配器需要能夠觸發(fā)reroute,reroute需要調(diào)用分配服務(wù)。我們?cè)谶@里關(guān)閉循環(huán):
            clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class));

            List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
                .filter(p -> p instanceof LifecycleComponent)
                .map(p -> (LifecycleComponent) p).collect(Collectors.toList());
            resourcesToClose.addAll(pluginLifecycleComponents);
            resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));
            this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
            client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {}), transportService.getTaskManager(),
                    () -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
            this.namedWriteableRegistry = namedWriteableRegistry;

            logger.debug("initializing HTTP handlers ...");
            actionModule.initRestHandlers(() -> clusterService.state().nodes());
            logger.info("initialized");

            success = true;
        } catch (IOException ex) {
            throw new ElasticsearchException("failed to bind service", ex);
        } finally {
            if (!success) {
                IOUtils.closeWhileHandlingException(resourcesToClose);
            }
        }
    }

Bootstrap.init()(3)

            try {
                IOUtils.close(keystore);
            } catch (IOException e) {
                throw new BootstrapException(e);
            }

            //node.start();啟動(dòng)節(jié)點(diǎn)
            //keepAliveThread.start();啟動(dòng)?;罹€程
            INSTANCE.start();

            if (foreground == false) {
                sysErrorCloser.run();
            }

        } catch (NodeValidationException | RuntimeException e) {
            //日志打印
        }
    }

Node.start()

    public Node start() throws NodeValidationException {
        if (!lifecycle.moveToStarted()) {
            return this;
        }

        logger.info("starting ...");
        pluginLifecycleComponents.forEach(LifecycleComponent::start);

        //通過(guò)injector獲取各個(gè)類的實(shí)例,調(diào)用start()方法啟動(dòng)
        //start()基本就是初始化內(nèi)部數(shù)據(jù)、創(chuàng)建線程池、啟動(dòng)線程池等操作
        injector.getInstance(MappingUpdatedAction.class).setClient(client);
        injector.getInstance(IndicesService.class).start();  //IndexService:索引管理
        injector.getInstance(IndicesClusterStateService.class).start();  //IndicesClusterStateService:跨集群同步
        injector.getInstance(SnapshotsService.class).start();  //SnapshotsService:創(chuàng)建快照
        injector.getInstance(SnapshotShardsService.class).start();  //SnapshotShardsService:?jiǎn)?dòng)和停止分片級(jí)別快照
        injector.getInstance(RepositoriesService.class).start();
        injector.getInstance(SearchService.class).start();  //SearchService:搜索服務(wù)
        nodeService.getMonitorService().start();  //MonitorService:監(jiān)控

        ......

        return this;
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容