業(yè)務(wù)場景:
之前已經(jīng)做過mysql-> deployer->adapter->es和deployer->rabbitmq,現(xiàn)在來把二者結(jié)合起來,實(shí)現(xiàn):
mysql-> deployer->rabbitmq->adapter->es。
也就是說:mysql同步至canal-deployer,canal-deployer同步rabbitmq,canal-adapter消費(fèi)同步至es。
canal.adapter-1.1.5\conf\application.yml
- canal.conf.mode 改為rabbitMQ,表示要從rabbitMQ中撈數(shù)據(jù)
- 配置好rabbbitmq的連接基本信息,注意不要加端口。默認(rèn)就是
- mysql數(shù)據(jù)源就不說了配好就行,注意defaultDS要用在后面
- canalAdapters.instance 配置rabbitmq的隊(duì)列名,后面會(huì)用到
- canalAdapters.groups.groupId 同樣后面要用到
- es連接信息也不用說了,注意下 key: exampleKey 后面會(huì)用到
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: rabbitMQ #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers:
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms:
kafka.auto.offset.reset:
kafka.request.timeout.ms:
kafka.session.timeout.ms:
kafka.isolation.level:
kafka.max.poll.records:
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr:
rocketmq.batch.size:
rocketmq.enable.message.trace:
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host: 127.0.0.1
rabbitmq.virtual.host: /
rabbitmq.username: yinkai
rabbitmq.password: yinkai
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS: # 這個(gè)也得注意,會(huì)用在book.yml中 ,單做那個(gè)sql語句的數(shù)據(jù)源
url: jdbc:mysql://127.0.0.1:3306/db_example?useUnicode=true
username: root
password: 123456
canalAdapters:
- instance: canal_queue # 這里配置rabbitmq的queueName,這個(gè)也得注意,會(huì)用在book.yml中 # canal instance Name or mq topic name
groups:
- groupId: g1 # 這個(gè)也得注意,會(huì)用在book.yml中
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es7
key: exampleKey # es的key(數(shù)據(jù)導(dǎo)入端的key)這個(gè)也得注意,會(huì)用在book.yml中
hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: geektime
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
my.canal.adapter-1.1.5\conf\es7\book.yml
- 我這個(gè)book.yml是用mysql表名命名的
- 具體注釋信息
dataSourceKey: defaultDS # 源數(shù)據(jù)源的key, 對(duì)應(yīng)上面配置的srcDataSources中的值 (對(duì)應(yīng)application中的值)
outerAdapterKey: exampleKey # 對(duì)應(yīng)application.yml中es配置的key (對(duì)應(yīng)application中的值)
# 這里要配置rabbitmq的 queueName
destination: canal_queue # cannal的instance或者M(jìn)Q的topic (對(duì)應(yīng)application中的值)
groupId: g1 # 對(duì)應(yīng)MQ模式下的groupId, 只會(huì)同步對(duì)應(yīng)groupId的數(shù)據(jù) (對(duì)應(yīng)application中的值)
esMapping:
_index: book # es 的索引名稱
_type: _doc # es 的type名稱, es7下無需配置此項(xiàng)
_id: _id # es 的_id, 如果不配置該項(xiàng)必須配置下面的pk項(xiàng)_id則會(huì)由es自動(dòng)分配
# pk: id # 如果不需要_id, 則需要指定一個(gè)屬性為主鍵屬性
# sql映射
sql: "select a.id as _id, a.name, a.year, a.last_updated,a.name2,a.name3,a.name4,a.name5,a.name6,date_format(a.create_time,'%Y-%m-%d %H:%I:%S') as create_time,a.my_json from book a"
# objFields:
# _labels: array:; # 數(shù)組或者對(duì)象屬性, array:; 代表以;字段里面是以;分隔的
# _obj: object # json對(duì)象
etlCondition: "where a.last_updated>='{0}'" # etl 的條件參數(shù)
commitBatch: 3000 # 提交批大小