canal 是阿里巴巴開源的一個項目,主要用途是基于 MySQL 數(shù)據(jù)庫 binlog 日志解析,提供增量數(shù)據(jù)訂閱和消費。
基于日志增量訂閱和消費的業(yè)務包括:
- 數(shù)據(jù)庫鏡像
- 數(shù)據(jù)庫實時備份
- 索引構建和實時維護(拆分異構索引、倒排索引等)
- 業(yè)務 cache 刷新
- 帶業(yè)務邏輯的增量數(shù)據(jù)處理
我這邊主要在兩個場景下使用:
一個是將變更數(shù)據(jù)實時同步到 Elasticsearch 和 Redis。
這里先說一下我目前的做法,一方面是全量數(shù)據(jù)定時同步,由于數(shù)據(jù)量比較大,同步時間比較長,所以數(shù)據(jù)也就不夠實時。第二個方面是針對單條數(shù)據(jù)的變更,部分更新 Elasticsearch 和 Redis 的邏輯都是直接寫在了業(yè)務代碼中,耦合比較嚴重。
拆出來之后就可以實現(xiàn)實時增量更新,而且還可以解耦,收益還是很大的。
第二個是保存重點關注數(shù)據(jù)的歷史變更。
這個目前用在了「資產管理」模塊,通過記錄 IP 資產的創(chuàng)建,變更以及刪除,實現(xiàn) IP 生命周期管理,方便歷史信息回溯。
MySQL 配置
修改 MySQL 配置文件 my.cnf,開啟 binlog 寫入功能,并配置模式為 ROW。
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
重啟數(shù)據(jù)庫,查看配置是否生效。
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.19 sec)
mysql>
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.00 sec)
mysql>
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000003 | 4230 | | | |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)
然后創(chuàng)建用戶,并授權。
mysql> CREATE USER canal IDENTIFIED BY 'canal';
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%%';
mysql> FLUSH PRIVILEGES;
mysql> show grants for 'canal'@'%%';
+----------------------------------------------------------------------------+
| Grants for canal@%% |
+----------------------------------------------------------------------------+
| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO `canal`@`%%` |
+----------------------------------------------------------------------------+
1 row in set (0.00 sec)
canal 服務端
拉取鏡像:
# docker pull canal/canal-server:v1.1.4
然后用官方提供的 shell 腳本直接啟動:
# sh run.sh -e canal.auto.scan=false -e canal.destinations=test -e canal.instance.master.address=127.0.0.1:3306 -e canal.instance.dbUsername=canal -e canal.instance.dbPassword=canal -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false
但每次都這樣啟動還是有點麻煩,可以寫一個 docker-compose 文件,如下:
version: '3'
services:
canal-server:
image: canal/canal-server:v1.1.4
container_name: canal-server
restart: unless-stopped
network_mode: host
ports:
- 11111:11111
environment:
- canal.auto.scan=false
- canal.instance.master.address=127.0.0.1:3306
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- canal.instance.filter.regex=.*\\..*
- canal.destinations=test
- canal.instance.connectionCharset=UTF-8
- canal.instance.tsdb.enable=true
volumes:
- /root/canal/test/log/:/home/admin/canal-server/logs/
啟動服務:
# docker-compose up
Recreating canal-server ... done
Attaching to canal-server
canal-server | DOCKER_DEPLOY_TYPE=VM
canal-server | ==> INIT /alidata/init/02init-sshd.sh
canal-server | ==> EXIT CODE: 0
canal-server | ==> INIT /alidata/init/fix-hosts.py
canal-server | ==> EXIT CODE: 0
canal-server | ==> INIT DEFAULT
canal-server | Generating SSH1 RSA host key: [ OK ]
canal-server | Starting sshd: [ OK ]
canal-server | Starting crond: [ OK ]
canal-server | ==> INIT DONE
canal-server | ==> RUN /home/admin/app.sh
canal-server | ==> START ...
canal-server | start canal ...
canal-server | start canal successful
canal-server | ==> START SUCCESSFUL ...
canal Python 客戶端
直接 Copy 官方提供的客戶端代碼:
import time
from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from canal.protocol import CanalProtocol_pb2
client = Client()
client.connect(host='127.0.0.1', port=11111)
client.check_valid(username=b'', password=b'')
client.subscribe(client_id=b'1001', destination=b'test', filter=b'.*\\..*')
while True:
message = client.get(100)
entries = message['entries']
for entry in entries:
entry_type = entry.entryType
if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
continue
row_change = EntryProtocol_pb2.RowChange()
row_change.MergeFromString(entry.storeValue)
event_type = row_change.eventType
header = entry.header
database = header.schemaName
table = header.tableName
event_type = header.eventType
for row in row_change.rowDatas:
format_data = dict()
if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
format_data = {
column.name: column.value
}
elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
format_data = {
column.name: column.value
}
else:
format_data['before'] = format_data['after'] = dict()
for column in row.beforeColumns:
format_data['before'][column.name] = column.value
for column in row.afterColumns:
format_data['after'][column.name] = column.value
data = dict(
db=database,
table=table,
event_type=event_type,
data=format_data,
)
print(data)
time.sleep(1)
client.disconnect()
功能驗證
首先在 MySQL 里邊創(chuàng)建一張測試表,然后再增刪改幾條測試數(shù)據(jù):
mysql> create database test;
mysql> use test;
mysql> CREATE TABLE `role` ( `id` int unsigned NOT NULL AUTO_INCREMENT, `role_name` varchar(255)
DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
mysql> insert into role (id, role_name) values (10, 'admin');
Query OK, 1 row affected (0.01 sec)
mysql> update role set role_name='hh' where id = 10;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
mysql> delete from role where id = 10;
Query OK, 1 row affected (0.01 sec)
客戶端打印輸出:
$ python canal_client.py
connected to 127.0.0.1:11111
Auth succed
Subscribe succed
header {
version: 1
logfileName: "mysql-bin.000003"
logfileOffset: 5497
serverId: 1
serverenCode: "UTF-8"
executeTime: 1607843285000
sourceType: MYSQL
eventLength: 75
}
entryType: TRANSACTIONBEGIN
storeValue: " \217\001"
header {
version: 1
logfileName: "mysql-bin.000003"
logfileOffset: 5630
serverId: 1
serverenCode: "UTF-8"
executeTime: 1607843285000
sourceType: MYSQL
schemaName: "test"
tableName: "role"
eventLength: 47
eventType: INSERT
props {
key: "rowsCount"
value: "1"
}
}
entryType: ROWDATA
storeValue: "\010\322\001\020\001P\000bN\022 \010\000\020\004\032\002id \001(\0010\000B\00210R\014int unsigned\022*\010\001\020\014\032\trole_name \000(\0010\000B\005adminR\014varchar(255)"
{'db': 'test', 'table': 'role', 'event_type': 1, 'data': {'role_name': 'admin'}}
header {
version: 1
logfileName: "mysql-bin.000003"
logfileOffset: 5677
serverId: 1
serverenCode: "UTF-8"
executeTime: 1607843285000
sourceType: MYSQL
eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\003440"
變更一條數(shù)據(jù),輸出內容分三部分,分別是:TRANSACTIONBEGIN,ROWDATA 和 TRANSACTIONEND。然后我們比較關注的內容都在 ROWDATA 中,解析出來之后就是我們需要的,包括數(shù)據(jù)庫名,表名和變更內容。
其中 event_type 字段 1 表示新增,2 表示更新,3 表示刪除。
update 對應輸出:
{'db': 'test', 'table': 'role', 'event_type': 2, 'data': {'before': {'id': '10', 'role_name': 'hh'}, 'after': {'id': '10', 'role_name': 'hh'}}}
delete 對應輸出:
{'db': 'test', 'table': 'role', 'event_type': 3, 'data': {'role_name': 'hh'}}
canal 服務端啟動之后,在 /home/admin/canal-server/logs/test 目錄下會生成兩個日志文件,分別是:meta.log 和 test.log,可以查看服務是不是正常,有沒有報錯信息。其中 test 是啟動 Docker 時 canal.destinations 設置的名稱。
# cat meta.log
2020-12-13 14:55:18.051 - clientId:1001 cursor:[mysql-bin.000003,4805,1607842360000,1,] address[/127.0.0.1:3306]
2020-12-13 14:55:33.051 - clientId:1001 cursor:[mysql-bin.000003,5096,1607842531000,1,] address[127.0.0.1:3306]
2020-12-13 14:57:07.051 - clientId:1001 cursor:[mysql-bin.000003,5387,1607842625000,1,] address[127.0.0.1:3306]
# cat test.log
2020-12-13 14:55:09.067 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-12-13 14:55:09.144 [destination = test , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-12-13 14:55:09.144 [destination = test , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2020-12-13 14:55:09.693 [destination = test , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000003,position=4699,serverId=1,gtid=,timestamp=1607842360000] cost : 538ms , the next step is binlog dump
踩坑記錄
在我自己搭建的測試環(huán)境一切正常,但放到項目 beta 環(huán)境上還是遇到了一個問題:
[fetch failed by table meta:
schemeName.tableName]
查了一下說是由于表刪除,或者表結構變更引起的解析錯誤,增加一條配置就可以解決:
canal.instance.filter.table.error=true
加上之后,報錯信息的確都沒有了,但消費出來的數(shù)據(jù)沒有 ROWDATA,這個地方確實困擾了我很長時間。
說實話,有的時候調試程序,并不怕碰到報錯,怕的是沒有報錯,然后程序還不正常。
后來,我把忽略表錯誤的配置刪除,又仔細看了一遍日志,發(fā)現(xiàn)還有一個報錯:
Caused by: java.io.IOException: ErrorPacket [errorNumber=1142, fieldCount=-1, message=SHOW command denied to user
這明顯就是權限不夠嘛,問了一下我們的 DBA,果然如此,我們的 binlog 賬號默認是沒有 select 權限的,加上之后,問題就成功解決了。
靜下心來仔細看日志是多么重要。
以上,下篇會說說對接 MQ 的事。
參考文檔: