我們都知道canal是CDC的一個(gè)實(shí)現(xiàn),用來監(jiān)控db數(shù)據(jù)變更的。
前言:
在默認(rèn)的情況下,mysql的binlog是不會(huì)記錄執(zhí)行的sql的,即使你設(shè)置了binlog_format=row(要開啟binlog記錄數(shù)據(jù)變更必須設(shè)置該值)情況。所以我們需要開啟另一個(gè)參數(shù)binlog_rows_query_log_events,該值默認(rèn)情況下是關(guān)閉的,我們需要自己手動(dòng)開啟。啟動(dòng)這個(gè)參數(shù),則可在row格式下查看到執(zhí)行的sql語句 。
開啟命令 :
set binlog_rows_query_log_events = 'on'; //開啟的是當(dāng)前session
set global binlog_rows_query_log_events = 'on'; //開啟的是全局配置
Note :
開啟了該參數(shù)后,對于mysql的寫入性能會(huì)有所影 響,主要是IO這塊,畢竟多寫了一些數(shù)據(jù)到磁盤中,但是性能影響不會(huì)是很大。
查看binlog事件
開啟參數(shù)后,執(zhí)行一些寫入操作,我們就可以查看Mysql里的binlog里的事件(因?yàn)閏anal也是基于這些事件來做的)及內(nèi)容了。
具體操作步驟:
1.獲取binlog文件列表 show binary logs;
2.查看指定binlog文件的內(nèi)容 show binlog events in 'mysql-bin.xxx'; 比如mysql-bin-log.000078,這個(gè)值就是從第一步獲取到的。
3.查看event的里面的sql(Rows_query 事件):

