1.關于Leader的minCommittedLog和maxCommittedLog
相關屬性是定義在類ZKDatabase中
protected long minCommittedLog, maxCommittedLog;
public static final int commitLogCount = 500;
protected static int commitLogBuffer = 700;
protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
該committedLog列表主要作用是為了leader和follower之間快速完成數(shù)據(jù)同步用的緩存。
方法addCommittedProposal:
public void addCommittedProposal(Request request) {
WriteLock wl = logLock.writeLock();
try {
wl.lock();
// 當前列表已經(jīng)大于緩存大小(默認值為500)
if (committedLog.size() > commitLogCount) {
// 大于的話,就將列表頭部第一個commit log刪除
committedLog.removeFirst();
// minCommittedLog,就是committedLog列表中第一個元素
minCommittedLog = committedLog.getFirst().packet.getZxid();
}
if (committedLog.size() == 0) {
minCommittedLog = request.zxid;
maxCommittedLog = request.zxid;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
try {
request.hdr.serialize(boa, "hdr");
if (request.txn != null) {
request.txn.serialize(boa, "txn");
}
baos.close();
} catch (IOException e) {
LOG.error("This really should be impossible", e);
}
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
baos.toByteArray(), null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
committedLog.add(p);
// maxCommittedLog,就是指向committedLog列表的最后一個元素
maxCommittedLog = p.packet.getZxid();
} finally {
wl.unlock();
}
}
每當有新提交的議案,都會調(diào)用addCommittedProposal添加到committedLog列表中。
2 Leader的lead流程
在QuorumPeer中,如果是LEADING,
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
2.1 loadData
在lead()方法中,其中比較重要的是zk.loadData(),即調(diào)用LeaderZooKeeperServer的loadData(),該方法就是為了重置sessions和data
當一個新的Leader選舉出來,開始執(zhí)行l(wèi)ead()方法,就會調(diào)用loadData()方法。Server的事務數(shù)據(jù)庫,其實在在運行領導者選舉之前進行了初始化,以便服務器可以為其初始投票選擇其zxid。該zxid通過QuorumPeer#getLastLoggedZxid獲取。
基于上,我們不需要再次對其進行初始化,并且避免了再次加載它的麻煩。 不重新加載它對于承載大型數(shù)據(jù)庫的應用程序尤為重要。
loadData()中,
// 檢查zkDb是否已經(jīng)被初始化
if(zkDb.isInitialized()){
setZxid(zkDb.getDataTreeLastProcessedZxid());
}
else {
setZxid(zkDb.loadDataBase());
}
loadDataBase方法:
將磁盤上的數(shù)據(jù)庫加載到內(nèi)存中,并將事務添加到內(nèi)存中的commitlog中。
調(diào)用FileTxnSnapLog類型的snapLog的restore方法
public long loadDataBase() throws IOException {
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}
可以看見restore的第三個參數(shù)commitProposalPlaybackListener,這里面就會調(diào)用addCommittedProposal()
2.2 啟動LearnerCnxAcceptor線程,接收Followers的請求
LearnerCnxAcceptor:這是Leader的一個內(nèi)部類,Leader就是通過它來與Learner建立連接
LearnerCnxAcceptor中的run方法:

- 在創(chuàng)建Leader實例的時候會初始一個ServerSocket用來與Follower以及Observer通信,端口是quorumAddress即投票端口
- 等待Learner的連接
- 如果有Learner與Leader建立連接成功,設置socket超時時間為initLimit*tickTime,然后通過leader.nodelay這個設置來判斷是否開啟Nagle算法,默認是開啟的。
- 創(chuàng)建一個LearnerHandler實例來處理與Learner的消息并且交互它
LearnerHandler的run方法:
public void run() {
try {
leader.addLearnerHandler(this);
tickOfNextAckDeadline = leader.self.tick.get()
+ leader.self.initLimit + leader.self.syncLimit;
ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
QuorumPacket qp = new QuorumPacket();
// 讀取消息
ia.readRecord(qp, "packet");
// 如果發(fā)送的類型,不是FOLLOWERINFO或者OBSERVERINFO,報錯
if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
LOG.error("First packet " + qp.toString()
+ " is not FOLLOWERINFO or OBSERVERINFO!");
return;
}
byte learnerInfoData[] = qp.getData();
if (learnerInfoData != null) {
if (learnerInfoData.length == 8) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
this.sid = bbsid.getLong();
} else {
LearnerInfo li = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
this.sid = li.getServerid();
this.version = li.getProtocolVersion();
}
} else {
this.sid = leader.followerCounter.getAndDecrement();
}
LOG.info("Follower sid: " + sid + " : info : "
+ leader.self.quorumPeers.get(sid));
if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
}
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// fake the message
leader.waitForEpochAck(this.getSid(), ss);
} else {
byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error(ackEpochPacket.toString()
+ " is not ACKEPOCH");
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
leader.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid();
/* the default to send to the follower */
int packetToSend = Leader.SNAP;
long zxidToSend = 0;
long leaderLastZxid = 0;
/** the packets that the follower needs to get updates from **/
long updates = peerLastZxid;
/* we are sending the diff check if we have proposals in memory to be able to
* send a diff to the
*/
ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
ReadLock rl = lock.readLock();
try {
rl.lock();
final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
LOG.info("Synchronizing with Follower sid: " + sid
+" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
+" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
+" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
// Follower is already sync with us, send empty diff
LOG.info("leader and follower are in sync, zxid=0x{}",
Long.toHexString(peerLastZxid));
packetToSend = Leader.DIFF;
zxidToSend = peerLastZxid;
} else if (proposals.size() != 0) {
LOG.debug("proposal size is {}", proposals.size());
if ((maxCommittedLog >= peerLastZxid)
&& (minCommittedLog <= peerLastZxid)) {
LOG.debug("Sending proposals to follower");
// as we look through proposals, this variable keeps track of previous
// proposal Id.
long prevProposalZxid = minCommittedLog;
// Keep track of whether we are about to send the first packet.
// Before sending the first packet, we have to tell the learner
// whether to expect a trunc or a diff
boolean firstPacket=true;
// If we are here, we can use committedLog to sync with
// follower. Then we only need to decide whether to
// send trunc or not
packetToSend = Leader.DIFF;
zxidToSend = maxCommittedLog;
for (Proposal propose: proposals) {
// skip the proposals the peer already has
if (propose.packet.getZxid() <= peerLastZxid) {
prevProposalZxid = propose.packet.getZxid();
continue;
} else {
// If we are sending the first packet, figure out whether to trunc
// in case the follower has some proposals that the leader doesn't
if (firstPacket) {
firstPacket = false;
// Does the peer have some proposals that the leader hasn't seen yet
if (prevProposalZxid < peerLastZxid) {
// send a trunc message before sending the diff
packetToSend = Leader.TRUNC;
zxidToSend = prevProposalZxid;
updates = zxidToSend;
}
}
queuePacket(propose.packet);
QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
null, null);
queuePacket(qcommit);
}
}
} else if (peerLastZxid > maxCommittedLog) {
LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
Long.toHexString(maxCommittedLog),
Long.toHexString(updates));
packetToSend = Leader.TRUNC;
zxidToSend = maxCommittedLog;
updates = zxidToSend;
} else {
LOG.warn("Unhandled proposal scenario");
}
} else {
// just let the state transfer happen
LOG.debug("proposals is empty");
}
LOG.info("Sending " + Leader.getPacketType(packetToSend));
leaderLastZxid = leader.startForwarding(this, updates);
} finally {
rl.unlock();
}
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
ZxidUtils.makeZxid(newEpoch, 0), null, null);
if (getVersion() < 0x10000) {
oa.writeRecord(newLeaderQP, "packet");
} else {
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
//Need to set the zxidToSend to the latest zxid
if (packetToSend == Leader.SNAP) {
zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
}
oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
bufferedOutput.flush();
/* if we are not truncating or sending a diff just send a snapshot */
if (packetToSend == Leader.SNAP) {
LOG.info("Sending snapshot last zxid of peer is 0x"
+ Long.toHexString(peerLastZxid) + " "
+ " zxid of leader is 0x"
+ Long.toHexString(leaderLastZxid)
+ "sent zxid of db as 0x"
+ Long.toHexString(zxidToSend));
// Dump data to peer
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
}
bufferedOutput.flush();
// Start sending packets
new Thread() {
public void run() {
Thread.currentThread().setName(
"Sender-" + sock.getRemoteSocketAddress());
try {
sendPackets();
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption",e);
}
}
}.start();
/*
* Have to wait for the first ACK, wait until
* the leader is ready, and only then we can
* start processing messages.
*/
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if(qp.getType() != Leader.ACK){
LOG.error("Next packet was supposed to be an ACK");
return;
}
LOG.info("Received NEWLEADER-ACK message from " + getSid());
leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
syncLimitCheck.start();
// now that the ack has been processed expect the syncLimit
sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
/*
* Wait until leader starts up
*/
synchronized(leader.zk){
while(!leader.zk.isRunning() && !this.isInterrupted()){
leader.zk.wait(20);
}
}
// Mutation packets will be queued during the serialize,
// so we need to mark when the peer can actually start
// using the data
//
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (qp.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
}
tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;
ByteBuffer bb;
long sessionId;
int cxid;
int type;
switch (qp.getType()) {
case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ACK from Observer " + this.sid);
}
}
syncLimitCheck.updateAck(qp.getZxid());
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
case Leader.PING:
// Process the touches
ByteArrayInputStream bis = new ByteArrayInputStream(qp
.getData());
DataInputStream dis = new DataInputStream(bis);
while (dis.available() > 0) {
long sess = dis.readLong();
int to = dis.readInt();
leader.zk.touch(sess, to);
}
break;
case Leader.REVALIDATE:
bis = new ByteArrayInputStream(qp.getData());
dis = new DataInputStream(bis);
long id = dis.readLong();
int to = dis.readInt();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeLong(id);
boolean valid = leader.zk.touch(id, to);
if (valid) {
try {
//set the session owner
// as the follower that
// owns the session
leader.zk.setOwner(id, this);
} catch (SessionExpiredException e) {
LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
}
}
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(id)
+ " is valid: "+ valid);
}
dos.writeBoolean(valid);
qp.setData(bos.toByteArray());
queuedPackets.add(qp);
break;
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if(type == OpCode.sync){
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
leader.zk.submitRequest(si);
break;
default:
LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
break;
}
}
} catch (IOException e) {
if (sock != null && !sock.isClosed()) {
LOG.error("Unexpected exception causing shutdown while sock "
+ "still open", e);
//close the socket to make sure the
//other side can see it being close
try {
sock.close();
} catch(IOException ie) {
// do nothing
}
}
} catch (InterruptedException e) {
LOG.error("Unexpected exception causing shutdown", e);
} finally {
LOG.warn("******* GOODBYE "
+ (sock != null ? sock.getRemoteSocketAddress() : "<null>")
+ " ********");
shutdown();
}
}
分析過程:
- 將這個LearnerHandler實例添加到Leader的LearnerHandler列表learners中
- 獲取下一個ACK截止時間,計算方法為:當前tick+initLimit+syncLimit(Leader在一個tickTime周期內(nèi)會跟Learner進行兩次PING的交互,然后這個tick就會自增1)
- 分別獲取自定義jute協(xié)議中的BinaryInputArchive和BinaryOutputArchive以用作數(shù)據(jù)讀寫
- 讀取Learner發(fā)來的數(shù)據(jù)包,判斷是否是FOLLOWERINFO或者OBSERVERINFO,如果都不是,那么直接退出run()方法,因為Learner連接上Leader發(fā)送的第一個數(shù)據(jù)包必須是FOLLOWERINFO或者OBSERVERINFO
- 如果數(shù)據(jù)包的data字段存在的話,那么解析這個數(shù)據(jù)包的data字段,分別獲取serverId、版本號version和投票驗證器版本configVersion,接下來對比Leader和Learner的投票驗證器版本;如果data字段不存在,則以-1再遞減的形式賦予serverId
- 獲取followerInfo以及確定learnerType,接下來就為LearnerHandler注冊JMX服務
-
ZxidUtils.getEpochFromZxid(qp.getZxid());通過數(shù)據(jù)包傳來的zxid獲取到最近一次通過事務投票的epoch即lastAcceptedEpoch,緊接著調(diào)用getEpochToPropose(long sid, long lastAcceptedEpoch)獲取當前集群的領導紀元并計算出集群Leader的zxid(getEpochToPropose這個方法,執(zhí)行方其實只有兩類即LearnerMaster和LearnerHandler,每個執(zhí)行這個方法的線程都會執(zhí)行wait()陷入等待,等待周期為initTime * tickTime,直到投票驗證器通過了獲取集群領導紀元的提案后退出方法棧執(zhí)行后面的流程)
后面會詳細介紹下getEpochToPropose - 判斷協(xié)議版本是否是0x10000,最新版本的是0x10000,然后發(fā)送LEADERINFO數(shù)據(jù)包給Learner并等待接收Learner發(fā)送的ACKEPOCH數(shù)據(jù)包,然后調(diào)用waitForEpochAck(long sid, StateSummary ss)方法等待initLimit * tickTime時間周期內(nèi)領導紀元的ack
- 接下來進入數(shù)據(jù)同步過程,針對3種不同的情況,具體后面再講
- 最新版本將NEWLEADER數(shù)據(jù)包緩存到阻塞隊列queuedPackets中以等待稍后的發(fā)送,老版本直接發(fā)送NEWLEADER數(shù)據(jù)包給Learner;
接下來啟動一個發(fā)送緩存數(shù)據(jù)包的線程,這個線程將循環(huán)從阻塞隊列queuedPackets中獲取緩存的數(shù)據(jù)包并將它發(fā)送給相應的Learner;
然后等待接收Learner對NEWLEADER消息發(fā)送的ACK數(shù)據(jù)包,接收到之后將會調(diào)用waitForNewLeaderAck方法等待initLimit * tickTime時間周期內(nèi)NEWLEADER的ACK; - 啟動同步檢查器syncLimitCheck,設置Socket超時時間為syncLimit * tickTime,然后等待learnerMaster即Leader或者ObserverMaster的Zookeeper服務啟動,接下來再緩存UPTODATE數(shù)據(jù)包到阻塞隊列queuedPackets中等待稍后的發(fā)送
- 循環(huán)處理Learner發(fā)來的數(shù)據(jù)包,首先讀取數(shù)據(jù)包,然后設置新的下一個ACK截止時間tickOfNextAckDeadline并統(tǒng)計數(shù)據(jù)包接收數(shù)量,針對不同類型的數(shù)據(jù)包處理方式也不盡相同,如下:
- ACK:這個是針對事務請求的提案,表示Follower同意當前事務請求的提案,首先會更新同步檢查器syncLimitCheck中提案的處理時間,然后將會由Leader進行事務處理
- PING:這個是處理LearnerMaster主動發(fā)送PING數(shù)據(jù)包給Learner之后,Learner回復的數(shù)據(jù)包,數(shù)據(jù)包中的data字段存儲的是會話信息,然后將會由LearnerMaster延長會話過期時間
- REVALIDATE:這個是用于重新驗證并激活會話的
- REQUEST:這個是處理Learner轉發(fā)的事務請求以及sync請求,因為Learner無權處理事務請求,應該交由Leader處理
2.3 數(shù)據(jù)包發(fā)送線程
// Start sending packets
new Thread() {
public void run() {
Thread.currentThread().setName(
"Sender-" + sock.getRemoteSocketAddress());
try {
sendPackets();
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption",e);
}
}
}.start();
private void sendPackets() throws InterruptedException {
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
while (true) {
try {
QuorumPacket p;
p = queuedPackets.poll();
if (p == null) {
bufferedOutput.flush();
p = queuedPackets.take();
}
if (p == proposalOfDeath) {
// Packet of death!
break;
}
if (p.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (p.getType() == Leader.PROPOSAL) {
syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
}
oa.writeRecord(p, "packet");
} catch (IOException e) {
if (!sock.isClosed()) {
LOG.warn("Unexpected exception at " + this, e);
try {
// this will cause everything to shutdown on
// this learner handler and will help notify
// the learner/observer instantaneously
sock.close();
} catch(IOException ie) {
LOG.warn("Error closing socket for handler " + this, ie);
}
}
break;
}
}
}
- 啟動一個線程專門用來發(fā)送緩存在queuedPackets隊列中的數(shù)據(jù)包
- 循環(huán)從queuedPackets這個隊列中獲取數(shù)據(jù)包,首先會進行直接獲取,如果獲取不到則刷新緩沖區(qū)將數(shù)據(jù)發(fā)送出去并且阻塞直到獲取新的數(shù)據(jù)包為止
- 進行指標統(tǒng)計,如果是事務請求還需要更新同步檢查器syncLimitCheck
- 重置lastZxid并將數(shù)據(jù)包寫到緩沖區(qū),然后統(tǒng)計已發(fā)送的數(shù)據(jù)包
2.4 同步檢查器syncLimitCheck
private class SyncLimitCheck {
private boolean started = false;
private long currentZxid = 0;
private long currentTime = 0;
private long nextZxid = 0;
private long nextTime = 0;
public synchronized void start() {
started = true;
}
public synchronized void updateProposal(long zxid, long time) {
if (!started) {
return;
}
if (currentTime == 0) {
currentTime = time;
currentZxid = zxid;
} else {
nextTime = time;
nextZxid = zxid;
}
}
public synchronized void updateAck(long zxid) {
if (currentZxid == zxid) {
currentTime = nextTime;
currentZxid = nextZxid;
nextTime = 0;
nextZxid = 0;
} else if (nextZxid == zxid) {
LOG.warn("ACK for " + zxid + " received before ACK for " + currentZxid + "!!!!");
nextTime = 0;
nextZxid = 0;
}
}
public synchronized boolean check(long time) {
if (currentTime == 0) {
return true;
} else {
long msDelay = (time - currentTime) / 1000000;
return (msDelay < learnerMaster.syncTimeout());
}
}
}
- updateProposal:LearnerHandler發(fā)送提案時,檢查是否已啟動以及當前已發(fā)送ACK的提案的時間currentTime,如果currentTime等于0,則將這個提案設置為當前提案,否則設置為下一提案
- updateAck:LearnerHandler接收到Learner發(fā)送的ACK時,檢查當前提案是否是剛剛發(fā)送ACK的提案,如果是的話更新當前提案的時間為之前調(diào)用updateProposal方法設置的nextTime 和 nextZxid并重置nextTime 和 nextZxid為0,如果不是的話說明請求處理亂序了,也重置nextTime 和 nextZxid為0
- check:LearnerHandler發(fā)送PING數(shù)據(jù)包給Learner之前會進行提案處理時間的檢查,檢查邏輯就是判斷currentTime是否等于0以及當前時間減去currentTime是否小于同步超時時間syncLimit * tickTime;假如一個提案一直未提交,直到發(fā)送PING數(shù)據(jù)包時發(fā)現(xiàn)超時將關閉當前LearnerHandler(注意:一個tickTime周期內(nèi)發(fā)送兩次PING數(shù)據(jù)包)