es源碼筆記-7.x 選主流程

Discovery模塊負責(zé)發(fā)現(xiàn)集群中的節(jié)點,以及選擇主節(jié)點。ES支持多種不同Discovery類型選擇,內(nèi)置的實現(xiàn)有兩種:Zen Discovery和Coordinator,其他的包括公有云平臺亞馬遜的EC2、谷歌的GCE等。

Zen Discovery和Coordinator

  • 1、Coordinator
    ES 7.x 重構(gòu)了一個新的集群協(xié)調(diào)層Coordinator,他實際上是 Raft 的實現(xiàn),但并非嚴格按照 Raft 論文實現(xiàn),而是做了一些調(diào)整。
    可以在配置文件中指定,代碼如下:
    org.elasticsearch.discovery.DiscoveryModule.DiscoveryModule(....)
public DiscoveryModule(...) {
        if (ZEN2_DISCOVERY_TYPE.equals(discoveryType) || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) {
            discovery = new Coordinator(NODE_NAME_SETTING.get(settings),
                settings, clusterSettings,
                transportService, namedWriteableRegistry, allocationService, masterService,
                () -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider,
                clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService, electionStrategy);
        } else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) {
            discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
                clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState, rerouteService);
        } else {
            throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
        }
}

  • 2、Zen Discovery
    采用Bully算法

它假定所有節(jié)點都有一個唯一的ID,使用該ID對節(jié)點進行排序。任何時候的當前Leader都是參與集群的最高ID節(jié)點。該算法的優(yōu)點是易于實現(xiàn)。但是,當擁有最大ID的節(jié)點處于不穩(wěn)定狀態(tài)的場景下會有問題。例如,Master負載過重而假死,集群擁有第二大ID的節(jié)點被選為新主,這時原來的Master恢復(fù),再次被選為新主,然后又假死
ES 通過推遲選舉,直到當前的 Master 失效來解決上述問題,只要當前主節(jié)點不掛掉,就不重新選主。但是容易產(chǎn)生腦裂(雙主),為此,再通過“法定得票人數(shù)過半”解決腦裂問題

  • 3、算法比較
    1、raft算法與Bully算法
相同點

1、多數(shù)派原則:必須得到超過半數(shù)的選票才能成為master。
選出的leader一定擁有最新已提交數(shù)據(jù):在raft中,數(shù)據(jù)更新的節(jié)點不會給數(shù)據(jù)舊的節(jié)點投選票,而當選需要多數(shù)派的選票,則當選人一定有最新已提交數(shù)據(jù)。在es中,version大的節(jié)點排序優(yōu)先級高,同樣用于保證這一點。

不同點

正確性論證:raft是一個被論證過正確性的算法,而ES的算法是一個沒有經(jīng)過論證的算法,只能在實踐中發(fā)現(xiàn)問題,做bug fix,這是我認為最大的不同。

是否有選舉周期term:raft引入了選舉周期的概念,每輪選舉term加1,保證了在同一個term下每個參與人只能投1票。ES在選舉時沒有term的概念,不能保證每輪每個節(jié)點只投一票。
選舉的傾向性:raft中只要一個節(jié)點擁有最新的已提交的數(shù)據(jù),則有機會選舉成為master。在ES中,version相同時會按照NodeId排序,總是NodeId小的人優(yōu)先級高。

2、Paxos算法
Paxos非常強大,尤其在什么時機,以及如何進行選舉方面的靈活性比簡單的Bully算法有很大的優(yōu)勢,因為在現(xiàn)實生活中,存在比網(wǎng)絡(luò)連接異常更多的故障模式。但 Paxos 實現(xiàn)起來非常復(fù)雜

本篇只討論內(nèi)置的Zen Discovery

流程分析

整體流程可以概括為:選舉臨時Master,如果本節(jié)點當選,則等待確立Master,如果其他節(jié)點當選,則嘗試加入集群,然后啟動節(jié)點失效探測器。

1、節(jié)點啟動

