canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議;mysql master收到dump請(qǐng)求,開始推送binary log給slave(也就是canal);canal解析binary log對(duì)象(原始為byte流)。
mysql主備復(fù)制實(shí)現(xiàn)
從上層來(lái)看,復(fù)制分成三步:
- master將改變記錄到二進(jìn)制日志(binary log)中(這些記錄叫做二進(jìn)制日志事件,binary log events,可以通過(guò)show binlog events進(jìn)行查看);
- slave將master的binary log events拷貝到它的中繼日志(relay log);
- slave重做中繼日志中的事件,將改變反映它自己的數(shù)據(jù)。
Binlog獲取詳解
Binlog發(fā)送接收流程,流程如下圖所示:

首先,我們需要偽造一個(gè)slave,向master注冊(cè),這樣master才會(huì)發(fā)送binlog event。注冊(cè)很簡(jiǎn)單,就是向master發(fā)送COM_REGISTER_SLAVE命令,帶上slave相關(guān)信息。這里需要注意,因?yàn)樵贛ySQL的replication topology中,都需要使用一個(gè)唯一的server id來(lái)區(qū)別標(biāo)示不同的server實(shí)例,所以這里我們偽造的slave也需要一個(gè)唯一的server id。
接著實(shí)現(xiàn)binlog的dump。MySQL只支持一種binlog dump方式,也就是指定binlog filename + position,向master發(fā)送COM_BINLOG_DUMP命令。在發(fā)送dump命令的時(shí)候,我們可以指定flag為BINLOG_DUMP_NON_BLOCK,這樣master在沒有可發(fā)送的binlog event之后,就會(huì)返回一個(gè)EOF package。不過(guò)通常對(duì)于slave來(lái)說(shuō),一直把連接掛著可能更好,這樣能更及時(shí)收到新產(chǎn)生的binlog event。
Dump命令包圖如下所示:

如上圖所示,在報(bào)文中塞入binlogPosition和binlogFileName即可讓master從相應(yīng)的位置發(fā)送binlog event。
關(guān)于binlog event的細(xì)節(jié),請(qǐng)參照我另一篇文章。 binlog詳解
canal結(jié)構(gòu)

說(shuō)明:
- server代表一個(gè)canal運(yùn)行實(shí)例,對(duì)應(yīng)于一個(gè)jvm,也可以理解為一個(gè)進(jìn)程
- instance對(duì)應(yīng)于一個(gè)數(shù)據(jù)隊(duì)列 (1個(gè)server對(duì)應(yīng)1..n個(gè)instance),每一個(gè)數(shù)據(jù)隊(duì)列可以理解為一個(gè)數(shù)據(jù)庫(kù)實(shí)例。
Server設(shè)計(jì)

server代表了一個(gè)canal的運(yùn)行實(shí)例,為了方便組件化使用,特意抽象了Embeded(嵌入式) / Netty(網(wǎng)絡(luò)訪問(wèn))的兩種實(shí)現(xiàn)
- Embeded : 對(duì)latency和可用性都有比較高的要求,自己又能hold住分布式的相關(guān)技術(shù)(比如failover)
- Netty : 基于netty封裝了一層網(wǎng)絡(luò)協(xié)議,由canal server保證其可用性,采用的pull模型,當(dāng)然latency會(huì)稍微打點(diǎn)折扣,不過(guò)這個(gè)也視情況而定。(阿里系的notify和metaq,典型的push/pull模型,目前也逐步的在向pull模型靠攏,push在數(shù)據(jù)量大的時(shí)候會(huì)有一些問(wèn)題)
Instance設(shè)計(jì)

