一、整體設(shè)計(jì)
1.1 功能概述
封裝與 Canal Server 進(jìn)行交互的客戶端,提供兩種實(shí)現(xiàn)給外部使用:
- 簡(jiǎn)單連接:直接通過(guò) socket 與 server 進(jìn)行交互,實(shí)現(xiàn)連接、訂閱、批量獲取、提交和回滾等操作。
- 有 HA 的 Cluster 連接:基于簡(jiǎn)單連接方式進(jìn)行封裝,通過(guò) ZooKeeper 實(shí)現(xiàn) client 端的 HA。
1.2 目錄結(jié)構(gòu)

其中 kafka 包和 rocketmq 包實(shí)現(xiàn)了接收到的消息直接發(fā)送到消息隊(duì)列中,目前先重點(diǎn)關(guān)注與服務(wù)端的交互和消息的處理流程,后續(xù)會(huì)重點(diǎn)關(guān)注 kafka 包的實(shí)現(xiàn),為目前生產(chǎn)上 C/S 端分離 ==> 集成 的改造做準(zhǔn)備。
1.3 核心類

-
ClientIdentity
canal client 和 server 交互之間的身份標(biāo)識(shí),目前 clientId 寫死為 <u>1001</u>(目前 canal server 上的一個(gè) instance 只能有一個(gè) client 消費(fèi),clientId 的設(shè)計(jì)是為 1 個(gè) instance 多 client 消費(fèi)模式而預(yù)留的,暫時(shí)不需要理會(huì))。 -
CanalConnector
SimpleCanalConnector / ClusterCanalConnector:兩種 connector 的實(shí)現(xiàn),simple 針對(duì)的是簡(jiǎn)單的 IP 直連模式,cluster 針對(duì)多 IP 的模式,可依賴 CanalNodeAccessStrategy 進(jìn)行 failover 控制。 -
CanalNodeAccessStrategy
SimpleNodeAccessStrategy / ClusterNodeAccessStrategy:兩種 failover 的實(shí)現(xiàn),simple 針對(duì)給定的初始 IP 列表進(jìn)行failover選擇,cluster 基于 ZooKeeper 上的 cluster 節(jié)點(diǎn)動(dòng)態(tài)選擇正在運(yùn)行的 canal server。 -
ClientRunningMonitor / ClientRunningListener / ClientRunningData
client running 相關(guān)控制,主要為解決 client 自身的 failover 機(jī)制。canal client 允許同時(shí)啟動(dòng)多個(gè) canal client,通過(guò) running 機(jī)制,可保證只有一個(gè) client 在工作,其他 client 做為冷備。當(dāng)運(yùn)行中的 client 掛了,running 會(huì)控制讓冷備中的 client 轉(zhuǎn)為工作模式,這樣就可以確保 canal client 也不會(huì)是單點(diǎn)。保證整個(gè)系統(tǒng)的高可用性。ClientRunningData 對(duì)應(yīng) ZooKeeper 上的 /otter/canal/destinations/xxx/1001 節(jié)點(diǎn)數(shù)據(jù)。
二、類設(shè)計(jì)
2.1 CanalConnector
2.1.1 接口介紹
CanalConnector 的作用是與服務(wù)端進(jìn)行交互,支持連接、訂閱、獲取數(shù)據(jù)、回滾、斷開等操作。一個(gè) CanalConnector 對(duì)應(yīng)一個(gè)指定的目標(biāo)庫(kù)(destination),如果需要訂閱多個(gè)目標(biāo)庫(kù)需要?jiǎng)?chuàng)建多個(gè) CanalConnector。
以下是 CanalConnector 接口的方法介紹:

