Cannal系列6 - server集群HA高可用

https://githubfast.com/alibaba/canal/wiki/AdminGuide
https://zookeeper.apache.org/doc/current/zookeeperStarted.html

安裝zk

創(chuàng)建文件 apache-zookeeper-3.9.0-bin\conf\zoo.cfg

tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181

啟動(dòng)zk

zkServer.cmd

啟動(dòng)兩個(gè)cannal-server

  • 兩個(gè)cannal服務(wù)端都放在一臺(tái)機(jī)器上
  • 只需要將原來(lái)的server整個(gè)copy一份。改下端口
  • start.bat 的debug端口也改下
  • 注意: 兩臺(tái)機(jī)器上的instance目錄的名字需要保證完全一致,HA模式是依賴(lài)于instance name進(jìn)行管理,同時(shí)必須都選擇default-instance.xml配置

在admin上修改配置

添加集群

image.png

主配置:直接copy之前的配置進(jìn)來(lái)

  • 這個(gè)配置改成這個(gè) canal.instance.global.spring.xml = classpath:spring/default-instance.xml ,之前默認(rèn)使用文件本地存儲(chǔ)的(classpath:spring/file-instance.xml)?,F(xiàn)在改成的這個(gè)就是把instance信息放到zk中。

  • zk的配置信息
    canal.zkServers=127.0.0.1:2181

#################################################
#########       common argument     #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
# register ip
canal.register.ip =

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.zkServers =127.0.0.1:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode =rabbitMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#################################################
#########       destinations        #############
#################################################
canal.destinations = example,my1
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########         MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
#########            Kafka           #############
##################################################


# sasl demo
# kafka.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \\n username=\"alice\" \\npassword="alice-secret\";
# kafka.sasl.mechanism = SCRAM-SHA-512
# kafka.security.protocol = SASL_PLAINTEXT

##################################################
#########           RocketMQ         #############
##################################################


##################################################
#########           RabbitMQ         #############
##################################################
rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host = /
rabbitmq.exchange =exchange.canal
rabbitmq.queue = canal_queue
rabbitmq.routingKey = canal-routing-key
rabbitmq.username =guest
rabbitmq.password =guest
rabbitmq.deliveryMode =direct



##################################################
#########             Pulsar         #############
##################################################

添加server


image.png
image.png

添加instant

image.png

把這之前instant的配置copy到里面來(lái)

#################################################
## mysql serverId , v1.0.26+ will autoGen
## mysql serverId,這里的slaveId不能和myql集群中已有的server_id一樣
canal.instance.mysql.slaveId=1234

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
# mysql主庫(kù)鏈接時(shí)起始的binlog偏移量
canal.instance.master.position=
# mysql主庫(kù)鏈接時(shí)起始的binlog的時(shí)間戳
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
# table regex .*\..*表示監(jiān)聽(tīng)所有表 也可以寫(xiě)具體的表名,用,隔開(kāi)
canal.instance.filter.regex=.*\\..*
# table black regex
# mysql 數(shù)據(jù)解析表的黑名單,多個(gè)表用,隔開(kāi)
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
# rabbitmq 的 routing key
canal.mq.topic=canal-routing-key
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.on=false
#################################################

測(cè)試HA功能

這個(gè)instant現(xiàn)在屬于node2(192.168.1.4:11114)


image.png

也可以查看zk當(dāng)前支持的工作cannal節(jié)點(diǎn)當(dāng)前工作的節(jié)點(diǎn)為192.168.1.4:11114

zkCli.cmd
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.1.4:11114"}
[zk: localhost:2181(CONNECTED) 1]

那我們就把這個(gè)192.168.1.4:11114給停掉
使用管理后臺(tái)的停止只是停止服務(wù),但是服務(wù)jvm進(jìn)程本身還是沒(méi)有停的。jvm進(jìn)程停了就是斷開(kāi)狀態(tài)


image.png

隨后192.168.1.4:11114上的instant也停止了


image.png

ok,過(guò)了幾秒鐘后刷新頁(yè)面再次查看,instant又起來(lái)了。這次所屬主機(jī)變成了node1
image.png

我們?cè)俅尾榭磟k的工作節(jié)點(diǎn)信息
發(fā)現(xiàn)已經(jīng)變成了192.168.1.4:11111了,說(shuō)明故障轉(zhuǎn)移測(cè)試成功!

[zk: localhost:2181(CONNECTED) 12] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.1.4:11111"}
[zk: localhost:2181(CONNECTED) 13]

幾種instance的實(shí)現(xiàn)

對(duì)應(yīng)的兩個(gè)位點(diǎn)組件,目前都有幾種實(shí)現(xiàn):

  • memory (memory-instance.xml中使用)
  • zookeeper
  • mixed
  • period (default-instance.xml中使用,集合了zookeeper+memory模式,先寫(xiě)內(nèi)存,定時(shí)刷新數(shù)據(jù)到zookeeper上)

memory-instance.xml介紹:

所有的組件(parser , sink , store)都選擇了內(nèi)存版模式,記錄位點(diǎn)的都選擇了memory模式,重啟后又會(huì)回到初始位點(diǎn)進(jìn)行解析
特點(diǎn):速度最快,依賴(lài)最少(不需要zookeeper)
場(chǎng)景:一般應(yīng)用在quickstart,或者是出現(xiàn)問(wèn)題后,進(jìn)行數(shù)據(jù)分析的場(chǎng)景,不應(yīng)該將其應(yīng)用于生產(chǎn)環(huán)境

default-instance.xml介紹:

store選擇了內(nèi)存模式,其余的parser/sink依賴(lài)的位點(diǎn)管理選擇了持久化模式,目前持久化的方式主要是寫(xiě)入zookeeper,保證數(shù)據(jù)集群共享.
特點(diǎn):支持HA
場(chǎng)景:生產(chǎn)環(huán)境,集群化部署.

group-instance.xml介紹:

主要針對(duì)需要進(jìn)行多庫(kù)合并時(shí),可以將多個(gè)物理instance合并為一個(gè)邏輯instance,提供客戶(hù)端訪(fǎng)問(wèn)。場(chǎng)景:分庫(kù)業(yè)務(wù)。 比如產(chǎn)品數(shù)據(jù)拆分了4個(gè)庫(kù),每個(gè)庫(kù)會(huì)有一個(gè)instance,如果不用group,業(yè)務(wù)上要消費(fèi)數(shù)據(jù)時(shí),需要啟動(dòng)4個(gè)客戶(hù)端,分別鏈接4個(gè)instance實(shí)例。使用group后,可以在canal server上合并為一個(gè)邏輯instance,只需要啟動(dòng)1個(gè)客戶(hù)端,鏈接這個(gè)邏輯instance即可.

instance.xml設(shè)計(jì)初衷

允許進(jìn)行自定義擴(kuò)展,比如實(shí)現(xiàn)了基于數(shù)據(jù)庫(kù)的位點(diǎn)管理后,可以自定義一份自己的instance.xml,整個(gè)canal設(shè)計(jì)中最大的靈活性在于此

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容