使用canal不消耗數(shù)據(jù)庫(kù)連接監(jiān)聽(tīng)數(shù)據(jù)庫(kù)變更,異步ACK

本文收錄在javaskill.cn中,內(nèi)有完整的JAVA知識(shí)地圖,歡迎訪(fǎng)問(wèn)

canal是什么

摘自項(xiàng)目github

基于數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱&消費(fèi),目前主要支持了mysql

原理相對(duì)比較簡(jiǎn)單:
canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議
mysql master收到dump請(qǐng)求,開(kāi)始推送binary log給slave(也就是canal)
canal解析binary log對(duì)象(原始為byte流)

canal解決了什么問(wèn)題

起源:
早期,阿里巴巴B2B公司因?yàn)榇嬖诤贾莺兔绹?guó)雙機(jī)房部署,存在跨機(jī)房同步的業(yè)務(wù)需求。不過(guò)早期的數(shù)據(jù)庫(kù)同步業(yè)務(wù),主要是基于trigger的方式獲取增量變更,不過(guò)從2010年開(kāi)始,阿里系公司開(kāi)始逐步的嘗試基于數(shù)據(jù)庫(kù)的日志解析,獲取增量變更進(jìn)行同步,由此衍生出了增量訂閱&消費(fèi)的業(yè)務(wù),從此開(kāi)啟了一段新紀(jì)元。

本文在探討什么

canal的基礎(chǔ)操作,本文不再贅述,可參考github上的quick start和client api,包含了demo
本文探討的是,基于canal的流式api的消息分發(fā),以及如何防止消息丟失和重復(fù)處理

什么是流式API

摘自項(xiàng)目github
流式api設(shè)計(jì):

image
  • 每次get操作都會(huì)在meta中產(chǎn)生一個(gè)mark,mark標(biāo)記會(huì)遞增,保證運(yùn)行過(guò)程中mark的唯一性
  • 每次的get操作,都會(huì)在上一次的mark操作記錄的cursor繼續(xù)往后取,如果mark不存在,則在last ack cursor繼續(xù)往后取
  • 進(jìn)行ack時(shí),需要按照mark的順序進(jìn)行數(shù)序ack,不能跳躍ack. ack會(huì)刪除當(dāng)前的mark標(biāo)記,并將對(duì)應(yīng)的mark位置更新為last ack cursor
  • 一旦出現(xiàn)異常情況,客戶(hù)端可發(fā)起rollback情況,重新置位:刪除所有的mark, 清理get請(qǐng)求位置,下次請(qǐng)求會(huì)從last ack cursor繼續(xù)往后取
image.png

關(guān)注點(diǎn)

異步的ack帶來(lái)了更好的性能,也帶來(lái)了一些問(wèn)題
rollback后,mark會(huì)清空,回到上次ack的位置。如果get的速度比ack快,當(dāng)rollback()后,就會(huì)出現(xiàn)重復(fù)消息
本文針對(duì)這個(gè)問(wèn)題,給出一個(gè)較為簡(jiǎn)單的解決方案

思路

mark清空后,再次get,獲取到的batchId會(huì)繼續(xù)遞增(保存在服務(wù)端),但是消息是已經(jīng)處理過(guò)的,此時(shí)我們不希望消息繼續(xù)被分發(fā)或者處理
如何判斷消息是否消費(fèi)過(guò),或者說(shuō),該次數(shù)據(jù)庫(kù)變更,是否已經(jīng)解析過(guò)
1.在業(yè)務(wù)上進(jìn)行判斷
2.更好的方式,通過(guò)當(dāng)前處理的消息在binlog中的位置進(jìn)行判斷

String logFileName = entry.getHeader().getLogfileName();
long offset = entry.getHeader().getLogfileOffset();

使用這兩行代碼,就可以方便的獲取當(dāng)前消息在具體哪個(gè)binlog文件的哪個(gè)位置,輸出類(lèi)似如下

logfileName = mysql-bin.000001,offset = 41919

代碼

測(cè)試代碼,請(qǐng)勿用于生產(chǎn)

    public static void main(String args[]) {
        new Thread(SimpleCanalClient::receiver).start();
        new Thread(SimpleCanalClient::ack).start();
    }

開(kāi)局啟動(dòng)兩個(gè)線(xiàn)程,一個(gè)接收,一個(gè)確認(rèn)

private static void receiver() {
        // 創(chuàng)建鏈接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1;
        int count = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int total = 20;
            while (count < total) {
                count++;
                Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId != -1 && size != 0) {
                    printEntry(message.getEntries(),batchId);
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            }
            STOP.set(true);
            System.out.println("receiver exit");
        } finally {
            connector.disconnect();
        }
    }

