Elasticsearch源碼分析-啟動(dòng)過程淺析

1、啟動(dòng)命令及啟動(dòng)類

首先,來看一下啟動(dòng)elasticsearch的Java命令,其中es.pidfile是pid文件路徑,es.path.home是es的安裝目錄,logs、data、work和conf分別是存儲(chǔ)日志、數(shù)據(jù)、工作和配置的目錄。

${JAVA_HOME}/bin/java  \
            -Des.pidfile=/path/xxx.pid \
            -Des.default.path.home=/path/xxx \
            -Des.default.path.logs=/path/logs \
            -Des.default.path.data=/path/data \
            -Des.default.path.work=/path/work \
            -Des.default.path.conf=/path/config \
            -Des.path.home=/path/xxx \
            -cp :/path/xxx.jar \
            org.elasticsearch.bootstrap.Elasticsearch

elasticsearch啟動(dòng)類有兩個(gè),分別是Elasticsearch和ElasticsearchF,其中F代表foreground,區(qū)別是在前臺(tái)進(jìn)程運(yùn)行還是后臺(tái)進(jìn)程運(yùn)行,以及日志是存儲(chǔ)在日志文件中還是顯示在控制臺(tái)中,System.setProperty("es.foreground", "yes")用來指定foreground。
兩個(gè)啟動(dòng)類最終都是調(diào)用Bootstrap的靜態(tài)main方法來啟動(dòng)elasticsearch。

public class Elasticsearch extends Bootstrap {
    public static void close(String[] args) {
        Bootstrap.close(args);
    }
    public static void main(String[] args) {
        Bootstrap.main(args);
    }
}
public class ElasticsearchF {
    public static void close(String[] args) {
        Bootstrap.close(args);
    }
    public static void main(String[] args) {
        System.setProperty("es.foreground", "yes");
        Bootstrap.main(args);
    }
}

2、環(huán)境初始化

在Bootstrap的main中,首先根據(jù)es.pidfile或者es-pidfile獲取pid文件路徑,并將運(yùn)行當(dāng)前elasticsearch的jvm進(jìn)程號(hào)寫入pid文件中,并調(diào)用fPidFile.deleteOnExit()在jvm進(jìn)程結(jié)束時(shí)刪除pid文件。

public class Bootstrap {
    public static void main(String[] args) {
        // pid文件路徑, 啟動(dòng)參數(shù) -Des.pidfile=/opt/elasticsearch-1.6.0/run/elasticsearch.pid
        final String pidFile = System.getProperty("es.pidfile", System.getProperty("es-pidfile"));
        if (pidFile != null) {
            try {
                File fPidFile = new File(pidFile);
                // 將jvm進(jìn)程號(hào)寫入fPidFile文件
                // ...

                // 當(dāng)虛擬機(jī)terminate時(shí),刪除pid文件
                fPidFile.deleteOnExit();
            } catch (Exception e) {
                // ...
            }
        }
    }
}

接下來使用initialSettings()加載環(huán)境變量和配置文件,主要的邏輯在InternalSettingsPreparer.prepareSettings()中

public class Bootstrap {
    public static void main(String[] args) {
        // ...
        Tuple<Settings, Environment> tuple = null;
        try {
            tuple = initialSettings(foreground);
            setupLogging(tuple);
        }catch (Exception e) {
            // ...
        }
        // ...
    }
}

大致的流程如下:
①首先從System.getProperties()加載前綴為elasticsearch.和es.的變量

public class InternalSettingsPreparer {
    public static Tuple<Settings, Environment> prepareSettings(Settings pSettings, boolean loadConfigSettings, Terminal terminal) {
        // ...
        // just create enough settings to build the environment
        ImmutableSettings.Builder settingsBuilder = settingsBuilder().put(pSettings);
        if (useSystemProperties) { // 優(yōu)先加載default系統(tǒng)屬性
            settingsBuilder.putProperties("elasticsearch.default.", System.getProperties())
                    .putProperties("es.default.", System.getProperties())
                    .putProperties("elasticsearch.", System.getProperties(), ignorePrefixes) // 加載相同前綴的系統(tǒng)屬性,但忽略es.default.和elasticsearch.default.前綴
                    .putProperties("es.", System.getProperties(), ignorePrefixes);
        }
        settingsBuilder.replacePropertyPlaceholders();
        // 獲取環(huán)境變量,包括path.home=home、path.data=data、path.logs=logs、path.conf=config和path.work=work
        Environment environment = new Environment(settingsBuilder.build()); //如果path.conf為空,則為path.home(default:user.dir)/config
        // ...
    }
}

②然后初始化Environment,主要是設(shè)置elasticsearch的path.home、path.conf、path.plugins、path.work、path.data、path.repo和path.logs變量,如果path.home沒有設(shè)置,則置為
System.getProperty("user.dir");如果其他變量為空,則為path.home下面對(duì)應(yīng)的目錄(conf為{path.home}/config,data為{path.home}/data/集群名)

