基于CDC做全鏈路數(shù)據(jù)審計(jì)系統(tǒng)-canal改造(四)

我們都知道canal是CDC的一個(gè)實(shí)現(xiàn),用來監(jiān)控db數(shù)據(jù)變更的。

前言:

在默認(rèn)的情況下,mysql的binlog是不會(huì)記錄執(zhí)行的sql的,即使你設(shè)置了binlog_format=row(要開啟binlog記錄數(shù)據(jù)變更必須設(shè)置該值)情況。所以我們需要開啟另一個(gè)參數(shù)binlog_rows_query_log_events,該值默認(rèn)情況下是關(guān)閉的,我們需要自己手動(dòng)開啟。啟動(dòng)這個(gè)參數(shù),則可在row格式下查看到執(zhí)行的sql語句 。

開啟命令 :

set  binlog_rows_query_log_events = 'on';  //開啟的是當(dāng)前session
set global binlog_rows_query_log_events = 'on'; //開啟的是全局配置
Note :

開啟了該參數(shù)后,對于mysql的寫入性能會(huì)有所影 響,主要是IO這塊,畢竟多寫了一些數(shù)據(jù)到磁盤中,但是性能影響不會(huì)是很大。

查看binlog事件

開啟參數(shù)后,執(zhí)行一些寫入操作,我們就可以查看Mysql里的binlog里的事件(因?yàn)閏anal也是基于這些事件來做的)及內(nèi)容了。
具體操作步驟:
1.獲取binlog文件列表 show binary logs;

2.查看指定binlog文件的內(nèi)容 show binlog events in 'mysql-bin.xxx'; 比如mysql-bin-log.000078,這個(gè)值就是從第一步獲取到的。

3.查看event的里面的sql(Rows_query 事件):


圖片.png

從上圖中們可以看到Rows_query這個(gè)事件里其實(shí)有我們想要的sql了。

mysql事務(wù):

由于Mysql的事件是獨(dú)立分開的,Rows_query事件,ROWDATA事件(具體的數(shù)據(jù)變更)。初看我們是沒有辦法將他們兩個(gè)進(jìn)行關(guān)聯(lián)起來的,當(dāng)時(shí)我在做這塊的時(shí)候其實(shí)也懵逼了半天。后來我仔細(xì)看了一下他的事件流,我發(fā)現(xiàn)其實(shí)每次修改都是一個(gè)事務(wù)。他都會(huì)有事務(wù)的開啟和事務(wù)的結(jié)束,我們可以在里面做文章。
這個(gè)是我對insert,update,delete做的一些測試時(shí)獲取到的一些數(shù)據(jù)。

update:
2020-01-19 21:42:50.973 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONBEGIN,sql:null
2020-01-19 21:42:54.118 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :QUERY,entryType:ROWDATA,sql:UPDATE `rocket`.`user` SET `name` = 'ff' WHERE `id` = 8
2020-01-19 21:42:54.623 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:ROWDATA,sql:null
2020-01-19 21:42:54.824 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONEND,sql:null

insert:
2020-01-19 21:43:54.298 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONBEGIN,sql:null
2020-01-19 21:43:55.921 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :QUERY,entryType:ROWDATA,sql:INSERT INTO `rocket`.`test`(`id`, `user_add_rating1`, `user_add_rating`, `name`) VALUES (9, 'sbdaf', 'daf', 'afds')
2020-01-19 21:43:56.167 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONEND,sql:null

delete:
2020-01-19 21:49:29.105 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONBEGIN,sql:null
2020-01-19 21:49:29.652 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :QUERY,entryType:ROWDATA,sql:DELETE FROM `rocket`.`test` WHERE `id` = 10
2020-01-19 21:49:29.795 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONEND,sql:null

canal :

簡介

