流程
- 解析配置,包括配置文件和命令行參數(shù)
- 檢查外部環(huán)境和內(nèi)部環(huán)境,例如,JVM版本、操作系統(tǒng)內(nèi)核參數(shù)等
- 初始化內(nèi)部資源,創(chuàng)建內(nèi)部模塊,初始化探測(cè)器
- 啟動(dòng)各個(gè)子模塊和keepalive線程

源碼閱讀
Elasticsearch.java
//啟動(dòng)elasticsearch
class Elasticsearch extends EnvironmentAwareCommand
//一個(gè)cli命令,需要org.elasticsearch.env.Environment來(lái)使用當(dāng)前路徑和設(shè)置
public abstract class EnvironmentAwareCommand extends Command
//要在cli中執(zhí)行的操作
public abstract class Command implements Closeable
Elasticsearch.main()
public static void main(final String[] args) throws Exception {
//重寫DNS Cache屬性
//將屬性轉(zhuǎn)為整數(shù)再轉(zhuǎn)成字符串,以確保正確解析
overrideDnsCachePolicyProperties();
//設(shè)置安全管理器,讓基于安全管理器存在與否的內(nèi)部策略生效(如DNS緩存策略)
//授予所有權(quán)限,以便稍后可以將安全管理器設(shè)置為所需的權(quán)限
System.setSecurityManager(new SecurityManager() {
@Override
public void checkPermission(Permission perm){}
});
//為狀態(tài)記錄器StatusLogger注冊(cè)偵聽(tīng)器偵聽(tīng)error事件
LogConfigurator.registerErrorListener();
final Elasticsearch elasticsearch = new Elasticsearch();
//進(jìn)入Command.main()
int status = main(args, elasticsearch, Terminal.DEFAULT);
//失敗,建議用戶查看日志文件
if (status != ExitCodes.OK) {
final String basePath = System.getProperty("es.logs.base_path");
if (basePath != null) {
Terminal.DEFAULT.errorPrintln(
"ERROR: Elasticsearch did not exit normally - check the logs at "
+ basePath
+ System.getProperty("file.separator")
+ System.getProperty("es.logs.cluster_name") + ".log"
);
}
exit(status);
}
}
SecurityManager
安全管理器是一個(gè)允許應(yīng)用程序?qū)崿F(xiàn)安全策略的類
它允許應(yīng)用程序在執(zhí)行一個(gè)可能不安全或敏感的操作前確定該操作是什么,以及是否是在允許執(zhí)行該操作的安全上下文中執(zhí)行它。應(yīng)用程序可以允許或不允許該操作。
類包含了很多名稱以單詞 check 開(kāi)頭的方法,Java 庫(kù)中的各種方法在執(zhí)行某些潛在的敏感操作前可以調(diào)用這些方法
安全管理器通過(guò)拋出異常來(lái)提供阻止操作完成的機(jī)會(huì)。如果允許執(zhí)行該操作,則安全管理器例程只是簡(jiǎn)單地返回。但如果不允許執(zhí)行該操作,則拋出一個(gè) SecurityException。該約定的唯一例外是 checkTopLevelWindow,它返回 boolean 值。
特殊方法 checkPermission(java.security.Permission)確定是應(yīng)該允許還是拒絕由指定權(quán)限所指示的訪問(wèn)請(qǐng)求
從 Java 2 SDK v1.2 開(kāi)始,SecurityManager 中其他所有 check 方法的默認(rèn)實(shí)現(xiàn)都是調(diào)用SecurityManager checkPermission 方法來(lái)確定調(diào)用線程是否具有執(zhí)行所請(qǐng)求操作的權(quán)限。
StatusLogger
記錄日志系統(tǒng)中發(fā)生的事件。默認(rèn)情況下,只有錯(cuò)誤消息被記錄到System.err。
Command.main()
//從args中解析此命令的選項(xiàng)并執(zhí)行它
public final int main(String[] args, Terminal terminal) throws Exception {
//是否添加ShuntdownHook以在退出時(shí)清理資源
//返回true
if (addShutdownHook()) {
shutdownHookThread = new Thread(() -> {
try {
this.close();
} catch (final IOException e) {
//當(dāng)Runtime異常關(guān)閉時(shí)打印異常信息
try (
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw)) {
e.printStackTrace(pw);
terminal.errorPrintln(sw.toString());
} catch (final IOException impossible) {
throw new AssertionError(impossible);
}
}
});
//RunTime.getRunTime().addShutdownHook的作用就是在JVM銷毀前執(zhí)行的一個(gè)線程
Runtime.getRuntime().addShutdownHook(shutdownHookThread);
}
beforeMain.run();
try {
//打印參數(shù),設(shè)置參數(shù),執(zhí)行命令,拋出所有異常
mainWithoutErrorHandling(args, terminal);
} catch (OptionException e) {
printHelp(terminal, true);
terminal.errorPrintln(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
return ExitCodes.USAGE;
} catch (UserException e) {
if (e.exitCode == ExitCodes.USAGE) {
printHelp(terminal, true);
}
if (e.getMessage() != null) {
terminal.errorPrintln(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
}
return e.exitCode;
}
return ExitCodes.OK;
}
Command.mainWithoutErrorHandling()
//打印參數(shù),設(shè)置參數(shù),執(zhí)行命令,拋出所有異常
void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {
final OptionSet options = parser.parse(args);
if (options.has(helpOption)) {
printHelp(terminal, false);
return;
}
if (options.has(silentOption)) {
terminal.setVerbosity(Terminal.Verbosity.SILENT);
} else if (options.has(verboseOption)) {
terminal.setVerbosity(Terminal.Verbosity.VERBOSE);
} else {
terminal.setVerbosity(Terminal.Verbosity.NORMAL);
}
//進(jìn)入EnviromentAwareCommand.execute()
execute(terminal, options);
}
EnvironmentAwareCommand.execute()
protected void execute(Terminal terminal, OptionSet options) throws Exception {
final Map<String, String> settings = new HashMap<>();
for (final KeyValuePair kvp : settingOption.values(options)) {
if (kvp.value.isEmpty()) {
throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty");
}
if (settings.containsKey(kvp.key)) {
final String message = String.format(
Locale.ROOT,
"setting [%s] already set, saw [%s] and [%s]",
kvp.key,
settings.get(kvp.key),
kvp.value);
throw new UserException(ExitCodes.USAGE, message);
}
settings.put(kvp.key, kvp.value);
}
//確保給定設(shè)置存在,如果尚未設(shè)置,則從系統(tǒng)屬性中讀取該設(shè)置
putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");
putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");
putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");
//進(jìn)入Elasticsearch.execute()
execute(terminal, options, createEnv(settings));
}
EnvironmentAwareCommand.createEnv()
//創(chuàng)建要使用的命令的Environment
protected Environment createEnv(final Map<String, String> settings) throws UserException {
return createEnv(Settings.EMPTY, settings);
}
protected final Environment createEnv(final Settings baseSettings, final Map<String, String> settings) throws UserException {
final String esPathConf = System.getProperty("es.path.conf");
if (esPathConf == null) {
throw new UserException(ExitCodes.CONFIG, "the system property [es.path.conf] must be set");
}
//讀取config下的配置文件elasticsearch.yml內(nèi)容,收集plugins,bin,lib,modules等目錄下的文件信息
return InternalSettingsPreparer.prepareEnvironment(baseSettings, settings,
getConfigPath(esPathConf),
() -> System.getenv("HOSTNAME"));
}

Elasticsearch.execute()
protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
if (options.nonOptionArguments().isEmpty() == false) {
throw new UserException(ExitCodes.USAGE, "Positional arguments not allowed, found " + options.nonOptionArguments());
}
if (options.has(versionOption)) {
final String versionOutput = String.format(
Locale.ROOT,
"Version: %s, Build: %s/%s/%s/%s, JVM: %s",
Build.CURRENT.getQualifiedVersion(),
Build.CURRENT.flavor().displayName(),
Build.CURRENT.type().displayName(),
Build.CURRENT.hash(),
Build.CURRENT.date(),
JvmInfo.jvmInfo().version()
);
terminal.println(versionOutput);
return;
}
//讀取daemonize,pidFile,quiet的值,并確保配置的臨時(shí)目錄是有效目錄
final boolean daemonize = options.has(daemonizeOption);
final Path pidFile = pidfileOption.value(options);
final boolean quiet = options.has(quietOption);
//配置錯(cuò)誤的tmpdir可能會(huì)導(dǎo)致以后難以診斷的問(wèn)題,因此立即拒絕它
try {
env.validateTmpFile();
} catch (IOException e) {
throw new UserException(ExitCodes.CONFIG, e.getMessage());
}
try {
//調(diào)用Bootstrap.init()
init(daemonize, pidFile, quiet, env);
} catch (NodeValidationException e) {
throw new UserException(ExitCodes.CONFIG, e.getMessage());
}
}
Bootstrap.init()(1)
private static volatile Bootstrap INSTANCE;
static void init(
final boolean foreground,
final Path pidFile,
final boolean quiet,
final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
//讓Bootstrap的類初始化在安全管理器安裝前進(jìn)行
//里面什么也沒(méi)做
BootstrapInfo.init();
//創(chuàng)建keepAliveThread,利用CountDownLatch,在shutdown前保持該線程存活
INSTANCE = new Bootstrap();
Bootstrap()
private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
Bootstrap() {
keepAliveThread = new Thread(new Runnable() {
@Override
public void run() {
try {
keepAliveLatch.await();
} catch (InterruptedException e) {
// bail out
}
}
}, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
keepAliveThread.setDaemon(false);
//在shutdown前保持該線程存活
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
keepAliveLatch.countDown();
}
});
}
keepAliveThread
線程本身不做具體的工作。主線程執(zhí)行完啟動(dòng)流程后會(huì)退出,keepalive線程是唯一的用戶線程,作用是保持進(jìn)程運(yùn)行。在Java程序中,至少要有一個(gè)用戶線程。當(dāng)用戶線程為零時(shí)退出進(jìn)程。
CountDownLatch
CountDownLatch是一個(gè)同步工具類,用來(lái)協(xié)調(diào)多個(gè)線程之間的同步,或者說(shuō)起到線程之間的通信(而不是用作互斥的作用)。
CountDownLatch能夠使一個(gè)線程在等待另外一些線程完成各自工作之后,再繼續(xù)執(zhí)行。使用一個(gè)計(jì)數(shù)器進(jìn)行實(shí)現(xiàn)。計(jì)數(shù)器初始值為線程的數(shù)量。當(dāng)每一個(gè)線程完成自己任務(wù)后,計(jì)數(shù)器的值就會(huì)減一。當(dāng)計(jì)數(shù)器的值為0時(shí),表示所有的線程都已經(jīng)完成一些任務(wù),然后在CountDownLatch上等待的線程就可以恢復(fù)執(zhí)行接下來(lái)的任務(wù)。
CountDownLatch典型用法:1、某一線程在開(kāi)始運(yùn)行前等待n個(gè)線程執(zhí)行完畢。將CountDownLatch的計(jì)數(shù)器初始化為new CountDownLatch(n),每當(dāng)一個(gè)任務(wù)線程執(zhí)行完畢,就將計(jì)數(shù)器減1 countdownLatch.countDown(),當(dāng)計(jì)數(shù)器的值變?yōu)?時(shí),在CountDownLatch上await()的線程就會(huì)被喚醒。一個(gè)典型應(yīng)用場(chǎng)景就是啟動(dòng)一個(gè)服務(wù)時(shí),主線程需要等待多個(gè)組件加載完畢,之后再繼續(xù)執(zhí)行。
CountDownLatch典型用法:2、實(shí)現(xiàn)多個(gè)線程開(kāi)始執(zhí)行任務(wù)的最大并行性。注意是并行性,不是并發(fā),強(qiáng)調(diào)的是多個(gè)線程在某一時(shí)刻同時(shí)開(kāi)始執(zhí)行。類似于賽跑,將多個(gè)線程放到起點(diǎn),等待發(fā)令槍響,然后同時(shí)開(kāi)跑。做法是初始化一個(gè)共享的CountDownLatch(1),將其計(jì)算器初始化為1,多個(gè)線程在開(kāi)始執(zhí)行任務(wù)前首先countdownlatch.await(),當(dāng)主線程調(diào)用countDown()時(shí),計(jì)數(shù)器變?yōu)?,多個(gè)線程同時(shí)被喚醒。
Bootstrap.init()(2)
//加載 keystore 安全配置,keystore文件不存在則創(chuàng)建,保存;存在則解密,更新keystore
//創(chuàng)建Environment
final SecureSettings keystore = loadSecureSettings(initialEnv);
final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile());
//LogConfigurator將用對(duì)我們的日志文件的重定向替換System.out和System.err
//因此我們需要在調(diào)用LogConfigurator之前捕獲流對(duì)象,以便能夠在適當(dāng)?shù)臅r(shí)候關(guān)閉它們
final Runnable sysOutCloser = getSysOutCloser();
final Runnable sysErrorCloser = getSysErrorCloser();
LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings()));
try {
LogConfigurator.configure(environment);
} catch (IOException e) {
throw new BootstrapException(e);
}
if (environment.pidFile() != null) {
try {
PidFile.create(environment.pidFile(), true);
} catch (IOException e) {
throw new BootstrapException(e);
}
}
try {
final boolean closeStandardStreams = (foreground == false) || quiet;
if (closeStandardStreams) {
final Logger rootLogger = LogManager.getRootLogger();
final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
if (maybeConsoleAppender != null) {
Loggers.removeAppender(rootLogger, maybeConsoleAppender);
}
sysOutCloser.run();
}
//檢查L(zhǎng)ucene版本
checkLucene();
//安裝默認(rèn)的未捕獲異常處理程序
//必須在安全初始化之前完成,因?yàn)槲覀儾幌胧谟鑢untime權(quán)限setDefaultUncaughtExceptionHandler
Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());
INSTANCE.setup(true, environment);
keystore
keystore安全配置是為了解決有些敏感的信息不適合放到配置文件中的,因?yàn)榕渲梦募敲魑谋4娴?,雖然文件系統(tǒng)有基于用戶權(quán)限的保護(hù),但這仍然不夠。因此ES把這些敏感配置信息加密,單獨(dú)放到一個(gè)文件中:config/elasticsearch.keystore。
Bootstrap.setup()
private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
Settings settings = environment.settings();
try {
//遍歷所有模塊,為每個(gè)模塊生成Native Controller
spawner.spawnNativeControllers(environment, true);
} catch (IOException e) {
throw new BootstrapException(e);
}
//初始化本地資源
initializeNatives(
environment.tmpFile(),
BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),
BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),
BootstrapSettings.CTRLHANDLER_SETTING.get(settings));
//在安裝安全管理器之前初始化探測(cè)
initializeProbes();
//當(dāng)ES退出時(shí)關(guān)閉必要的IO流和日志上下文
if (addShutdownHook) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
//關(guān)閉節(jié)點(diǎn)
//在Node.close()中調(diào)用各個(gè)模塊的doStop()和doClose()
IOUtils.close(node, spawner);
LoggerContext context = (LoggerContext) LogManager.getContext(false);
Configurator.shutdown(context);
if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) {
throw new IllegalStateException("Node didn't stop within 10 seconds. " +
"Any outstanding requests or tasks might get killed.");
}
} catch (IOException ex) {
throw new ElasticsearchException("failed to stop node", ex);
} catch (InterruptedException e) {
LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");
Thread.currentThread().interrupt();
}
}
});
}
try {
//檢查jar沖突,檢查當(dāng)前類路徑是否存在重復(fù)的類
final Logger logger = LogManager.getLogger(JarHell.class);
JarHell.checkJarHell(logger::debug);
} catch (IOException | URISyntaxException e) {
throw new BootstrapException(e);
}
//在安裝SecurityManager之前記錄ifconfig輸出
IfConfig.logIfNecessary();
//安裝SecurityManager
try {
Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
} catch (IOException | NoSuchAlgorithmException e) {
throw new BootstrapException(e);
}
//創(chuàng)建Node
node = new Node(environment) {
@Override
protected void validateNodeBeforeAcceptingRequests(
final BootstrapContext context,
final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
BootstrapChecks.check(context, boundTransportAddress, checks);
}
};
}
Node()
protected Node(final Environment initialEnvironment,
Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
logger = LogManager.getLogger(Node.class);
//關(guān)閉流程中需要關(guān)閉的service等資源
final List<Closeable> resourcesToClose = new ArrayList<>();
boolean success = false;
try {
//節(jié)點(diǎn)環(huán)境
Settings tmpSettings = Settings.builder().put(initialEnvironment.settings())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
final JvmInfo jvmInfo = JvmInfo.jvmInfo();
logger.info(...)
//檢查版本
if (Build.CURRENT.isProductionRelease() == false) {
logger.warn(
"version [{}] is a pre-release version of Elasticsearch and is not suitable for production",
Build.CURRENT.getQualifiedVersion());
}
if (logger.isDebugEnabled()) {
logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]",
initialEnvironment.configFile(), Arrays.toString(initialEnvironment.dataFiles()),
initialEnvironment.logsFile(), initialEnvironment.pluginsFile());
}
//讀取并加載所有的插件和模塊
this.pluginsService = new PluginsService(tmpSettings, initialEnvironment.configFile(), initialEnvironment.modulesFile(),
initialEnvironment.pluginsFile(), classpathPlugins);
final Settings settings = pluginsService.updatedSettings();
final Set<DiscoveryNodeRole> possibleRoles = Stream.concat(
DiscoveryNodeRole.BUILT_IN_ROLES.stream(),
pluginsService.filterPlugins(Plugin.class)
.stream()
.map(Plugin::getRoles)
.flatMap(Set::stream))
.collect(Collectors.toSet());
DiscoveryNode.setPossibleRoles(possibleRoles);
//根據(jù)設(shè)置的最終視圖創(chuàng)建環(huán)境
//這是為了確保組件獲得相同的設(shè)定值,無(wú)論它們從哪里進(jìn)行請(qǐng)求
this.environment = new Environment(settings, initialEnvironment.configFile());
Environment.assertEquivalent(initialEnvironment, this.environment);
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
logger.info("node name [{}], node ID [{}], cluster name [{}]",
NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value());
resourcesToClose.add(nodeEnvironment);
localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
//創(chuàng)建線程池
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
resourcesToClose.add(resourceWatcherService);
//將上下文添加到DeprecationLogger,這樣它就不需要被注入到任何地方
HeaderWarning.setThreadContext(threadPool.getThreadContext());
resourcesToClose.add(() -> HeaderWarning.removeThreadContext(threadPool.getThreadContext()));
final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
additionalSettings.addAll(builder.getRegisteredSettings());
}
//創(chuàng)建NodeClient
client = new NodeClient(settings, threadPool);
//創(chuàng)建各種模塊和服務(wù)
final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
final ScriptService scriptService = newScriptService(settings, scriptModule.engines, scriptModule.contexts);
......
//定義好的模塊由ModulesBuilder類統(tǒng)一管理
//ModulesBuilder是ES對(duì)Guice的封裝
ModulesBuilder modules = new ModulesBuilder();
......
//綁定依賴,依賴注入
modules.add(b -> {
b.bind(Node.class).toInstance(this);
......
});
//可以通過(guò)injector獲取相應(yīng)Service類的實(shí)例
injector = modules.createInjector();
//我們通過(guò)在集群中尋找分片的可用副本來(lái)分配現(xiàn)有的分片副本
//對(duì)可用副本的搜索由分配嘗試(即reroute)觸發(fā),并異步執(zhí)行
//當(dāng)它完成時(shí),我們觸發(fā)另一個(gè)reroute再次嘗試分配
//這意味著存在循環(huán)依賴:分配服務(wù)需要訪問(wèn)現(xiàn)有的分片分配器(例如,GatewayAllocator)
//這些分配器需要能夠觸發(fā)reroute,reroute需要調(diào)用分配服務(wù)。我們?cè)谶@里關(guān)閉循環(huán):
clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class));
List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
.filter(p -> p instanceof LifecycleComponent)
.map(p -> (LifecycleComponent) p).collect(Collectors.toList());
resourcesToClose.addAll(pluginLifecycleComponents);
resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {}), transportService.getTaskManager(),
() -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
this.namedWriteableRegistry = namedWriteableRegistry;
logger.debug("initializing HTTP handlers ...");
actionModule.initRestHandlers(() -> clusterService.state().nodes());
logger.info("initialized");
success = true;
} catch (IOException ex) {
throw new ElasticsearchException("failed to bind service", ex);
} finally {
if (!success) {
IOUtils.closeWhileHandlingException(resourcesToClose);
}
}
}
Bootstrap.init()(3)
try {
IOUtils.close(keystore);
} catch (IOException e) {
throw new BootstrapException(e);
}
//node.start();啟動(dòng)節(jié)點(diǎn)
//keepAliveThread.start();啟動(dòng)?;罹€程
INSTANCE.start();
if (foreground == false) {
sysErrorCloser.run();
}
} catch (NodeValidationException | RuntimeException e) {
//日志打印
}
}
Node.start()
public Node start() throws NodeValidationException {
if (!lifecycle.moveToStarted()) {
return this;
}
logger.info("starting ...");
pluginLifecycleComponents.forEach(LifecycleComponent::start);
//通過(guò)injector獲取各個(gè)類的實(shí)例,調(diào)用start()方法啟動(dòng)
//start()基本就是初始化內(nèi)部數(shù)據(jù)、創(chuàng)建線程池、啟動(dòng)線程池等操作
injector.getInstance(MappingUpdatedAction.class).setClient(client);
injector.getInstance(IndicesService.class).start(); //IndexService:索引管理
injector.getInstance(IndicesClusterStateService.class).start(); //IndicesClusterStateService:跨集群同步
injector.getInstance(SnapshotsService.class).start(); //SnapshotsService:創(chuàng)建快照
injector.getInstance(SnapshotShardsService.class).start(); //SnapshotShardsService:?jiǎn)?dòng)和停止分片級(jí)別快照
injector.getInstance(RepositoriesService.class).start();
injector.getInstance(SearchService.class).start(); //SearchService:搜索服務(wù)
nodeService.getMonitorService().start(); //MonitorService:監(jiān)控
......
return this;
}