Canal+RabbitMQ數(shù)據(jù)異構(gòu)填坑指南

此文章主要補充Canal整合RabbitMQ時填過的坑。

1、完整數(shù)據(jù)處理流程

用戶中心數(shù)據(jù)處理流程

這里核心處理點,在于Canal的部署和RabbitMQ的整合。

Canal主要模擬MySQL的Slave,實現(xiàn)主從復制,拉取Mysql的binlog進行解析轉(zhuǎn)換成相應的數(shù)據(jù),獲取到數(shù)據(jù)后,將數(shù)據(jù)推送到RabbitMQ(默認支持的是RocketMQ和Kafka,新版加入對RabbitMQ的支持)。

Canal整合RabbitMQ主要采用的是 路由模式。

2、Canal 配置及部署

2.1、Canal Admin 安裝及配置

新版本Canal已經(jīng)支持在線管理,我們可以先安裝一個canal-admin。參考:Canal Admin QuickStart

canal-admin的核心模型主要有:

  1. instance,對應canal-server里的instance,一個最小的訂閱mysql的隊列
  2. server,對應canal-server,一個server里可以包含多個instance
  3. 集群,對應一組canal-server,組合在一起面向高可用HA的運維

簡單解釋:

  1. instance因為是最原始的業(yè)務(wù)訂閱訴求,它會和 server/集群 這兩個面向資源服務(wù)屬性的進行關(guān)聯(lián),比如instance A綁定到server A上或者集群 A上,
  2. 有了任務(wù)和資源的綁定關(guān)系后,對應的資源服務(wù)就會接收到這個任務(wù)配置,在對應的資源上動態(tài)加載instance,并提供服務(wù)。動態(tài)加載的過程,有點類似于之前的autoScan機制,只不過基于canal-admin之后可就以變?yōu)檫h程的web操作,而不需要在機器上運維配置文件
  3. 將server抽象成資源之后,原本canal-server運行所需要的canal.properties/instance.properties配置文件就需要在web ui上進行統(tǒng)一運維,每個server只需要以最基本的啟動配置 (比如知道一下canal-admin的manager地址,以及訪問配置的賬號、密碼即可)

集群模式配置

  1. 先配置一個zk地址和集群名稱。


    集群模式
  2. 配置canal.properties
    2.1. 選擇主配置

    集群模式主配置

    2.2. 設(shè)置canal.properties,可以先載入模板,然后進行修改。
    canal.properties

canal.properties核心配置:

...
# register ip to zookeeper
canal.register.ip = 10.8.158.4
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
canal.user = canal
canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
canal.admin.manager = 10.8.158.4:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

canal.zkServers = 10.8.158.4:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = rabbitmq
...
# table meta tsdb info
canal.instance.tsdb.enable = false
...
# 一下幾項均為 1.1.5 新版本新增支持 rabbitmq 的配置
rabbitmq.host = 10.224.45.9:25674
rabbitmq.virtual.host = /
# 指定 rabbitmq 上的 exchange 名稱, "新建 `Exchange`" 步驟新建的名稱
rabbitmq.exchange = usercenter
# 連接 rabbitmq 的用戶名
rabbitmq.username = guest
# 連接 rabbitmq 的密碼
rabbitmq.password = guest
...

3、 Canal Server配置及部署

canal-server接入canal-admin,參考:Canal Admin ServerGuide

3.1. 核心配置

root@dreamson-QiTianM425-N000:/tmp/canal-1.1.5/conf# cat canal_local.properties 
# register ip
canal.register.ip = 10.8.158.4

# canal admin config
canal.admin.manager = 10.8.158.4:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = usercenter

3.2. 啟動canal-server,注冊canal-admin

root@dreamson-QiTianM425-N000:/tmp/canal-1.1.5# bash bin/startup.sh local

注冊后在admin中的展現(xiàn)。


server注冊admin

4.Instance實例配置

4.1. 新增instance實例

Instance實例啟動后,即可監(jiān)聽mysql的binlog,并將數(shù)據(jù)推到RabbitMQ。推送的規(guī)則參考:Canal Kafka/RocketMQ QuickStart
RabbitMQ的配置的topic,事實上是設(shè)置到routeKey上。

新增instance實例
instance列表

5. 最后我們調(diào)試一下

MQ監(jiān)控處理

@RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "usercenter", durable = "true"),
                    exchange = @Exchange(value = "usercenter", type = ExchangeTypes.TOPIC),
                    key = "usercenter"
            )
    }, concurrency = "10")
    public void test(String message) {
        log.info("test接收到消息。message:{}", message);
    }

修改表數(shù)據(jù)

修改數(shù)據(jù)庫數(shù)據(jù)

MQ監(jiān)控輸入日志如下

2021-02-04 11:34:53.037 INFO  [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.18278394004589313","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test","ts":1612409677905,"type":"INSERT"}
2021-02-04 11:34:53.050 INFO  [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.461750433278969","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test_1","ts":1612409677905,"type":"INSERT"}
2021-02-04 11:34:53.051 INFO  [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.12820304849017694","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test_2","ts":1612409677905,"type":"INSERT"}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容