canal是阿里用來做數(shù)據(jù)同步用的工具,但是官方的版本,只會(huì)記錄數(shù)據(jù)的變更前和之后的數(shù)據(jù)(與maxwell等工具一樣),他不會(huì)記錄這條數(shù)據(jù)是哪條sql改變的。為什么不記錄呢,因?yàn)閏anal的設(shè)計(jì)是記錄數(shù)據(jù)的變更過程,并不關(guān)心他是哪個(gè)sql的引起。但是我們看了他們的源碼,其實(shí)他們也監(jiān)控了Rows_query 事件的,他們發(fā)到mq也是有這個(gè)事件的,但是,數(shù)據(jù)變更事件和Rows_query 事件兩個(gè)事件是獨(dú)立的事件,他們之間沒有任何關(guān)聯(lián)(mysql本身也沒有做關(guān)聯(lián))。

由于他們之間沒能關(guān)聯(lián),我們拿到這兩個(gè)事件也沒辦法做任何處理。所以,我們在想我們能不能通過某種方法將他們關(guān)聯(lián)起來呢。正如前面講到的mysql事務(wù),他是有開始有結(jié)束的事件的(這些事件是有序的),我們可以在收到sql的事件時(shí)將該sql存在一個(gè)地方,然后再收到ROWDATA事件時(shí)從剛才存的sql地方取出來并設(shè)置到該事件中(因?yàn)閝uery事件先后ROWDATA事件),然后在收到TRANSACTIONBEGIN或者TRANSACTIONEND事件的時(shí)候清空sql,這不就ok了么。

按照這個(gè)思路,我們公司的一個(gè)小伙伴改造了一版,他是在 MemoryEventStoreWithBuffer(后面大體會(huì)講canal流程)里進(jìn)行處理的(不推薦),后面發(fā)現(xiàn)只要數(shù)據(jù)量一大(像我們公司老項(xiàng)目一個(gè)sql 20M...)一下就內(nèi)存溢出了,canal就掛了,后面我看了一下源碼,發(fā)現(xiàn)換一種處理的方式可以更好,換在發(fā)Mq消息的地方做處理(到目前還沒出現(xiàn)內(nèi)存溢出問題)。

調(diào)用關(guān)系(以前記錄的筆記很簡單):

1.解析流程 Parser ----Slink----EventStore
AbstractEventParser (consumeTheEventAndProfilingIfNecessary()方法) —> EntryEventSink —-> MemoryEventStoreWithBuffer

2.Server 調(diào)用關(guān)系:
CanalServerWithEmbedded(get(ClientIdentity clientIdentity, int batchSize) )--->MemoryEventStoreWithBuffer( tryGet() )

tryGet() :
獲取到的是Event,event里面有CanalEntry.Entry entry; 這個(gè)entry的toString方法就是ByteString rawEntry 值(因?yàn)槊看握{(diào)用toString序列化會(huì)化很多時(shí)間,所以將他cache起來效率更高);
如果我們開啟行級(jí)模式并記錄了sql的話,要保證Event里的事件類型是EventType.QUERY,
我們可以通過Event里的 CanalEntry.Entry entry,
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());每獲取的rowChange,通過rowChange來獲取sql:rowChange.getSql(),

3.發(fā)送mq消息:
com.alibaba.otter.canal.deployer.CanalStarter#start -->
com.alibaba.otter.canal.server.CanalMQStarter#start -->
com.alibaba.otter.canal.kafka.CanalKafkaProducer#send(com.alibaba.otter.canal.common.MQProperties.CanalDestination, com.alibaba.otter.canal.protocol.Message, com.alibaba.otter.canal.spi.CanalMQProducer.Callback) (或者rocketMq的send,具體的實(shí)現(xiàn)類)

從上面的流程并配合上源碼,其實(shí)可以發(fā)現(xiàn),MemoryEventStoreWithBuffer是一個(gè)環(huán)型的內(nèi)存queue,雖然可以在里面做處理,但是如果將每條數(shù)據(jù)的變更(一次修改導(dǎo)致幾萬,幾十萬的數(shù)據(jù)變更)都加上一個(gè)sql。這個(gè)內(nèi)存無疑就會(huì)增長很快,一會(huì)兒就溢出了,所以不推薦在此處做修改。

改造

由于我們公司沒有用rocketmq,kafka作為處理Log這塊效率極高,所以我們主要改造kafka的send這塊邏輯,rocketMq修改的原理差不多。

