zk通信本文講解客戶端Zookeeper
zk通信分為兩部分來說明,第一部分叫做消息的發(fā)送和接收,第二部分是客戶端和服務端會話的建立。
1.消息的發(fā)送和接收
我們平常構建一個zk客戶端都是如下代碼來構建,
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 1000, new Watcher() {
public void process(WatchedEvent event) {
//do sth
}
});
追蹤這段代碼進去是這樣的:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
//1--1:創(chuàng)建默認watcher
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
//1--2:設置zk服務器地址列表
hostProvider = aHostProvider;
//1--3:創(chuàng)建ClientCnxn,同時初始化outgoingqueue和pendingqueue
cnxn = createConnection(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
//1--4:初始化sendThread和eventThread
cnxn.start();
}
變量說明:
- connectString:ZooKeeper集群的服務器地址列表
- sessionTimeout:最終會引出三個時間設置:和服務端協(xié)商后的sessionTimeout、readTimeout、connectTimeout,服務器端使用協(xié)商后的sessionTimeout:即超過該時間后,客戶端沒有向服務器端發(fā)送任何請求(正常情況下客戶端會每隔一段時間發(fā)送心跳請求,此時服務器端會從新計算客戶端的超時時間點的),則服務器端認為session超時,清理數(shù)據(jù)。此時客戶端的ZooKeeper對象就不再起作用了,需要再重新new一個新的對象了。
客戶端使用connectTimeout、readTimeout分別用于檢測連接超時和讀取超時,一旦超時,則該客戶端認為該服務器不穩(wěn)定,就會從新連接下一個服務器地址。 - watcher:作為ZooKeeper對象一個默認的Watcher,用于接收一些事件通知。如和服務器連接成功的通知、斷開連接的通知、Session過期的通知等。
- canBeReadOnly:是否是只讀客戶端
- aHostProvider:zk服務器列表(就是由connectString得來的)
這段代碼可以做了這幾件事情:
- 1:創(chuàng)建watcher,注意這個watcher是交給了Zookeeper類的ZKWatchManager來管理,這個內容會在這個系列后面講解,本文不涉及到。
- 2:設置zk服務器列表,HostProvider實現(xiàn)
- 3:創(chuàng)建ClientCnxn,同時初始化outgoingqueue和pendingqueue。
- 4:初始化sendThread和eventThread,這兩個線程需要結合上面兩個隊列來說,都會在下面具體說明。
上面兩個隊列和兩個線程都是ClientCnxn這個類中的,所以本文主角ClientCnxn登場。
ClientCnxn是客戶端和服務端底層通信接口,你可以認為是它來主導通信這件事情。打開ClientCnxn類源碼,有下面這幾個變量和內部類需要你關注:


