canal:
1.配置mysql
canal的原理是基于mysql binlog技術(shù),所以這里一定需要開啟mysql的binlog寫入功能,建議配置binlog模式為row.
配置如下
[mysqld]
log-bin=mysql-bin #添加這一行就ok
binlog-format=ROW #選擇row模式
server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重復(fù)
canal的原理是模擬自己為mysql slave,所以這里一定需要設(shè)置mysql slave的相關(guān)權(quán)限.
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
重啟mysql服務(wù)即可
2.配置canal server
新建canal目錄,下載canal,解壓。
wget?https://github.com/alibaba/canal/releases/download/canal-1.1.2/canal.deployer-1.1.2.tar.gz
進(jìn)入到canal/conf目錄,新建目錄brand,將example目錄中的instance.properties拷貝到brand中。
進(jìn)入brand中的instance.properties,修改配置如下:
canal.instance.dbUsername=canal#給canal配置的賬號(hào)
canal.instance.dbPassword=canal #給canal配置的密碼
canal.instance.connectionCharset = UTF-8#連接數(shù)據(jù)庫(kù)用的字符集
canal.instance.defaultDatabaseName =test#默認(rèn)的數(shù)據(jù)庫(kù)名
canal.instance.enableDruid=false#是否使用druid
canal.instance.filter.regex=.\..?#mysql 數(shù)據(jù)解析關(guān)注的表,Perl正則表達(dá)式
修改canal/canal.properties中的配置
canal.destinations = brand
多個(gè)用,分隔。配置之后,啟動(dòng)canal會(huì)conf/brand目錄里面的instance.properties。
啟動(dòng)canal server。
如果啟動(dòng)報(bào)錯(cuò),確認(rèn)一下
<1>mysqld中的server_id和canal有沒有沖突
<2>canal.properties中的canal.instance.parser.parallelThreadSize沒有有打開,這里我配置的值為256
3.配置elasticsearch
canal adapter 的 Elastic Search 版本支持6.x.x以上, 如需其它版本的es可替換依賴重新編譯client-adapter.elasticsearch模塊
下載es并解壓
wget?https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.5.4.tar.gz
es不能在root賬戶下執(zhí)行,創(chuàng)建一個(gè)賬戶,給它分配權(quán)限。切換到該賬戶中,啟動(dòng)es,如果報(bào)錯(cuò)max virtual memory areas vm.maxmapcount [65530] is too low,則執(zhí)行命令
sudo sysctl -w vm.max_map_count=262144
4.配置canal adapter
本文使用的adapter版本有明顯bug,官網(wǎng)已經(jīng)更新了,請(qǐng)留意官網(wǎng)更新,下文僅作參考。
新建canal_adapter目錄,下載并解壓。
解壓,進(jìn)入conf/application.yml作如下配置
srcDataSources:
defaultDS:
url://數(shù)據(jù)庫(kù)的地址username:password:
canalAdapters:
instance: brand
groups:
groupId: g1
outerAdapters:
name: es #這是canal_adapters的內(nèi)置實(shí)現(xiàn)
hosts: 127.0.0.1:9300 # es 集群地址, 逗號(hào)分隔
properties:
cluster.name: elasticsearch # es cluster name
adapter將會(huì)自動(dòng)加載 conf/es 下的所有.yml結(jié)尾的配置文件。新增brand.yml,內(nèi)容仿照下面的配置
dataSourceKey: defaultDS # 源數(shù)據(jù)源的key, 對(duì)應(yīng)上面配置的srcDataSources中的值
destination: example # cannal的instance或者M(jìn)Q的topic
esMapping:
_index: mytest_user # es 的索引名稱
_type: _doc # es 的doc名稱
_id: _id # es 的_id, 如果不配置該項(xiàng)必須配置下面的pk項(xiàng)_id則會(huì)由es自動(dòng)分配
pk: id # 如果不需要_id, 則需要指定一個(gè)屬性為主鍵屬性
sql映射
sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
a.c_timeas_c_time, c.labelsas_labels from user a? ? left join role b on b.id=a.role_id? ? left join (select user_id, group_concat(label order by id desc separator';')aslabels from label? ? group by user_id) c on c.user_id=a.id"
objFields:
_labels: array:; # 數(shù)組或者對(duì)象屬性, array:; 代表以;字段里面是以;分隔的
_obj: object # json對(duì)象
etlCondition: "where a.c_time>='{}'" # etl 的條件參數(shù)
commitBatch: 3000 # 提交批大小
啟動(dòng)canal adapter。在es中新增相關(guān)的index和type,就可以實(shí)現(xiàn)了
參考文檔:canal官方文檔