該變量放在CanalKafkaProducer中:

    private Map<String,String> sqlMap = new ConcurrentHashMap<>();

我們看一下CanalKafkaProducer里的send方法

 private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message)
            throws Exception {
        if (!kafkaProperties.getFlatMessage()) {
            List<ProducerRecord> records = new ArrayList<ProducerRecord>();
            if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                Message[] messages = MQMessageUtils.messagePartition(message,
                        canalDestination.getPartitionsNum(),
                        canalDestination.getPartitionHash());
                int length = messages.length;
                for (int i = 0; i < length; i++) {
                    Message messagePartition = messages[i];
                    if (messagePartition != null) {
                        records.add(new ProducerRecord<String, Message>(topicName, i, null, messagePartition));
                    }
                }
            } else {
                final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
                records.add(new ProducerRecord<String, Message>(topicName, partition, null, message));
            }

            produce(topicName, records, false);
        } else {
            // 發(fā)送扁平數(shù)據(jù)json

            setSql(canalDestination,message); // 系統(tǒng)默認(rèn)是扁平數(shù)據(jù)json,我們只處理這塊的邏輯

            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
//            logger.warn("---flatMessages :{}", JSON.toJSONString(flatMessages));

            List<ProducerRecord> records = new ArrayList<>();
            if (flatMessages != null) {
                for (FlatMessage flatMessage : flatMessages) {

                    if("QUERY".equalsIgnoreCase(flatMessage.getType())){ //query event不用發(fā),沒用
                        continue;
                    }

                    flatMessage.setSql(sqlMap.get(canalDestination.getCanalDestination()));

                    if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                        FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                                canalDestination.getPartitionsNum(),
                                canalDestination.getPartitionHash());
                        int length = partitionFlatMessage.length;
                        for (int i = 0; i < length; i++) {
                            FlatMessage flatMessagePart = partitionFlatMessage[i];
                            if (flatMessagePart != null) {
                                records.add(new ProducerRecord<String, String>(topicName,
                                        i,
                                        null,
                                        JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue)));
                            }
                        }
                    } else {
                        final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
                        records.add(new ProducerRecord<String, String>(topicName,
                                partition,
                                null,
                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue)));
                    }

                   /* for (ProducerRecord record : records) {
                        logger.warn("-----key:{},mvn clean install -Dmaven.test.skip -Denv=release:{}", record.key(), record.value());
                    }*/

                    // 每條記錄需要flush
                    produce(topicName, records, true);
                    records.clear();
                }
                clearSql(canalDestination,message);
            }
        }
    }

獲取sql并存儲(chǔ)在一個(gè)變量中

  /**
     * @param message
     */
    private void setSql(MQProperties.CanalDestination canalDestination,Message message) {
        List<CanalEntry.Entry> entries = message.getEntries();
        if (entries != null) {
            for (CanalEntry.Entry k : entries) {
                boolean isTransActionBegin = k.getEntryType().getNumber()
                        == CanalEntry.EntryType.TRANSACTIONBEGIN_VALUE;

                if (isTransActionBegin) {
                    sqlMap.put(canalDestination.getCanalDestination(),"");
                    continue;
                }

                boolean isRowData = k.getEntryType().getNumber() == CanalEntry.EntryType.ROWDATA_VALUE;
                boolean isQueryEventType =
                        k.getHeader().getEventType().getNumber() == CanalEntry.EventType.QUERY_VALUE;
                if (isRowData && isQueryEventType) { //query event
                    try {
                        String sql;
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(k.getStoreValue());
                        sql = rowChange == null ? null : rowChange.getSql();
                        if(StringUtils.isNotBlank(sql) && sql.length() > 2000){
                            sql = sql.substring(0,2000);
                        }
//                        logger.warn("----sql:{}", sql);
                        sqlMap.put(canalDestination.getCanalDestination(), sql == null ? "" : sql);
                        break;
                    } catch (InvalidProtocolBufferException e) {
                        logger.warn("e,", e);
                    }
                }
            }
        }

    }

