前言
在前面介紹了zookeeper server端的啟動(dòng)過(guò)程,現(xiàn)在我們分析zookeeper client啟動(dòng)過(guò)程
創(chuàng)建客戶端連接對(duì)象
一般情況下使用zookeeper原生庫(kù)創(chuàng)建建立的方式如下
Zookeeper zookeeper = new Zookeeper(connectionString,sessionTimeout,watcher)
我直接看Zookeeper類(lèi)初始化的源代碼
public ZooKeeper(
String connectString,
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly,
HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info(
"Initiating client connection, connectString={} sessionTimeout={} watcher={}",
connectString,
sessionTimeout,
watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
//clientConfig存儲(chǔ)zookeeper客戶端一些可配置屬性的信息
this.clientConfig = clientConfig;
//創(chuàng)建客戶端的watcher的管理器
watchManager = defaultWatchManager();
//設(shè)置watcher管理器默認(rèn)的watcher
watchManager.defaultWatcher = watcher;
//根據(jù)用戶提供的connectString穿件ConnectStringParser對(duì)象,下面會(huì)解析ConnectStringParser對(duì)象的作用
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
//用戶提供的連接信息中可能包含多個(gè)ip地址,那么當(dāng)客戶端去連接zookeeper server的時(shí)候應(yīng)該選擇哪一個(gè)ip去連接呢?通過(guò)hostProvider來(lái)封裝這個(gè)邏輯
hostProvider = aHostProvider;
//客戶端的連接對(duì)象,這個(gè)對(duì)象是實(shí)現(xiàn)客戶端連接服務(wù)端的核心,在下面我們會(huì)詳細(xì)解析
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this,
watchManager,
getClientCnxnSocket(),
canBeReadOnly);
//啟動(dòng)客戶端相關(guān)的一些線程
cnxn.start();
}
ConnectStringParser
我們先解析下ConnectStringParser這個(gè)類(lèi)

從上圖我們可以看出ConnectStringParser類(lèi)有兩個(gè)重要的屬性chrootPath,serverAddresses。ConnectStringParser就是根據(jù)用戶傳入的連接信息解析出這兩個(gè)屬性的值,chrootPath是用戶傳入的連接信息中包含的路徑信息,比如用戶提供的連接信息是"192.168.11.1:2181,192.168.11.2:2181/tt",那么通過(guò)ConnectStringParser的解析chrootPath=tt,serverAddresses用來(lái)存儲(chǔ)用戶提供的地址和端口對(duì)的解析結(jié)果,同樣是上面的例子,serverAddresses解析的結(jié)果是["192.168.11.1:2181","192.168.11.2:2181"]
現(xiàn)在給出ConnectStringParser解析用戶提供的連接信息的源代碼
public ConnectStringParser(String connectString) {
// parse out chroot, if any
//取得chrootPath的分解符的位置
int off = connectString.indexOf('/');
if (off >= 0) {
//解析出chrootPath
String chrootPath = connectString.substring(off);
// ignore "/" chroot spec, same as null
if (chrootPath.length() == 1) {
this.chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
}
//解析出客戶端連接服務(wù)端的ip:port對(duì)信息
connectString = connectString.substring(0, off);
} else {
this.chrootPath = null;
}
//通過(guò)逗號(hào)分隔符分割出每個(gè)ip:port的連接信息
List<String> hostsList = split(connectString, ",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
try {
//解析出ip和port
String[] hostAndPort = ConfigUtils.getHostAndPort(host);
host = hostAndPort[0];
if (hostAndPort.length == 2) {
port = Integer.parseInt(hostAndPort[1]);
}
} catch (ConfigException e) {
e.printStackTrace();
}
//根據(jù)ip和port創(chuàng)建InetSocketAddress對(duì)象,然后加入serverAddresses中
serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
}
}
HostProvider
通過(guò)對(duì)ConnectStringParser的解析,我們知道用戶可能會(huì)提供多個(gè)連接服務(wù)端的IP:Port,那么客戶端應(yīng)該選擇哪一個(gè)去連接服務(wù)端呢?這個(gè)就是HostProvider的工作了。HostProvider默認(rèn)實(shí)現(xiàn)是StaticHostProvider

