本文收錄在javaskill.cn中,內(nèi)有完整的JAVA知識(shí)地圖,歡迎訪(fǎng)問(wèn)
canal是什么
基于數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱&消費(fèi),目前主要支持了mysql
原理相對(duì)比較簡(jiǎn)單:
canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議
mysql master收到dump請(qǐng)求,開(kāi)始推送binary log給slave(也就是canal)
canal解析binary log對(duì)象(原始為byte流)
canal解決了什么問(wèn)題
起源:
早期,阿里巴巴B2B公司因?yàn)榇嬖诤贾莺兔绹?guó)雙機(jī)房部署,存在跨機(jī)房同步的業(yè)務(wù)需求。不過(guò)早期的數(shù)據(jù)庫(kù)同步業(yè)務(wù),主要是基于trigger的方式獲取增量變更,不過(guò)從2010年開(kāi)始,阿里系公司開(kāi)始逐步的嘗試基于數(shù)據(jù)庫(kù)的日志解析,獲取增量變更進(jìn)行同步,由此衍生出了增量訂閱&消費(fèi)的業(yè)務(wù),從此開(kāi)啟了一段新紀(jì)元。
本文在探討什么
canal的基礎(chǔ)操作,本文不再贅述,可參考github上的quick start和client api,包含了demo
本文探討的是,基于canal的流式api的消息分發(fā),以及如何防止消息丟失和重復(fù)處理
什么是流式API
摘自項(xiàng)目github
流式api設(shè)計(jì):
- 每次get操作都會(huì)在meta中產(chǎn)生一個(gè)mark,mark標(biāo)記會(huì)遞增,保證運(yùn)行過(guò)程中mark的唯一性
- 每次的get操作,都會(huì)在上一次的mark操作記錄的cursor繼續(xù)往后取,如果mark不存在,則在last ack cursor繼續(xù)往后取
- 進(jìn)行ack時(shí),需要按照mark的順序進(jìn)行數(shù)序ack,不能跳躍ack. ack會(huì)刪除當(dāng)前的mark標(biāo)記,并將對(duì)應(yīng)的mark位置更新為last ack cursor
- 一旦出現(xiàn)異常情況,客戶(hù)端可發(fā)起rollback情況,重新置位:刪除所有的mark, 清理get請(qǐng)求位置,下次請(qǐng)求會(huì)從last ack cursor繼續(xù)往后取

