3. PhxPaxos源碼分析之Proposer、Acceptor

目錄
1. PhxPaxos源碼分析之關(guān)于PhxPaxos
2. PhxPaxos分析之網(wǎng)絡(luò)基礎(chǔ)部件
3. PhxPaxos源碼分析之Proposer、Acceptor
4. PhxPaxos源碼分析之Learner
5. PhxPaxos源碼分析之狀態(tài)機(jī)
6. PhxPaxos源碼分析之歸檔機(jī)制
7. PhxPaxos源碼分析之整體架構(gòu)


3.1 基本概念

Paxos算法包含三個(gè)角色:

  • Proposer
    提案發(fā)起者。負(fù)責(zé)發(fā)起提案,提案本身由編號(hào)(number)、值(value)兩部分組成。
  • Acceptor
    提案接收者。負(fù)責(zé)接收Proposer的提案,通過(guò)一定的規(guī)則首先確定提案編號(hào)、最終確定提案值。
  • Learner
    選中提案值習(xí)得者。不參與提案確定過(guò)程,只負(fù)責(zé)學(xué)習(xí)已確定好的提案值。

簡(jiǎn)單來(lái)說(shuō),提案由每個(gè)節(jié)點(diǎn)的Proposer、Acceptor參與決策,如果某個(gè)節(jié)點(diǎn)由于網(wǎng)絡(luò)故障等原因未參與決策,由Learner負(fù)責(zé)學(xué)習(xí)已經(jīng)選中提案值。

Paxos算法本身并不好理解,這里按通俗易懂的說(shuō)法講述整個(gè)提案確定過(guò)程,完整的Paxos算計(jì)介紹請(qǐng)參見Paxos made simple 釋譯。

注意:一個(gè)完整的提案由{編號(hào)n,提案值v}兩部分組成。我們只關(guān)心最終的提案值,提案編號(hào)的唯一用途在于確定提案值。

  1. Proposer選擇新的編號(hào)n,并發(fā)送到所有的Acceptor。
  2. Acceptor接收到請(qǐng)求后完成如下工作:
    2.1 承諾(promise):承諾不再accept所有編號(hào)小于n的請(qǐng)求,即reject編號(hào)小于n的請(qǐng)求。
    2.2 返回(reply):返回當(dāng)前小于n的最大編號(hào)所對(duì)應(yīng)的提議值v(如果存在)。
  3. 如果Proposer發(fā)起的提案編號(hào)被半數(shù)以上的Acceptor接受,此時(shí)它可以真正的發(fā)起提案值,否則回到步驟一重新確定提案編號(hào)。
  4. 發(fā)起的提案值不是Proposer隨意確定的。確定規(guī)則如下:
    4.1 如果步驟2.2中,一個(gè)或者多個(gè)Acceptor返回了已經(jīng)被接受(accept)的提案值,新發(fā)起的提案值必須是最大編號(hào)返回的提案值。
    4.2 如果步驟2.2中,沒有任何Acceptor返回已被接受(accept)的提案值,由Proposer自行發(fā)起新的提案值。
  5. Proposer發(fā)起的提案值如果被接受則提案結(jié)束,否則重復(fù)上述過(guò)程,直到確定提案值。

Paxos算法分為提案選定階段(Prepare)、確定提案值(accept)兩個(gè)階段。每個(gè)階段都有可能需要執(zhí)行多次,每次都花費(fèi)一個(gè)RTT網(wǎng)絡(luò)耗時(shí)。為了減少網(wǎng)絡(luò)耗時(shí),達(dá)到產(chǎn)品級(jí)應(yīng)用的目的,PhxPaxos采用選主并在主上執(zhí)行Paxos算法的方式避免提案沖突,隨后為所有的提案執(zhí)行noop操作,有條件跳過(guò)Prepare階段將網(wǎng)絡(luò)花費(fèi)降至理論最小值(一個(gè)RTT)。

本章先來(lái)看Proposer、Acceptor這兩個(gè)角色,Learner將在下一節(jié)講解。

3.2 Proposer