上圖是StaticHostProvider屬性圖,紅框框出來(lái)的三個(gè)屬性serverAddresses,lastIndex,currentIndex是StaticHostProvider實(shí)現(xiàn)選擇一個(gè)服務(wù)器進(jìn)行服務(wù)端連接的核心。StaticHostProvider的next()方法向外部提供了選取一個(gè)服務(wù)器進(jìn)行連接的封裝
next()
public InetSocketAddress next(long spinDelay) {
boolean needToSleep = false;
InetSocketAddress addr;
synchronized (this) {
//reconfigMode是zookeeper為了server端連接的負(fù)債均衡而設(shè)計(jì)的一個(gè)功能
if (reconfigMode) {
addr = nextHostInReconfigMode();
if (addr != null) {
currentIndex = serverAddresses.indexOf(addr);
return resolve(addr);
}
//tried all servers and couldn't connect
reconfigMode = false;
needToSleep = (spinDelay > 0);
}
//更新currentIndex
++currentIndex;
//如果currentIndex和服務(wù)器列表長(zhǎng)度一樣大,那么重置currentIndex為0
if (currentIndex == serverAddresses.size()) {
currentIndex = 0;
}
//從服務(wù)器列表中獲取一個(gè)服務(wù)器
addr = serverAddresses.get(currentIndex);
//判斷是不是需要sleep 一會(huì)
needToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0);
if (lastIndex == -1) {
// We don't want to sleep on the first ever connect attempt.
//初始化lastIndex為0,如果一開(kāi)始lastIndex就設(shè)置成0而不是-1那么會(huì)導(dǎo)致第一次連接時(shí)候needToSleep就是true,這樣顯然不合適
lastIndex = 0;
}
}
if (needToSleep) {
try {
//休眠一會(huì)會(huì),當(dāng)服務(wù)器列表中的所有的服務(wù)器都被連接一遍之后,再次去連接服務(wù)器的時(shí)候需要休眠一會(huì)(個(gè)人理解:既然所有的服務(wù)器連接都沒(méi)連接上,那么服務(wù)端可能在忙著什么事情,等一會(huì)給服務(wù)器端一些喘息的機(jī)會(huì))
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
//返回服務(wù)器ip信息
return resolve(addr);
}
ClientCnxn
客戶端連接對(duì)象,保存了客戶端連接服務(wù)端的信息,包含的屬性比較多,我們選擇幾個(gè)進(jìn)行注釋

我們看下ClientCnxn最終的構(gòu)造方法
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZooKeeper zooKeeper,
ClientWatchManager watcher,
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
//創(chuàng)建SendThread線程,SendThread負(fù)責(zé)IO的處理,clientCnxnSocket在默認(rèn)情況下的實(shí)現(xiàn)是ClientCnxnSocketNIO
sendThread = new SendThread(clientCnxnSocket);
//創(chuàng)建EventThread用來(lái)處理各種watcher關(guān)注的事件
eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
initRequestTimeout();
}
ClientCnxn.start()
ClientCnxn.start會(huì)啟動(dòng)SendThread和EventThread
public void start() {
sendThread.start();
eventThread.start();
}
SendThread
SendThread負(fù)責(zé)處理客戶端的所有IO,我們看下它的run方法
SendThread.run
public void run() {
//設(shè)置clientCnxnSocket的sessionId,outgoingQueue屬性
//注意當(dāng)?shù)谝淮谓⑦B接的時(shí)候由于服務(wù)端的sessionId還沒(méi)有生成,所以為默認(rèn)的0
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
//客戶端向服務(wù)端發(fā)送心跳的頻率,默認(rèn)是10s,但是為了debug我把這個(gè)參數(shù)設(shè)置的很大
final int MAX_SEND_PING_INTERVAL = 10000000; //10 seconds
InetSocketAddress serverAddress = null;
//注意這個(gè)這里是while(xxx),如果連接正常,那么會(huì)一直執(zhí)行下面的邏輯
while (state.isAlive()) {
try {
//如果客戶端還沒(méi)有和服務(wù)端建立連接,那么進(jìn)入建立連接流程
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
//通過(guò)hostProvider去服務(wù)器列表中獲取一個(gè)服務(wù)進(jìn)行連接
serverAddress = hostProvider.next(1000);
}
//建立到服務(wù)端的socket連接,下面會(huì)給出具體的源碼
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
//如果已經(jīng)建立了到服務(wù)端的連接
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent) {
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
if (state == States.AUTH_FAILED) {
eventThread.queueEventOfDeath();
}
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
//統(tǒng)計(jì)連接的耗時(shí)
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
//如果to<0說(shuō)明操作超時(shí)拋出異常
if (to <= 0) {
String warnInfo = String.format(
"Client session timed out, have not heard from server in %dms for session id 0x%s",
clientCnxnSocket.getIdleRecv(),
Long.toHexString(sessionId));
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
//如果已經(jīng)建立了到服務(wù)端的連接,下面是下一次發(fā)送心跳信息到服務(wù)端的時(shí)間點(diǎn)
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2
- clientCnxnSocket.getIdleSend()
- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
//如果timeTNextPing<=0說(shuō)明發(fā)送的心跳的時(shí)間到了,亦或者客戶端已經(jīng)過(guò)了MAX_SEND_PING_INTERVAL這么久都沒(méi)有發(fā)送任何消息到服務(wù)端,在上述兩種情況下都需要發(fā)送心跳信息到服務(wù)端
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
//如果state == States.CONNECTEDREADONLY,看下面的英文解釋
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
//客戶端處理各種IO事件,這個(gè)我們后面會(huì)詳細(xì)解析
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
// closing so this is expected
LOG.warn(
"An exception was thrown while closing send thread for session 0x{}.",
Long.toHexString(getSessionId()),
e);
break;
} else {
LOG.warn(
"Session 0x{} for sever {}, Closing socket connection. "
+ "Attempting reconnect except it is a SessionExpiredException.",
Long.toHexString(getSessionId()),
serverAddress,
e);
// At this point, there might still be new packets appended to outgoingQueue.
// they will be handled in next connection or cleared up if closed.
cleanAndNotifyState();
}
}
}
//代碼到這步,說(shuō)明客戶端和服務(wù)端的連接出現(xiàn)了異常
synchronized (state) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
}
clientCnxnSocket.close();
if (state.isAlive()) {
//向客戶端事件處理線程發(fā)現(xiàn)服務(wù)端連接斷開(kāi)信息
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
}
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
ZooTrace.logTraceMessage(
LOG,
ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
}
客戶端到服務(wù)端的socket連接
客戶端建立到服務(wù)端的socket發(fā)生在ClientCnxnSocket.connect中
void connect(InetSocketAddress addr) throws IOException {
//創(chuàng)建客戶端socketChannel
SocketChannel sock = createSock();
try {
//socketChannel向selector注冊(cè)O(shè)P_CONNECT時(shí)間,同時(shí)
//socketChannel向遠(yuǎn)程服務(wù)器發(fā)起連接請(qǐng)求
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to {}", addr);
sock.close();
throw e;
}
//連接初始化標(biāo)識(shí)
initialized = false;
/*
* Reset incomingBuffer
*/
//zookeeper默認(rèn)連接是基于NIO實(shí)現(xiàn),通信消息流分成兩個(gè)部分:消息長(zhǎng)度和消息,消息又分成兩個(gè)部分[消息頭,消息體]
//消息長(zhǎng)度部分固定為4個(gè)字節(jié)大小用來(lái)標(biāo)識(shí)消息體的長(zhǎng)度,lenBuffer就是用來(lái)表示消息長(zhǎng)度的byteBuffer
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
上面客戶端向服務(wù)端發(fā)起了連接請(qǐng)求,之后會(huì)執(zhí)行到ClientCnxnSocketNIO.doTransport方法,這個(gè)方法是客戶端處理IO信息的入口
ClientCnxnSocketNIO.doTransport
void doTransport(
int waitTimeOut,
Queue<Packet> pendingQueue,
ClientCnxn cnxn) throws IOException, InterruptedException {
//等待注冊(cè)監(jiān)聽(tīng)事件的發(fā)生
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
//如果發(fā)生的是OP_CONNECT,那么完成socketChannel的連接
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
//完成了sessionId建立和認(rèn)證等操作,我們?cè)谙旅鏁?huì)詳細(xì)解析
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//如果是發(fā)生的是IO讀寫(xiě)事件執(zhí)行doIO,在下面我們會(huì)詳細(xì)解析
doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
//如果發(fā)送隊(duì)列outgoingQueue有數(shù)據(jù)那么向selector注冊(cè)O(shè)P_WRITE監(jiān)聽(tīng)
//多說(shuō)一句,因?yàn)閦ookeeper NIO一次IO寫(xiě)出去的數(shù)據(jù)量有限制,所以在一次doIO完成后還需要判斷outgoingQueue是不是還有數(shù)據(jù)要寫(xiě),如果有那么就設(shè)置OP_WRITE監(jiān)聽(tīng)
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
//清空事件
selected.clear();
}
當(dāng)客戶端和服務(wù)端建立起socket連接之后,緊接著就是session的建立,
sendThread.primeConnection完成了這一過(guò)程
sendThread.primeConnection
客戶端在完成到服務(wù)端的socket連接建立之后,會(huì)向服務(wù)端發(fā)起建立session會(huì)話的請(qǐng)求,下面就是這一邏輯的實(shí)現(xiàn)
void primeConnection() throws IOException {
LOG.info(
"Socket connection established, initiating session, client: {}, server: {}",
clientCnxnSocket.getLocalSocketAddress(),
clientCnxnSocket.getRemoteSocketAddress());
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
//初始化創(chuàng)建會(huì)話請(qǐng)求
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
// TODO: here we have the only remaining use of zooKeeper in
// this class. It's to be eliminated!
if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
//下面很長(zhǎng)的這段代碼是客戶端處理各種watcher發(fā)送到服務(wù)端的情況
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
List<String> persistentWatches = zooKeeper.getPersistentWatches();
List<String> persistentRecursiveWatches = zooKeeper.getPersistentRecursiveWatches();
if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()
|| !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
Iterator<String> persistentWatchesIter = prependChroot(persistentWatches).iterator();
Iterator<String> persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator();
long setWatchesLastZxid = lastZxid;
while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()
|| persistentWatchesIter.hasNext() || persistentRecursiveWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList<String>();
List<String> existWatchesBatch = new ArrayList<String>();
List<String> childWatchesBatch = new ArrayList<String>();
List<String> persistentWatchesBatch = new ArrayList<String>();
List<String> persistentRecursiveWatchesBatch = new ArrayList<String>();
int batchLength = 0;
// Note, we may exceed our max length by a bit when we add the last
// watch in the batch. This isn't ideal, but it makes the code simpler.
while (batchLength < SET_WATCHES_MAX_LENGTH) {
final String watch;
if (dataWatchesIter.hasNext()) {
watch = dataWatchesIter.next();
dataWatchesBatch.add(watch);
} else if (existWatchesIter.hasNext()) {
watch = existWatchesIter.next();
existWatchesBatch.add(watch);
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
} else if (persistentWatchesIter.hasNext()) {
watch = persistentWatchesIter.next();
persistentWatchesBatch.add(watch);
} else if (persistentRecursiveWatchesIter.hasNext()) {
watch = persistentRecursiveWatchesIter.next();
persistentRecursiveWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}
Record record;
int opcode;
if (persistentWatchesBatch.isEmpty() && persistentRecursiveWatchesBatch.isEmpty()) {
// maintain compatibility with older servers - if no persistent/recursive watchers
// are used, use the old version of SetWatches
record = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);
opcode = OpCode.setWatches;
} else {
record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch,
childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch);
opcode = OpCode.setWatches2;
}
//set watcher 請(qǐng)求的header
RequestHeader header = new RequestHeader(ClientCnxn.SET_WATCHES_XID, opcode);
//把請(qǐng)求頭和請(qǐng)求體封裝成Packet對(duì)象然后放入outgoingQueue中,等待發(fā)送
Packet packet = new Packet(header, new ReplyHeader(), record, null, null);
outgoingQueue.addFirst(packet);
}
}
}
for (AuthData id : authInfo) {
//把客戶端認(rèn)證信息放入outgoingQueue中
outgoingQueue.addFirst(
new Packet(
new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),
null,
new AuthPacket(0, id.scheme, id.data),
null,
null));
}
//最后把連接請(qǐng)求加入到outgoingQueue的頭部,
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
//通過(guò)connectionPrimed向selector注冊(cè)op_read和op_write事件
clientCnxnSocket.connectionPrimed();
LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
}
上面的發(fā)送的連接請(qǐng)求就會(huì)觸發(fā)ClientCnxnSocketNIO.doIO方法
ClientCnxnSocketNIO.doIO
客戶端處理IO事件的方法,這個(gè)方法也是很長(zhǎng)請(qǐng)大家耐心看完,我都耐心的分析完了,我想讀者應(yīng)該更有耐心讀完
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
//從SelectionKey中獲取SocketChannel
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
//如果是可讀事件發(fā)生
if (sockKey.isReadable()) {
//從socket中讀取消息
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
//數(shù)據(jù)讀取完成,設(shè)置byteBuffer狀態(tài)準(zhǔn)備讀
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
//incomingBuffer等于lenBuffer說(shuō)讀取的消息長(zhǎng)度信息
recvCount.getAndIncrement();
//獲取到了消息的長(zhǎng)度,那么就初始化一個(gè)相應(yīng)長(zhǎng)度的bytebuffer,為讀取消息做準(zhǔn)備
readLength();
} else if (!initialized) {
//如果連接還沒(méi)有初始化,說(shuō)明session會(huì)話還沒(méi)建立完成,
//那么通過(guò)readConnectResult來(lái)處理服務(wù)端發(fā)送來(lái)的ConnectResponse,下面我們會(huì)解析
readConnectResult();
//注冊(cè)op_read事件
enableRead();
//下面同樣是根據(jù)outgoingQueue的狀態(tài)設(shè)置op_write信息
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
//下面是一些設(shè)置和清理操作
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
//上面是讀取消息長(zhǎng)度的過(guò)程,下面就是讀取消息體過(guò)程,
//readResponse下面會(huì)詳細(xì)解析
sendThread.readResponse(incomingBuffer);
//重置lenBuffer
lenBuffer.clear();
//設(shè)置incomingBuffer = lenBuffer 為下次讀取做準(zhǔn)備
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
//下面是處理寫(xiě)事件的過(guò)程
if (sockKey.isWritable()) {
//從outgoingQueue中獲取第一個(gè)待發(fā)送的消息
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null)
&& (p.requestHeader.getType() != OpCode.ping)
&& (p.requestHeader.getType() != OpCode.auth)) {
//如果請(qǐng)求不是ping和auth類(lèi)型,那么客戶端為了保證請(qǐng)求按照順序處理,會(huì)在requestHeader中設(shè)置xid,xid在客戶端按照自增的形式產(chǎn)生
p.requestHeader.setXid(cnxn.getXid());
}
//把請(qǐng)求消息對(duì)象轉(zhuǎn)換成byteBuffer,下面會(huì)解析
p.createBB();
}
//把消息通過(guò)socket發(fā)送給服務(wù)端
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
//如果一個(gè)消息被一次性的發(fā)送了,那么從outgoingQueue把這個(gè)消息刪除,如果一次write io操作沒(méi)有把一個(gè)消息寫(xiě)完,那么這個(gè)消息會(huì)繼續(xù)存在outgoingQueue中等待下一次write io 繼續(xù)寫(xiě)出去
sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
//如果發(fā)送的請(qǐng)求不是ping和auth類(lèi)型的,那么這個(gè)請(qǐng)求需要等待服務(wù)端的response,把該請(qǐng)求放入pendingQueue中
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
//如果outgoingQueue中的所有消息都發(fā)送了,那么取消對(duì)op_write的監(jiān)控
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
//簡(jiǎn)單來(lái)說(shuō)就是在初始連接成功后但是很快session超時(shí)了,這個(gè)時(shí)候服務(wù)端會(huì)給客戶端發(fā)送session超時(shí)事件同時(shí)關(guān)閉socket連接,如果與此同時(shí)客戶端發(fā)送消息給服務(wù)端,會(huì)導(dǎo)致TCP的RST狀態(tài)從而導(dǎo)致客戶端收不到session 超時(shí)的消息。故而在連接沒(méi)有完成的情況下initialized=false,客戶端取消對(duì)op_write的監(jiān)聽(tīng)
disableWrite();
} else {
// Just in case
//就像注釋一樣,以防萬(wàn)一,outgoingQueue還有數(shù)據(jù)繼續(xù)注冊(cè)op_write監(jiān)聽(tīng)
enableWrite();
}
}
}
上面分析了doIO的邏輯,還有幾個(gè)小點(diǎn)需要交待,我們先看下消息是如何轉(zhuǎn)化成ByteBuffer的
我們先看消息對(duì)象

