此文章主要補充Canal整合RabbitMQ時填過的坑。
- Canal的簡介和用法,請查看官網(wǎng)介紹:https://github.com/alibaba/canal/wiki/%E7%AE%80%E4%BB%8B;
- RabbitMQ的用法請查看官網(wǎng):http://www.rabbitmq.com/
1、完整數(shù)據(jù)處理流程

用戶中心數(shù)據(jù)處理流程
這里核心處理點,在于Canal的部署和RabbitMQ的整合。
Canal主要模擬MySQL的Slave,實現(xiàn)主從復制,拉取Mysql的binlog進行解析轉(zhuǎn)換成相應的數(shù)據(jù),獲取到數(shù)據(jù)后,將數(shù)據(jù)推送到RabbitMQ(默認支持的是RocketMQ和Kafka,新版加入對RabbitMQ的支持)。
Canal整合RabbitMQ主要采用的是 路由模式。
2、Canal 配置及部署
2.1、Canal Admin 安裝及配置
新版本Canal已經(jīng)支持在線管理,我們可以先安裝一個canal-admin。參考:Canal Admin QuickStart
canal-admin的核心模型主要有:
- instance,對應canal-server里的instance,一個最小的訂閱mysql的隊列
- server,對應canal-server,一個server里可以包含多個instance
- 集群,對應一組canal-server,組合在一起面向高可用HA的運維
簡單解釋:
- instance因為是最原始的業(yè)務(wù)訂閱訴求,它會和 server/集群 這兩個面向資源服務(wù)屬性的進行關(guān)聯(lián),比如instance A綁定到server A上或者集群 A上,
- 有了任務(wù)和資源的綁定關(guān)系后,對應的資源服務(wù)就會接收到這個任務(wù)配置,在對應的資源上動態(tài)加載instance,并提供服務(wù)。動態(tài)加載的過程,有點類似于之前的autoScan機制,只不過基于canal-admin之后可就以變?yōu)檫h程的web操作,而不需要在機器上運維配置文件
- 將server抽象成資源之后,原本canal-server運行所需要的canal.properties/instance.properties配置文件就需要在web ui上進行統(tǒng)一運維,每個server只需要以最基本的啟動配置 (比如知道一下canal-admin的manager地址,以及訪問配置的賬號、密碼即可)
集群模式配置
-
先配置一個zk地址和集群名稱。
集群模式 -
配置canal.properties
2.1. 選擇主配置
集群模式主配置
2.2. 設(shè)置canal.properties,可以先載入模板,然后進行修改。
canal.properties
canal.properties核心配置:
...
# register ip to zookeeper
canal.register.ip = 10.8.158.4
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
canal.user = canal
canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
canal.admin.manager = 10.8.158.4:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
canal.zkServers = 10.8.158.4:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = rabbitmq
...
# table meta tsdb info
canal.instance.tsdb.enable = false
...
# 一下幾項均為 1.1.5 新版本新增支持 rabbitmq 的配置
rabbitmq.host = 10.224.45.9:25674
rabbitmq.virtual.host = /
# 指定 rabbitmq 上的 exchange 名稱, "新建 `Exchange`" 步驟新建的名稱
rabbitmq.exchange = usercenter
# 連接 rabbitmq 的用戶名
rabbitmq.username = guest
# 連接 rabbitmq 的密碼
rabbitmq.password = guest
...
3、 Canal Server配置及部署
canal-server接入canal-admin,參考:Canal Admin ServerGuide
3.1. 核心配置
root@dreamson-QiTianM425-N000:/tmp/canal-1.1.5/conf# cat canal_local.properties
# register ip
canal.register.ip = 10.8.158.4
# canal admin config
canal.admin.manager = 10.8.158.4:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = usercenter
3.2. 啟動canal-server,注冊canal-admin
root@dreamson-QiTianM425-N000:/tmp/canal-1.1.5# bash bin/startup.sh local
注冊后在admin中的展現(xiàn)。

server注冊admin
4.Instance實例配置
4.1. 新增instance實例
Instance實例啟動后,即可監(jiān)聽mysql的binlog,并將數(shù)據(jù)推到RabbitMQ。推送的規(guī)則參考:Canal Kafka/RocketMQ QuickStart
RabbitMQ的配置的topic,事實上是設(shè)置到routeKey上。

新增instance實例

instance列表
5. 最后我們調(diào)試一下
MQ監(jiān)控處理
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "usercenter", durable = "true"),
exchange = @Exchange(value = "usercenter", type = ExchangeTypes.TOPIC),
key = "usercenter"
)
}, concurrency = "10")
public void test(String message) {
log.info("test接收到消息。message:{}", message);
}
修改表數(shù)據(jù)

修改數(shù)據(jù)庫數(shù)據(jù)
MQ監(jiān)控輸入日志如下
2021-02-04 11:34:53.037 INFO [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.18278394004589313","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test","ts":1612409677905,"type":"INSERT"}
2021-02-04 11:34:53.050 INFO [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.461750433278969","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test_1","ts":1612409677905,"type":"INSERT"}
2021-02-04 11:34:53.051 INFO [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.12820304849017694","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test_2","ts":1612409677905,"type":"INSERT"}