Proposer為提案的發(fā)起者,入口函數(shù)如下:

    int Proposer :: NewValue(const std::string& sValue)
    {
        BP->GetProposerBP()->NewProposal(sValue);
 
        if (m_oProposerState.GetValue().size() == 0)
        {
            m_oProposerState.SetValue(sValue);
        }

        m_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS;
        m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS;

        // 如果允許跳過(guò)Prepare階段 && 當(dāng)前未被其他節(jié)點(diǎn)拒絕
        if (m_bCanSkipPrepare && !m_bWasRejectBySomeone)
        {
            BP->GetProposerBP()->NewProposalSkipPrepare();
            // 直接進(jìn)入提案值確定階段
            PLGHead("skip prepare, directly start accept");
            Accept();
        }
        else
        {
             // 先進(jìn)入Prepare階段,隨后進(jìn)入提案值確定階段
            //if not reject by someone, no need to increase ballot
            Prepare(m_bWasRejectBySomeone);
        }

        return 0;
    }

滿足如下場(chǎng)景,允許跳過(guò)Prepare階段:

  • 本節(jié)點(diǎn)之前已經(jīng)執(zhí)行過(guò)Prepare階段,并且Prepare階段的執(zhí)行結(jié)果為Accept。

當(dāng)不滿足上述場(chǎng)景時(shí),需要執(zhí)行完整的Paxos兩階段流程。來(lái)看Prepare階段。

3.2.1 Prepare階段

Prepare的入口函數(shù)如下:

    void Proposer :: Prepare(const bool bNeedNewBallot)
    {
        PLGHead("START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu",
                GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(),
                m_oProposerState.GetValue().size());

        BP->GetProposerBP()->Prepare();
        m_oTimeStat.Point();

        //重置Proposer狀態(tài),準(zhǔn)備進(jìn)入Prepare階段。
        ExitAccept();
        m_bIsPreparing = true;
        m_bCanSkipPrepare = false;
        m_bWasRejectBySomeone = false;

        m_oProposerState.ResetHighestOtherPreAcceptBallot();

        //分配新的提案編號(hào)
        if (bNeedNewBallot)
        {
            m_oProposerState.NewPrepare();
        }

        PaxosMsg oPaxosMsg;
        oPaxosMsg.set_msgtype(MsgType_PaxosPrepare);
        oPaxosMsg.set_instanceid(GetInstanceID());
        oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
        oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());

        m_oMsgCounter.StartNewRound();

        //Prepare超時(shí)定時(shí)器
        AddPrepareTimer();

        PLGHead("END OK");

        //發(fā)送Prepare消息
        BroadcastMessage(oPaxosMsg);
    }

發(fā)起Prepare主要做4件事:

  1. Proposer狀態(tài)重置。表明當(dāng)前開始進(jìn)入Prepare階段。
  2. 按需分配新的提案編號(hào)。按需的意思是指,當(dāng)有其他節(jié)點(diǎn)明確拒絕了該提案,按Paxos協(xié)議必須使用新的提案編號(hào)重寫發(fā)起提案;而如果并無(wú)其他節(jié)點(diǎn)拒絕,即由于超時(shí)等原因?qū)е碌闹匦掳l(fā)起提案,可沿用原有的編號(hào)。
  3. 設(shè)置Prepare超時(shí)定時(shí)器。Prepare超時(shí)原因有很多,比如網(wǎng)絡(luò)丟包。當(dāng)Prepare超時(shí)時(shí),處理方式也很簡(jiǎn)單,重新執(zhí)行Prepare。
void Proposer::OnPrepareTimeout()
{
    PLGHead("OK");
    //本輪提案已經(jīng)選舉結(jié)束,不再執(zhí)行任何操作。
    if (GetInstanceID() != m_llTimeoutInstanceID)
    {
        PLGErr("TimeoutInstanceID %lu not same to NowInstanceID %lu, skip",
               m_llTimeoutInstanceID, GetInstanceID());
        return;
    }

    BP->GetProposerBP()->PrepareTimeout();
    //重新發(fā)起Prepare
    Prepare(m_bWasRejectBySomeone);
}
  1. 發(fā)送Prepare消息。消息采用UDP方式發(fā)送。發(fā)送內(nèi)容包括本輪提案的實(shí)例編號(hào)、節(jié)點(diǎn)編號(hào)、提案編號(hào)、消息類型等。如果我們依次發(fā)起了多輪提案,每輪實(shí)例編號(hào)依次為1、2、3...,某些節(jié)點(diǎn)如果落后于其他節(jié)點(diǎn),需要通過(guò)實(shí)例編號(hào)隔離不同的提案請(qǐng)求。關(guān)于整個(gè)實(shí)例的概念,后面后有詳細(xì)的講解。

