canal.client 源碼分析

一、整體設(shè)計(jì)

1.1 功能概述

封裝與 Canal Server 進(jìn)行交互的客戶端,提供兩種實(shí)現(xiàn)給外部使用:

  • 簡(jiǎn)單連接:直接通過(guò) socket 與 server 進(jìn)行交互,實(shí)現(xiàn)連接、訂閱、批量獲取、提交和回滾等操作。
  • 有 HA 的 Cluster 連接:基于簡(jiǎn)單連接方式進(jìn)行封裝,通過(guò) ZooKeeper 實(shí)現(xiàn) client 端的 HA。

1.2 目錄結(jié)構(gòu)

項(xiàng)目基本結(jié)構(gòu).png

其中 kafka 包和 rocketmq 包實(shí)現(xiàn)了接收到的消息直接發(fā)送到消息隊(duì)列中,目前先重點(diǎn)關(guān)注與服務(wù)端的交互和消息的處理流程,后續(xù)會(huì)重點(diǎn)關(guān)注 kafka 包的實(shí)現(xiàn),為目前生產(chǎn)上 C/S 端分離 ==> 集成 的改造做準(zhǔn)備。

1.3 核心類

核心類類圖.jpg
  • ClientIdentity
    canal client 和 server 交互之間的身份標(biāo)識(shí),目前 clientId 寫死為 <u>1001</u>(目前 canal server 上的一個(gè) instance 只能有一個(gè) client 消費(fèi),clientId 的設(shè)計(jì)是為 1 個(gè) instance 多 client 消費(fèi)模式而預(yù)留的,暫時(shí)不需要理會(huì))。
  • CanalConnector
    SimpleCanalConnector / ClusterCanalConnector:兩種 connector 的實(shí)現(xiàn),simple 針對(duì)的是簡(jiǎn)單的 IP 直連模式,cluster 針對(duì)多 IP 的模式,可依賴 CanalNodeAccessStrategy 進(jìn)行 failover 控制。
  • CanalNodeAccessStrategy
    SimpleNodeAccessStrategy / ClusterNodeAccessStrategy:兩種 failover 的實(shí)現(xiàn),simple 針對(duì)給定的初始 IP 列表進(jìn)行failover選擇,cluster 基于 ZooKeeper 上的 cluster 節(jié)點(diǎn)動(dòng)態(tài)選擇正在運(yùn)行的 canal server。
  • ClientRunningMonitor / ClientRunningListener / ClientRunningData
    client running 相關(guān)控制,主要為解決 client 自身的 failover 機(jī)制。canal client 允許同時(shí)啟動(dòng)多個(gè) canal client,通過(guò) running 機(jī)制,可保證只有一個(gè) client 在工作,其他 client 做為冷備。當(dāng)運(yùn)行中的 client 掛了,running 會(huì)控制讓冷備中的 client 轉(zhuǎn)為工作模式,這樣就可以確保 canal client 也不會(huì)是單點(diǎn)。保證整個(gè)系統(tǒng)的高可用性。ClientRunningData 對(duì)應(yīng) ZooKeeper 上的 /otter/canal/destinations/xxx/1001 節(jié)點(diǎn)數(shù)據(jù)。

二、類設(shè)計(jì)

2.1 CanalConnector

2.1.1 接口介紹

CanalConnector 的作用是與服務(wù)端進(jìn)行交互,支持連接、訂閱、獲取數(shù)據(jù)、回滾、斷開等操作。一個(gè) CanalConnector 對(duì)應(yīng)一個(gè)指定的目標(biāo)庫(kù)(destination),如果需要訂閱多個(gè)目標(biāo)庫(kù)需要?jiǎng)?chuàng)建多個(gè) CanalConnector。

以下是 CanalConnector 接口的方法介紹:

CanalConnector.png
  • void connect():與 Canal 服務(wù)端建立連接。
  • void disconnect():與 Canal 服務(wù)端斷開連接。
  • boolean checkValid():檢查連接是否合法。
    • 連接服務(wù)端失敗,一直沒(méi)有一個(gè)可用的連接時(shí),返回 false
    • 當(dāng)前客戶端在進(jìn)行 running 節(jié)點(diǎn)搶占時(shí),作為備份節(jié)點(diǎn)存在,并非作為工作節(jié)點(diǎn),返回 false
  • void subcribe():訂閱服務(wù)端,可以傳入一個(gè) filter 字符串用于過(guò)濾物理庫(kù)或表。
  • void unsubscribe():取消訂閱服務(wù)端。
  • Message getWithoutAck():不需要指定 position 就可以獲取服務(wù)端數(shù)據(jù),Canal 會(huì)記住此 client 的最新 position。如果是第一次fetch,則會(huì)從 Canal 中保存的最老一條數(shù)據(jù)開始輸出。
    • (int batchSize):從服務(wù)端獲取 batchSize 大小的數(shù)據(jù),有多少取多少,不會(huì)阻塞等待。
    • (int batchSize, int timeout, TimeUnit unit):從服務(wù)端獲取 batchSize 大小的數(shù)據(jù),阻塞等待直到拿夠 batchSize 條記錄或者等待時(shí)間達(dá)到 timeout。
  • Message get():從服務(wù)端獲取訂閱的消費(fèi)數(shù)據(jù),在執(zhí)行 getWithoutAck 之后 自動(dòng)提交確認(rèn)。
    • (int batchSize):從服務(wù)端獲取 batchSize 大小的數(shù)據(jù),有多少取多少,不會(huì)阻塞等待
    • (int batchSize, int timeout, TimeUnit unit):從服務(wù)端獲取 batchSize 大小的數(shù)據(jù),阻塞等待直到拿夠 batchSize 條記錄或者等待時(shí)間達(dá)到 timeout。
  • void ack(int batchId):通過(guò)傳入 batchId 向服務(wù)端確認(rèn)消息已經(jīng)消費(fèi)成功,小于等于此 batchId 的 Message 都會(huì)被確認(rèn)。
  • void rollback(int batchId):基于 batchId 回滾對(duì)應(yīng)的 get/getWithoutAck 請(qǐng)求,重新獲取一次數(shù)據(jù)。

2.2.2 接口實(shí)現(xiàn)

CanalConnector 接口有兩種實(shí)現(xiàn),分別是 SimpleCanalConnector 和 ClusterCanalConnector。前者負(fù)責(zé)通過(guò)指定地址直接連接服務(wù)端,后者則可以通過(guò)指定一個(gè)地址列表或是 ZK 地址來(lái)連接高可用的服務(wù)端集群。

2.2.2.1 SimpleCanalConnector

該類負(fù)責(zé)實(shí)現(xiàn)與服務(wù)端的交互邏輯,包括與服務(wù)端進(jìn)行握手、認(rèn)證、發(fā)送與接收數(shù)據(jù)包等,以及自身運(yùn)行狀態(tài)的控制。下面以 connect 方法為例,舉例分析該類與服務(wù)端的交互流程。

SimpleCanalConnector#connect 時(shí)序圖.png

首先,程序會(huì)執(zhí)行 waitClientRunning() 檢測(cè)自己當(dāng)前的角色,如果自己是工作節(jié)點(diǎn),那么繼續(xù)運(yùn)行,如果是備用節(jié)點(diǎn)則線程掛起,等待角色切換,如果是單節(jié)點(diǎn)直連則直接設(shè)置自己為運(yùn)行狀態(tài)并返回。

接下來(lái),程序會(huì)執(zhí)行 doConnect() 方法開始執(zhí)行真正的連接過(guò)程。

  • 第一步,建立 socket 連接:開啟一個(gè) SocketChannel,并設(shè)置 socket 的連接超時(shí)時(shí)間,然后與服務(wù)端 TCP 三次握手建立連接。
  • 第二步,與服務(wù)端進(jìn)行握手認(rèn)證:讀取并解析服務(wù)端發(fā)送的報(bào)文,其中報(bào)文頭包含了協(xié)議的版本、類型等,報(bào)文體包含了握手所需要的信息,如壓縮方式、認(rèn)證需要的 seed 等。調(diào)用加密工具類,用 seed 加密傳入的 password,隨后生成一個(gè) ClientAuth 類,設(shè)置 username、password 密文等信息,并將它組裝為一個(gè)客戶端認(rèn)證報(bào)文,發(fā)送給服務(wù)端。接收并解析服務(wù)端的響應(yīng)報(bào)文,判斷是否認(rèn)證成功,若成功則繼續(xù)設(shè)置當(dāng)前狀態(tài)為已連接(connected = true),返回本地 socket 地址對(duì)象,若失敗則拋出異常。

連接建立后,隨后的 subscribe/unsubscribe、get/getWithoutAck、ack/rollback 等操作無(wú)需再執(zhí)行握手認(rèn)證,這些操作實(shí)際上大同小異,無(wú)非是根據(jù)傳入?yún)?shù),向服務(wù)端發(fā)送一個(gè)相應(yīng)的報(bào)文(指定報(bào)文類型、構(gòu)建報(bào)文體),然后解析響應(yīng)報(bào)文,返回或執(zhí)行后續(xù)操作。客戶端操作、報(bào)文頭、報(bào)文類型的對(duì)應(yīng)關(guān)系如下:

