1. ZooKeeper Leader/Follower 啟動, Leader 選舉, Leader/Follower 建立 概述
先看一下下面這張圖:

上面這張圖片有點大, 建議在 百度云 里面進行下載預覽, 接下來我們會一步一步進行下去
PS: 吐槽一下簡書的圖片系統(tǒng), 圖片一旦大了就預覽出問題(不清晰)
2. QuorumPeerMain 解析配置文件構建 QuorumPeer
下面的代碼主要是從配置文件中獲取配置信息, 構建 QuorumPeer
// 根據 配置 QuorumPeerConfig 來啟動 QuorumPeer
public void runFromConfig(QuorumPeerConfig config) throws IOException {
LOG.info("QuorumPeerConfig : " + config);
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
try { // 1. 在 ZooKeeper 集群中, 每個 QuorumPeer 代表一個 服務
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
quorumPeer = new QuorumPeer();
quorumPeer.setClientPortAddress(config.getClientPortAddress());
quorumPeer.setTxnFactory(new FileTxnSnapLog( // 2. 設置 FileTxnSnapLog(這個類包裹 TxnLog, SnapShot)
new File(config.getDataLogDir()),
new File(config.getDataDir())));
quorumPeer.setQuorumPeers(config.getServers()); // 3. 集群中所有機器
quorumPeer.setElectionType(config.getElectionAlg()); // 4. 設置集群 Leader 選舉所使用的的算法(默認值 3, 代表 FastLeaderElection)
quorumPeer.setMyid(config.getServerId()); // 5. 每個 QuorumPeer 設置一個 myId 用于區(qū)分集群中的各個節(jié)點
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); // 6. 客戶端最小的 sessionTimeout 時間(若不設置的話, 就是 tickTime * 2)
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); // 7. 客戶端最小的 sessionTimeout 時間(若不設置的話, 就是 tickTime * 20)
quorumPeer.setInitLimit(config.getInitLimit()); // 8. 最常用的就是 initLimit * tickTime, getEpochToPropose(等待集群中所有節(jié)點的 Epoch值 ) waitForEpochAck(在 Leader 建立過程中, Leader 會向所有節(jié)點發(fā)送 LEADERINFO, 而Follower 節(jié)點會回復ACKEPOCH) waitForNewLeaderAck(在 Leader 建立的過程中, Leader 會向 Follower 發(fā)送 NEWLEADER, waitForNewLeaderAck 就是等候所有Follower 回復對應的 ACK 值)
quorumPeer.setSyncLimit(config.getSyncLimit()); // 9. 常用方法 self.tickTime * self.syncLimit 用于限制集群中各個節(jié)點相互連接的 socket 的soTimeout
quorumPeer.setQuorumVerifier(config.getQuorumVerifier()); // 10.投票方法, 默認超過半數就通過 (默認值 QuorumMaj)
quorumPeer.setCnxnFactory(cnxnFactory); // 11.設置集群節(jié)點接收client端連接使用的 nioCnxnFactory(用 基于原生 java nio, netty nio) (PS 在原生 NIO 的類中發(fā)現代碼中沒有處理 java nio CPU 100% 的bug)
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); // 12.設置 ZKDataBase
quorumPeer.setLearnerType(config.getPeerType()); // 13.設置節(jié)點的類別 (參與者/觀察者)
quorumPeer.setSyncEnabled(config.getSyncEnabled()); // 14.這個參數主要用于 (Observer Enables/Disables sync request processor. This option is enable by default and is to be used with observers.) 就是 Observer 是否使用 SyncRequestProcessor
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
quorumPeer.start(); // 15.開啟服務
LOG.info("quorumPeer.join begin");
quorumPeer.join(); // 16.等到 線程 quorumPeer 執(zhí)行完成, 程序才會繼續(xù)向下再執(zhí)行, 詳情見方法注解 (Waits for this thread to die.)
LOG.info("quorumPeer.join end");
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
3. QuorumPeer 啟動
主要是 加載數據到DataTree, 開啟監(jiān)聽客戶端連接, 開啟Leader選舉, 最后程序會在 QuorumPeer.run() 的while loop 里面
public synchronized void start() {
loadDataBase(); // 從SnapShot,TxnFile 加載數據到 DataTree
cnxnFactory.start(); // 開啟服務端的 端口監(jiān)聽
startLeaderElection(); // 開啟 Leader 選舉線程
super.start(); // 這一步 開啟 Thread.run() 方法
}
4. QuorumPeer.loadDataBase
從 snapshot/TxnLog里面加載數據到DataTree里面
// 經過下面的操作, 就會存在 currentEpoch, acceptEpoch 文件, 并且 DataTree 文件也會進行加載
private void loadDataBase() {
File updating = new File(getTxnFactory().getSnapDir(), // 1. 在 snap shot 文件目錄下面有對應的 updateEpoch 文件
UPDATING_EPOCH_FILENAME);
try {
zkDb.loadDataBase(); // 2. 從 snapshot, TxnLog 里面加載出 dataTree 及 sessionsWithTimeouts
// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; // 3. 獲取 zkDb 對應的處理過的 最新的一個 zxid 的值
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); // 4. 將 zxid 的高 32 位當做 epoch 值, 低 32 位才是 zxid
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); // 5. 從文件中加載 epoch 值 (若不存在 currentEpoch 文件, 則直接在 catch 中執(zhí)行代碼, 而且一般都是這樣)
if (epochOfZxid > currentEpoch && updating.exists()) { // 6. 此處說明 QuorumPeer 在進行 takeSnapShot 后, 進程直接掛了, 還沒來得及更新 currentEpoch
LOG.info("{} found. The server was terminated after " +
"taking a snapshot but before updating current " +
"epoch. Setting current epoch to {}.",
UPDATING_EPOCH_FILENAME, epochOfZxid);
setCurrentEpoch(epochOfZxid);
if (!updating.delete()) {
throw new IOException("Failed to delete " +
updating.toString());
}
}
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid; // 7. 遇到的是 currentEpoch 文件不存在, 直接運行到這里了
LOG.info(CURRENT_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
currentEpoch);
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
}
if (epochOfZxid > currentEpoch) {
throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
}
try {
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME); // 8. 從文件中讀取當前接收到的 epoch 值
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
acceptedEpoch = epochOfZxid; // 9. 當從 acceptEpoch 文件里面讀取數據失敗時, 就直接運行這邊的代碼
LOG.info(ACCEPTED_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
acceptedEpoch);
writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch); // 10 將 acceptEpoch 值直接寫入到對應的文件里面
}
if (acceptedEpoch < currentEpoch) {
throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + " is less than the accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch));
}
} catch(IOException ie) {
LOG.error("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
}
5. QuorumPeer.startLeaderElection
創(chuàng)建 Leader 選舉的算法及開啟 QuorumCnxManager.Listener 監(jiān)聽集群中的節(jié)點相互連接
// 開啟 leader 的選舉操作
synchronized public void startLeaderElection() {
try {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); // 1. 生成投給自己的選票
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
for (QuorumServer p : getView().values()) { // 2. 獲取集群里面的所有的機器
if (p.id == myid) {
myQuorumAddr = p.addr;
break;
}
}
if (myQuorumAddr == null) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
if (electionType == 0) { // 3. 現在默認的選舉算法是 FastLeaderElection
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
this.electionAlg = createElectionAlgorithm(electionType); // 4. 創(chuàng)建 Election
}
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3: // 1. 默認的 leader 選舉的算法
qcm = new QuorumCnxManager(this);
QuorumCnxManager.Listener listener = qcm.listener; // 2. 等待集群中的其他的機器進行連接
if(listener != null){
listener.start();
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
Listener 在監(jiān)聽到有其他節(jié)點連接上, 則進行相應的處理
6. QuorumCnxManager.Listener
/**
* Sleeps on accept().
*/
@Override
public void run() {
int numRetries = 0;
InetSocketAddress addr;
while((!shutdown) && (numRetries < 3)){ // 1. 有個疑惑 若真的出現 numRetries >= 3 從而退出了, 怎么辦
try {
ss = new ServerSocket();
ss.setReuseAddress(true);
if (self.getQuorumListenOnAllIPs()) { // 2. 這里的默認值 quorumListenOnAllIPs 是 false
int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
addr = new InetSocketAddress(port);
} else {
addr = self.quorumPeers.get(self.getId()).electionAddr;
}
LOG.info("My election bind port: " + addr.toString());
setName(self.quorumPeers.get(self.getId()).electionAddr
.toString());
ss.bind(addr);
while (!shutdown) {
Socket client = ss.accept(); // 3. 這里會阻塞, 直到有請求到達
setSockOpts(client); // 4. 設置 socket 的連接屬性
LOG.info("Received connection request " + client.getRemoteSocketAddress());
receiveConnection(client);
numRetries = 0;
}
} catch (IOException e) {
LOG.error("Exception while listening", e);
numRetries++;
try {
ss.close();
Thread.sleep(1000);
} catch (IOException ie) {
LOG.error("Error closing server socket", ie);
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. " +
"Ignoring exception", ie);
}
}
}
LOG.info("Leaving listener");
if (!shutdown) {
LOG.error("As I'm leaving the listener thread, "
+ "I won't be able to participate in leader "
+ "election any longer: "
+ self.quorumPeers.get(self.getId()).electionAddr);
}
}
7. QuorumCnxManager.Listener.receiveConnection
為防止重復建立連接, 集群中各個節(jié)點之間只允許大的 myid 連接小的 myid, 建立之后會有SendWorker, RecvWorker 來處理消息的接受發(fā)送
/**
* If this server receives a connection request, then it gives up on the new
* connection if it wins. Notice that it checks whether it has a connection
* to this server already or not. If it does, then it sends the smallest
* possible long value to lose the challenge.
*
*/
public boolean receiveConnection(Socket sock) { // 1.接收 集群之間各個節(jié)點的相互的連接
Long sid = null;
LOG.info("sock:"+sock);
try {
// Read server id
DataInputStream din = new DataInputStream(sock.getInputStream());
sid = din.readLong(); // 2.讀取對應的 myid (這里第一次讀取的可能是一個協議的版本號)
LOG.info("sid:"+sid);
if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
sid = din.readLong();
LOG.info("sid:"+sid);
// next comes the #bytes in the remainder of the message
int num_remaining_bytes = din.readInt(); // 3.讀取這整條消息的長度
byte[] b = new byte[num_remaining_bytes]; // 4.構建要讀取數據長度的字節(jié)數組
// remove the remainder of the message from din
int num_read = din.read(b); // 5.讀取消息的內容 (疑惑來了, 這里會不會有拆包斷包的情況出現)
if (num_read != num_remaining_bytes) { // 6.數據沒有讀滿, 進行日志記錄
LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
}
}
if (sid == QuorumPeer.OBSERVER_ID) { // 7.連接過來的是一個觀察者
/*
* Choose identifier at random. We need a value to identify
* the connection.
*/
sid = observerCounter--;
LOG.info("Setting arbitrary identifier to observer: " + sid);
}
} catch (IOException e) { // 8.這里可能會有 EOFException 表示讀取數據到文件尾部, 客戶端已經斷開, 沒什么數據可以讀取了, 所以直接關閉 socket
closeSocket(sock);
LOG.warn("Exception reading or writing challenge: " + e.toString() + ", sock:"+sock);
return false;
}
//If wins the challenge, then close the new connection.
if (sid < self.getId()) { // 9.在集群中為了防止重復連接, 只能允許大的 myid 連接小的
/*
* This replica might still believe that the connection to sid is
* up, so we have to shut down the workers before trying to open a
* new connection.
*/
SendWorker sw = senderWorkerMap.get(sid); // 10.看看是否已經有 SendWorker, 有的話就進行關閉
if (sw != null) {
sw.finish();
}
/*
* Now we start a new connection
*/
LOG.debug("Create new connection to server: " + sid);
closeSocket(sock); // 11.關閉 socket
connectOne(sid); // 12.因為自己的 myid 比對方的大, 所以進行主動連接
// Otherwise start worker threads to receive data.
} else { // 13.自己的 myid 比對方小
SendWorker sw = new SendWorker(sock, sid); // 14.建立 SendWorker
RecvWorker rw = new RecvWorker(sock, sid, sw); // 15.建立 RecvWorker
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid); // 16.若以前存在 SendWorker, 則進行關閉
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
if (!queueSendMap.containsKey(sid)) { // 17.若不存在 myid 對應的 消息發(fā)送 queue, 則就構建一個
queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
}
sw.start(); // 18.開啟 消息發(fā)送 及 接收的線程
rw.start();
return true;
}
return false;
}
8. QuorumPeer.run
程序最終一直在QuorumPeer.run里面, 而且狀態(tài)從 LOOKING -> LEADING ->LOOING -> LEADING 一直循環(huán)
@Override
public void run() {
setName("QuorumPeer" + "[myid=" + getId() + "]" + // 1. 設置當前線程的名稱
cnxnFactory.getLocalAddress());
LOG.debug("Starting quorum peer");
try {
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null); // 2. 在 QuorumPeer 上包裝 QuorumBean 注入到 JMX
for(QuorumServer s: getView().values()){ // 3. 遍歷每個 ZooKeeperServer 節(jié)點
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try { // 4. 將 LocalPeerBean 注入到 JMX 里面
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else { // 5. 若myid不是本機, 也注入到 JMX 里面
p = new RemotePeerBean(s);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
}
try {
/*
* Main loop
*/
while (running) { // 6. QuorumPeer 會一直在這個 while 里面 (一般先是 LOOKING, LEADING/FOLLOWING)
switch (getPeerState()) {
case LOOKING: // 7. QuorumPeer 是 LOOKING 狀態(tài), 正在尋找 Leader 機器
LOG.info("LOOKING, and myid is " + myid);
if (Boolean.getBoolean("readonlymode.enabled")) { // 8. 判斷啟動服務是否是 readOnly 模式
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
logFactory, this,
new ZooKeeperServer.BasicDataTreeBuilder(),
this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start(); // 9. 這里分兩部(a.. QuorumPeer.start().startLeaderElection().createElectionAlgorithm()
setBCVote(null); // b. 調用 Election.lookForLeader方法開始選舉, 直至 選舉成功/其中發(fā)生異常
setCurrentVote(makeLEStrategy().lookForLeader());// 10. 創(chuàng)建選舉 Leader 德策略)選舉算法, 在這里可能需要消耗一點時間
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try { // 11. 這里分兩部(a. QuorumPeer.start().startLeaderElection().createElectionAlgorithm()
setBCVote(null); // b. 調用 Election.lookForLeader方法開始選舉, 直至 選舉成功/其中發(fā)生異常
setCurrentVote(makeLEStrategy().lookForLeader());// 12. 選舉算法, 在這里可能需要消耗一點時間
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING, and myid is " + myid);
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING, and myid is " + myid); // 13. 最上層還是 QuorumPeer
setFollower(makeFollower(logFactory)); // 14. 初始化 follower, 在 Follower 里面引用 FollowerZooKeeperServer
follower.followLeader(); // 15. 帶用 follower.followLeader, 程序會阻塞在這里
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
case LEADING:
LOG.info("LEADING, and myid is " + myid);
try {
setLeader(makeLeader(logFactory)); // 16. 初始化 Leader 對象
leader.lead(); // 17. Leader 程序會阻塞在這里
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
}
}
} finally {
LOG.warn("QuorumPeer main thread exited");
try {
MBeanRegistry.getInstance().unregisterAll();
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
jmxQuorumBean = null;
jmxLocalPeerBean = null;
}
}
9. FastLeaderElection.lookForLeader
zookeeper默認使用FastLeaderElection做Leader選舉的算法, 接下來直接看代碼
/**
* Starts a new round of leader election. Whenever our QuorumPeer
* changes its state to LOOKING, this method is invoked, and it
* sends notifications to all other peers.
*/
// 每個 QuorumPeer 啟動時會調用這個方法, 通過這里的調用來完成 Leader 的選舉
public Vote lookForLeader() throws InterruptedException {
LOG.info("QuorumPeer {" + self + "} is LOOKING !");
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register( // 1. 將 jmxLeaderElectionBean 注冊入 JMX 里面 (有個注意點在使用 classLader 時,進行熱部署需要 unregister 掉注入到 java核心包里面德 SQL Driver)
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = System.currentTimeMillis();
}
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); // 2. 收到的投票信息
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
LOG.info("logicalclock :" + logicalclock);
logicalclock++; // 3. 獲取對應的 myid, zxid, epoch 值
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x " + Long.toHexString(proposedZxid));
LOG.info("sendNotifications to QuorumPeers ");
sendNotifications(); // 4. 先將進行 Leader 選舉的信息發(fā)送給集群里面德節(jié)點 (包括自己)
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ // 5. 若 QuorumPeer 還處于 LOOKING 狀態(tài), 則一直運行下面的 loop, 直到 Leader 選舉成功
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/ // 6. 獲取 投票的信息(這里是 Leader/Follower 發(fā)給自己德投票德信息)
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
LOG.info("Notification:"+n);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){ // 7. 這里 n == null, 說明有可能 集群之間的節(jié)點還沒有正真連接上
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll(); // 8. 開始連接集群中的各臺機器, 連接成功后都會有對應的 SenderWorker ReceiverWorker 與之對應
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if(self.getVotingView().containsKey(n.sid)) { // 9. 處理集群中節(jié)點發(fā)來 Leader 選舉德投票消息收到投票的信息
/*
* Only proceed if the vote comes from a replica in the
* voting view.
*/
switch (n.state) {
case LOOKING: // 10.消息的來源方說自己也在找 Leader
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock) { // 11.若果接收到的 notication 的 epoch(選舉的輪次)大于當前的輪次
logicalclock = n.electionEpoch;
recvset.clear(); // 12.totalOrderPredicate 將收到的 投票信息與 自己的進行比較(比較的次序依次是 epoch, zxid, myid)
boolean totalOrderPredicate = totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
LOG.info("n.leader:" + n.leader + ", n.zxid:"+ n.zxid +", n.peerEpoch:"+n.peerEpoch +", getInitId():"+getInitId() +", getInitLastLoggedZxid():"+getInitLastLoggedZxid() + ", getPeerEpoch():"+getPeerEpoch());
LOG.info("totalOrderPredicate:"+totalOrderPredicate);
if(totalOrderPredicate) { // 13.新收到的 Leader 選舉信息勝出, 覆蓋本地的選舉信息
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications(); // 14.因為這里 Leader 選舉德信息已經更新了, 所以這里 將 Leader 選舉的消息發(fā)送出去
} else if (n.electionEpoch < logicalclock) { // 15.若果接收到的 notication 的 epoch(選舉的輪次)小于當前的輪次, 則直接丟掉
LOG.info("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock));
break; // 16.若接收到德選舉消息德 epoch 與自己的在同一個選舉周期上 totalOrderPredicate 將收到的 投票信息與 自己的進行比較(比較的次序依次是 epoch, zxid, myid)
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch); // 17.接收到的 Leader新收到的 Notification 勝出, 將Notification里面的內容覆蓋本地的選舉信息
sendNotifications(); // 18.因為這里 Leader 選舉德信息已經更新了, 所以這里 將 Leader 選舉的消息發(fā)送出去
} // 19. 將收到的投票信息放入投票的集合 recvset 中, 用來作為最終的 "過半原則" 判斷
Vote vote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
LOG.info("Receive Notification: " + n);
LOG.info("Adding vote: " + vote);
recvset.put(n.sid, vote);
// 20.生成這次Vote 通過 termPredicate判斷這次選舉是否結束
Vote selfVote = new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch);
boolean termPredicate = termPredicate(recvset,selfVote ); // 21.判斷選舉是否結束 (默認的就是過半原則)
LOG.info("recvset:"+recvset +", || selfVote: " + selfVote);
LOG.info("termPredicate:"+termPredicate);
if (termPredicate) { // 22.滿足過半原則, Leader 選舉成功
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){ // 23.這時候再從 recvqueue 里面獲取 Notification
boolean totalOrderPredicate2 = totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch);// 24.判斷是否需要更新 本機(QuorumPeer) 的投票信息
LOG.info("totalOrderPredicate2:"+totalOrderPredicate2);
if(totalOrderPredicate2){ // 25.若還需要更新 Leader 的投票信息
recvqueue.put(n); // 26.則將對方發(fā)來的 Notification 放入 recvqueue, 重新等待獲取 Notification
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) { // 27.若n==null, 說明 Leader 集群中的選舉可以定下來了, 修改狀態(tài)信息 至 Leading
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState()); // 28.判斷這時確認的 Leader 是否是本機, 若是的話, 則更新本機的state為 LEADING
Vote endVote = new Vote(proposedLeader, // 29.組裝生成這次 Leader 選舉最終的投票的結果
proposedZxid,
logicalclock,
proposedEpoch);
leaveInstance(endVote); // 30.Leader選舉結束, 清空 recvqueue
return endVote; // 31.這時會退回到程序的上層, 進行 follower.followLeader() / leader.lead()
}
}
break;
case OBSERVING: // 32.角色是 OBSERVING 的 QuorumPeer 不參與 Leader 的選舉
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock){
recvset.put(n.sid, new Vote(n.leader, // 33.同樣需要將 投票的信息加入到集合里面
n.zxid,
n.electionEpoch,
n.peerEpoch));
if(ooePredicate(recvset, outofelection, n)) { // 34.檢測投票是否結束, Leader 是否已經去人
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState()); // 35.在此處進行更行 QuorumPeer 的狀態(tài)信息 (LEADING / FOLLOWING)
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if(ooePredicate(outofelection, outofelection, n)) {
synchronized(this){
logicalclock = n.electionEpoch;
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
}
}
通過 FastLeaderElection 確定 Leader 后, Leader與Follower就分開來處理, 下面分開進行
10. Leader.lead() 第一部分
在Leader端, 則通過lead()來處理與Follower的交互(這里的過程主要涉及 Follower, Learner, LearnerHandler, Leader 之間的交互, 而且有好幾個阻塞的地方)
self.tick = 0;
zk.loadData(); // 2. 從 snapshot, txn log 里面進行恢復 zxid
// 3. 生成 Leader 的狀態(tài)信息
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
LOG.info("leaderStateSummary:" + leaderStateSummary);
// Start thread that waits for connection requests from
// new followers.
cnxAcceptor = new LearnerCnxAcceptor(); // 4. LearnerCnxAcceptor 它會監(jiān)聽在對應端口, 一有 follower 連接上, 就開啟一個 LearnerHandler 來處理對應的事件
LOG.info("cnxAcceptor start");
cnxAcceptor.start();
readyToStart = true; // 5. 一開始這個 getAcceptedEpoch 是直接從文件中恢復過來的, 指的是處理過的 Propose
LOG.info("self.getId() :" + self.getId() + ", self.getAcceptedEpoch():" + self.getAcceptedEpoch());
// 6. 等待足夠多de Follower進來, 代表自己確實是 leader, 此處 lead 線程可能在 while 循環(huán)處等待
// 7. 而在對應的 LearnerHandler 里面, 也會收到對應的 FOLLOWERINFO 數據包, 里面包含 acceptEpoch 數據
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
在 Leader 端會建立一個 LearnerCnxAcceptor, 用于連接集群中的其他節(jié)點, 建立后會用一個 LearnerHandler 來維護他們之間的交互; 而這時 Leader 會阻塞在 getEpochToPropose里面, 直到有過半 Follower發(fā)來信息, 并且在 LearnerHandler里面調用了 Leader.getEpochToPropose后就解除阻塞
11. Follower 連接 Leader
程序先通過 findLeader找到Leader
/**
* Returns the address of the node we think is the leader.
*/
// 返回 Leader 的網絡信息
protected InetSocketAddress findLeader() {
InetSocketAddress addr = null;
// Find the leader by id
Vote current = self.getCurrentVote(); // 獲取 QuorumPeer 的投票信息, 里面包含自己Leader選舉所投的信息
for (QuorumServer s : self.getView().values()) {
if (s.id == current.getId()) {
addr = s.addr; // 獲取 Leader 的 addr
break;
}
}
if (addr == null) {
LOG.warn("Couldn't find the leader with id = "
+ current.getId());
}
return addr;
}
找到 Leader 的地址后就可以和Leader建立連接
/**
* Establish a connection with the Leader found by findLeader. Retries
* 5 times before giving up.
* @param addr - the address of the Leader to connect to.
* @throws IOException - if the socket connection fails on the 5th attempt
* @throws ConnectException
* @throws InterruptedException
*/
// 連接 leader, 建立成功后, 在 Leader 端會有一個 LearnerHandler 處理與之的通信
protected void connectToLeader(InetSocketAddress addr) throws IOException, ConnectException, InterruptedException {
sock = new Socket();
sock.setSoTimeout(self.tickTime * self.initLimit); // 1. 這里的 SoTimeout 很重要, 若 InputStream.read 超過這個時間,則會報出 SocketTimeoutException 異常
for (int tries = 0; tries < 5; tries++) { // 2. 連接 Leader 嘗試 5次, 若還是失敗, 則拋出異常, 一直往外拋出, 直到 QuorumPeer 的重新開始選舉 leader run 方法里面 -> 進行選舉 Leader
try {
sock.connect(addr, self.tickTime * self.syncLimit); // 3. 連接 leader
sock.setTcpNoDelay(nodelay); // 4. 設置 tcpnoDelay <- 這里其實就是禁止 tcp 底層合并小數據包, 一次發(fā)送所有數據的 算法
break;
} catch (IOException e) {
if (tries == 4) {
LOG.error("Unexpected exception",e);
throw e;
} else {
LOG.warn("Unexpected exception, tries="+tries+
", connecting to " + addr,e);
sock = new Socket();
sock.setSoTimeout(self.tickTime * self.initLimit);
}
}
Thread.sleep(1000);
} // 5. 封裝對應的 I/O 數據流
leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); // 6. 封裝輸出數據流
}
Follower在與Leader建立連接之后會調用registerWithLeader()方法, 與Leader同步確認 epoch 值
12. Learner.registerWithLeader 第一部分
registerWithLeader(Leader.FOLLOWERINFO) 將Follower的zxid及 myid 等信息封裝好發(fā)送到Leader
LOG.info("registerWithLeader:" + pktType);
/*
* Send follower info, including last zxid and sid
*/
long lastLoggedZxid = self.getLastLoggedZxid(); // 獲取 Follower 的最后處理的 zxid
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType); // 若是 Follower ,則當前的角色是 Leader.FOLLOWERINFO
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); // Follower 的 lastZxid 的值
/*
* Add sid to payload
*/
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000); // 將 Follower 的信息封裝成 LearnerInfo
LOG.info("li:" + li);
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray()); // 在 QuorumPacket 里面添加 Follower 的信息
LOG.info("qp:" + qp);
writePacket(qp, true); // 發(fā)送 QuorumPacket 包括 learnerInfo 與 Leader.FOLLOWERINFO, 通過 self.getAcceptedEpoch() 構成的 zxid
12. LearnerHandler.run 第一部分
處理Follower發(fā)過來的Leader.FOLLOWERINFO信息
tickOfNextAckDeadline = leader.self.tick + leader.self.initLimit + leader.self.syncLimit;
LOG.info("tickOfNextAckDeadline : " + tickOfNextAckDeadline);
// 1. 構建與 Follower 之間建立的 socket 成數據流
ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
// 2. 等待 Follower 發(fā)來數據包
QuorumPacket qp = new QuorumPacket();
long a1 = System.currentTimeMillis();
ia.readRecord(qp, "packet"); // 3. 讀取 Follower 發(fā)過來的 FOLLOWERINFO 數據包
LOG.info("System.currentTimeMillis() - a1 : " + (System.currentTimeMillis() - a1));
LOG.info("qp:" + qp);
// 4. 不應該有這種數據的存在
if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!");
return;
}
byte learnerInfoData[] = qp.getData(); // 5. 讀取參與者發(fā)來的數據
LOG.info("learnerInfoData :" + Arrays.toString(learnerInfoData)); // 6. 這里的 learnerInfo 就是 Follower/Observer 的信息
if (learnerInfoData != null) {
if (learnerInfoData.length == 8) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
this.sid = bbsid.getLong();
} else {
LearnerInfo li = new LearnerInfo(); // 7. 反序列化出 LearnerInfo
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
LOG.info("li :" + li);
this.sid = li.getServerid(); // 8. 取出 Follower 的 myid
this.version = li.getProtocolVersion(); // 9. 通訊的協議
}
} else {
this.sid = leader.followerCounter.getAndDecrement();
}
LOG.info("Follower sid: " + sid + " : info : " + leader.self.quorumPeers.get(sid));
if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
}
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); // 10. 通過 zxid 來獲取 Follower 的 Leader 選舉的 epoch
LOG.info("qp : " + qp + ", lastAcceptedEpoch : " + lastAcceptedEpoch);
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); // 11. 將 Follower 的 Leader 選舉的 epoch 信息加入到 connectingFollowers 里面, 判斷 集群中過半的Leader參與者了 getEpochToPropose
程序最后調用了leader.getEpochToPropose(), 當集群中有過半的節(jié)點發(fā)來后, 會在這里解除阻塞
在解除阻塞之后, Leader會向Follower發(fā)送LeaderLEADERINFO的信息
byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000); // 14. 構建出 描述Leader信息的數據包
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
LOG.info("newEpochPacket:" + newEpochPacket);
oa.writeRecord(newEpochPacket, "packet"); // 15. 將 Leader 的信息發(fā)送給對應的 Follower / Observer
bufferedOutput.flush();
而此時Follower在接受到Leader.LEADERINFO信息之后會回復 Leader.ACKEPOCH 信息
// we are connected to a 1.0 server so accept the new epoch and read the next packet
leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
LOG.info("leaderProtocolVersion:" + leaderProtocolVersion);
byte epochBytes[] = new byte[4];
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
LOG.info("newEpoch:" + newEpoch + ", self.getAcceptedEpoch():" + self.getAcceptedEpoch());
if (newEpoch > self.getAcceptedEpoch()) { // 若 Follower 的 election 的 epoch 值小于自己, 則用 Leader 的
wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
self.setAcceptedEpoch(newEpoch);
} else if (newEpoch == self.getAcceptedEpoch()) {
// since we have already acked an epoch equal to the leaders, we cannot ack
// again, but we still need to send our lastZxid to the leader so that we can
// sync with it if it does assume leadership of the epoch.
// the -1 indicates that this reply should not count as an ack for the new epoch
wrappedEpochBytes.putInt(-1);
} else { // 若 Follower.epoch > Leader.epoch 則說明前面的 Leader 選舉出錯了
throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
} // 在 接收到 Leader.LEADERINFO 的消息后, 進行回復 Leader.ACKEPOCH 的消息, 并且加上 lastLargestZxid 值
QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
LOG.info("ackNewEpoch:" + ackNewEpoch);
writePacket(ackNewEpoch, true); // 將 ACKEPOCH 信息發(fā)送給對方 用于回復Leader發(fā)過來的LEADERINFO
return ZxidUtils.makeZxid(newEpoch, 0);
接著就是LearnerHandler對Leader.ACKEPOCH的處理了
byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000); // 14. 構建出 描述Leader信息的數據包
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
LOG.info("newEpochPacket:" + newEpochPacket);
oa.writeRecord(newEpochPacket, "packet"); // 15. 將 Leader 的信息發(fā)送給對應的 Follower / Observer
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet"); // 16. Leader 讀取 Follower 發(fā)來的 ACKEPOCH 信息
LOG.info("ackEpochPacket:" +ackEpochPacket); // 17. 剛剛發(fā)送了 leader 的信息, 現在獲取一下確認的 ack
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error(ackEpochPacket.toString()
+ " is not ACKEPOCH");
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
LOG.info("ss : " + ss);
leader.waitForEpochAck(this.getSid(), ss); // 18. 在這邊等待所有的 Follower 都回復 ACKEPOCH 值 (這里也是滿足過半就可以)
接下來就是 Leader 與 Follower之間的數據同步
13. Learner同步數據至Follower
這里面將涉及下面幾個概念
1. committedLog 里面保存著Leader 端處理的最新的500個Proposal
2. 當 Follower處理的Proposal大于 maxCommittedLog, 則 Follower 要TRUNC自己的Proposal至maxCommittedLog
3. 當 Follower處理的Proposal小于 maxCommittedLog, 大于minCommittedLog, 則Leader將Follower沒有的Proposal發(fā)送到Follower, 讓其處理
4. 當 Follower處理的Proposal小于 minCommittedLog, 則Leader發(fā)送 Leader.SNAP給FOLLOWER, 并且將自身的數據序列化成數據流, 發(fā)送給 Follower
下面直接看代碼
/* we are sending the diff check if we have proposals in memory to be able to
* send a diff to the
*/
ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
ReadLock rl = lock.readLock();
try {
rl.lock(); // 20. Leader上將 最近已經提交的 Request 緩存到 ZKDatabase.committedLog里面(這個操作在 FinalRequestProcessor.processRequest 里面操作) 事務的 zxid 會 minCommittedLog -> maxCommittedLog 之間的事務
final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
LOG.info("sid:" + sid + ", maxCommittedLog:" + Long.toHexString(maxCommittedLog)
+ ", minCommittedLog:" +Long.toHexString(minCommittedLog)
+ " peerLastZxid=0x"+Long.toHexString(peerLastZxid)
);
/**
* http://www.itdecent.cn/p/4cc1040b6a14
* 獲取 Leader 已經提交的 Request 數據
* 1) 若 lastzxid 在 min 和 max 之間
* 循環(huán) proposals
* a) 當單個 proposal 的zxid <= 當前的 peerLastZxid 時, 說明已經提交過了, 直接跳過
* b) 當 proposal 的zxid 大于 peerLastZxid 時, 則刪除小于 peerLastZxid部分, 因為已經提交過了, 剩余部分繼續(xù)做 commit 操作,
* 因此在所有 commit 之前, 先發(fā)送一個 trunc 事件, 刪除已經提交過的部分, 然后發(fā)送需要的 commit 的相關節(jié)點
* 2) 如果當前的 peerLastZxid 大于 max, 則全部做 TRUNC
* 3) 剩下的不處理, 可能是新加入的節(jié)點, 所以事件類型為 SNAP, 同步數據時直接取快照
*/
LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog(); // 21. 獲取Leader 上最近提交的Request, 查看是否還有需要的投票
LOG.info("proposals:"+proposals);
if (proposals.size() != 0) { // 22. proposals 里面存儲的是已經提交的 Proposal
LOG.debug("proposal size is {}", proposals.size());
if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) { // 23. 若這個 If 條件成立, 說明 Follower 與 Leader 之間有少于 500 條數據未同步
LOG.info("sid:" + sid + ", maxCommittedLog:" + Long.toHexString(maxCommittedLog)
+ ", minCommittedLog:" +Long.toHexString(minCommittedLog)
+ " peerLastZxid=0x"+Long.toHexString(peerLastZxid)
);
LOG.debug("Sending proposals to follower");
// as we look through proposals, this variable keeps track of previous
// proposal Id.
long prevProposalZxid = minCommittedLog;
// Keep track of whether we are about to send the first packet.
// Before sending the first packet, we have to tell the learner
// whether to expect a trunc or a diff
boolean firstPacket=true;
// If we are here, we can use committedLog to sync with
// follower. Then we only need to decide whether to
// send trunc or not
packetToSend = Leader.DIFF;
zxidToSend = maxCommittedLog;
for (Proposal propose: proposals) {
// skip the proposals the peer already has // 24. 這個 Propose 已經處理過了, continue
if (propose.packet.getZxid() <= peerLastZxid) {
prevProposalZxid = propose.packet.getZxid(); // 25. 若 follower 已經處理過, 則更新 prevProposalZxid, 輪詢下個 Proposal
continue;
} else {
// If we are sending the first packet, figure out whether to trunc
// in case the follower has some proposals that the leader doesn't
if (firstPacket) { // 26. 在發(fā)起 Proposal 之前一定要確認 是否 follower 比 Leader 超前處理 Proposal
firstPacket = false;
// Does the peer have some proposals that the leader hasn't seen yet
if (prevProposalZxid < peerLastZxid) { // 27. follower 的處理事務處理比 leader 多, 也就是說prevProposalZxid這時就是maxCommittedLog, 則發(fā)送 TRUC 進行 Proposal 數據同步
// send a trunc message before sending the diff
packetToSend = Leader.TRUNC;
zxidToSend = prevProposalZxid;
updates = zxidToSend;
}
}
queuePacket(propose.packet); // 28. 將 事務發(fā)送到 發(fā)送隊列里面
QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
null, null);
queuePacket(qcommit); // 29. 緊接著發(fā)送一個 commit, 讓 Follower 來進行提交 request
}
}
} else if (peerLastZxid > maxCommittedLog) { // 30. follower 的處理事務處理比 leader 多, 則發(fā)送 TRUC 進行 Proposal 數據同步
LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
Long.toHexString(maxCommittedLog),
Long.toHexString(updates));
LOG.info("sid:" + sid + ", maxCommittedLog:" + Long.toHexString(maxCommittedLog)
+ ", minCommittedLog:" +Long.toHexString(minCommittedLog)
+ " peerLastZxid=0x"+Long.toHexString(peerLastZxid)
+ ", updates : " + Long.toHexString(updates)
);
packetToSend = Leader.TRUNC; // 31. 發(fā)送 TRUNC, Follower 將刪除比 Leader 多的 Request
zxidToSend = maxCommittedLog; // 32. 這里的 maxCommittedLog 是 Leader 處理過的最大 Request的 zxid
updates = zxidToSend;
} else {
LOG.warn("Unhandled proposal scenario");
} // 33. 若 Follower 與 Leader 的 lastZxid 相同, 則 發(fā)送 DIFF
} else if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
// The leader may recently take a snapshot, so the committedLog
// is empty. We don't need to send snapshot if the follow
// is already sync with in-memory db.
LOG.info("committedLog is empty but leader and follower "
+ "are in sync, zxid=0x{}",
Long.toHexString(peerLastZxid));
LOG.info("sid:" + sid + ", maxCommittedLog:" + Long.toHexString(maxCommittedLog)
+ ", minCommittedLog:" +Long.toHexString(minCommittedLog)
+ " peerLastZxid=0x"+Long.toHexString(peerLastZxid)
);
packetToSend = Leader.DIFF;
zxidToSend = peerLastZxid;
} else {
// just let the state transfer happen
LOG.debug("proposals is empty");
}
LOG.info("Sending " + Leader.getPacketType(packetToSend));
leaderLastZxid = leader.startForwarding(this, updates); // 34. leader 將沒有持久化但已經過半 ACK 確認過了的Proposal發(fā)給 Learner (這里就是細節(jié))
LOG.info("leaderLastZxid : " + leaderLastZxid);
} finally {
rl.unlock();
}
對于消息的發(fā)送 Leader 最后發(fā)送一個 Leader.NEWLEADER
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, ZxidUtils.makeZxid(newEpoch, 0), null, null);
LOG.info("newLeaderQP:" + newLeaderQP);
if (getVersion() < 0x10000) {
oa.writeRecord(newLeaderQP, "packet");
} else {
queuedPackets.add(newLeaderQP); // 36. 將 Leader.NEWLEADER 的數據包加入到發(fā)送隊列(注意此時還沒有啟動發(fā)送隊列的線程)
}
bufferedOutput.flush();
14. Follower處理與Leader之間的數據同步
synchronized (zk) {
if (qp.getType() == Leader.DIFF) { // DIFF 數據包(DIFF數據包顯示集群中兩個節(jié)點的 lastZxid 相同)
LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
}
else if (qp.getType() == Leader.SNAP) { // 收到的信息是 snap, 則從 leader 復制一份 鏡像數據到本地(Leader比Follower處理的Proposal多至少500個)
LOG.info("Getting a snapshot from leader");
// The leader is going to dump the database
// clear our own database and read
zk.getZKDatabase().clear();
zk.getZKDatabase().deserializeSnapshot(leaderIs); // 從 InputStream 里面 反序列化出 DataTree
String signature = leaderIs.readString("signature"); // 看了一個 讀取 tag "signature" 代表的一個 String 對象
if (!signature.equals("BenWasHere")) {
LOG.error("Missing signature. Got " + signature);
throw new IOException("Missing signature");
}
} else if (qp.getType() == Leader.TRUNC) { // 回滾到對應的事務到 qp.getZxid()(Follower處理的事務比Leader多)
//we need to truncate the log to the lastzxid of the leader
LOG.warn("Truncating log to get in sync with the leader 0x" + Long.toHexString(qp.getZxid()));
boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
LOG.info("truncated:" + truncated + ", qp.getZxid():" + qp.getZxid());
if (!truncated) {
// not able to truncate the log
LOG.error("Not able to truncate the log "
+ Long.toHexString(qp.getZxid()));
System.exit(13);
}
}
else {
LOG.error("Got unexpected packet from leader "
+ qp.getType() + " exiting ... " );
System.exit(13);
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); // 因為這里的 ZKDatatree 是從 Leader 的 SnapShot 的 InputStream 里面獲取的, 所以調用這里通過 set 進行賦值
zk.createSessionTracker(); // Learner 創(chuàng)建對應的 SessionTracker (Follower/Observer)(LearnerSessionTracker)
long lastQueued = 0;
// in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0
// we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER)
// we need to make sure that we don't take the snapshot twice.
boolean snapshotTaken = false;
// we are now going to start getting transactions to apply followed by an UPTODATE
outerLoop:
while (self.isRunning()) { // 這里的 self.isRunning() 默認就是 true
readPacket(qp);
LOG.info("qp:" + qp);
switch(qp.getType()) {
case Leader.PROPOSAL: // 將投票信息加入到 待處理列表
PacketInFlight pif = new PacketInFlight();
pif.hdr = new TxnHeader();
pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr); // 反序列化對應的 請求事務體
LOG.info("pif:" + pif);
if (pif.hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(pif.hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = pif.hdr.getZxid();
packetsNotCommitted.add(pif);
break;
case Leader.COMMIT: // commit 則將事務提交給 Server 處理
LOG.info("snapshotTaken :" + snapshotTaken);
if (!snapshotTaken) {
pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
} else {
zk.processTxn(pif.hdr, pif.rec); // 處理對應的事件
packetsNotCommitted.remove();
}
} else {
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.INFORM: // 這個 INFORM 只有Observer 才會處理
/*
* Only observer get this type of packet. We treat this
* as receiving PROPOSAL and COMMMIT.
*/
PacketInFlight packet = new PacketInFlight();
packet.hdr = new TxnHeader();
packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
LOG.info("packet:" + packet);
// Log warning message if txn comes out-of-order
if (packet.hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(packet.hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = packet.hdr.getZxid();
LOG.info("snapshotTaken : " + snapshotTaken);
if (!snapshotTaken) {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
packetsNotCommitted.add(packet);
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.UPTODATE: // UPTODATE 數據包, 說明同步數據成功, 退出循環(huán)
LOG.info("snapshotTaken : " + snapshotTaken + ", newEpoch:" + newEpoch);
if (!snapshotTaken) { // true for the pre v1.0 case
zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
}
self.cnxnFactory.setZooKeeperServer(zk);
break outerLoop; // 獲取 UPTODATE 后 退出 while loop
case Leader.NEWLEADER: // it will be NEWLEADER in v1.0 // 說明之前殘留的投票已經處理完, 則將內存中的數據寫入文件, 并發(fā)送 ACK 包
LOG.info("newEpoch:" + newEpoch);
// Create updatingEpoch file and remove it after current
// epoch is set. QuorumPeer.loadDataBase() uses this file to
// detect the case where the server was terminated after
// taking a snapshot but before setting the current epoch.
File updating = new File(self.getTxnFactory().getSnapDir(),
QuorumPeer.UPDATING_EPOCH_FILENAME);
if (!updating.exists() && !updating.createNewFile()) {
throw new IOException("Failed to create " +
updating.toString());
}
zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
if (!updating.delete()) {
throw new IOException("Failed to delete " +
updating.toString());
}
snapshotTaken = true;
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
}
}
接著Follower將發(fā)送NEWLEADER對應的ACK信息, 并且處理數據同步時Leader發(fā)送過來的Proposal消息, 緊接著Followerjiuzai 一個while循環(huán)里面一直讀取數據包并處理數據包; 與之對應的是LearnerHandler, LearnerHandler最后就在 while loop 里面一直處理與Follower之間的消息
15. 總結
整個Leader/Follower啟動, Leader選舉, Leader與Follower之間數據的同步涉及好幾個步驟, 細節(jié)點比較多, 還好總體線路比較清晰, 若想了解的話, 可以先看一下片頭的那個流程圖!