對(duì)于發(fā)送端來(lái)說(shuō),Packet類(lèi)有兩個(gè)屬性,請(qǐng)求頭requestHeader和請(qǐng)求體request
這個(gè)兩個(gè)屬性的數(shù)據(jù)會(huì)轉(zhuǎn)化成ByteBuffer類(lèi)型的bb,下面我們就分析這個(gè)過(guò)程
public void createBB() {
try {
//創(chuàng)建輸出流
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
//消息流的前4個(gè)字節(jié)是全部消息的長(zhǎng)度,但是目前還沒(méi)確定,所以先初始化成-1
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
//requestHeader序列化到消息輸出流中,requestHeader會(huì)序列化兩個(gè)屬性:客戶端事物id(xid),請(qǐng)求類(lèi)型碼type,其實(shí)這個(gè)對(duì)象序列化結(jié)果也是固定長(zhǎng)度[ 4(xid) + 4(type) = 8個(gè)字節(jié) ]
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
//如果請(qǐng)求是創(chuàng)建會(huì)話連接,序列化ConnectRequest到消息輸出流
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
//序列化請(qǐng)求到消息輸出流
request.serialize(boa, "request");
}
baos.close();
//把消息輸出流轉(zhuǎn)化成ByteBuffer
this.bb = ByteBuffer.wrap(baos.toByteArray());
//設(shè)置消息的長(zhǎng)度
this.bb.putInt(this.bb.capacity() - 4);
//為寫(xiě)做準(zhǔn)備
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}
}
zookeeper使用自帶的JUTE作為序列化實(shí)現(xiàn),有興趣的可以去研究下
上面解析了消息發(fā)送時(shí)的結(jié)構(gòu),接來(lái)下我們分析客戶端處理會(huì)話創(chuàng)建完成的response
SendThread.readConnectResult
void readConnectResult() throws IOException {
if (LOG.isTraceEnabled()) {
StringBuilder buf = new StringBuilder("0x[");
for (byte b : incomingBuffer.array()) {
buf.append(Integer.toHexString(b)).append(",");
}
buf.append("]");
if (LOG.isTraceEnabled()) {
LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf.toString());
}
}
//通過(guò)ByteBuffer創(chuàng)建輸入流
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
//反序列化ConnectResponse
conRsp.deserialize(bbia, "connect");
// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
//從反序列化的ConnectResponse對(duì)象中獲得sessionId
this.sessionId = conRsp.getSessionId();
//向eventThread發(fā)送連接結(jié)果的通知
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
}
到此zookeeper 完成了客戶端的啟動(dòng),客戶端啟動(dòng)包含了socket連接建立和session建立的過(guò)程,下圖對(duì)上面過(guò)程做了簡(jiǎn)短的概述
