zookeeper - session建立(4)

概述

? ? 在講解完zookeeper核心的選舉部分的功能邏輯之后,另外一個我個人覺得需要理解的就是zookeeper的client-server之間的連接的建立過程,因為除了zookeeper各個節(jié)點之間的通信外,另外一大塊就是zookeeper作為server端與client端的交互,包括之前的連接建立以及后續(xù)的各種操作命令入get/set等操作,這個博文專注于講清楚前面部分的概念,后面部分的邏輯會有專門的一篇博文來闡述。

? ? 當(dāng)然由于現(xiàn)在對于zookeeper的訪問已經(jīng)有很多開源的實現(xiàn),排在首位的應(yīng)該就是Curator了(最早介紹這個東西給我的還是我在車來了的同事袁翔,感謝當(dāng)初帶我入門),大部分情況我們使用的都是它封裝的高級API,但是其實如果我們只是使用它的高級API而沒有細究底層,我們還是不知道client-server之間的交互細節(jié)的,所以為了能夠更直觀的知道細節(jié),我就在網(wǎng)上找了一個demo,基于這個demo我們可以開始開始我們的分析了。

zookeeper-api

說明

? ? 上面中我們看到的client通過new ZooKeeper的api創(chuàng)建了server的連接,然后開始的一系列操作,所以我們的源碼分析也從這個地方開始。


zookeeper-session連接


Session建立 - client端

? ? 通過demo我們看出來,client連接zookeeper的server端其實就是創(chuàng)建了zookeeper對象。

????創(chuàng)建zookeeper對象,核心的點是創(chuàng)建了ClientXnxn對象,該對象內(nèi)部包含兩個核心對象,分別是sendThread和eventThread。

????啟動zookeeper對象,實際上是啟動sendThread和eventThread。當(dāng)然我們關(guān)注的是sendThread這部分的工作,基本上建立連接(session的建立)也就是它在玩轉(zhuǎn)的。

? ? 在sendThread當(dāng)中我們需要處理各種連接事件,譬如注冊O(shè)P_CONNECT/OP_WRITE/

OP_READ等相關(guān)事件。


session-client-1

說明:

? ? Zookeeper當(dāng)中主要是創(chuàng)建了ClientXnxn對象并進行啟動,其中ClientCnxn對象內(nèi)部主要對象是兩個線程,分別是是sendThread和eventThread,其中sendThread負責(zé)連接server。


session-client-2

說明:

? ? 很明顯的創(chuàng)建兩個線程的邏輯,一個是sendThread,一個是eventThread。我們關(guān)注sendThread的run部分邏輯。


session-sendThread-1

說明:

? ? sendThread關(guān)聯(lián)的幾個對象包括sessionId,outgoingQueue等。

????sendThread內(nèi)部如果判斷clientCnxnSocket沒有建立連接,就會開始嘗試建立連接。

? ? 我們關(guān)注的應(yīng)該就是建立連接的過程,關(guān)注startConnect部分邏輯。


session-sendThread-2

說明:

? ? 繼續(xù)跟進connect部分的邏輯


session-sendThread-3

說明:

? ? 繼續(xù)跟進registerAndConnect部分的邏輯


session-sendThread-4

說明:

? ? 首先將socket注冊到selector當(dāng)中并關(guān)注OP_CONNECT動作,這樣異步連接成功的過程中就可以捕捉到事件了。

? ? 如果立即連接成功以后就直接進入后續(xù)處理了,關(guān)注一下primeConnection這個動作,在異步連接成功后也會執(zhí)行這個函數(shù)的。


session-sendThread-5

說明:

? ? 連接成功我們開始發(fā)送相關(guān)報文給server端,其中發(fā)送是通過放到outgoing隊列中,有專門的發(fā)送線程負責(zé)發(fā)送。

? ? 其實發(fā)送了兩種報文,但是不知道前面的報文是什么東西,看著像各種watch。

? ? 最后最重要的部分在于connectionPrimed部分操作,其實就是注冊了OP_READ和OP_WRITE事件到selector當(dāng)中了。


session-sendThread-6

說明:

? ? 其實這個run邏輯是在sendThread當(dāng)中執(zhí)行的,我們真正關(guān)心的部分邏輯是在doTransport部分,里面其實是對異步連接成功的處理。


session-sendThread-7

說明:

? ? 進入doTransport的邏輯我們看到了selector的執(zhí)行部分,其中select返回的就是感興趣的事件,我們在registerAndConnect邏輯當(dāng)中注冊了OP_CONNECT事件,所以假設(shè)異步連接成功了那么我們就再次進入了sendThread.primeConnection的邏輯。

? ? 在處理OP_CONNECT事件邏輯,sendThread.primeConnection的邏輯其實就是在發(fā)送package報文。? ??

? ? 在處理OP_READ和OP_WRITE的邏輯,進入的其實是doIO部分的邏輯。


session-sendThread-8

說明:

? ? 處理讀事件也是一件挺有意思的事情,基本上你會看到ByteBuffer的各種用法,這里讀取的邏輯其實很簡單,先讀取4Byte的數(shù)據(jù)長度,然后再讀取剩余的實際報文數(shù)據(jù)。

????incomingBuffer一開始讀取的是報文長度,在readLnegth()內(nèi)部其實就是讀取實際數(shù)據(jù),根據(jù)sock.read(incomingBuffer)獲取報文的長度。


session-sendThread-9

說明:

? ? 處理寫事件,基本上就是發(fā)送報文,細節(jié)沒仔細關(guān)注。


Session建立 - server端


????server端其實就是接收client端的連接,接受連接部分的邏輯似乎有點繞,所以我默認就從server端已經(jīng)接受了連接并開始處理報文的邏輯開始。

? ? 通過整個邏輯的串聯(lián)了解下server對報文請求的處理,其實整個處理過程類似pipeLine的過程,由PrepRequestProcessor、SyncRequestProcessor、FinalRequestProcessor三者進行的串聯(lián)。


session-server-1

說明:

? ? 開始進入處理connect請求部分的邏輯,入口函數(shù)已經(jīng)很明顯了。


session-server-2

說明:

? ? 進入創(chuàng)建session部分的邏輯,注意在這里生成了cnxn對象,session密碼,超時時間等。


session-server-3

說明:

? ? 創(chuàng)建session其實一個異步過程,這了我們生成了一個Request對象,然后提交這個Request對象。


session-server-4

說明:

? ? 首先我們通過PrepRequestProcessor操作進行第一波處理,processRequest操作其實把request提交到一個隊列當(dāng)中submittedRequests當(dāng)中,具體的消費處理邏輯看下一個邏輯代碼。


session-server-5

說明:

? ? 沒錯,這里開始進行第一波處理了,看函數(shù)就是PrepRequestProcessor進行處理,具體處理邏輯往后繼續(xù)看。


session-server-6

說明:

? ? 其實這個地方我們基本上知道了zookeeper處理請求的核心邏輯代碼,我們只是現(xiàn)在關(guān)心session的create事件而已。


session-server-7

說明:

? ? 這里我們看到createSession部分的邏輯,繼續(xù)關(guān)注pRequest2Txn邏輯。


session-server-8

說明:

? ? 我們將進行下一步下一步處理,至于nextProcessor從哪里來的呢,可以看下一個截圖。

其實nextProcessor其實是syncProcessor。


session-server-9

說明:

? ? 基本上可以看出來了,PrepRequestProcessor、SyncRequestProcessor、FinalRequestProcessor。


session-server-10

說明:

? ? 只是把任務(wù)簡單的提交了另外一個queue當(dāng)中,也就是queuedRequests當(dāng)中。


session-server-11

說明:

? ? take任務(wù)繼續(xù)下一步處理,這個還在SyncRequestProcessor當(dāng)中,我們關(guān)注其實是flush動作,繼續(xù)看下圖的代碼。



session-server-12

說明:

? ? 其實flush里面最后還是將任務(wù)提交到給FinalRequestProcessor進行處理。


session-server-13

說明:

? ? 進入zks.processTxn的邏輯,這部分代碼其實很多,所以只截取了其中一部分。


session-server-14

說明:

? ? 把session加入到全局session當(dāng)中。


session-server-15

說明:

? ? 真正完成session初始的入口函數(shù)。

session-server-16

說明:

? ? 一開始通過serverCnxnFactory.registerConnection將session注冊到server端,將session創(chuàng)建的結(jié)果發(fā)送回client端。

session-server-17

說明:

? ? 在server端維持新建的session對象,但是我暫時也不知道干嘛。我們在創(chuàng)建ServerCnxnFactory的過程中會生成server端負責(zé)accept連接,這部分到時候后面再繼續(xù)補充。


參考文獻

使用ZooKeeper Java API編程

Zookeeper源碼分析之二Session建立

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容