清除sql變量里的值

private void clearSql(MQProperties.CanalDestination canalDestination,Message message) {
        List<CanalEntry.Entry> entries = message.getEntries();
        if (entries != null) {
            for (CanalEntry.Entry k : entries) {
                boolean isTransActionEnd = k.getEntryType().getNumber()
                        == CanalEntry.EntryType.TRANSACTIONEND_VALUE;

                if (isTransActionEnd) {
                    sqlMap.put(canalDestination.getCanalDestination(),"");
                    break;
                }
            }
        }
    }

從上面是我們已經(jīng)修改過后的代碼,其實(shí)我們主要修改的位置是在
[// 發(fā)送扁平數(shù)據(jù)json] 這塊,因?yàn)橄到y(tǒng)默認(rèn)就是發(fā)送的這種數(shù)據(jù),其他格式我們不用關(guān)心,因?yàn)槲覀兪盏降臄?shù)據(jù)主要是json形式的。

開始我想的是在for (FlatMessage flatMessage : flatMessages) {xxxxx}里面去做sql.當(dāng)一條sql修改的數(shù)據(jù)比較少的情況下沒問題,但是一旦修改的數(shù)據(jù)很多時(shí),這里面是沒有sql事件也就是拿不到sql的(canal每次從內(nèi)存queue里get的數(shù)據(jù)條數(shù)默認(rèn)是50條)。所以經(jīng)過仔細(xì)閱讀源碼發(fā)現(xiàn)其實(shí)我們可以在外層進(jìn)行處理,還是按事務(wù)的開始和結(jié)束時(shí)做處理。 然后清除sql變量值也是每次我們發(fā)送一批的時(shí)候都去判斷一下要不要做清除處理。

其實(shí)canal的整體改造就弄完了,其實(shí)不算復(fù)雜,但是你要改造,首先還是得對canal的整體設(shè)計(jì)有所了解,然后具體去找他們的調(diào)用邏輯,最后去改造他。

我們來看一下改造后收到的數(shù)據(jù)格式:

{
    "data": [
        {
            "id": "14",
            "name": "zhangsan",
            "address": "addreds"
        }
    ],
    "database": "test",
    "es": 1611294851000,
    "id": 3,
    "isDdl": false,
    "mysqlType": {
        "id": "bigint(20)",
        "name": "varchar(200)",
        "address": "varchar(255)"
    },
    "old": [
        {
            "name": "bbbbbb"
        }
    ],
    "pkNames": [
        "id"
    ],
    "sql": "/*@123,ff44cb60bd074b659c7c7087712b8f56@*/ UPDATE user  SET name='zhangsan',\naddress='addreds'  WHERE id=14",
    "sqlType": {
        "id": -5,
        "name": 12,
        "address": 12
    },
    "table": "user",
    "ts": 1611294851425,
    "type": "UPDATE"
}

總結(jié)一下,要想canal拿到sql你需要做的事:
1.查看binlog開啟狀態(tài):
show variables like '%log_bin%' ;
如果沒開啟就需要開啟

2.設(shè)置mysql: set global binlog_format = "ROW" ; 也可以在配置文件修改(推薦):

[mysqld]
binlog_format=Row

3.開啟binlog_rows_query_log_events參數(shù):
set global binlog_rows_query_log_events = 'on';
開啟后要查看是否成功時(shí),你修改幾條數(shù)據(jù),然后通過下面步驟查看:
1>.獲取binlog文件列表 show binary logs;
2>.查看指定binlog文件的內(nèi)容 show binlog events in 'mysql-bin.xxx';

4.定制canal開發(fā),如果自己不想動(dòng)手可以去我的github直接下載:https://github.com/waterlang/canal-data-sql 如果對你有幫助,可以對項(xiàng)目點(diǎn)一個(gè)star,謝謝咯。
canal的具體使用直接參考canal的官方文檔。

本節(jié)就完了,下一節(jié)我們將做入口和db數(shù)據(jù)變更的解析并存儲(chǔ)及整個(gè)項(xiàng)目的整合。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容