如果集群剛啟動則參與選主,否則加入集群
org.elasticsearch.node.Node.start()

public Node start() throws NodeValidationException {
    discovery.start();           
    discovery.startInitialJoin();
}
2、選舉臨時Master

選舉過程的實現(xiàn)位于 org.elasticsearch.discovery.zen.ZenDiscovery.findMaster(),該函數(shù)查找當前集群的活躍 Master,或者從候選者中選擇新的Master。如果選主成功,則返回選定的Master,否則返回空

private DiscoveryNode findMaster() {
        logger.trace("starting to ping");
        List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();             
        List<DiscoveryNode> activeMasters = new ArrayList<>();
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
            // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
            if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
                activeMasters.add(pingResponse.master());
            }
        }

        // nodes discovered during pinging
        List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            if (pingResponse.node().isMasterNode()) {
                masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
            }
        }

        if (activeMasters.isEmpty()) {
            //判斷當前候選者人數(shù)是否達到法定人數(shù)
            if (electMaster.hasEnoughCandidates(masterCandidates)) {
                final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
                logger.trace("candidate {} won election", winner);
                return winner.getNode();
            } else {
                // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                logger.trace("not enough master nodes [{}]", masterCandidates);
                return null;
            }
        } else {
            assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
            // lets tie break between discovered nodes
            return electMaster.tieBreakActiveMasters(activeMasters);
        }
    }

上面選擇臨時主節(jié)點非常簡單,
首先需要判斷當前候選者人數(shù)是否達到法定人數(shù),否則選主失敗。

  public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
        if (candidates.isEmpty()) {
            return false;
        }
        if (minimumMasterNodes < 1) {
            return true;
        }
        assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
            "duplicates ahead: " + candidates;
        return candidates.size() >= minimumMasterNodes;
    }

取列表中的最小值,比較函數(shù)通過compareNodes實現(xiàn),只是對節(jié)點 ID 進行排序

   public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
        assert hasEnoughCandidates(candidates);
        List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
        sortedCandidates.sort(MasterCandidate::compare);
        return sortedCandidates.get(0);
    }

 public static int compare(MasterCandidate c1, MasterCandidate c2) {
            // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
            // list, so if c2 has a higher cluster state version, it needs to come first.
            int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
            if (ret == 0) {
                ret = compareNodes(c1.getNode(), c2.getNode());
            }
            return ret;
        }
3、確立Master或加入集群

選舉出的臨時Master有兩種情況:該臨時Master是本節(jié)點或非本節(jié)點。

  • 1、如果臨時Master是本節(jié)點:
    (1)等待足夠多的具備Master資格的節(jié)點加入本節(jié)點(投票達到法定人數(shù)),以完成選舉。
if (clusterService.localNode().equals(masterNode)) {
            final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one           
            nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
                    new NodeJoinController.ElectionCallback() {
                        @Override
                        public void onElectedAsMaster(ClusterState state) {
                            joinThreadControl.markThreadAsDone(currentThread);                            
                            nodesFD.updateNodesAndPing(state); // start the odes FD
                        }
                        @Override
                        public void onFailure(Throwable t) {                            joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                        }
                    }

            );
        }

    private synchronized void checkPendingJoinsAndElectIfNeeded() {
        assert electionContext != null : "election check requested but no active context";
        final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
        //是否有足夠的選票
        if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {           
            }
        } else {           
            electionContext.closeAndBecomeMaster();
            electionContext = null; // clear this out so future joins won't be accumulated
        }
    }

(2)超時(默認為30秒,可配置)后還沒有滿足數(shù)量的join請求,則選舉失敗,需要進行新一輪選舉。

 public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) {
        final CountDownLatch done = new CountDownLatch(1);       
            try {
                if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) {
                    // callback handles everything
                    return;
                }
            } catch (InterruptedException e) {
            }          
    }

超時后直接return,當非臨時節(jié)點加入集群不成功時,重新發(fā)起選主流程
org.elasticsearch.discovery.zen.ZenDiscovery.innerJoinCluster()