這樣就可以引出通信具體過程:
Zookeeper構建Packet,并放入outgoingQueue隊列中,ClientCnxn中發(fā)送線程sendThread發(fā)送outgoingQueue隊列中內容,同時sendThread也兼顧接受服務端響應(SendThread#readResponse處理服務端響應,clientCnxnSocket有兩個實現(xiàn)類ClientCnxnSocketNIO和ClientCnxnSocketNetty,但是這兩個類最終都是調用到SendThread#readResponse),做的處理就是生成watcherEvent事件放入eventThread中的waitingEvents隊列,等到eventThread處理,同時也放入pendingqueue(放入這個隊列的作用在于接收到響應后和原始我應該要發(fā)送的請求作對比來保證接受消息的順序性)。注意這里的步驟是去掉了客戶端和服務端建立連接過程,因為這部分也比較復雜,在這里展開不太好。
以上就是客戶端通信的具體過程,很繞,接下來我們跟著代碼來走一遍。比如創(chuàng)建一個節(jié)點。
//客戶端調用create在集群內創(chuàng)建node,返回成功創(chuàng)建的路徑
zooKeeper.create("/test","".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
//--ZooKeeper對象負責創(chuàng)建出Request,并交給ClientCnxn來執(zhí)行,ZooKeeper對象再對返回結果進行處理。
public String create(final String path, byte data[], List<ACL> acl,
CreateMode createMode)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath, createMode.isSequential());
EphemeralType.validateTTL(createMode, -1);
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create);
//--1:構建CreateRequest包,
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
if (acl != null && acl.size() == 0) {
throw new KeeperException.InvalidACLException();
}
request.setAcl(acl);
//2:傳遞給服務端
//submitRequest將CreateRequest包轉換成Packet包,調用sendPacket將發(fā)送包放入隊列outgoingQueue,等待發(fā)送線程發(fā)送給服務端,調用線程wait方法等待返回
//3:服務端響應后結果填充到response實體返回給客戶端
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (cnxn.chrootPath == null) {
return response.getPath();
} else {
return response.getPath().substring(cnxn.chrootPath.length());
}
}
你可以按照這個順序來追蹤代碼:
ClientCnxn#submitRequest()->ClientCnxn#queuePacket()
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
// The synchronized block here is for two purpose:
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (state) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
//放入outgoingQueue,等待發(fā)送線程發(fā)送給服務端
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
這里便是放入了outgoingQueue,等待sendThread處理。還記得我們在創(chuàng)建zk客戶端的時候啟動了這個線程,那么我們查看這個線程具體干了什么。
在這里,我先把和這部分無關代碼屏蔽掉,后面再展開(比如zk的心跳檢查,zk的斷線重連等等這里都先屏蔽掉)
@Override
public void run() {
//---在這里賦值給了具體的實現(xiàn)類() ClientCnxnSocketNIO和ClientCnxnSocketNetty
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
//....此部分是屏蔽代碼部分
//sendThread發(fā)送
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
//...屏蔽代碼部分
}
}
synchronized (state) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
}
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Closed, null));
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));
}
clientCnxnSocket有兩個實現(xiàn)類ClientCnxnSocketNIO和ClientCnxnSocketNetty,我們分別進入這兩個類并一直往下面走,你會發(fā)現(xiàn)我們要發(fā)送的packet都是來自之前所說ClientCnxn中的outgoingQueue,這也和前面對應之前,同時最終的發(fā)送給服務端都還是調到ClientCnxn.Packet#createBB,兜來兜去還是回到了ClientCnxn。這個方法便是將從outgoingQueue中封裝好的Packet真正發(fā)送給服務端的方法,源碼如下:
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
接下來我們就可以看到回調這個內容,既然是通信,肯定a發(fā)給b,b接受a,b回復a,a接受b,這里的a就是客戶端,b就是服務端,我們說完了a發(fā)給b,那我們再來看a如何接受b。
前面說到SendThread#readResponse處理服務端響應,但是不止它一個處理回調,因為Nio和Netty處理不一樣,我直接說明答案。
- ClientCnxnSocketNIO的回調是ClientCnxnSocketNIO.doIO(),這個方法不僅是發(fā)送也是接受,
- ClientCnxnSocketNetty的回調ClientCnxnSocketNetty.ZKClientHandler#channelRead0(不同版本這里不一樣,我的是3.5.7,但代碼都一樣)
先看向ClientCnxnSocketNIO:
//1--回調事件和發(fā)送請求
void doIO(List<Packet> pendingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
//1--回調
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount.getAndIncrement();
readLength();
} else if (!initialized) {//1--也就是當前客戶端和服務端之間正在進行會話創(chuàng)建
//1--首先判斷當前的客戶端狀態(tài)是否是“已初始化”
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue,
sendThread.tunnelAuthInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {//1--常規(guī)請求,create,getdata,exist,事件響應也在里面
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
if (sockKey.isWritable()) {
//4--并把Packet放入pendingQueue(CLientCnxn接受服務端返回結果的隊列),以便等待服務端響應后進行相應的處理
//4--p.requestHeader != null&& p.requestHeader.getType() != OpCode.ping&& p.requestHeader.getType() != OpCode.auth
Packet p = findSendablePacket(outgoingQueue,
sendThread.tunnelAuthInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
//1--生成一個客戶端請求序號xid設置到packet請求頭里面去
p.requestHeader.setXid(cnxn.getXid());
}
//---這里才是真正發(fā)送給服務端
p.createBB();
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
//1--這個if判斷在后面pendingQueue取出來的前面是有判斷的(常規(guī)的請求,create,getdata,exist)
//1--也就是在sendThread.readResponse
//1--ClientCnxn的pendingQueue
synchronized (pendingQueue) {
//1---在這里加進去pendingQueue
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
ClientCnxnSocketNetty.ZKClientHandler#channelRead0代碼最終也是調用到sendThread.readResponse(),這里就不再多敘述。
直接看向sendThread.readResponse()
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
//反序列化(解碼)
replyHdr.deserialize(bbia, "header");
//1--這些if判斷在pendingQueue加進去時是有限制的
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");
}
return;
}
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
eventThread.queueEventOfDeath();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x"
+ Long.toHexString(sessionId));
}
return;
}
if (replyHdr.getXid() == -1) {//10--通知消息
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
//10--1:字節(jié)流反序列化WatcherEvent
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
//10--2:處理chrootpath
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
//10--3:還原watchedEvent
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
//10--4:將watcher事件放入eventThread線程等待隊列中,回調watcher
eventThread.queueEvent( we );
return;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia,"token");
zooKeeperSaslClient.respondToServer(request.getToken(),
ClientCnxn.this);
return;
}
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
//保證消息的有序性
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+ replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet );
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
//--相應消息填充到response字段
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
//1--完成watch注冊等邏輯
finishPacket(packet);
}
}
這里你可以看到邏輯就是最終調用eventThread.queueEvent(),這個方法就是生成watcherEvent事件放入eventThread中的waitingEvents隊列,和之前對應上。
2.客戶端和服務端會話的建立
前面先說明了sendThread和eventThread這些線程的作用,你再來看這樣一張圖,會清除一些。