public class Environment {
    public Environment(Settings settings) {
        this.settings = settings;
        if (settings.get("path.home") != null) {
            homeFile = new File(cleanPath(settings.get("path.home")));
        } else {
            homeFile = new File(System.getProperty("user.dir"));
        }
        if (settings.get("path.conf") != null) {
            configFile = new File(cleanPath(settings.get("path.conf")));
        } else {
            configFile = new File(homeFile, "config");
        }
        // ......
    }
}

③按順序加載es.default.config、es.config和elasticsearch.config變量對(duì)應(yīng)的配置文件,若為空,則忽略。當(dāng)加載的配置文件不是es.config和elasticsearch.config變量對(duì)應(yīng)的配置文件時(shí),則繼續(xù)加載{path.home}/config/elasticsearch.yml, .yaml, .json, .properties配置文件

public class InternalSettingsPreparer {
    public static Tuple<Settings, Environment> prepareSettings(Settings pSettings, boolean loadConfigSettings, Terminal terminal) {
        // ...
        if (loadConfigSettings) {
            boolean loadFromEnv = true;
            if (useSystemProperties) {// 默認(rèn)為true
                // if its default, then load it, but also load form env
                if (Strings.hasText(System.getProperty("es.default.config"))) { // 從系統(tǒng)屬性中加載默認(rèn)config配置
                    loadFromEnv = true;
                    settingsBuilder.loadFromUrl(environment.resolveConfig(System.getProperty("es.default.config")));
                }
                // ...
            }
            // 從es.default.config加載配置后需要從.yml, .yaml, .json, .properties中繼續(xù)加載配置
            if (loadFromEnv) { 
                for (String allowedSuffix : ALLOWED_SUFFIXES) {
                    try {
                        // config目錄下的elasticsearch.yml文件
                        settingsBuilder.loadFromUrl(environment.resolveConfig("elasticsearch" + allowedSuffix)); 
                    } catch (FailedToResolveConfigException e) {
                        // ignore
                    }
                }
            }
        }
        // ...
    }
}

④使用相同前綴的系統(tǒng)屬性覆蓋已設(shè)置的前綴為es.和elasticsearch.變量,并忽略前綴為es.default., elasticsearch.default.的變量

public class InternalSettingsPreparer {
    public static Tuple<Settings, Environment> prepareSettings(Settings pSettings, boolean loadConfigSettings, Terminal terminal) {
        // ...
        settingsBuilder.put(pSettings);
        // 除了es.default., elasticsearch.default. ,使用相同前綴的系統(tǒng)屬性覆蓋settingsBuilder
        if (useSystemProperties) {
            settingsBuilder.putProperties("elasticsearch.", System.getProperties(), ignorePrefixes)
                    .putProperties("es.", System.getProperties(), ignorePrefixes);
        }
        settingsBuilder.replacePropertyPlaceholders();
    }
}

⑤如果配置文件中沒有設(shè)置name,則從系統(tǒng)屬性中讀取,如果不為空則為節(jié)點(diǎn)名,若依然為空,則從config/names.txt隨機(jī)選擇一個(gè)字符串作為節(jié)點(diǎn)名

public class InternalSettingsPreparer {
    public static Tuple<Settings, Environment> prepareSettings(Settings pSettings, boolean loadConfigSettings, Terminal terminal) {
        // ...
        if (settingsBuilder.get("name") == null) { 
            String name = System.getProperty("name");
            if (name != null) {
                settingsBuilder.put("name", name);
            }
        }
        Settings settings = replacePromptPlaceholders(settingsBuilder.build(), terminal);
        if (settings.get("name") == null) {
            final String name = settings.get("node.name");
            if (name == null || name.isEmpty()) {
                settings = settingsBuilder().put(settings)
                    .put("name", Names.randomNodeName(environment.resolveConfig("names.txt"))) 
                                .build();
            } else {
                settings = settingsBuilder().put(settings)
                    .put("name", name)
                    .build();
            }
        }
        // ...
    }
}

⑥如果沒有設(shè)置集群名變量cluster.name,則設(shè)置為默認(rèn)值"elasticsearch"

public class InternalSettingsPreparer {
    public static Tuple<Settings, Environment> prepareSettings(Settings pSettings, boolean loadConfigSettings, Terminal terminal) {
        // ...
        if (settingsBuilder.get(ClusterName.SETTING) == null) {
            settingsBuilder.put(ClusterName.SETTING, ClusterName.DEFAULT.value());
        }
        // ...
    }
}

3、集群設(shè)置及啟動(dòng)

設(shè)置完環(huán)境變量,開始使用bootstrap.setup(true, tuple)進(jìn)行集群的初始化,并使用bootstrap.start()啟動(dòng)集群