// send join request
            final boolean success = joinElectedMaster(masterNode);

            // finalize join through the cluster state update thread
            final DiscoveryNode finalMasterNode = masterNode;
            clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new ClusterStateUpdateTask() {
                @Override
                public boolean runOnlyOnMaster() {
                    return false;
                }
                @Override
                public ClusterState execute(ClusterState currentState) throws Exception {
                    if (!success) {
                        // failed to join. Try again...
              //重新發(fā)起選主流程
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                        return currentState;
                    }

(3)成功后發(fā)布新的clusterState。
實現(xiàn)如下:

 public synchronized void closeAndBecomeMaster() {           
            Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
            final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)";

            tasks.put(BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result
            tasks.put(FINISH_ELECTION_TASK, electionFinishedListener);

            clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
        }

submitStateUpdateTask最終通過TaskBatcher# submitTasks來提交任務(wù)。執(zhí)行任務(wù)并發(fā)布集群狀態(tài)的總體過程在 MasterService#runTasks 方法中實現(xiàn)。

protected void runTasks(TaskInputs taskInputs) {       
                publish(clusterChangedEvent, taskOutputs, startTimeNS);         
    }
  • 2、如果其他節(jié)點被選為Master:
    (1)不再接受其他節(jié)點的join請求。
// process any incoming joins (they will fail because we are not the master)
nodeJoinController.stopElectionContext(masterNode + " elected");

(2)向Master發(fā)送加入請求,并等待回復(fù)。超時時間默認為1分鐘(可配置),如果遇到異常,則默認重試3次(可配置)。這個步驟在joinElectedMaster方法中實現(xiàn)。

    private boolean joinElectedMaster(DiscoveryNode masterNode) {
        try {
        
        int joinAttempt = 0; // we retry on illegal state if the master is not yet ready
        while (true) {
            try {             
                membership.sendJoinRequestBlocking(masterNode, clusterService.localNode(), joinTimeout);
                return true;
            } catch (Exception e) {
                final Throwable unwrap = ExceptionsHelper.unwrapCause(e);
                if (unwrap instanceof NotMasterException) {
                    if (++joinAttempt == this.joinRetryAttempts) {
                        logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
                        return false;
                    } else {
                        logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
                    }
                } else {                   
                    }
                    return false;
                }
            }

            try {
                Thread.sleep(this.joinRetryDelay.millis());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

最終當選的Master會先發(fā)布集群狀態(tài),才確認客戶的join請求,因此,joinElectedMaster返回代表收到了join請求的確認,并且已經(jīng)收到了集群狀態(tài)。所以如果返回不成功,則重新發(fā)起選主流程
(3)檢查收到的集群狀態(tài)中的Master節(jié)點如果為空,或者當選的Master不是之前選擇的節(jié)點,則重新選舉。

       if (currentState.getNodes().getMasterNode() == null) {
                        // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                        // a valid master.
                        logger.debug("no master node is set, despite of join request completing. retrying pings.");
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                        return currentState;
                    }

                    if (!currentState.getNodes().getMasterNode().equals(finalMasterNode)) {
                        return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join");
                    }

                    // Note: we do not have to start master fault detection here because it's set at {@link #processNextPendingClusterState }
                    // when the first cluster state arrives.
                    joinThreadControl.markThreadAsDone(currentThread);
                    return currentState;
總結(jié)

1、es通過主從模式以及發(fā)現(xiàn)機制保證節(jié)點之間的負載均衡,但是es使用量的急劇增加暴露了很多問題,例如,Zen的minimum_master_nodes設(shè)置經(jīng)常配置錯誤,這會使群集更容易出現(xiàn)裂腦和丟失數(shù)據(jù)的風(fēng)險
2、7.x以上版本Coordinator提供了安全的亞秒級的master選舉時間,而Zen可能要花幾秒鐘來選擇一個新的master
3、es的master掛了,數(shù)據(jù)節(jié)點在這區(qū)間還能對外提供服務(wù)嗎?
參考
Elasticsearch分布式一致性原理剖析

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容