關(guān)注點(diǎn)
異步的ack帶來(lái)了更好的性能,也帶來(lái)了一些問(wèn)題
rollback后,mark會(huì)清空,回到上次ack的位置。如果get的速度比ack快,當(dāng)rollback()后,就會(huì)出現(xiàn)重復(fù)消息
本文針對(duì)這個(gè)問(wèn)題,給出一個(gè)較為簡(jiǎn)單的解決方案
思路
mark清空后,再次get,獲取到的batchId會(huì)繼續(xù)遞增(保存在服務(wù)端),但是消息是已經(jīng)處理過(guò)的,此時(shí)我們不希望消息繼續(xù)被分發(fā)或者處理
如何判斷消息是否消費(fèi)過(guò),或者說(shuō),該次數(shù)據(jù)庫(kù)變更,是否已經(jīng)解析過(guò)
1.在業(yè)務(wù)上進(jìn)行判斷
2.更好的方式,通過(guò)當(dāng)前處理的消息在binlog中的位置進(jìn)行判斷
String logFileName = entry.getHeader().getLogfileName();
long offset = entry.getHeader().getLogfileOffset();
使用這兩行代碼,就可以方便的獲取當(dāng)前消息在具體哪個(gè)binlog文件的哪個(gè)位置,輸出類(lèi)似如下
logfileName = mysql-bin.000001,offset = 41919
代碼
測(cè)試代碼,請(qǐng)勿用于生產(chǎn)
public static void main(String args[]) {
new Thread(SimpleCanalClient::receiver).start();
new Thread(SimpleCanalClient::ack).start();
}
開(kāi)局啟動(dòng)兩個(gè)線(xiàn)程,一個(gè)接收,一個(gè)確認(rèn)
private static void receiver() {
// 創(chuàng)建鏈接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1;
int count = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int total = 20;
while (count < total) {
count++;
Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId != -1 && size != 0) {
printEntry(message.getEntries(),batchId);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
STOP.set(true);
System.out.println("receiver exit");
} finally {
connector.disconnect();
}
}
接收者,客戶(hù)端連上服務(wù)端后,每秒鐘獲取一次數(shù)據(jù),一次獲取1條,20秒后關(guān)閉
一秒鐘是模擬業(yè)務(wù)處理時(shí)間
調(diào)用了printEntry的業(yè)務(wù)處理方法
注意剛啟動(dòng)時(shí)的rollback(),把上次停止后未提交的數(shù)據(jù)回滾,因?yàn)椴淮_定是否已處理
private static void printEntry(List<Entry> entries,long batchId) {
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
String logFileName = entry.getHeader().getLogfileName();
long offset = entry.getHeader().getLogfileOffset();
long lastOffset = getTagFromRedis(logFileName);
if(offset<=lastOffset){
System.out.println("Already processed ! logfileName = "+logFileName+",offset = "+offset+",batchId "+batchId);
}else{
System.out.println("processing logfileName = "+logFileName+",offset = "+offset+",batchId "+batchId);
saveTagToRedis(logFileName,offset);
}
}
BATCH_ID_QUEUE.offer(batchId);
}
printEntry方法進(jìn)行簡(jiǎn)單的消費(fèi),生產(chǎn)上應(yīng)該在這里分發(fā)給具體的業(yè)務(wù)處理類(lèi)
獲取了最后處理的offset,如果當(dāng)前offset小于最后處理的offset,則不處理
處理完后,把offset存入redis中,(saveTagToRedis)
最后把batchId放到了一個(gè)Queue中,方便順序ack
private static void saveTagToRedis(String logFileName, long offset) {
try(Jedis jedis = jedisPool.getResource()){
jedis.set(logFileName, String.valueOf(offset));
jedis.append("processed-"+logFileName,String.valueOf(offset)+"\n");
}
}
private static long getTagFromRedis(String logFileName){
try (Jedis jedis = jedisPool.getResource()){
String position = jedis.get(logFileName);
if(StringUtils.isNotBlank(position)){
return Long.valueOf(position);
}else{
return -1L;
}
}
}
保存和獲取最后處理位置的代碼,簡(jiǎn)單的放到了redis中
private static void ack() {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
try {
connector.connect();
connector.subscribe(".*\\..*");
while (!STOP.get()) {
Long batchId = BATCH_ID_QUEUE.poll();
if(batchId!=null){
connector.ack(batchId);
System.out.println("batchId "+batchId+" ack");
}
try {
Thread.sleep(5000);
} catch (InterruptedException ignored) {
}
}
System.out.println("ack exit");
} finally {
connector.disconnect();
}
}
ack方法,從queue中獲取最后的batchId(每次停止,未處理的都自動(dòng)釋放了)
模擬網(wǎng)絡(luò)延遲,5秒鐘才能確認(rèn)一個(gè)batchId
測(cè)試
測(cè)試方法:
開(kāi)啟數(shù)據(jù)庫(kù),不斷修改數(shù)據(jù),看日志
第一次啟動(dòng)(前面消費(fèi)都確認(rèn)完),日志如下
"C:\Program Files\Java\jdk-10.0.1\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2018.1.5\lib\idea_rt.jar=57322:C:\Program Files\JetBrains\IntelliJ IDEA 2018.1.5\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\LiYang\IdeaProjects\canal-test\target\classes;C:\Users\LiYang\.m2\repository\com\alibaba\otter\canal.client\1.0.25\canal.client-1.0.25.jar;C:\Users\LiYang\.m2\repository\com\alibaba\otter\canal.protocol\1.0.25\canal.protocol-1.0.25.jar;C:\Users\LiYang\.m2\repository\com\alibaba\otter\canal.common\1.0.25\canal.common-1.0.25.jar;C:\Users\LiYang\.m2\repository\io\netty\netty-all\4.1.6.Final\netty-all-4.1.6.Final.jar;C:\Users\LiYang\.m2\repository\org\apache\zookeeper\zookeeper\3.4.5\zookeeper-3.4.5.jar;C:\Users\LiYang\.m2\repository\org\jboss\netty\netty\3.2.2.Final\netty-3.2.2.Final.jar;C:\Users\LiYang\.m2\repository\com\101tec\zkclient\0.10\zkclient-0.10.jar;C:\Users\LiYang\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\LiYang\.m2\repository\commons-codec\commons-codec\1.9\commons-codec-1.9.jar;C:\Users\LiYang\.m2\repository\com\alibaba\fastjson\1.2.28\fastjson-1.2.28.jar;C:\Users\LiYang\.m2\repository\com\google\guava\guava\18.0\guava-18.0.jar;C:\Users\LiYang\.m2\repository\ch\qos\logback\logback-core\1.1.3\logback-core-1.1.3.jar;C:\Users\LiYang\.m2\repository\ch\qos\logback\logback-classic\1.1.3\logback-classic-1.1.3.jar;C:\Users\LiYang\.m2\repository\org\slf4j\jcl-over-slf4j\1.7.12\jcl-over-slf4j-1.7.12.jar;C:\Users\LiYang\.m2\repository\org\slf4j\slf4j-api\1.7.12\slf4j-api-1.7.12.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-core\3.2.9.RELEASE\spring-core-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-aop\3.2.9.RELEASE\spring-aop-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\aopalliance\aopalliance\1.0\aopalliance-1.0.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-beans\3.2.9.RELEASE\spring-beans-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-context\3.2.9.RELEASE\spring-context-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-expression\3.2.9.RELEASE\spring-expression-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-jdbc\3.2.9.RELEASE\spring-jdbc-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-tx\3.2.9.RELEASE\spring-tx-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-orm\3.2.9.RELEASE\spring-orm-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\com\google\protobuf\protobuf-java\2.6.1\protobuf-java-2.6.1.jar;C:\Users\LiYang\.m2\repository\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;C:\Users\LiYang\.m2\repository\redis\clients\jedis\2.9.0\jedis-2.9.0.jar;C:\Users\LiYang\.m2\repository\org\apache\commons\commons-pool2\2.4.2\commons-pool2-2.4.2.jar com.github.liyang0211.canal.test.SimpleCanalClient
processing logfileName = mysql-bin.000001,offset = 43107,batchId 151
processing logfileName = mysql-bin.000001,offset = 43404,batchId 152
processing logfileName = mysql-bin.000001,offset = 43701,batchId 153
processing logfileName = mysql-bin.000001,offset = 43998,batchId 154
batchId 151 ack
processing logfileName = mysql-bin.000001,offset = 44295,batchId 155
processing logfileName = mysql-bin.000001,offset = 44592,batchId 156
processing logfileName = mysql-bin.000001,offset = 44889,batchId 157
processing logfileName = mysql-bin.000001,offset = 45186,batchId 158
processing logfileName = mysql-bin.000001,offset = 45483,batchId 159
batchId 152 ack
processing logfileName = mysql-bin.000001,offset = 45780,batchId 160
processing logfileName = mysql-bin.000001,offset = 46077,batchId 161
processing logfileName = mysql-bin.000001,offset = 46374,batchId 162
processing logfileName = mysql-bin.000001,offset = 46671,batchId 162
processing logfileName = mysql-bin.000001,offset = 46968,batchId 163
processing logfileName = mysql-bin.000001,offset = 47265,batchId 164
batchId 153 ack
processing logfileName = mysql-bin.000001,offset = 47562,batchId 165
processing logfileName = mysql-bin.000001,offset = 47859,batchId 166
processing logfileName = mysql-bin.000001,offset = 48156,batchId 167
processing logfileName = mysql-bin.000001,offset = 48453,batchId 168
processing logfileName = mysql-bin.000001,offset = 48750,batchId 169
processing logfileName = mysql-bin.000001,offset = 49047,batchId 169
batchId 154 ack
receiver exit
ack exit
Process finished with exit code 0
可以看到,已經(jīng)處理到了batchId 169,由于時(shí)間差,ack才到154就退出了,再次啟動(dòng)服務(wù),日志如下:
"C:\Program Files\Java\jdk-10.0.1\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2018.1.5\lib\idea_rt.jar=57362:C:\Program Files\JetBrains\IntelliJ IDEA 2018.1.5\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\LiYang\IdeaProjects\canal-test\target\classes;C:\Users\LiYang\.m2\repository\com\alibaba\otter\canal.client\1.0.25\canal.client-1.0.25.jar;C:\Users\LiYang\.m2\repository\com\alibaba\otter\canal.protocol\1.0.25\canal.protocol-1.0.25.jar;C:\Users\LiYang\.m2\repository\com\alibaba\otter\canal.common\1.0.25\canal.common-1.0.25.jar;C:\Users\LiYang\.m2\repository\io\netty\netty-all\4.1.6.Final\netty-all-4.1.6.Final.jar;C:\Users\LiYang\.m2\repository\org\apache\zookeeper\zookeeper\3.4.5\zookeeper-3.4.5.jar;C:\Users\LiYang\.m2\repository\org\jboss\netty\netty\3.2.2.Final\netty-3.2.2.Final.jar;C:\Users\LiYang\.m2\repository\com\101tec\zkclient\0.10\zkclient-0.10.jar;C:\Users\LiYang\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\LiYang\.m2\repository\commons-codec\commons-codec\1.9\commons-codec-1.9.jar;C:\Users\LiYang\.m2\repository\com\alibaba\fastjson\1.2.28\fastjson-1.2.28.jar;C:\Users\LiYang\.m2\repository\com\google\guava\guava\18.0\guava-18.0.jar;C:\Users\LiYang\.m2\repository\ch\qos\logback\logback-core\1.1.3\logback-core-1.1.3.jar;C:\Users\LiYang\.m2\repository\ch\qos\logback\logback-classic\1.1.3\logback-classic-1.1.3.jar;C:\Users\LiYang\.m2\repository\org\slf4j\jcl-over-slf4j\1.7.12\jcl-over-slf4j-1.7.12.jar;C:\Users\LiYang\.m2\repository\org\slf4j\slf4j-api\1.7.12\slf4j-api-1.7.12.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-core\3.2.9.RELEASE\spring-core-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-aop\3.2.9.RELEASE\spring-aop-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\aopalliance\aopalliance\1.0\aopalliance-1.0.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-beans\3.2.9.RELEASE\spring-beans-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-context\3.2.9.RELEASE\spring-context-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-expression\3.2.9.RELEASE\spring-expression-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-jdbc\3.2.9.RELEASE\spring-jdbc-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-tx\3.2.9.RELEASE\spring-tx-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-orm\3.2.9.RELEASE\spring-orm-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\com\google\protobuf\protobuf-java\2.6.1\protobuf-java-2.6.1.jar;C:\Users\LiYang\.m2\repository\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;C:\Users\LiYang\.m2\repository\redis\clients\jedis\2.9.0\jedis-2.9.0.jar;C:\Users\LiYang\.m2\repository\org\apache\commons\commons-pool2\2.4.2\commons-pool2-2.4.2.jar com.github.liyang0211.canal.test.SimpleCanalClient
Already processed ! logfileName = mysql-bin.000001,offset = 44295,batchId 170
Already processed ! logfileName = mysql-bin.000001,offset = 44592,batchId 170
Already processed ! logfileName = mysql-bin.000001,offset = 44889,batchId 170
Already processed ! logfileName = mysql-bin.000001,offset = 45186,batchId 170
Already processed ! logfileName = mysql-bin.000001,offset = 45483,batchId 170
Already processed ! logfileName = mysql-bin.000001,offset = 45780,batchId 170
Already processed ! logfileName = mysql-bin.000001,offset = 46077,batchId 171
Already processed ! logfileName = mysql-bin.000001,offset = 46374,batchId 171
Already processed ! logfileName = mysql-bin.000001,offset = 46671,batchId 171
Already processed ! logfileName = mysql-bin.000001,offset = 46968,batchId 171
Already processed ! logfileName = mysql-bin.000001,offset = 47265,batchId 171
Already processed ! logfileName = mysql-bin.000001,offset = 47562,batchId 171
Already processed ! logfileName = mysql-bin.000001,offset = 47859,batchId 172
Already processed ! logfileName = mysql-bin.000001,offset = 48156,batchId 172
Already processed ! logfileName = mysql-bin.000001,offset = 48453,batchId 172
Already processed ! logfileName = mysql-bin.000001,offset = 48750,batchId 172
Already processed ! logfileName = mysql-bin.000001,offset = 49047,batchId 172
processing logfileName = mysql-bin.000001,offset = 49344,batchId 172
processing logfileName = mysql-bin.000001,offset = 49641,batchId 173
processing logfileName = mysql-bin.000001,offset = 49938,batchId 173
batchId 170 ack
batchId 171 ack
batchId 172 ack
batchId 173 ack
receiver exit
可以看到,44295開(kāi)始一直到49047位置的消息,又拉過(guò)來(lái)了,但是被檢測(cè)到offset比最后處理的offset小,所以不處理
后面幾條處理的,是服務(wù)停掉后,又修改的數(shù)據(jù)庫(kù)
最終都確認(rèn)成功,如果再次打開(kāi)服務(wù),會(huì)回到第一條日志的情況
redis內(nèi)容如下


圖2是記錄了所有已處理過(guò)offset,可以看到,并沒(méi)有重復(fù)處理
總結(jié)
原理非常簡(jiǎn)單,canal可以作為mq消息的一個(gè)補(bǔ)償機(jī)制,也給以前需要定時(shí)全表掃描的任務(wù)提供了新的思路,另外可以關(guān)注阿里的otter,也是基于canal的,定位是“分布式數(shù)據(jù)庫(kù)同步系統(tǒng)”,底層是canal
感謝閱讀,如有錯(cuò)漏,歡迎指正