public class Bootstrap {
    public static void main(String[] args) {
        // ...
        try {
            if (!foreground) {
                Loggers.disableConsoleLogging();
                System.out.close();
            }
            // fail if using broken version
            JVMCheck.check();
            bootstrap.setup(true, tuple);
            stage = "Startup";
            bootstrap.start();
        } catch (Throwable e) {

        }
        // ...
    }
}

在初始化集群前,進(jìn)行jvm檢驗(yàn),出現(xiàn)以下二者之一情況將會(huì)拋異常并終止啟動(dòng)過程:
①JVM供應(yīng)商為IBM Corporation
②JVM供應(yīng)商為Oracle Corporation,版本為21.0-b17、24.0-b56、24.45-b08和24.51-b03,且運(yùn)行時(shí)沒有加對(duì)應(yīng)的-XX:-UseLoopPredicate或者-XX:-UseSuperWord參數(shù)

public class JVMCheck {
    static final Map<String,HotspotBug> JVM_BROKEN_HOTSPOT_VERSIONS;
    
    static {
        Map<String,HotspotBug> bugs = new HashMap<>();
        
        // 1.7.0: loop optimizer bug
        bugs.put("21.0-b17",  new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-7070134", "-XX:-UseLoopPredicate"));
        // register allocation issues (technically only x86/amd64). This impacted update 40, 45, and 51
        bugs.put("24.0-b56",  new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-8024830", "-XX:-UseSuperWord"));
        bugs.put("24.45-b08", new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-8024830", "-XX:-UseSuperWord"));
        bugs.put("24.51-b03", new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-8024830", "-XX:-UseSuperWord"));
        
        JVM_BROKEN_HOTSPOT_VERSIONS = Collections.unmodifiableMap(bugs);
    }
    static void check() {
        if (Boolean.parseBoolean(System.getProperty(JVM_BYPASS))) {
            // ...
        } else if ("Oracle Corporation".equals(Constants.JVM_VENDOR)) {
            HotspotBug bug = JVM_BROKEN_HOTSPOT_VERSIONS.get(Constants.JVM_VERSION);
            if (bug != null) { // wordAround為-XX:-UseLoopPredicate或者-XX:-UseSuperWord
                if (bug.workAround != null && ManagementFactory.getRuntimeMXBean().getInputArguments().contains(bug.workAround)) {
                    Loggers.getLogger(JVMCheck.class).warn(bug.getWarningMessage());
                } else {
                    throw new RuntimeException(bug.getErrorMessage());
                }
            }
        } else if ("IBM Corporation".equals(Constants.JVM_VENDOR)) {
            // currently any JVM from IBM will easily result in index corruption.
            // ...
            throw new RuntimeException(sb.toString());
        }
    }
}

若JVM可用,則執(zhí)行bootstrap.setup()進(jìn)入集群初始化階段,主要使用InternalNode類的構(gòu)造方法創(chuàng)建節(jié)點(diǎn)對(duì)象node。
最后添加鉤子方法,在虛擬機(jī)結(jié)束前執(zhí)行node.close()關(guān)閉node

public class Bootstrap {
    private void setup(boolean addShutdownHook, Tuple<Settings, Environment> tuple) throws Exception {
        // ...
        Settings nodeSettings = ImmutableSettings.settingsBuilder()
                .put(settings)
                .put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING, true)
                .build();
        NodeBuilder nodeBuilder = NodeBuilder.nodeBuilder().settings(nodeSettings).loadConfigSettings(false);
        node = nodeBuilder.build();
        if (addShutdownHook) {
            Runtime.getRuntime().addShutdownHook(new Thread() { //虛擬機(jī)關(guān)閉前執(zhí)行
                @Override
                public void run() {
                    node.close();
                }
            });
        }
    }
}

構(gòu)造node對(duì)象時(shí),主要流程是創(chuàng)建nodeEnvironment,并執(zhí)行modules.add()方法添加elasticsearch各部分模塊,在添加模塊時(shí),會(huì)執(zhí)行模塊對(duì)應(yīng)的module.spawnModules(),最后創(chuàng)建注入對(duì)象,執(zhí)行每個(gè)模塊的configure()方法,將實(shí)現(xiàn)和接口進(jìn)行綁定。

public final class InternalNode implements Node {
    public InternalNode(Settings preparedSettings, boolean loadConfigSettings) throws ElasticsearchException {
        // ...
        final NodeEnvironment nodeEnvironment;
        try {
            nodeEnvironment = new NodeEnvironment(this.settings, this.environment);
        } catch (IOException ex) {
            throw new ElasticsearchIllegalStateException("Failed to created node environment", ex);
        }
        boolean success = false;
        try {
            ModulesBuilder modules = new ModulesBuilder();
            modules.add(new NodeEnvironmentModule(nodeEnvironment));
            modules.add(new DiscoveryModule(settings));
            //...
            // 創(chuàng)建injector完成注入
            injector = modules.createInjector();
            //獲取Client的綁定實(shí)現(xiàn)
            client = injector.getInstance(Client.class);
            threadPool.setNodeSettingsService(injector.getInstance(NodeSettingsService.class));
            success = true;
        }finally {
            if (!success) {
                nodeEnvironment.close();
                ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
            }
        }
        logger.info("initialized");
    }
}

對(duì)Client類進(jìn)行綁定

public class NodeClientModule extends AbstractModule {

    @Override
    protected void configure() {
        // ...
        bind(Client.class).to(NodeClient.class).asEagerSingleton();
    }
}

以DiscoveryModule模塊為例,用node.local和node.mode參數(shù)控制是本地發(fā)現(xiàn)還是Zen發(fā)現(xiàn),然后初始化對(duì)應(yīng)Discovery模塊

public class DiscoveryModule extends AbstractModule implements SpawnModules {
    @Override
    public Iterable<? extends Module> spawnModules() {
        Class<? extends Module> defaultDiscoveryModule;
        if (DiscoveryNode.localNode(settings)) {
            defaultDiscoveryModule = LocalDiscoveryModule.class;
        } else {
            defaultDiscoveryModule = ZenDiscoveryModule.class;
        }
        return ImmutableList.of(Modules.createModule(settings.getAsClass(DISCOVERY_TYPE_KEY, defaultDiscoveryModule, "org.elasticsearch.discovery.",                 "DiscoveryModule"), settings));
    }
    @Override
    protected void configure() {
        bind(DiscoveryService.class).asEagerSingleton();
    }
}

節(jié)點(diǎn)發(fā)現(xiàn)的兩種方式Local和Network,分別對(duì)應(yīng)LocalDiscovery和ZenDiscovery

public class DiscoveryNode implements Streamable, Serializable {
    public static boolean localNode(Settings settings) {
        if (settings.get("node.local") != null) {
            return settings.getAsBoolean("node.local", false);
        }
        if (settings.get("node.mode") != null) {
            String nodeMode = settings.get("node.mode");
            if ("local".equals(nodeMode)) {
                return true;
            } else if ("network".equals(nodeMode)) {
                return false;
            } else {
                throw new ElasticsearchIllegalArgumentException("unsupported node.mode [" + nodeMode + "]. Should be one of [local, network].");
            }
        }
        return false;
    }
}

在完成節(jié)點(diǎn)初始化后,調(diào)用bootstrap.start()來啟動(dòng)節(jié)點(diǎn),其實(shí)是調(diào)用的node.start(),與鉤子函數(shù)的node.close()相對(duì)應(yīng)
啟動(dòng)節(jié)點(diǎn)的過程,其實(shí)是各個(gè)模塊的啟動(dòng)過程,調(diào)用各模塊的start方法

public final class InternalNode implements Node {
    public Node start() {
        if (!lifecycle.moveToStarted()) {
            return this;
        }
        // 開啟Tcp服務(wù)
        injector.getInstance(TransportService.class).start();
    
        // 節(jié)點(diǎn)發(fā)現(xiàn)及master選舉
        DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start(); // ZenDiscovery.doStart
        discoService.waitForInitialState();
        
        // 應(yīng)該在DiscoveryService啟動(dòng)之后,開啟網(wǎng)關(guān)服務(wù)
        injector.getInstance(GatewayService.class).start();

        // 開啟Http服務(wù)
        if (settings.getAsBoolean("http.enabled", true)) {
            injector.getInstance(HttpServer.class).start();
        }
    }
}

4、設(shè)置keepAliveThread

node完成啟動(dòng)后,創(chuàng)建用戶線程keepAliveThread,值為1,并添加一個(gè)鉤子方法,在JVM關(guān)閉前執(zhí)行countDown()。
然后繼續(xù)創(chuàng)建用戶線程keepAliveThread,在keepAliveLatch執(zhí)行countDown()之前一直阻塞,以此來保證elasticsearch一直存活

public class Bootstrap {
    public static void main(String[] args) {
        keepAliveLatch = new CountDownLatch(1);
        // keep this thread alive (non daemon thread) until we shutdown
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                keepAliveLatch.countDown();
            }
        });

        keepAliveThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    keepAliveLatch.await();
                } catch (InterruptedException e) {
                // bail out
                }
            }
        }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
        keepAliveThread.setDaemon(false);
        keepAliveThread.start();
    }
}

elasticsearch在啟動(dòng)過程中會(huì)注入TransportModule和HttpServerModule模塊,并且在啟動(dòng)時(shí)會(huì)啟動(dòng)TransportService和HttpServer,最終都是通過Netty監(jiān)聽Http和Tcp消息,完成客戶端請(qǐng)求處理。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容