Prepare消息是發(fā)往各個(gè)節(jié)點(diǎn)的Acceptor的,Acceptor處理完成后發(fā)送Replay消息。Proposer交由OnPrepareReply處理Replay消息,邏輯如下:

  1. 判定消息的有效性。包括實(shí)例編號(hào)、提案編號(hào)是否一致;當(dāng)前是否處在Prepare階段等。代碼略。
  2. 根據(jù)消息內(nèi)容更新Proposer狀態(tài)。包括更新接收節(jié)點(diǎn)信息、接收或拒絕狀態(tài)更新。
    //更新:已從node id節(jié)點(diǎn)獲取數(shù)據(jù)
    m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());

    if (oPaxosMsg.rejectbypromiseid() == 0)
    {
        //更新:接受該提案,并將該節(jié)點(diǎn)的promise id(承諾提案編號(hào))、提案值更新到本地。
        BallotNumber oBallot(oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid());
        PLGDebug("[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu",
                 oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size());
        m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
        m_oProposerState.AddPreAcceptValue(oBallot, oPaxosMsg.value());
    }
    else
    {
        //更新:該提案被拒絕,標(biāo)明本輪存在拒絕請(qǐng)求的節(jié)點(diǎn)(重新發(fā)起提案時(shí)需要更新提案值)。并將該節(jié)點(diǎn)已承諾的提案編號(hào)更新到本地。
        PLGDebug("[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid());
        m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
        m_bWasRejectBySomeone = true;
        m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
    }
  1. 根據(jù)已收到的各個(gè)節(jié)點(diǎn)的回復(fù),判定是否進(jìn)入Accept階段,或者重新發(fā)起Prepare。
    //已收到超過(guò)半數(shù)的Accept回復(fù)消息,直接進(jìn)入Accept階段。
    if (m_oMsgCounter.IsPassedOnThisRound())
    {
        int iUseTimeMs = m_oTimeStat.Point();
        BP->GetProposerBP()->PreparePass(iUseTimeMs);
        PLGImp("[Pass] start accept, usetime %dms", iUseTimeMs);

        //最近一次發(fā)起的Prepare被接受,后續(xù)可跳過(guò)Prepare階段。
        m_bCanSkipPrepare = true;

        //進(jìn)入Accept階段。
        Accept();
    }
    //已收到超過(guò)半數(shù)的Reject回復(fù)消息或者所有節(jié)點(diǎn)已回復(fù)(這個(gè)判斷并不需要),重新進(jìn)入Prepare階段
    else if (m_oMsgCounter.IsRejectedOnThisRound() || m_oMsgCounter.IsAllReceiveOnThisRound())
    {
        BP->GetProposerBP()->PrepareNotPass();
        PLGImp("[Not Pass] wait 30ms and restart prepare");

        //重置Prepare超時(shí)定時(shí)器,提前觸發(fā)Prepare超時(shí)。
        AddPrepareTimer(OtherUtils::FastRand() % 30 + 10);
    }

3.2.2 Accept階段

Accept的入口函數(shù)如下:

void Proposer::Accept()
{
    PLGHead("START ProposalID %lu ValueSize %zu ValueLen %zu",
            m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size());

    BP->GetProposerBP()->Accept();
    m_oTimeStat.Point();

    //標(biāo)識(shí)進(jìn)入Accept階段
    ExitPrepare();
    m_bIsAccepting = true;

    PaxosMsg oPaxosMsg;
    oPaxosMsg.set_msgtype(MsgType_PaxosAccept);
    oPaxosMsg.set_instanceid(GetInstanceID());
    oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
    oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
    oPaxosMsg.set_value(m_oProposerState.GetValue());
    oPaxosMsg.set_lastchecksum(GetLastChecksum());

    m_oMsgCounter.StartNewRound();

    //添加Accept超時(shí)定時(shí)器
    AddAcceptTimer();

    PLGHead("END");

    //發(fā)送Accept消息
    BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_Final);
}

Accept和Prepare階段的request操作如出一轍:

  1. Proposer狀態(tài)重置。表明當(dāng)前開始進(jìn)入Accept階段。
  2. 設(shè)置Accept超時(shí)定時(shí)器。Accept超時(shí)原因有很多,比如網(wǎng)絡(luò)丟包。當(dāng)Accept超時(shí)時(shí),處理方式也很簡(jiǎn)單,重新進(jìn)入Prepare階段。