這里的會話建立其實是兩部分內容:
- 客戶端與服務器端的TCP連接
- 在TCP連接的基礎上建立session關聯(lián)
我們看向之前省略掉的代碼:
@Override
public void run() {
//---在這里賦值給了具體的實現(xiàn)類() ClientCnxnSocketNIO和ClientCnxnSocketNetty
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
//--沒有鏈接
//--一旦客戶端開始創(chuàng)建Zookeeper對象,客戶端Zookeeper狀態(tài)state設置為CONNECTING,成功連接上服務器后,客戶端Zookeeper狀態(tài)變更為CONNECTED。
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
//--服務器正在關閉
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
//--1000毫秒后嘗試下一個服務端地址
//--當在sessionTimeout時間內,即還未超時,此時TCP連接斷開
//--服務器端仍然認為該sessionId處于存活狀態(tài)。此時,客戶端會選擇下一個ZooKeeper服務器地址進行TCP連接建立
//--TCP連接建立完成后,拿著之前的sessionId和密碼發(fā)送ConnectRequest請求
//--如果還未到該sessionId的超時時間,則表示自動重連成功,對客戶端用戶是透明的,一切都在背后默默執(zhí)行,ZooKeeper對象是有效的
//--休息的原因在于可能列表中地址都連不上,所以休息一段時間再去鏈接
serverAddress = hostProvider.next(1000);
}
//--發(fā)起鏈接
//-- 沒有tcp鏈接
//--內部還有session鏈接
startConnect(serverAddress);
//--更新最后一次發(fā)送和接受時間
clientCnxnSocket.updateLastSendAndHeard();
}
//--鏈接狀態(tài)
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
if (state == States.AUTH_FAILED) {
//--添加死亡事件
eventThread.queueEventOfDeath();
}
}
}
//--鏈接狀態(tài)判斷讀超時
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
//--斷開狀態(tài)判斷鏈接超時
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
//--不斷發(fā)送ping通知,從當前時間計算ssession過期時間,會話遷移激活
//--會話遷移公式:
//long expireTime = currentTime + sessionTimeout);
//expireTime = (expireTime / expirationInterval + 1) * expirationInterval;
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
//--最后一次發(fā)送數(shù)據(jù)包的時間與當前時間的間隔 clientCnxnSocket.getIdleSend()
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
//--也是放到outgoingQueue等待發(fā)送
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
//sendThread發(fā)送
//---這里是建立tcp鏈接的地方,這里還是不清楚,到底哪里建立了tcp鏈接
//--ClientCnxnSocket負責和服務器創(chuàng)建一個TCP長連接
//---執(zhí)行IO操作,即發(fā)送請求隊列中的請求和讀取服務器端的響應數(shù)據(jù)。---NIO
//---Netty發(fā)送請求
//--- ClientCnxnSocketNIO.doTransport()->doIO()->(這里面有兩個發(fā)送)
//--- ClientCnxnSocketNetty.doTransport()->doWrite()->sendPktOnly()->sendPkt()
//---總而言之,這個方法不僅是發(fā)送請求的,也是接受返回的,也就是回調(ClientCnxnSocketNIO是的,ClientCnxnSocketNetty不是的)
//4回調ClientCnxnSocketNIO.doIO()
//4回調ClientCnxnSocketNetty.channelRead0
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
if (LOG.isDebugEnabled()) {
// closing so this is expected
LOG.debug("An exception was thrown while closing send thread for session 0x"
+ Long.toHexString(getSessionId())
+ " : " + e.getMessage());
}
break;
} else {
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else if (e instanceof SocketException) {
LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
} else {
LOG.warn("Session 0x{} for server {}, unexpected error{}",
Long.toHexString(getSessionId()),
serverAddress,
RETRY_CONN_MSG,
e);
}
// At this point, there might still be new packets appended to outgoingQueue.
// they will be handled in next connection or cleared up if closed.
cleanAndNotifyState();
}
}
}
synchronized (state) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
}
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Closed, null));
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));
}
追蹤下去,看向ClientCnxn.SendThread#primeConnection中:
//--和服務器建立連接成功后,客戶端sendThread發(fā)送ConnectRequest請求,申請建立session關聯(lián),此時服務器端會為該客戶端分配sessionId和密碼,同時開啟對該session是否超時的檢測。
//--把該請求放到outgoingQueue請求隊列中,等待被發(fā)送給服務器。
//--將該請求包裝成網(wǎng)絡I/O層的Packet對象,放入發(fā)送隊列outgoingQueue中去。
void primeConnection() throws IOException {
LOG.info("Socket connection established, initiating session, client: {}, server: {}",
clientCnxnSocket.getLocalSocketAddress(),
clientCnxnSocket.getRemoteSocketAddress());
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
// TODO: here we have the only remaining use of zooKeeper in
// this class. It's to be eliminated!
if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
if (!dataWatches.isEmpty()
|| !existWatches.isEmpty() || !childWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
long setWatchesLastZxid = lastZxid;
while (dataWatchesIter.hasNext()
|| existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList<String>();
List<String> existWatchesBatch = new ArrayList<String>();
List<String> childWatchesBatch = new ArrayList<String>();
int batchLength = 0;
// Note, we may exceed our max length by a bit when we add the last
// watch in the batch. This isn't ideal, but it makes the code simpler.
while (batchLength < SET_WATCHES_MAX_LENGTH) {
final String watch;
if (dataWatchesIter.hasNext()) {
watch = dataWatchesIter.next();
dataWatchesBatch.add(watch);
} else if (existWatchesIter.hasNext()) {
watch = existWatchesIter.next();
existWatchesBatch.add(watch);
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}
SetWatches sw = new SetWatches(setWatchesLastZxid,
dataWatchesBatch,
existWatchesBatch,
childWatchesBatch);
RequestHeader header = new RequestHeader(-8, OpCode.setWatches);
Packet packet = new Packet(header, new ReplyHeader(), sw, null, null);
outgoingQueue.addFirst(packet);
}
}
}
for (AuthData id : authInfo) {
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
OpCode.auth), null, new AuthPacket(0, id.scheme,
id.data), null, null));
}
//--把該請求放到outgoingQueue請求隊列中,等待被發(fā)送給服務器。
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));
clientCnxnSocket.connectionPrimed();
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request sent on "
+ clientCnxnSocket.getRemoteSocketAddress());
}
}
其實也是放入outgoingQueue等待sendThread發(fā)送。那么肯定這個請求和之前我們的消息發(fā)送和接收不一樣,因為這個發(fā)送這個請求時客戶端和服務端并沒有建立鏈接,接著我們看向ClientCnxnSocketNIO#doIO和ClientCnxnSocketNetty.ZKClientHandler#channelRead0的回調中有這樣一個方法:ClientCnxnSocket#readConnectResult,這個是客戶端發(fā)送建立連接時服務端返回的響應,我們看向代碼:
void readConnectResult() throws IOException {
if (LOG.isTraceEnabled()) {
StringBuilder buf = new StringBuilder("0x[");
for (byte b : incomingBuffer.array()) {
buf.append(Integer.toHexString(b) + ",");
}
buf.append("]");
LOG.trace("readConnectResult " + incomingBuffer.remaining() + " "
+ buf.toString());
}
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
//4反序列化,得到ConnectResponse對象
//4我們就可以獲取到服務器端給我們客戶端分配的sessionId和passwd,以及協(xié)商后的sessionTimeOut時間。
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
//4取到Zookeeper服務端分配的會話SessionId
this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
conRsp.getPasswd(), isRO);
}
//--這個函數(shù)跟服務端沒關系
//--如果重新建立TCP連接后,已經達到該sessionId的超時時間了(服務器端就會清理與該sessionId相關的數(shù)據(jù)):
// 則返回給客戶端的sessionTimeout時間為0,sessionid為0,密碼為空字節(jié)數(shù)組。(這是是服務端做的事情,客戶端接收事件才是這個函數(shù),才是下面的事件)
// 客戶端接收到該數(shù)據(jù)后,會判斷協(xié)商后的sessionTimeout時間是否小于等于0,
// 如果小于等于0,則使用eventThread線程先發(fā)出一個KeeperState.Expired事件,通知相應的Watcher,
// 然后結束EventThread線程的循環(huán),開始走向結束。此時ZooKeeper對象就是無效的了,必須要重新new一個新的ZooKeeper對象,分配新的sessionId了。
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
String warnInfo;
warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
+ Long.toHexString(sessionId) + " has expired";
LOG.warn(warnInfo);
throw new SessionExpiredException(warnInfo);
}
if (!readOnly && isRO) {
LOG.error("Read/write client got connected to read-only server");
}
readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
//1--回調方法,通知Hostprovider
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
state = (isRO) ?
States.CONNECTEDREADONLY : States.CONNECTED;
seenRwServerBefore |= !isRO;
LOG.info("Session establishment complete on server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", sessionid = 0x" + Long.toHexString(sessionId)
+ ", negotiated timeout = " + negotiatedSessionTimeout
+ (isRO ? " (READ-ONLY mode)" : ""));
//4通過EventThread發(fā)送一個SyncConnected連接成功事件
KeeperState eventState = (isRO) ?
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
//4代表客戶端與服務器會話創(chuàng)建成功,并將該事件傳遞給EventThread線程
//4ventThread線程收到事件后,會從ClientWatchManager管理器中查詢出對應的Watcher,
//4針對SyncConnected-None事件,那么就直接找出存儲的默認Watcher,然后將其放到
//4 EventThread的watingEvents隊列中去。
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));
}
最終都是調用eventThread.queueEvent(),只是放入的事件類型不一樣.
以上就是本文全部內容。