- void connect():與 Canal 服務(wù)端建立連接。
- void disconnect():與 Canal 服務(wù)端斷開連接。
- boolean checkValid():檢查連接是否合法。
- 連接服務(wù)端失敗,一直沒(méi)有一個(gè)可用的連接時(shí),返回 false
- 當(dāng)前客戶端在進(jìn)行 running 節(jié)點(diǎn)搶占時(shí),作為備份節(jié)點(diǎn)存在,并非作為工作節(jié)點(diǎn),返回 false
- void subcribe():訂閱服務(wù)端,可以傳入一個(gè) filter 字符串用于過(guò)濾物理庫(kù)或表。
- void unsubscribe():取消訂閱服務(wù)端。
- Message getWithoutAck():不需要指定 position 就可以獲取服務(wù)端數(shù)據(jù),Canal 會(huì)記住此 client 的最新 position。如果是第一次fetch,則會(huì)從 Canal 中保存的最老一條數(shù)據(jù)開始輸出。
- (int batchSize):從服務(wù)端獲取 batchSize 大小的數(shù)據(jù),有多少取多少,不會(huì)阻塞等待。
- (int batchSize, int timeout, TimeUnit unit):從服務(wù)端獲取 batchSize 大小的數(shù)據(jù),阻塞等待直到拿夠 batchSize 條記錄或者等待時(shí)間達(dá)到 timeout。
- Message get():從服務(wù)端獲取訂閱的消費(fèi)數(shù)據(jù),在執(zhí)行 getWithoutAck 之后 自動(dòng)提交確認(rèn)。
- (int batchSize):從服務(wù)端獲取 batchSize 大小的數(shù)據(jù),有多少取多少,不會(huì)阻塞等待
- (int batchSize, int timeout, TimeUnit unit):從服務(wù)端獲取 batchSize 大小的數(shù)據(jù),阻塞等待直到拿夠 batchSize 條記錄或者等待時(shí)間達(dá)到 timeout。
- void ack(int batchId):通過(guò)傳入 batchId 向服務(wù)端確認(rèn)消息已經(jīng)消費(fèi)成功,小于等于此 batchId 的 Message 都會(huì)被確認(rèn)。
- void rollback(int batchId):基于 batchId 回滾對(duì)應(yīng)的 get/getWithoutAck 請(qǐng)求,重新獲取一次數(shù)據(jù)。
2.2.2 接口實(shí)現(xiàn)
CanalConnector 接口有兩種實(shí)現(xiàn),分別是 SimpleCanalConnector 和 ClusterCanalConnector。前者負(fù)責(zé)通過(guò)指定地址直接連接服務(wù)端,后者則可以通過(guò)指定一個(gè)地址列表或是 ZK 地址來(lái)連接高可用的服務(wù)端集群。
2.2.2.1 SimpleCanalConnector
該類負(fù)責(zé)實(shí)現(xiàn)與服務(wù)端的交互邏輯,包括與服務(wù)端進(jìn)行握手、認(rèn)證、發(fā)送與接收數(shù)據(jù)包等,以及自身運(yùn)行狀態(tài)的控制。下面以 connect 方法為例,舉例分析該類與服務(wù)端的交互流程。

首先,程序會(huì)執(zhí)行 waitClientRunning() 檢測(cè)自己當(dāng)前的角色,如果自己是工作節(jié)點(diǎn),那么繼續(xù)運(yùn)行,如果是備用節(jié)點(diǎn)則線程掛起,等待角色切換,如果是單節(jié)點(diǎn)直連則直接設(shè)置自己為運(yùn)行狀態(tài)并返回。
接下來(lái),程序會(huì)執(zhí)行 doConnect() 方法開始執(zhí)行真正的連接過(guò)程。
- 第一步,建立 socket 連接:開啟一個(gè) SocketChannel,并設(shè)置 socket 的連接超時(shí)時(shí)間,然后與服務(wù)端 TCP 三次握手建立連接。
- 第二步,與服務(wù)端進(jìn)行握手認(rèn)證:讀取并解析服務(wù)端發(fā)送的報(bào)文,其中報(bào)文頭包含了協(xié)議的版本、類型等,報(bào)文體包含了握手所需要的信息,如壓縮方式、認(rèn)證需要的 seed 等。調(diào)用加密工具類,用 seed 加密傳入的 password,隨后生成一個(gè) ClientAuth 類,設(shè)置 username、password 密文等信息,并將它組裝為一個(gè)客戶端認(rèn)證報(bào)文,發(fā)送給服務(wù)端。接收并解析服務(wù)端的響應(yīng)報(bào)文,判斷是否認(rèn)證成功,若成功則繼續(xù)設(shè)置當(dāng)前狀態(tài)為已連接(connected = true),返回本地 socket 地址對(duì)象,若失敗則拋出異常。
連接建立后,隨后的 subscribe/unsubscribe、get/getWithoutAck、ack/rollback 等操作無(wú)需再執(zhí)行握手認(rèn)證,這些操作實(shí)際上大同小異,無(wú)非是根據(jù)傳入?yún)?shù),向服務(wù)端發(fā)送一個(gè)相應(yīng)的報(bào)文(指定報(bào)文類型、構(gòu)建報(bào)文體),然后解析響應(yīng)報(bào)文,返回或執(zhí)行后續(xù)操作。客戶端操作、報(bào)文頭、報(bào)文類型的對(duì)應(yīng)關(guān)系如下:
| 方法名 | 報(bào)文類型 | 報(bào)文體格式類 | 含義 |
|---|---|---|---|
| doConnect | PacketType.HandShake PacketType.CLIENTAUTHENTICATION PacketType.Ack |
- ClientAuth Ack |
握手 客戶端認(rèn)證 服務(wù)端確認(rèn) |
| subscribe | PacketType.SUBSCRIPTION | Sub | 訂閱 |
| unsubscribe | PacketType.UNSUBSCRIPTION | Unsub | 取消訂閱 |
| get/getWithoutAck | PacketType.GET | Get | 獲取數(shù)據(jù) |
| ack | PacketType.CLIENTACK | ClientAck | 客戶端確認(rèn) |
| rollback | PacketType.CLIENTROLLBACK | ClientRollback | 客戶端回滾 |
2.2.2.2 ClusterCanalConnector
如 1.4 核心類 中所介紹,ClusterCanalConnector 基于 SimpleCanalConnector、CanalNodeAccessStrategy 以及 impl.running 包內(nèi)的類來(lái)實(shí)現(xiàn)客戶端連接 HA 的服務(wù)端以及客戶端自身的 HA。
ClusterCanalConnector 內(nèi)部維護(hù)了一個(gè) currentConnector 對(duì)象,這是 SimpleCanalConnector 的一個(gè)實(shí)例,并且是當(dāng)前正在工作的實(shí)例。類似后者,該類同樣實(shí)現(xiàn)了 CanalConnector 與用于和服務(wù)端交互的方法,其實(shí)現(xiàn)方式為內(nèi)部調(diào)用 currentConnecotr 的對(duì)象方法來(lái)真正地交互,而它自己只負(fù)責(zé)維護(hù) currentConnector 即可。
該類支持 SimpleNodeAccessStrategy 和 ClusterNodeAccessStrategy 兩種節(jié)點(diǎn)訪問(wèn)策略,前者需要手動(dòng)指定服務(wù)端地址的列表,而后者基于 ZK 注冊(cè)與發(fā)現(xiàn),監(jiān)聽 ZK 上的節(jié)點(diǎn),自動(dòng)更新內(nèi)存中的地址列表和當(dāng)前工作節(jié)點(diǎn)地址。