方法名 報(bào)文類型 報(bào)文體格式類 含義
doConnect PacketType.HandShake
PacketType.CLIENTAUTHENTICATION
PacketType.Ack
-
ClientAuth
Ack
握手
客戶端認(rèn)證
服務(wù)端確認(rèn)
subscribe PacketType.SUBSCRIPTION Sub 訂閱
unsubscribe PacketType.UNSUBSCRIPTION Unsub 取消訂閱
get/getWithoutAck PacketType.GET Get 獲取數(shù)據(jù)
ack PacketType.CLIENTACK ClientAck 客戶端確認(rèn)
rollback PacketType.CLIENTROLLBACK ClientRollback 客戶端回滾

2.2.2.2 ClusterCanalConnector

1.4 核心類 中所介紹,ClusterCanalConnector 基于 SimpleCanalConnector、CanalNodeAccessStrategy 以及 impl.running 包內(nèi)的類來(lái)實(shí)現(xiàn)客戶端連接 HA 的服務(wù)端以及客戶端自身的 HA。

ClusterCanalConnector 內(nèi)部維護(hù)了一個(gè) currentConnector 對(duì)象,這是 SimpleCanalConnector 的一個(gè)實(shí)例,并且是當(dāng)前正在工作的實(shí)例。類似后者,該類同樣實(shí)現(xiàn)了 CanalConnector 與用于和服務(wù)端交互的方法,其實(shí)現(xiàn)方式為內(nèi)部調(diào)用 currentConnecotr 的對(duì)象方法來(lái)真正地交互,而它自己只負(fù)責(zé)維護(hù) currentConnector 即可。

該類支持 SimpleNodeAccessStrategy 和 ClusterNodeAccessStrategy 兩種節(jié)點(diǎn)訪問(wèn)策略,前者需要手動(dòng)指定服務(wù)端地址的列表,而后者基于 ZK 注冊(cè)與發(fā)現(xiàn),監(jiān)聽 ZK 上的節(jié)點(diǎn),自動(dòng)更新內(nèi)存中的地址列表和當(dāng)前工作節(jié)點(diǎn)地址。

server-HA-zknodes.png

以 ClusterNodeAccessStrategy 為例,它維護(hù)了 currentAddress 和 runningAddress 兩個(gè)屬性。在構(gòu)造器初始化時(shí),程序會(huì)去訂閱監(jiān)聽 ZK 上的 destination/cluster 節(jié)點(diǎn)和 destination/running 節(jié)點(diǎn),前者包含服務(wù)端地址列表,后者則是正在工作的服務(wù)端節(jié)點(diǎn)地址,分別對(duì)應(yīng)它的兩個(gè)屬性。當(dāng)上述兩個(gè)節(jié)點(diǎn)的數(shù)據(jù)更改或者刪除后,程序會(huì)對(duì)這兩個(gè)屬性做相應(yīng)的變更,具體如下圖所示。

server-HA-connect-workflow.png

三、HA 實(shí)現(xiàn)

3.1 實(shí)現(xiàn)原理

服務(wù)端和客戶端 HA 的實(shí)現(xiàn)都是利用了 ZooKeeper 可作為分布式鎖的特性。

3.1.1 ZooKeeper 分布式鎖原理

  • 臨時(shí)節(jié)點(diǎn)(EPHEMERAL):與客戶端會(huì)話綁定,一旦客戶端會(huì)話失效(如宕機(jī)),這個(gè)客戶端所創(chuàng)建的所有臨時(shí)節(jié)點(diǎn)都會(huì)被移除。

  • Watcher 機(jī)制:

ZooKeeper-Watcher.jpg

ZooKeeper 客戶端向 ZooKeeper 服務(wù)器注冊(cè) watcher 的同時(shí),會(huì)將 watcher 對(duì)象存儲(chǔ)在客戶端的 WatcherManager;ZooKeeper 服務(wù)器觸發(fā) watcher 事件后,會(huì)向客戶端發(fā)送通知,客戶端線程從 WatcherManager 中回調(diào) watcher 執(zhí)行相應(yīng)的功能。

3.1.2 Client HA 具體實(shí)現(xiàn)

client-HA-zknodes.png
  • 1001/cursor:最新的 client 確認(rèn)消費(fèi)的信息

    zk-1001-cursor.png

  • 1001/filter:client 訂閱的過(guò)濾規(guī)則

    zk-1001-filter.png

  • 1001/running:正在運(yùn)行中的 client 地址

    zk-1001-running.png

client 啟動(dòng)時(shí)會(huì)訂閱 running 節(jié)點(diǎn)的變更事件,然后去嘗試創(chuàng)建 running 節(jié)點(diǎn):

  • 創(chuàng)建成功則將自己的信息寫入 running 節(jié)點(diǎn),然后繼續(xù)運(yùn)行;
  • 創(chuàng)建失敗則線程掛起,等待 running 節(jié)點(diǎn)釋放,然后再次嘗試創(chuàng)建 running 節(jié)點(diǎn)。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容