從上圖中們可以看到Rows_query這個(gè)事件里其實(shí)有我們想要的sql了。
mysql事務(wù):
由于Mysql的事件是獨(dú)立分開的,Rows_query事件,ROWDATA事件(具體的數(shù)據(jù)變更)。初看我們是沒有辦法將他們兩個(gè)進(jìn)行關(guān)聯(lián)起來的,當(dāng)時(shí)我在做這塊的時(shí)候其實(shí)也懵逼了半天。后來我仔細(xì)看了一下他的事件流,我發(fā)現(xiàn)其實(shí)每次修改都是一個(gè)事務(wù)。他都會(huì)有事務(wù)的開啟和事務(wù)的結(jié)束,我們可以在里面做文章。
這個(gè)是我對insert,update,delete做的一些測試時(shí)獲取到的一些數(shù)據(jù)。
update:
2020-01-19 21:42:50.973 [pool-4-thread-1] WARN c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONBEGIN,sql:null
2020-01-19 21:42:54.118 [pool-4-thread-1] WARN c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :QUERY,entryType:ROWDATA,sql:UPDATE `rocket`.`user` SET `name` = 'ff' WHERE `id` = 8
2020-01-19 21:42:54.623 [pool-4-thread-1] WARN c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:ROWDATA,sql:null
2020-01-19 21:42:54.824 [pool-4-thread-1] WARN c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONEND,sql:null
insert:
2020-01-19 21:43:54.298 [pool-4-thread-1] WARN c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONBEGIN,sql:null
2020-01-19 21:43:55.921 [pool-4-thread-1] WARN c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :QUERY,entryType:ROWDATA,sql:INSERT INTO `rocket`.`test`(`id`, `user_add_rating1`, `user_add_rating`, `name`) VALUES (9, 'sbdaf', 'daf', 'afds')
2020-01-19 21:43:56.167 [pool-4-thread-1] WARN c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONEND,sql:null
delete:
2020-01-19 21:49:29.105 [pool-4-thread-1] WARN c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONBEGIN,sql:null
2020-01-19 21:49:29.652 [pool-4-thread-1] WARN c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :QUERY,entryType:ROWDATA,sql:DELETE FROM `rocket`.`test` WHERE `id` = 10
2020-01-19 21:49:29.795 [pool-4-thread-1] WARN c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONEND,sql:null
canal :
簡介
canal是阿里用來做數(shù)據(jù)同步用的工具,但是官方的版本,只會(huì)記錄數(shù)據(jù)的變更前和之后的數(shù)據(jù)(與maxwell等工具一樣),他不會(huì)記錄這條數(shù)據(jù)是哪條sql改變的。為什么不記錄呢,因?yàn)閏anal的設(shè)計(jì)是記錄數(shù)據(jù)的變更過程,并不關(guān)心他是哪個(gè)sql的引起。但是我們看了他們的源碼,其實(shí)他們也監(jiān)控了Rows_query 事件的,他們發(fā)到mq也是有這個(gè)事件的,但是,數(shù)據(jù)變更事件和Rows_query 事件兩個(gè)事件是獨(dú)立的事件,他們之間沒有任何關(guān)聯(lián)(mysql本身也沒有做關(guān)聯(lián))。
由于他們之間沒能關(guān)聯(lián),我們拿到這兩個(gè)事件也沒辦法做任何處理。所以,我們在想我們能不能通過某種方法將他們關(guān)聯(lián)起來呢。正如前面講到的mysql事務(wù),他是有開始有結(jié)束的事件的(這些事件是有序的),我們可以在收到sql的事件時(shí)將該sql存在一個(gè)地方,然后再收到ROWDATA事件時(shí)從剛才存的sql地方取出來并設(shè)置到該事件中(因?yàn)閝uery事件先后ROWDATA事件),然后在收到TRANSACTIONBEGIN或者TRANSACTIONEND事件的時(shí)候清空sql,這不就ok了么。
按照這個(gè)思路,我們公司的一個(gè)小伙伴改造了一版,他是在 MemoryEventStoreWithBuffer(后面大體會(huì)講canal流程)里進(jìn)行處理的(不推薦),后面發(fā)現(xiàn)只要數(shù)據(jù)量一大(像我們公司老項(xiàng)目一個(gè)sql 20M...)一下就內(nèi)存溢出了,canal就掛了,后面我看了一下源碼,發(fā)現(xiàn)換一種處理的方式可以更好,換在發(fā)Mq消息的地方做處理(到目前還沒出現(xiàn)內(nèi)存溢出問題)。
調(diào)用關(guān)系(以前記錄的筆記很簡單):
1.解析流程 Parser ----Slink----EventStore
AbstractEventParser (consumeTheEventAndProfilingIfNecessary()方法) —> EntryEventSink —-> MemoryEventStoreWithBuffer
2.Server 調(diào)用關(guān)系:
CanalServerWithEmbedded(get(ClientIdentity clientIdentity, int batchSize) )--->MemoryEventStoreWithBuffer( tryGet() )
tryGet() :
獲取到的是Event,event里面有CanalEntry.Entry entry; 這個(gè)entry的toString方法就是ByteString rawEntry 值(因?yàn)槊看握{(diào)用toString序列化會(huì)化很多時(shí)間,所以將他cache起來效率更高);
如果我們開啟行級(jí)模式并記錄了sql的話,要保證Event里的事件類型是EventType.QUERY,
我們可以通過Event里的 CanalEntry.Entry entry,
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());每獲取的rowChange,通過rowChange來獲取sql:rowChange.getSql(),
3.發(fā)送mq消息:
com.alibaba.otter.canal.deployer.CanalStarter#start -->
com.alibaba.otter.canal.server.CanalMQStarter#start -->
com.alibaba.otter.canal.kafka.CanalKafkaProducer#send(com.alibaba.otter.canal.common.MQProperties.CanalDestination, com.alibaba.otter.canal.protocol.Message, com.alibaba.otter.canal.spi.CanalMQProducer.Callback) (或者rocketMq的send,具體的實(shí)現(xiàn)類)
從上面的流程并配合上源碼,其實(shí)可以發(fā)現(xiàn),MemoryEventStoreWithBuffer是一個(gè)環(huán)型的內(nèi)存queue,雖然可以在里面做處理,但是如果將每條數(shù)據(jù)的變更(一次修改導(dǎo)致幾萬,幾十萬的數(shù)據(jù)變更)都加上一個(gè)sql。這個(gè)內(nèi)存無疑就會(huì)增長很快,一會(huì)兒就溢出了,所以不推薦在此處做修改。
改造
由于我們公司沒有用rocketmq,kafka作為處理Log這塊效率極高,所以我們主要改造kafka的send這塊邏輯,rocketMq修改的原理差不多。
該變量放在CanalKafkaProducer中:
private Map<String,String> sqlMap = new ConcurrentHashMap<>();
我們看一下CanalKafkaProducer里的send方法
private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message)
throws Exception {
if (!kafkaProperties.getFlatMessage()) {
List<ProducerRecord> records = new ArrayList<ProducerRecord>();
if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
Message[] messages = MQMessageUtils.messagePartition(message,
canalDestination.getPartitionsNum(),
canalDestination.getPartitionHash());
int length = messages.length;
for (int i = 0; i < length; i++) {
Message messagePartition = messages[i];
if (messagePartition != null) {
records.add(new ProducerRecord<String, Message>(topicName, i, null, messagePartition));
}
}
} else {
final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
records.add(new ProducerRecord<String, Message>(topicName, partition, null, message));
}
produce(topicName, records, false);
} else {
// 發(fā)送扁平數(shù)據(jù)json
setSql(canalDestination,message); // 系統(tǒng)默認(rèn)是扁平數(shù)據(jù)json,我們只處理這塊的邏輯
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
// logger.warn("---flatMessages :{}", JSON.toJSONString(flatMessages));
List<ProducerRecord> records = new ArrayList<>();
if (flatMessages != null) {
for (FlatMessage flatMessage : flatMessages) {
if("QUERY".equalsIgnoreCase(flatMessage.getType())){ //query event不用發(fā),沒用
continue;
}
flatMessage.setSql(sqlMap.get(canalDestination.getCanalDestination()));
if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
canalDestination.getPartitionsNum(),
canalDestination.getPartitionHash());
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
FlatMessage flatMessagePart = partitionFlatMessage[i];
if (flatMessagePart != null) {
records.add(new ProducerRecord<String, String>(topicName,
i,
null,
JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue)));
}
}
} else {
final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
records.add(new ProducerRecord<String, String>(topicName,
partition,
null,
JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue)));
}
/* for (ProducerRecord record : records) {
logger.warn("-----key:{},mvn clean install -Dmaven.test.skip -Denv=release:{}", record.key(), record.value());
}*/
// 每條記錄需要flush
produce(topicName, records, true);
records.clear();
}
clearSql(canalDestination,message);
}
}
}
獲取sql并存儲(chǔ)在一個(gè)變量中
/**
* @param message
*/
private void setSql(MQProperties.CanalDestination canalDestination,Message message) {
List<CanalEntry.Entry> entries = message.getEntries();
if (entries != null) {
for (CanalEntry.Entry k : entries) {
boolean isTransActionBegin = k.getEntryType().getNumber()
== CanalEntry.EntryType.TRANSACTIONBEGIN_VALUE;
if (isTransActionBegin) {
sqlMap.put(canalDestination.getCanalDestination(),"");
continue;
}
boolean isRowData = k.getEntryType().getNumber() == CanalEntry.EntryType.ROWDATA_VALUE;
boolean isQueryEventType =
k.getHeader().getEventType().getNumber() == CanalEntry.EventType.QUERY_VALUE;
if (isRowData && isQueryEventType) { //query event
try {
String sql;
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(k.getStoreValue());
sql = rowChange == null ? null : rowChange.getSql();
if(StringUtils.isNotBlank(sql) && sql.length() > 2000){
sql = sql.substring(0,2000);
}
// logger.warn("----sql:{}", sql);
sqlMap.put(canalDestination.getCanalDestination(), sql == null ? "" : sql);
break;
} catch (InvalidProtocolBufferException e) {
logger.warn("e,", e);
}
}
}
}
}
清除sql變量里的值
private void clearSql(MQProperties.CanalDestination canalDestination,Message message) {
List<CanalEntry.Entry> entries = message.getEntries();
if (entries != null) {
for (CanalEntry.Entry k : entries) {
boolean isTransActionEnd = k.getEntryType().getNumber()
== CanalEntry.EntryType.TRANSACTIONEND_VALUE;
if (isTransActionEnd) {
sqlMap.put(canalDestination.getCanalDestination(),"");
break;
}
}
}
}
從上面是我們已經(jīng)修改過后的代碼,其實(shí)我們主要修改的位置是在
[// 發(fā)送扁平數(shù)據(jù)json] 這塊,因?yàn)橄到y(tǒng)默認(rèn)就是發(fā)送的這種數(shù)據(jù),其他格式我們不用關(guān)心,因?yàn)槲覀兪盏降臄?shù)據(jù)主要是json形式的。
開始我想的是在for (FlatMessage flatMessage : flatMessages) {xxxxx}里面去做sql.當(dāng)一條sql修改的數(shù)據(jù)比較少的情況下沒問題,但是一旦修改的數(shù)據(jù)很多時(shí),這里面是沒有sql事件也就是拿不到sql的(canal每次從內(nèi)存queue里get的數(shù)據(jù)條數(shù)默認(rèn)是50條)。所以經(jīng)過仔細(xì)閱讀源碼發(fā)現(xiàn)其實(shí)我們可以在外層進(jìn)行處理,還是按事務(wù)的開始和結(jié)束時(shí)做處理。 然后清除sql變量值也是每次我們發(fā)送一批的時(shí)候都去判斷一下要不要做清除處理。
其實(shí)canal的整體改造就弄完了,其實(shí)不算復(fù)雜,但是你要改造,首先還是得對canal的整體設(shè)計(jì)有所了解,然后具體去找他們的調(diào)用邏輯,最后去改造他。
我們來看一下改造后收到的數(shù)據(jù)格式:
{
"data": [
{
"id": "14",
"name": "zhangsan",
"address": "addreds"
}
],
"database": "test",
"es": 1611294851000,
"id": 3,
"isDdl": false,
"mysqlType": {
"id": "bigint(20)",
"name": "varchar(200)",
"address": "varchar(255)"
},
"old": [
{
"name": "bbbbbb"
}
],
"pkNames": [
"id"
],
"sql": "/*@123,ff44cb60bd074b659c7c7087712b8f56@*/ UPDATE user SET name='zhangsan',\naddress='addreds' WHERE id=14",
"sqlType": {
"id": -5,
"name": 12,
"address": 12
},
"table": "user",
"ts": 1611294851425,
"type": "UPDATE"
}
總結(jié)一下,要想canal拿到sql你需要做的事:
1.查看binlog開啟狀態(tài):
show variables like '%log_bin%' ;
如果沒開啟就需要開啟
2.設(shè)置mysql: set global binlog_format = "ROW" ; 也可以在配置文件修改(推薦):
[mysqld]
binlog_format=Row
3.開啟binlog_rows_query_log_events參數(shù):
set global binlog_rows_query_log_events = 'on';
開啟后要查看是否成功時(shí),你修改幾條數(shù)據(jù),然后通過下面步驟查看:
1>.獲取binlog文件列表 show binary logs;
2>.查看指定binlog文件的內(nèi)容 show binlog events in 'mysql-bin.xxx';
4.定制canal開發(fā),如果自己不想動(dòng)手可以去我的github直接下載:https://github.com/waterlang/canal-data-sql 如果對你有幫助,可以對項(xiàng)目點(diǎn)一個(gè)star,謝謝咯。
canal的具體使用直接參考canal的官方文檔。
本節(jié)就完了,下一節(jié)我們將做入口和db數(shù)據(jù)變更的解析并存儲(chǔ)及整個(gè)項(xiàng)目的整合。