以 ClusterNodeAccessStrategy 為例,它維護(hù)了 currentAddress 和 runningAddress 兩個(gè)屬性。在構(gòu)造器初始化時(shí),程序會(huì)去訂閱監(jiān)聽 ZK 上的 destination/cluster 節(jié)點(diǎn)和 destination/running 節(jié)點(diǎn),前者包含服務(wù)端地址列表,后者則是正在工作的服務(wù)端節(jié)點(diǎn)地址,分別對(duì)應(yīng)它的兩個(gè)屬性。當(dāng)上述兩個(gè)節(jié)點(diǎn)的數(shù)據(jù)更改或者刪除后,程序會(huì)對(duì)這兩個(gè)屬性做相應(yīng)的變更,具體如下圖所示。

三、HA 實(shí)現(xiàn)
3.1 實(shí)現(xiàn)原理
服務(wù)端和客戶端 HA 的實(shí)現(xiàn)都是利用了 ZooKeeper 可作為分布式鎖的特性。
3.1.1 ZooKeeper 分布式鎖原理
臨時(shí)節(jié)點(diǎn)(EPHEMERAL):與客戶端會(huì)話綁定,一旦客戶端會(huì)話失效(如宕機(jī)),這個(gè)客戶端所創(chuàng)建的所有臨時(shí)節(jié)點(diǎn)都會(huì)被移除。
Watcher 機(jī)制:

ZooKeeper 客戶端向 ZooKeeper 服務(wù)器注冊(cè) watcher 的同時(shí),會(huì)將 watcher 對(duì)象存儲(chǔ)在客戶端的 WatcherManager;ZooKeeper 服務(wù)器觸發(fā) watcher 事件后,會(huì)向客戶端發(fā)送通知,客戶端線程從 WatcherManager 中回調(diào) watcher 執(zhí)行相應(yīng)的功能。
3.1.2 Client HA 具體實(shí)現(xiàn)

-
1001/cursor:最新的 client 確認(rèn)消費(fèi)的信息
zk-1001-cursor.png -
1001/filter:client 訂閱的過(guò)濾規(guī)則
zk-1001-filter.png -
1001/running:正在運(yùn)行中的 client 地址
zk-1001-running.png
client 啟動(dòng)時(shí)會(huì)訂閱 running 節(jié)點(diǎn)的變更事件,然后去嘗試創(chuàng)建 running 節(jié)點(diǎn):
- 創(chuàng)建成功則將自己的信息寫入 running 節(jié)點(diǎn),然后繼續(xù)運(yùn)行;
- 創(chuàng)建失敗則線程掛起,等待 running 節(jié)點(diǎn)釋放,然后再次嘗試創(chuàng)建 running 節(jié)點(diǎn)。