instance代表了一個(gè)實(shí)際運(yùn)行的數(shù)據(jù)隊(duì)列,包括了EventPaser,EventSink,EventStore等組件。
抽象了CanalInstanceGenerator,主要是考慮配置的管理方式:
manager方式: 和你自己的內(nèi)部web console/manager系統(tǒng)進(jìn)行對(duì)接。(目前主要是公司內(nèi)部使用,Otter采用這種方式)
spring方式:基于spring xml + properties進(jìn)行定義,構(gòu)建spring配置.
下面是canalServer和instance如何運(yùn)行
canalServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {
public CanalInstance generate(String destination) {
Canal canal = canalConfigClient.findCanal(destination);
// 此處省略部分代碼 大致邏輯是設(shè)置canal一些屬性
CanalInstanceWithManager instance = new CanalInstanceWithManager(canal, filter) {
protected CanalHAController initHaController() {
HAMode haMode = parameters.getHaMode();
if (haMode.isMedia()) {
return new MediaHAController(parameters.getMediaGroup(),
parameters.getDbUsername(),
parameters.getDbPassword(),
parameters.getDefaultDatabaseName());
} else {
return super.initHaController();
}
}
protected void startEventParserInternal(CanalEventParser parser, boolean isGroup) {
//大致邏輯是 設(shè)置支持的類型
//初始化設(shè)置MysqlEventParser的主庫(kù)信息,這處抽象不好,目前只支持mysql
}
};
return instance;
}
});
canalServer.start(); //啟動(dòng)canalServer
canalServer.start(destination);//啟動(dòng)對(duì)應(yīng)instance
this.clientIdentity = new ClientIdentity(destination, pipeline.getParameters().getMainstemClientId(), filter);
canalServer.subscribe(clientIdentity);// 發(fā)起一次訂閱,當(dāng)監(jiān)聽到instance配置時(shí),調(diào)用generate方法注入新的instance
instance模塊:
- eventParser (數(shù)據(jù)源接入,模擬slave協(xié)議和master進(jìn)行交互,協(xié)議解析)
- eventSink (Parser和Store鏈接器,進(jìn)行數(shù)據(jù)過(guò)濾,加工,分發(fā)的工作)
- eventStore (數(shù)據(jù)存儲(chǔ))
- metaManager (增量訂閱&消費(fèi)信息管理器)
EventParser設(shè)計(jì)
大致過(guò)程:

整個(gè)parser過(guò)程大致可分為幾步:
Connection獲取上一次解析成功的位置 (如果第一次啟動(dòng),則獲取初始指定的位置或者是當(dāng)前數(shù)據(jù)庫(kù)的binlog位點(diǎn))
Connection建立鏈接,發(fā)送BINLOG_DUMP指令
// 0. write command number
// 1. write 4 bytes bin-log position to start at
// 2. write 2 bytes bin-log flags
// 3. write 4 bytes server id of the slave
// 4. write bin-log file name
Mysql開始推送Binaly Log
接收到的Binaly Log的通過(guò)Binlog parser進(jìn)行協(xié)議解析,補(bǔ)充一些特定信息
// 補(bǔ)充字段名字,字段類型,主鍵信息,unsigned類型處理
傳遞給EventSink模塊進(jìn)行數(shù)據(jù)存儲(chǔ),是一個(gè)阻塞操作,直到存儲(chǔ)成功
存儲(chǔ)成功后,由CanalLogPositionManager定時(shí)記錄Binaly Log位置
EventSink設(shè)計(jì)

