目錄
1. PhxPaxos源碼分析之關(guān)于PhxPaxos
2. PhxPaxos分析之網(wǎng)絡(luò)基礎(chǔ)部件
3. PhxPaxos源碼分析之Proposer、Acceptor
4. PhxPaxos源碼分析之Learner
5. PhxPaxos源碼分析之狀態(tài)機(jī)
6. PhxPaxos源碼分析之歸檔機(jī)制
7. PhxPaxos源碼分析之整體架構(gòu)
3.1 基本概念
Paxos算法包含三個(gè)角色:
-
Proposer
提案發(fā)起者。負(fù)責(zé)發(fā)起提案,提案本身由編號(hào)(number)、值(value)兩部分組成。 -
Acceptor
提案接收者。負(fù)責(zé)接收Proposer的提案,通過(guò)一定的規(guī)則首先確定提案編號(hào)、最終確定提案值。 -
Learner
選中提案值習(xí)得者。不參與提案確定過(guò)程,只負(fù)責(zé)學(xué)習(xí)已確定好的提案值。
簡(jiǎn)單來(lái)說(shuō),提案由每個(gè)節(jié)點(diǎn)的Proposer、Acceptor參與決策,如果某個(gè)節(jié)點(diǎn)由于網(wǎng)絡(luò)故障等原因未參與決策,由Learner負(fù)責(zé)學(xué)習(xí)已經(jīng)選中提案值。
Paxos算法本身并不好理解,這里按通俗易懂的說(shuō)法講述整個(gè)提案確定過(guò)程,完整的Paxos算計(jì)介紹請(qǐng)參見Paxos made simple 釋譯。
注意:一個(gè)完整的提案由{編號(hào)n,提案值v}兩部分組成。我們只關(guān)心最終的提案值,提案編號(hào)的唯一用途在于確定提案值。
- Proposer選擇新的編號(hào)n,并發(fā)送到所有的Acceptor。
- Acceptor接收到請(qǐng)求后完成如下工作:
2.1 承諾(promise):承諾不再accept所有編號(hào)小于n的請(qǐng)求,即reject編號(hào)小于n的請(qǐng)求。
2.2 返回(reply):返回當(dāng)前小于n的最大編號(hào)所對(duì)應(yīng)的提議值v(如果存在)。- 如果Proposer發(fā)起的提案編號(hào)被半數(shù)以上的Acceptor接受,此時(shí)它可以真正的發(fā)起提案值,否則回到步驟一重新確定提案編號(hào)。
- 發(fā)起的提案值不是Proposer隨意確定的。確定規(guī)則如下:
4.1 如果步驟2.2中,一個(gè)或者多個(gè)Acceptor返回了已經(jīng)被接受(accept)的提案值,新發(fā)起的提案值必須是最大編號(hào)返回的提案值。
4.2 如果步驟2.2中,沒有任何Acceptor返回已被接受(accept)的提案值,由Proposer自行發(fā)起新的提案值。- Proposer發(fā)起的提案值如果被接受則提案結(jié)束,否則重復(fù)上述過(guò)程,直到確定提案值。
Paxos算法分為提案選定階段(Prepare)、確定提案值(accept)兩個(gè)階段。每個(gè)階段都有可能需要執(zhí)行多次,每次都花費(fèi)一個(gè)RTT網(wǎng)絡(luò)耗時(shí)。為了減少網(wǎng)絡(luò)耗時(shí),達(dá)到產(chǎn)品級(jí)應(yīng)用的目的,PhxPaxos采用選主并在主上執(zhí)行Paxos算法的方式避免提案沖突,隨后為所有的提案執(zhí)行noop操作,有條件跳過(guò)Prepare階段將網(wǎng)絡(luò)花費(fèi)降至理論最小值(一個(gè)RTT)。
本章先來(lái)看Proposer、Acceptor這兩個(gè)角色,Learner將在下一節(jié)講解。
3.2 Proposer
Proposer為提案的發(fā)起者,入口函數(shù)如下:
int Proposer :: NewValue(const std::string& sValue)
{
BP->GetProposerBP()->NewProposal(sValue);
if (m_oProposerState.GetValue().size() == 0)
{
m_oProposerState.SetValue(sValue);
}
m_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS;
m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS;
// 如果允許跳過(guò)Prepare階段 && 當(dāng)前未被其他節(jié)點(diǎn)拒絕
if (m_bCanSkipPrepare && !m_bWasRejectBySomeone)
{
BP->GetProposerBP()->NewProposalSkipPrepare();
// 直接進(jìn)入提案值確定階段
PLGHead("skip prepare, directly start accept");
Accept();
}
else
{
// 先進(jìn)入Prepare階段,隨后進(jìn)入提案值確定階段
//if not reject by someone, no need to increase ballot
Prepare(m_bWasRejectBySomeone);
}
return 0;
}
滿足如下場(chǎng)景,允許跳過(guò)Prepare階段:
- 本節(jié)點(diǎn)之前已經(jīng)執(zhí)行過(guò)Prepare階段,并且Prepare階段的執(zhí)行結(jié)果為Accept。
當(dāng)不滿足上述場(chǎng)景時(shí),需要執(zhí)行完整的Paxos兩階段流程。來(lái)看Prepare階段。
3.2.1 Prepare階段
Prepare的入口函數(shù)如下:
void Proposer :: Prepare(const bool bNeedNewBallot)
{
PLGHead("START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu",
GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(),
m_oProposerState.GetValue().size());
BP->GetProposerBP()->Prepare();
m_oTimeStat.Point();
//重置Proposer狀態(tài),準(zhǔn)備進(jìn)入Prepare階段。
ExitAccept();
m_bIsPreparing = true;
m_bCanSkipPrepare = false;
m_bWasRejectBySomeone = false;
m_oProposerState.ResetHighestOtherPreAcceptBallot();
//分配新的提案編號(hào)
if (bNeedNewBallot)
{
m_oProposerState.NewPrepare();
}
PaxosMsg oPaxosMsg;
oPaxosMsg.set_msgtype(MsgType_PaxosPrepare);
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
m_oMsgCounter.StartNewRound();
//Prepare超時(shí)定時(shí)器
AddPrepareTimer();
PLGHead("END OK");
//發(fā)送Prepare消息
BroadcastMessage(oPaxosMsg);
}
發(fā)起Prepare主要做4件事:
- Proposer狀態(tài)重置。表明當(dāng)前開始進(jìn)入Prepare階段。
- 按需分配新的提案編號(hào)。按需的意思是指,當(dāng)有其他節(jié)點(diǎn)明確拒絕了該提案,按Paxos協(xié)議必須使用新的提案編號(hào)重寫發(fā)起提案;而如果并無(wú)其他節(jié)點(diǎn)拒絕,即由于超時(shí)等原因?qū)е碌闹匦掳l(fā)起提案,可沿用原有的編號(hào)。
- 設(shè)置Prepare超時(shí)定時(shí)器。Prepare超時(shí)原因有很多,比如網(wǎng)絡(luò)丟包。當(dāng)Prepare超時(shí)時(shí),處理方式也很簡(jiǎn)單,重新執(zhí)行Prepare。
void Proposer::OnPrepareTimeout()
{
PLGHead("OK");
//本輪提案已經(jīng)選舉結(jié)束,不再執(zhí)行任何操作。
if (GetInstanceID() != m_llTimeoutInstanceID)
{
PLGErr("TimeoutInstanceID %lu not same to NowInstanceID %lu, skip",
m_llTimeoutInstanceID, GetInstanceID());
return;
}
BP->GetProposerBP()->PrepareTimeout();
//重新發(fā)起Prepare
Prepare(m_bWasRejectBySomeone);
}
- 發(fā)送Prepare消息。消息采用UDP方式發(fā)送。發(fā)送內(nèi)容包括本輪提案的實(shí)例編號(hào)、節(jié)點(diǎn)編號(hào)、提案編號(hào)、消息類型等。如果我們依次發(fā)起了多輪提案,每輪實(shí)例編號(hào)依次為1、2、3...,某些節(jié)點(diǎn)如果落后于其他節(jié)點(diǎn),需要通過(guò)實(shí)例編號(hào)隔離不同的提案請(qǐng)求。關(guān)于整個(gè)實(shí)例的概念,后面后有詳細(xì)的講解。
Prepare消息是發(fā)往各個(gè)節(jié)點(diǎn)的Acceptor的,Acceptor處理完成后發(fā)送Replay消息。Proposer交由OnPrepareReply處理Replay消息,邏輯如下:
- 判定消息的有效性。包括實(shí)例編號(hào)、提案編號(hào)是否一致;當(dāng)前是否處在Prepare階段等。代碼略。
- 根據(jù)消息內(nèi)容更新Proposer狀態(tài)。包括更新接收節(jié)點(diǎn)信息、接收或拒絕狀態(tài)更新。
//更新:已從node id節(jié)點(diǎn)獲取數(shù)據(jù)
m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
if (oPaxosMsg.rejectbypromiseid() == 0)
{
//更新:接受該提案,并將該節(jié)點(diǎn)的promise id(承諾提案編號(hào))、提案值更新到本地。
BallotNumber oBallot(oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid());
PLGDebug("[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu",
oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size());
m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
m_oProposerState.AddPreAcceptValue(oBallot, oPaxosMsg.value());
}
else
{
//更新:該提案被拒絕,標(biāo)明本輪存在拒絕請(qǐng)求的節(jié)點(diǎn)(重新發(fā)起提案時(shí)需要更新提案值)。并將該節(jié)點(diǎn)已承諾的提案編號(hào)更新到本地。
PLGDebug("[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid());
m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
m_bWasRejectBySomeone = true;
m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
}
- 根據(jù)已收到的各個(gè)節(jié)點(diǎn)的回復(fù),判定是否進(jìn)入Accept階段,或者重新發(fā)起Prepare。
//已收到超過(guò)半數(shù)的Accept回復(fù)消息,直接進(jìn)入Accept階段。
if (m_oMsgCounter.IsPassedOnThisRound())
{
int iUseTimeMs = m_oTimeStat.Point();
BP->GetProposerBP()->PreparePass(iUseTimeMs);
PLGImp("[Pass] start accept, usetime %dms", iUseTimeMs);
//最近一次發(fā)起的Prepare被接受,后續(xù)可跳過(guò)Prepare階段。
m_bCanSkipPrepare = true;
//進(jìn)入Accept階段。
Accept();
}
//已收到超過(guò)半數(shù)的Reject回復(fù)消息或者所有節(jié)點(diǎn)已回復(fù)(這個(gè)判斷并不需要),重新進(jìn)入Prepare階段
else if (m_oMsgCounter.IsRejectedOnThisRound() || m_oMsgCounter.IsAllReceiveOnThisRound())
{
BP->GetProposerBP()->PrepareNotPass();
PLGImp("[Not Pass] wait 30ms and restart prepare");
//重置Prepare超時(shí)定時(shí)器,提前觸發(fā)Prepare超時(shí)。
AddPrepareTimer(OtherUtils::FastRand() % 30 + 10);
}
3.2.2 Accept階段
Accept的入口函數(shù)如下:
void Proposer::Accept()
{
PLGHead("START ProposalID %lu ValueSize %zu ValueLen %zu",
m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size());
BP->GetProposerBP()->Accept();
m_oTimeStat.Point();
//標(biāo)識(shí)進(jìn)入Accept階段
ExitPrepare();
m_bIsAccepting = true;
PaxosMsg oPaxosMsg;
oPaxosMsg.set_msgtype(MsgType_PaxosAccept);
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
oPaxosMsg.set_value(m_oProposerState.GetValue());
oPaxosMsg.set_lastchecksum(GetLastChecksum());
m_oMsgCounter.StartNewRound();
//添加Accept超時(shí)定時(shí)器
AddAcceptTimer();
PLGHead("END");
//發(fā)送Accept消息
BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_Final);
}
Accept和Prepare階段的request操作如出一轍:
- Proposer狀態(tài)重置。表明當(dāng)前開始進(jìn)入Accept階段。
- 設(shè)置Accept超時(shí)定時(shí)器。Accept超時(shí)原因有很多,比如網(wǎng)絡(luò)丟包。當(dāng)Accept超時(shí)時(shí),處理方式也很簡(jiǎn)單,重新進(jìn)入Prepare階段。
void Proposer::OnAcceptTimeout()
{
PLGHead("OK");
//本輪提案已經(jīng)選舉結(jié)束,不再執(zhí)行任何操作。
if (GetInstanceID() != m_llTimeoutInstanceID)
{
PLGErr("TimeoutInstanceID %lu not same to NowInstanceID %lu, skip",
m_llTimeoutInstanceID, GetInstanceID());
return;
}
BP->GetProposerBP()->AcceptTimeout();
//重新發(fā)起Prepare
Prepare(m_bWasRejectBySomeone);
}
- 發(fā)送Accept消息到其他節(jié)點(diǎn)。消息采用UDP方式發(fā)送。發(fā)送內(nèi)容包括本輪提案的實(shí)例編號(hào)、節(jié)點(diǎn)編號(hào)、提案編號(hào)、消息類型、提案值等。
OnAcceptReply處理邏輯和OnPrepareReply類似,包括如下幾步:
- 判定消息有效性。包括實(shí)例編號(hào)、提案編號(hào)是否一致;當(dāng)前是否處在Accept階段等。
- 根據(jù)消息內(nèi)容更新Proposer狀態(tài)。包括更新接收節(jié)點(diǎn)信息、接收或拒絕狀態(tài)更新。
- 根據(jù)已收到的各個(gè)節(jié)點(diǎn)的回復(fù),判定Accept階段是否已完成,或者重新發(fā)起Prepare。
//提案被接受,提前退出Accept階段
if (m_oMsgCounter.IsPassedOnThisRound())
{
int iUseTimeMs = m_oTimeStat.Point();
BP->GetProposerBP()->AcceptPass(iUseTimeMs);
PLGImp("[Pass] Start send learn, usetime %dms", iUseTimeMs);
ExitAccept();
//通知Learner,本輪提案已完成
m_poLearner->ProposerSendSuccess(GetInstanceID(), m_oProposerState.GetProposalID());
}
else if (m_oMsgCounter.IsRejectedOnThisRound() || m_oMsgCounter.IsAllReceiveOnThisRound())
{
BP->GetProposerBP()->AcceptNotPass();
PLGImp("[Not pass] wait 30ms and Restart prepare");
//Accept失敗,終止Accept階段。
AddAcceptTimer(OtherUtils::FastRand() % 30 + 10);
}
如果提案為獲得半數(shù)通過(guò),即未被選中(chosen)將從prepare階段重新發(fā)起提案。如果獲得半數(shù)通過(guò),learner通知所有節(jié)點(diǎn)提案被選中,即從accept狀態(tài)改為chosen狀態(tài)。
3.3 Acceptor
Acceptor做為提案的被動(dòng)參與者,也分為Prepare和Accept兩個(gè)階段
3.3.1 Prepare階段
Proposer發(fā)送的Prepare消息由Acceptor的OnPrepare處理,邏輯如下:
int Acceptor :: OnPrepare(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu",
oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid());
BP->GetAcceptorBP()->OnPrepare();
PaxosMsg oReplyPaxosMsg;
oReplyPaxosMsg.set_instanceid(GetInstanceID());
oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
oReplyPaxosMsg.set_msgtype(MsgType_PaxosPrepareReply);
BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
//當(dāng)前的提案編號(hào)大于已承諾的編號(hào),接受該提案
if (oBallot >= m_oAcceptorState.GetPromiseBallot())
{
PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
"State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID,
m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
//返回之前承諾的提案編號(hào)
oReplyPaxosMsg.set_preacceptid(m_oAcceptorState.GetAcceptedBallot().m_llProposalID);
oReplyPaxosMsg.set_preacceptnodeid(m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
//如果之前已經(jīng)接受過(guò)某個(gè)提案的提案值,返回該提案值
if (m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0)
{
oReplyPaxosMsg.set_value(m_oAcceptorState.GetAcceptedValue());
}
//將承諾的提案更新為當(dāng)前提案
m_oAcceptorState.SetPromiseBallot(oBallot);
//保證P2.c的不變性,數(shù)據(jù)需要入庫(kù)
int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
if (ret != 0)
{
BP->GetAcceptorBP()->OnPreparePersistFail();
PLGErr("Persist fail, Now.InstanceID %lu ret %d",
GetInstanceID(), ret);
return -1;
}
BP->GetAcceptorBP()->OnPreparePass();
}
else
{
BP->GetAcceptorBP()->OnPrepareReject();
PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID);
//已存在更高編號(hào)的提案,拒絕該提案并返回已承諾提案編號(hào)
oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
}
nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
GetInstanceID(), oPaxosMsg.nodeid());;
//發(fā)送Replay消息
SendMessage(iReplyNodeID, oReplyPaxosMsg);
return 0;
}
OnPrepare函數(shù)看似并未做任何有效性校驗(yàn),但這部分校驗(yàn)是必不可少的,并未省去,而是出現(xiàn)在了調(diào)用OnPrepare的Instance類的上層函數(shù)中。關(guān)于Instance后面也會(huì)單獨(dú)說(shuō)明,這里的校驗(yàn)主要是保證參數(shù)中的instance id和acceptor一致。
另外一點(diǎn)需要說(shuō)明的是:前面一直提到的提案編號(hào)并不為完整的Paxos意義上的提案編號(hào),完整的提案編號(hào)由提案編號(hào)(proposal id)+節(jié)點(diǎn)Id(node id)兩部分組成。代碼上看,BallotNumber代表了Paxos意義上的提案編號(hào)。
class BallotNumber
{
public:
......
uint64_t m_llProposalID;
nodeid_t m_llNodeID;
};
3.3.2 Accept階段
Proposer發(fā)送的Accept消息由Acceptor的OnAccept處理,邏輯如下:
void Acceptor :: OnAccept(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu",
oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size());
BP->GetAcceptorBP()->OnAccept();
PaxosMsg oReplyPaxosMsg;
oReplyPaxosMsg.set_instanceid(GetInstanceID());
oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
oReplyPaxosMsg.set_msgtype(MsgType_PaxosAcceptReply);
BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
//當(dāng)前的提案編號(hào)大于或等于已承諾的編號(hào),接受該提案
if (oBallot >= m_oAcceptorState.GetPromiseBallot())
{
PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
"State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID,
m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
//更新提案編號(hào)、提案值
m_oAcceptorState.SetPromiseBallot(oBallot);
m_oAcceptorState.SetAcceptedBallot(oBallot);
m_oAcceptorState.SetAcceptedValue(oPaxosMsg.value());
//按Paxos協(xié)議P2.c不變性要求,數(shù)據(jù)需要持久化(包括實(shí)例號(hào)、提案編號(hào)、提案值)
int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
if (ret != 0)
{
BP->GetAcceptorBP()->OnAcceptPersistFail();
PLGErr("Persist fail, Now.InstanceID %lu ret %d",
GetInstanceID(), ret);
return;
}
BP->GetAcceptorBP()->OnAcceptPass();
}
else
{
BP->GetAcceptorBP()->OnAcceptReject();
PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID);
//已存在更高編號(hào)的提案,拒絕該提案并返回已承諾提案編號(hào)
oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
}
nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
GetInstanceID(), oPaxosMsg.nodeid());
SendMessage(iReplyNodeID, oReplyPaxosMsg);
}
有一點(diǎn)需要說(shuō)明,OnPrepare、OnAccept兩個(gè)階段中,如果處理結(jié)果為接受,那么需要保證不管在何種情況下,這個(gè)結(jié)果是不變的。因此,數(shù)據(jù)需要被持久化,但這個(gè)持久化數(shù)據(jù)只需要本地保存即可,PhxPaxos中采用leveldb做本地持久化數(shù)據(jù)庫(kù)。
3.5 并發(fā)
前面的處理邏輯中,沒有看到加鎖或者其他并發(fā)保護(hù)機(jī)制。那么,如果一個(gè)instance正在處理Prepare請(qǐng)求時(shí),同時(shí)接收到了Accept消息怎么辦呢?其實(shí),PhxPaxos中還有另外一個(gè)IOLoop線程,專門負(fù)責(zé)接收網(wǎng)絡(luò)消息,并串行執(zhí)行。
IOLoop線程的run函數(shù)實(shí)現(xiàn)如下:
void IOLoop :: run()
{
m_bIsEnd = false;
m_bIsStart = true;
while(true)
{
BP->GetIOLoopBP()->OneLoop();
int iNextTimeout = 1000;
DealwithTimeout(iNextTimeout);
//PLGHead("nexttimeout %d", iNextTimeout);
OneLoop(iNextTimeout);
if (m_bIsEnd)
{
PLGHead("IOLoop [End]");
break;
}
}
}
其中,OneLoop是我們本次關(guān)注的重點(diǎn):
void IOLoop :: OneLoop(const int iTimeoutMs)
{
std::string * psMessage = nullptr;
m_oMessageQueue.lock();
bool bSucc = m_oMessageQueue.peek(psMessage, iTimeoutMs);
if (!bSucc)
{
m_oMessageQueue.unlock();
}
else
{
m_oMessageQueue.pop();
m_oMessageQueue.unlock();
if (psMessage != nullptr && psMessage->size() > 0)
{
m_iQueueMemSize -= psMessage->size();
m_poInstance->OnReceive(*psMessage);
}
delete psMessage;
BP->GetIOLoopBP()->OutQueueMsg();
}
DealWithRetry();
//must put on here
//because addtimer on this funciton
m_poInstance->CheckNewValue();
}
OneLoop從m_oMessageQueue中取出一個(gè)網(wǎng)絡(luò)消息隨后交給Instance處理。每個(gè)Group的Instance啟動(dòng)一個(gè)IOLoop線程,不同Group彼此隔離,并發(fā)處理互不影響。
3.4 總結(jié)
本章簡(jiǎn)要介紹了Paxos算法原理,了解到Paxos算法的三大角色:Proposer、Acceptor、Learner。講解了Proposer、Learner兩個(gè)角色的主要代碼實(shí)現(xiàn),以及二者如何參與到Prepare、Accept兩個(gè)階段中。
至于最后一個(gè)角色Learner,原本的理解認(rèn)為應(yīng)該是參與度最低的,邏輯最少的角色。但PhxPaxos中,Learner是三者中實(shí)現(xiàn)最復(fù)雜的,這部分內(nèi)容將在下一章單獨(dú)講解。
【轉(zhuǎn)載請(qǐng)注明】隨安居士. 3. PhxPaxos源碼分析之Proposer、Acceptor. 2017.11.14