接收者,客戶(hù)端連上服務(wù)端后,每秒鐘獲取一次數(shù)據(jù),一次獲取1條,20秒后關(guān)閉
一秒鐘是模擬業(yè)務(wù)處理時(shí)間
調(diào)用了printEntry的業(yè)務(wù)處理方法
注意剛啟動(dòng)時(shí)的rollback(),把上次停止后未提交的數(shù)據(jù)回滾,因?yàn)椴淮_定是否已處理

 private static void printEntry(List<Entry> entries,long batchId) {
        for (Entry entry : entries) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            String logFileName = entry.getHeader().getLogfileName();
            long offset = entry.getHeader().getLogfileOffset();
            long lastOffset = getTagFromRedis(logFileName);
            if(offset<=lastOffset){
                System.out.println("Already processed ! logfileName = "+logFileName+",offset = "+offset+",batchId "+batchId);
            }else{
                System.out.println("processing logfileName = "+logFileName+",offset = "+offset+",batchId "+batchId);
                saveTagToRedis(logFileName,offset);
            }
        }
        BATCH_ID_QUEUE.offer(batchId);
    }

printEntry方法進(jìn)行簡(jiǎn)單的消費(fèi),生產(chǎn)上應(yīng)該在這里分發(fā)給具體的業(yè)務(wù)處理類(lèi)
獲取了最后處理的offset,如果當(dāng)前offset小于最后處理的offset,則不處理
處理完后,把offset存入redis中,(saveTagToRedis)
最后把batchId放到了一個(gè)Queue中,方便順序ack

 private static void saveTagToRedis(String logFileName, long offset) {
        try(Jedis jedis = jedisPool.getResource()){
            jedis.set(logFileName, String.valueOf(offset));
            jedis.append("processed-"+logFileName,String.valueOf(offset)+"\n");
        }
    }

    private static long getTagFromRedis(String logFileName){
        try (Jedis jedis = jedisPool.getResource()){
            String position = jedis.get(logFileName);
            if(StringUtils.isNotBlank(position)){
                return Long.valueOf(position);
            }else{
                return -1L;
            }
        }
    }

保存和獲取最后處理位置的代碼,簡(jiǎn)單的放到了redis中

    private static void ack() {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            while (!STOP.get()) {
                Long batchId = BATCH_ID_QUEUE.poll();
                if(batchId!=null){
                    connector.ack(batchId);
                    System.out.println("batchId "+batchId+" ack");
                }
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException ignored) {
                }
            }
            System.out.println("ack exit");
        } finally {
            connector.disconnect();
        }
    }

ack方法,從queue中獲取最后的batchId(每次停止,未處理的都自動(dòng)釋放了)
模擬網(wǎng)絡(luò)延遲,5秒鐘才能確認(rèn)一個(gè)batchId

測(cè)試

測(cè)試方法:
開(kāi)啟數(shù)據(jù)庫(kù),不斷修改數(shù)據(jù),看日志

第一次啟動(dòng)(前面消費(fèi)都確認(rèn)完),日志如下