說(shuō)明:
- 數(shù)據(jù)過(guò)濾:支持通配符的過(guò)濾模式,表名,字段內(nèi)容等
- 數(shù)據(jù)路由/分發(fā):解決1:n (1個(gè)parser對(duì)應(yīng)多個(gè)store的模式)
- 數(shù)據(jù)歸并:解決n:1 (多個(gè)parser對(duì)應(yīng)1個(gè)store)
- 數(shù)據(jù)加工:在進(jìn)入store之前進(jìn)行額外的處理,比如join
** 數(shù)據(jù)1:n業(yè)務(wù) **
為了合理的利用數(shù)據(jù)庫(kù)資源, 一般常見的業(yè)務(wù)都是按照schema進(jìn)行隔離,然后在mysql上層或者dao這一層面上,進(jìn)行一個(gè)數(shù)據(jù)源路由,屏蔽數(shù)據(jù)庫(kù)物理位置對(duì)開發(fā)的影響,阿里系主要是通過(guò)cobar/tddl來(lái)解決數(shù)據(jù)源路由問(wèn)題。
所以,一般一個(gè)數(shù)據(jù)庫(kù)實(shí)例上,會(huì)部署多個(gè)schema,每個(gè)schema會(huì)有由1個(gè)或者多個(gè)業(yè)務(wù)方關(guān)注
** 數(shù)據(jù)n:1業(yè)務(wù) **
同樣,當(dāng)一個(gè)業(yè)務(wù)的數(shù)據(jù)規(guī)模達(dá)到一定的量級(jí)后,必然會(huì)涉及到水平拆分和垂直拆分的問(wèn)題,針對(duì)這些拆分的數(shù)據(jù)需要處理時(shí),就需要鏈接多個(gè)store進(jìn)行處理,消費(fèi)的位點(diǎn)就會(huì)變成多份,而且數(shù)據(jù)消費(fèi)的進(jìn)度無(wú)法得到盡可能有序的保證。
所以,在一定業(yè)務(wù)場(chǎng)景下,需要將拆分后的增量數(shù)據(jù)進(jìn)行歸并處理,比如按照時(shí)間戳/全局id進(jìn)行排序歸并.
EventStore設(shè)計(jì)
- 目前僅實(shí)現(xiàn)了Memory內(nèi)存模式,后續(xù)計(jì)劃增加本地file存儲(chǔ),mixed混合模式
- 借鑒了Disruptor的RingBuffer的實(shí)現(xiàn)思路
RingBuffer設(shè)計(jì):

定義了3個(gè)cursor
Put : Sink模塊進(jìn)行數(shù)據(jù)存儲(chǔ)的最后一次寫入位置
Get : 數(shù)據(jù)訂閱獲取的最后一次提取位置
Ack : 數(shù)據(jù)消費(fèi)成功的最后一次消費(fèi)位置
借鑒Disruptor的RingBuffer的實(shí)現(xiàn),將RingBuffer拉直來(lái)看:

實(shí)現(xiàn)說(shuō)明:
Put/Get/Ack cursor用于遞增,采用long型存儲(chǔ)
buffer的get操作,通過(guò)取余或者與操作。(與操作: cusor & (size - 1) , size需要為2的指數(shù),效率比較高)
HA機(jī)制設(shè)計(jì)
canal的ha分為兩部分,canal server和canal client分別有對(duì)應(yīng)的ha實(shí)現(xiàn)
- canal server: 為了減少對(duì)mysql dump的請(qǐng)求,不同server上的instance要求同一時(shí)間只能有一個(gè)處于running,其他的處于standby狀態(tài).
- canal client: 為了保證有序性,一份instance同一時(shí)間只能由一個(gè)canal client進(jìn)行g(shù)et/ack/rollback操作,否則客戶端接收無(wú)法保證有序。
整個(gè)HA機(jī)制的控制主要是依賴了zookeeper的幾個(gè)特性,watcher和EPHEMERAL節(jié)點(diǎn)(和session生命周期綁定),可以看下我之前zookeeper的相關(guān)文章。
Canal Server:

大致步驟:
- canal server要啟動(dòng)某個(gè)canal instance時(shí)都先向zookeeper進(jìn)行一次嘗試啟動(dòng)判斷 (實(shí)現(xiàn):創(chuàng)建EPHEMERAL節(jié)點(diǎn),誰(shuí)創(chuàng)建成功就允許誰(shuí)啟動(dòng))
- 創(chuàng)建zookeeper節(jié)點(diǎn)成功后,對(duì)應(yīng)的canal server就啟動(dòng)對(duì)應(yīng)的canal instance,沒有創(chuàng)建成功的canal instance就會(huì)處于standby狀態(tài)
- 一旦zookeeper發(fā)現(xiàn)canal server A創(chuàng)建的節(jié)點(diǎn)消失后,立即通知其他的canal server再次進(jìn)行步驟1的操作,重新選出一個(gè)canal server啟動(dòng)instance.
- canal client每次進(jìn)行connect時(shí),會(huì)首先向zookeeper詢問(wèn)當(dāng)前是誰(shuí)啟動(dòng)了canal instance,然后和其建立鏈接,一旦鏈接不可用,會(huì)重新嘗試connect.
Canal Client的方式和canal server方式類似,也是利用zookeeper的搶占EPHEMERAL節(jié)點(diǎn)的方式進(jìn)行控制.