void Proposer::OnAcceptTimeout()
{
    PLGHead("OK");
    //本輪提案已經(jīng)選舉結(jié)束,不再執(zhí)行任何操作。
    if (GetInstanceID() != m_llTimeoutInstanceID)
    {
        PLGErr("TimeoutInstanceID %lu not same to NowInstanceID %lu, skip",
               m_llTimeoutInstanceID, GetInstanceID());
        return;
    }

    BP->GetProposerBP()->AcceptTimeout();
    //重新發(fā)起Prepare
    Prepare(m_bWasRejectBySomeone);
}
  1. 發(fā)送Accept消息到其他節(jié)點(diǎn)。消息采用UDP方式發(fā)送。發(fā)送內(nèi)容包括本輪提案的實(shí)例編號(hào)、節(jié)點(diǎn)編號(hào)、提案編號(hào)、消息類型、提案值等。

OnAcceptReply處理邏輯和OnPrepareReply類似,包括如下幾步:

  1. 判定消息有效性。包括實(shí)例編號(hào)、提案編號(hào)是否一致;當(dāng)前是否處在Accept階段等。
  2. 根據(jù)消息內(nèi)容更新Proposer狀態(tài)。包括更新接收節(jié)點(diǎn)信息、接收或拒絕狀態(tài)更新。
  3. 根據(jù)已收到的各個(gè)節(jié)點(diǎn)的回復(fù),判定Accept階段是否已完成,或者重新發(fā)起Prepare。
    //提案被接受,提前退出Accept階段
    if (m_oMsgCounter.IsPassedOnThisRound())
    {
        int iUseTimeMs = m_oTimeStat.Point();
        BP->GetProposerBP()->AcceptPass(iUseTimeMs);
        PLGImp("[Pass] Start send learn, usetime %dms", iUseTimeMs);
        ExitAccept();  
        //通知Learner,本輪提案已完成
        m_poLearner->ProposerSendSuccess(GetInstanceID(), m_oProposerState.GetProposalID());
    }
    else if (m_oMsgCounter.IsRejectedOnThisRound() || m_oMsgCounter.IsAllReceiveOnThisRound())
    {
        BP->GetProposerBP()->AcceptNotPass();
        PLGImp("[Not pass] wait 30ms and Restart prepare");

        //Accept失敗,終止Accept階段。
        AddAcceptTimer(OtherUtils::FastRand() % 30 + 10);
    }

如果提案為獲得半數(shù)通過(guò),即未被選中(chosen)將從prepare階段重新發(fā)起提案。如果獲得半數(shù)通過(guò),learner通知所有節(jié)點(diǎn)提案被選中,即從accept狀態(tài)改為chosen狀態(tài)。

3.3 Acceptor

Acceptor做為提案的被動(dòng)參與者,也分為Prepare和Accept兩個(gè)階段

3.3.1 Prepare階段

Proposer發(fā)送的Prepare消息由Acceptor的OnPrepare處理,邏輯如下:

int Acceptor :: OnPrepare(const PaxosMsg & oPaxosMsg)
{
    PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu",
            oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid());

    BP->GetAcceptorBP()->OnPrepare();
    
    PaxosMsg oReplyPaxosMsg;
    oReplyPaxosMsg.set_instanceid(GetInstanceID());
    oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
    oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
    oReplyPaxosMsg.set_msgtype(MsgType_PaxosPrepareReply);

    BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
    
    //當(dāng)前的提案編號(hào)大于已承諾的編號(hào),接受該提案
    if (oBallot >= m_oAcceptorState.GetPromiseBallot())
    {
        PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
                "State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
                m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                m_oAcceptorState.GetPromiseBallot().m_llNodeID,
                m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
                m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
        //返回之前承諾的提案編號(hào)
        oReplyPaxosMsg.set_preacceptid(m_oAcceptorState.GetAcceptedBallot().m_llProposalID);
        oReplyPaxosMsg.set_preacceptnodeid(m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
        //如果之前已經(jīng)接受過(guò)某個(gè)提案的提案值,返回該提案值
        if (m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0)
        {
            oReplyPaxosMsg.set_value(m_oAcceptorState.GetAcceptedValue());
        }

        //將承諾的提案更新為當(dāng)前提案
        m_oAcceptorState.SetPromiseBallot(oBallot);

        //保證P2.c的不變性,數(shù)據(jù)需要入庫(kù)
        int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
        if (ret != 0)
        {
            BP->GetAcceptorBP()->OnPreparePersistFail();
            PLGErr("Persist fail, Now.InstanceID %lu ret %d",
                    GetInstanceID(), ret);
            
            return -1;
        }

        BP->GetAcceptorBP()->OnPreparePass();
    }
    else
    {
        BP->GetAcceptorBP()->OnPrepareReject();

        PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu", 
                m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                m_oAcceptorState.GetPromiseBallot().m_llNodeID);
        //已存在更高編號(hào)的提案,拒絕該提案并返回已承諾提案編號(hào)        
        oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
    }

    nodeid_t iReplyNodeID = oPaxosMsg.nodeid();

    PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
            GetInstanceID(), oPaxosMsg.nodeid());;

    //發(fā)送Replay消息
    SendMessage(iReplyNodeID, oReplyPaxosMsg);

    return 0;
}