"C:\Program Files\Java\jdk-10.0.1\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2018.1.5\lib\idea_rt.jar=57322:C:\Program Files\JetBrains\IntelliJ IDEA 2018.1.5\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\LiYang\IdeaProjects\canal-test\target\classes;C:\Users\LiYang\.m2\repository\com\alibaba\otter\canal.client\1.0.25\canal.client-1.0.25.jar;C:\Users\LiYang\.m2\repository\com\alibaba\otter\canal.protocol\1.0.25\canal.protocol-1.0.25.jar;C:\Users\LiYang\.m2\repository\com\alibaba\otter\canal.common\1.0.25\canal.common-1.0.25.jar;C:\Users\LiYang\.m2\repository\io\netty\netty-all\4.1.6.Final\netty-all-4.1.6.Final.jar;C:\Users\LiYang\.m2\repository\org\apache\zookeeper\zookeeper\3.4.5\zookeeper-3.4.5.jar;C:\Users\LiYang\.m2\repository\org\jboss\netty\netty\3.2.2.Final\netty-3.2.2.Final.jar;C:\Users\LiYang\.m2\repository\com\101tec\zkclient\0.10\zkclient-0.10.jar;C:\Users\LiYang\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\LiYang\.m2\repository\commons-codec\commons-codec\1.9\commons-codec-1.9.jar;C:\Users\LiYang\.m2\repository\com\alibaba\fastjson\1.2.28\fastjson-1.2.28.jar;C:\Users\LiYang\.m2\repository\com\google\guava\guava\18.0\guava-18.0.jar;C:\Users\LiYang\.m2\repository\ch\qos\logback\logback-core\1.1.3\logback-core-1.1.3.jar;C:\Users\LiYang\.m2\repository\ch\qos\logback\logback-classic\1.1.3\logback-classic-1.1.3.jar;C:\Users\LiYang\.m2\repository\org\slf4j\jcl-over-slf4j\1.7.12\jcl-over-slf4j-1.7.12.jar;C:\Users\LiYang\.m2\repository\org\slf4j\slf4j-api\1.7.12\slf4j-api-1.7.12.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-core\3.2.9.RELEASE\spring-core-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-aop\3.2.9.RELEASE\spring-aop-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\aopalliance\aopalliance\1.0\aopalliance-1.0.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-beans\3.2.9.RELEASE\spring-beans-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-context\3.2.9.RELEASE\spring-context-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-expression\3.2.9.RELEASE\spring-expression-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-jdbc\3.2.9.RELEASE\spring-jdbc-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-tx\3.2.9.RELEASE\spring-tx-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-orm\3.2.9.RELEASE\spring-orm-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\com\google\protobuf\protobuf-java\2.6.1\protobuf-java-2.6.1.jar;C:\Users\LiYang\.m2\repository\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;C:\Users\LiYang\.m2\repository\redis\clients\jedis\2.9.0\jedis-2.9.0.jar;C:\Users\LiYang\.m2\repository\org\apache\commons\commons-pool2\2.4.2\commons-pool2-2.4.2.jar com.github.liyang0211.canal.test.SimpleCanalClient
processing logfileName = mysql-bin.000001,offset = 43107,batchId 151
processing logfileName = mysql-bin.000001,offset = 43404,batchId 152
processing logfileName = mysql-bin.000001,offset = 43701,batchId 153
processing logfileName = mysql-bin.000001,offset = 43998,batchId 154
batchId 151 ack
processing logfileName = mysql-bin.000001,offset = 44295,batchId 155
processing logfileName = mysql-bin.000001,offset = 44592,batchId 156
processing logfileName = mysql-bin.000001,offset = 44889,batchId 157
processing logfileName = mysql-bin.000001,offset = 45186,batchId 158
processing logfileName = mysql-bin.000001,offset = 45483,batchId 159
batchId 152 ack
processing logfileName = mysql-bin.000001,offset = 45780,batchId 160
processing logfileName = mysql-bin.000001,offset = 46077,batchId 161
processing logfileName = mysql-bin.000001,offset = 46374,batchId 162
processing logfileName = mysql-bin.000001,offset = 46671,batchId 162
processing logfileName = mysql-bin.000001,offset = 46968,batchId 163
processing logfileName = mysql-bin.000001,offset = 47265,batchId 164
batchId 153 ack
processing logfileName = mysql-bin.000001,offset = 47562,batchId 165
processing logfileName = mysql-bin.000001,offset = 47859,batchId 166
processing logfileName = mysql-bin.000001,offset = 48156,batchId 167
processing logfileName = mysql-bin.000001,offset = 48453,batchId 168
processing logfileName = mysql-bin.000001,offset = 48750,batchId 169
processing logfileName = mysql-bin.000001,offset = 49047,batchId 169
batchId 154 ack
receiver exit
ack exit

Process finished with exit code 0

可以看到,已經(jīng)處理到了batchId 169,由于時(shí)間差,ack才到154就退出了,再次啟動(dòng)服務(wù),日志如下:

"C:\Program Files\Java\jdk-10.0.1\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2018.1.5\lib\idea_rt.jar=57362:C:\Program Files\JetBrains\IntelliJ IDEA 2018.1.5\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\LiYang\IdeaProjects\canal-test\target\classes;C:\Users\LiYang\.m2\repository\com\alibaba\otter\canal.client\1.0.25\canal.client-1.0.25.jar;C:\Users\LiYang\.m2\repository\com\alibaba\otter\canal.protocol\1.0.25\canal.protocol-1.0.25.jar;C:\Users\LiYang\.m2\repository\com\alibaba\otter\canal.common\1.0.25\canal.common-1.0.25.jar;C:\Users\LiYang\.m2\repository\io\netty\netty-all\4.1.6.Final\netty-all-4.1.6.Final.jar;C:\Users\LiYang\.m2\repository\org\apache\zookeeper\zookeeper\3.4.5\zookeeper-3.4.5.jar;C:\Users\LiYang\.m2\repository\org\jboss\netty\netty\3.2.2.Final\netty-3.2.2.Final.jar;C:\Users\LiYang\.m2\repository\com\101tec\zkclient\0.10\zkclient-0.10.jar;C:\Users\LiYang\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\LiYang\.m2\repository\commons-codec\commons-codec\1.9\commons-codec-1.9.jar;C:\Users\LiYang\.m2\repository\com\alibaba\fastjson\1.2.28\fastjson-1.2.28.jar;C:\Users\LiYang\.m2\repository\com\google\guava\guava\18.0\guava-18.0.jar;C:\Users\LiYang\.m2\repository\ch\qos\logback\logback-core\1.1.3\logback-core-1.1.3.jar;C:\Users\LiYang\.m2\repository\ch\qos\logback\logback-classic\1.1.3\logback-classic-1.1.3.jar;C:\Users\LiYang\.m2\repository\org\slf4j\jcl-over-slf4j\1.7.12\jcl-over-slf4j-1.7.12.jar;C:\Users\LiYang\.m2\repository\org\slf4j\slf4j-api\1.7.12\slf4j-api-1.7.12.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-core\3.2.9.RELEASE\spring-core-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-aop\3.2.9.RELEASE\spring-aop-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\aopalliance\aopalliance\1.0\aopalliance-1.0.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-beans\3.2.9.RELEASE\spring-beans-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-context\3.2.9.RELEASE\spring-context-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-expression\3.2.9.RELEASE\spring-expression-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-jdbc\3.2.9.RELEASE\spring-jdbc-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-tx\3.2.9.RELEASE\spring-tx-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\org\springframework\spring-orm\3.2.9.RELEASE\spring-orm-3.2.9.RELEASE.jar;C:\Users\LiYang\.m2\repository\com\google\protobuf\protobuf-java\2.6.1\protobuf-java-2.6.1.jar;C:\Users\LiYang\.m2\repository\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;C:\Users\LiYang\.m2\repository\redis\clients\jedis\2.9.0\jedis-2.9.0.jar;C:\Users\LiYang\.m2\repository\org\apache\commons\commons-pool2\2.4.2\commons-pool2-2.4.2.jar com.github.liyang0211.canal.test.SimpleCanalClient
Already processed ! logfileName = mysql-bin.000001,offset = 44295,batchId 170
Already processed ! logfileName = mysql-bin.000001,offset = 44592,batchId 170
Already processed ! logfileName = mysql-bin.000001,offset = 44889,batchId 170
Already processed ! logfileName = mysql-bin.000001,offset = 45186,batchId 170
Already processed ! logfileName = mysql-bin.000001,offset = 45483,batchId 170
Already processed ! logfileName = mysql-bin.000001,offset = 45780,batchId 170
Already processed ! logfileName = mysql-bin.000001,offset = 46077,batchId 171
Already processed ! logfileName = mysql-bin.000001,offset = 46374,batchId 171
Already processed ! logfileName = mysql-bin.000001,offset = 46671,batchId 171
Already processed ! logfileName = mysql-bin.000001,offset = 46968,batchId 171
Already processed ! logfileName = mysql-bin.000001,offset = 47265,batchId 171
Already processed ! logfileName = mysql-bin.000001,offset = 47562,batchId 171
Already processed ! logfileName = mysql-bin.000001,offset = 47859,batchId 172
Already processed ! logfileName = mysql-bin.000001,offset = 48156,batchId 172
Already processed ! logfileName = mysql-bin.000001,offset = 48453,batchId 172
Already processed ! logfileName = mysql-bin.000001,offset = 48750,batchId 172
Already processed ! logfileName = mysql-bin.000001,offset = 49047,batchId 172
processing logfileName = mysql-bin.000001,offset = 49344,batchId 172
processing logfileName = mysql-bin.000001,offset = 49641,batchId 173
processing logfileName = mysql-bin.000001,offset = 49938,batchId 173
batchId 170 ack
batchId 171 ack
batchId 172 ack
batchId 173 ack
receiver exit

可以看到,44295開(kāi)始一直到49047位置的消息,又拉過(guò)來(lái)了,但是被檢測(cè)到offset比最后處理的offset小,所以不處理
后面幾條處理的,是服務(wù)停掉后,又修改的數(shù)據(jù)庫(kù)
最終都確認(rèn)成功,如果再次打開(kāi)服務(wù),會(huì)回到第一條日志的情況

redis內(nèi)容如下


圖1

圖2

圖2是記錄了所有已處理過(guò)offset,可以看到,并沒(méi)有重復(fù)處理

總結(jié)

原理非常簡(jiǎn)單,canal可以作為mq消息的一個(gè)補(bǔ)償機(jī)制,也給以前需要定時(shí)全表掃描的任務(wù)提供了新的思路,另外可以關(guān)注阿里的otter,也是基于canal的,定位是“分布式數(shù)據(jù)庫(kù)同步系統(tǒng)”,底層是canal

感謝閱讀,如有錯(cuò)漏,歡迎指正

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容