OnPrepare函數(shù)看似并未做任何有效性校驗(yàn),但這部分校驗(yàn)是必不可少的,并未省去,而是出現(xiàn)在了調(diào)用OnPrepare的Instance類的上層函數(shù)中。關(guān)于Instance后面也會(huì)單獨(dú)說(shuō)明,這里的校驗(yàn)主要是保證參數(shù)中的instance id和acceptor一致。

另外一點(diǎn)需要說(shuō)明的是:前面一直提到的提案編號(hào)并不為完整的Paxos意義上的提案編號(hào),完整的提案編號(hào)由提案編號(hào)(proposal id)+節(jié)點(diǎn)Id(node id)兩部分組成。代碼上看,BallotNumber代表了Paxos意義上的提案編號(hào)。

class BallotNumber
{
public:
    ......
    uint64_t m_llProposalID;
    nodeid_t m_llNodeID;
};

3.3.2 Accept階段

Proposer發(fā)送的Accept消息由Acceptor的OnAccept處理,邏輯如下:

void Acceptor :: OnAccept(const PaxosMsg & oPaxosMsg)
{
    PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu",
            oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size());

    BP->GetAcceptorBP()->OnAccept();

    PaxosMsg oReplyPaxosMsg;
    oReplyPaxosMsg.set_instanceid(GetInstanceID());
    oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
    oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
    oReplyPaxosMsg.set_msgtype(MsgType_PaxosAcceptReply);

    BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());

    //當(dāng)前的提案編號(hào)大于或等于已承諾的編號(hào),接受該提案
    if (oBallot >= m_oAcceptorState.GetPromiseBallot())
    {
        PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
                "State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
                m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                m_oAcceptorState.GetPromiseBallot().m_llNodeID,
                m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
                m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
        
        //更新提案編號(hào)、提案值
        m_oAcceptorState.SetPromiseBallot(oBallot);
        m_oAcceptorState.SetAcceptedBallot(oBallot);
        m_oAcceptorState.SetAcceptedValue(oPaxosMsg.value());
        
        //按Paxos協(xié)議P2.c不變性要求,數(shù)據(jù)需要持久化(包括實(shí)例號(hào)、提案編號(hào)、提案值)
        int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
        if (ret != 0)
        {
            BP->GetAcceptorBP()->OnAcceptPersistFail();

            PLGErr("Persist fail, Now.InstanceID %lu ret %d",
                    GetInstanceID(), ret);
            
            return;
        }

        BP->GetAcceptorBP()->OnAcceptPass();
    }
    else
    {
        BP->GetAcceptorBP()->OnAcceptReject();

        PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu", 
                m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                m_oAcceptorState.GetPromiseBallot().m_llNodeID);
        
        //已存在更高編號(hào)的提案,拒絕該提案并返回已承諾提案編號(hào)
        oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
    }

    nodeid_t iReplyNodeID = oPaxosMsg.nodeid();

    PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
            GetInstanceID(), oPaxosMsg.nodeid());

    SendMessage(iReplyNodeID, oReplyPaxosMsg);
}

有一點(diǎn)需要說(shuō)明,OnPrepare、OnAccept兩個(gè)階段中,如果處理結(jié)果為接受,那么需要保證不管在何種情況下,這個(gè)結(jié)果是不變的。因此,數(shù)據(jù)需要被持久化,但這個(gè)持久化數(shù)據(jù)只需要本地保存即可,PhxPaxos中采用leveldb做本地持久化數(shù)據(jù)庫(kù)。

3.5 并發(fā)

前面的處理邏輯中,沒有看到加鎖或者其他并發(fā)保護(hù)機(jī)制。那么,如果一個(gè)instance正在處理Prepare請(qǐng)求時(shí),同時(shí)接收到了Accept消息怎么辦呢?其實(shí),PhxPaxos中還有另外一個(gè)IOLoop線程,專門負(fù)責(zé)接收網(wǎng)絡(luò)消息,并串行執(zhí)行。

IOLoop線程的run函數(shù)實(shí)現(xiàn)如下:

void IOLoop :: run()
{
    m_bIsEnd = false;
    m_bIsStart = true;
    while(true)
    {
        BP->GetIOLoopBP()->OneLoop();

        int iNextTimeout = 1000;
        
        DealwithTimeout(iNextTimeout);

        //PLGHead("nexttimeout %d", iNextTimeout);

        OneLoop(iNextTimeout);

        if (m_bIsEnd)
        {
            PLGHead("IOLoop [End]");
            break;
        }
    }
}

其中,OneLoop是我們本次關(guān)注的重點(diǎn):

void IOLoop :: OneLoop(const int iTimeoutMs)
{
    std::string * psMessage = nullptr;

    m_oMessageQueue.lock();
    bool bSucc = m_oMessageQueue.peek(psMessage, iTimeoutMs);
    
    if (!bSucc)
    {
        m_oMessageQueue.unlock();
    }
    else
    {
        m_oMessageQueue.pop();
        m_oMessageQueue.unlock();

        if (psMessage != nullptr && psMessage->size() > 0)
        {
            m_iQueueMemSize -= psMessage->size();
            m_poInstance->OnReceive(*psMessage);
        }

        delete psMessage;

        BP->GetIOLoopBP()->OutQueueMsg();
    }

    DealWithRetry();

    //must put on here
    //because addtimer on this funciton
    m_poInstance->CheckNewValue();
}

OneLoop從m_oMessageQueue中取出一個(gè)網(wǎng)絡(luò)消息隨后交給Instance處理。每個(gè)Group的Instance啟動(dòng)一個(gè)IOLoop線程,不同Group彼此隔離,并發(fā)處理互不影響。

3.4 總結(jié)

本章簡(jiǎn)要介紹了Paxos算法原理,了解到Paxos算法的三大角色:Proposer、Acceptor、Learner。講解了Proposer、Learner兩個(gè)角色的主要代碼實(shí)現(xiàn),以及二者如何參與到Prepare、Accept兩個(gè)階段中。

至于最后一個(gè)角色Learner,原本的理解認(rèn)為應(yīng)該是參與度最低的,邏輯最少的角色。但PhxPaxos中,Learner是三者中實(shí)現(xiàn)最復(fù)雜的,這部分內(nèi)容將在下一章單獨(dú)講解。


【轉(zhuǎn)載請(qǐng)注明】隨安居士. 3. PhxPaxos源碼分析之Proposer、Acceptor. 2017.11.14

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 原文:Paxos Made Simple作者:Leslie Lamport時(shí)間:01 Nov 2001 1 Int...
    隨安居士閱讀 1,648評(píng)論 1 2
  • 此文知識(shí)來(lái)自于:《從Paxos到Zookeeper分布式一致性原理與實(shí)踐》第二章分布式入門基礎(chǔ)知識(shí),由于博主對(duì)其理...
    李文文丶閱讀 2,047評(píng)論 0 0
  • Paxos是什么 Paxos算法是基于消息傳遞且具有高度容錯(cuò)特性的一致性算法,是目前公認(rèn)的解決分布式一致性問(wèn)題最有...
    jiangmo閱讀 1,590評(píng)論 0 6
  • 問(wèn)題: 基于消息傳遞通信模型的分布式系統(tǒng),不可避免的會(huì)發(fā)生以下錯(cuò)誤:進(jìn)程可能會(huì)慢、被殺死或者重啟,消息可能會(huì)延遲、...
    LaxChan閱讀 2,066評(píng)論 6 1
  • 今天是我堅(jiān)持寫作第21天,如果不是天天打卡,我也毫無(wú)意識(shí)。 書上說(shuō),21天可以形成一個(gè)習(xí)慣,習(xí)慣有沒有習(xí)得,我暫時(shí)...
    沸騰的小餃子閱讀 347評(píng)論 0 4

友情鏈接更多精彩內(nèi)容