一文帶你快速入門Canal,看這篇就夠了!


前言

???????? ?我們在做實時數(shù)倉時數(shù)據(jù)往往都是保存到數(shù)據(jù)庫中例如MySQL,當有一條數(shù)據(jù)新增或修改需要馬上將數(shù)據(jù)同步到kafka中或其他的數(shù)據(jù)庫中,這時候我們需要借助阿里開源出來的Canal,來實現(xiàn)我們功能。

一、什么是Canal

我們看下官網(wǎng)的描述:

canal [k?'n?l],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費

二、Canal能干什么

  • 數(shù)據(jù)庫鏡像
  • 數(shù)據(jù)庫實時備份
  • 索引構(gòu)建和實時維護(拆分異構(gòu)索引、倒排索引等)
  • 業(yè)務 cache 刷新
  • 帶業(yè)務邏輯的增量數(shù)據(jù)處理

注意: 當前Canal支持的MySQL版本有 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

三、Canal工作原理

  • MySQL master 將數(shù)據(jù)變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events,可以通過 show binlog events 進行查看)
  • MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
  • MySQL slave 重放 relay log 中事件,將數(shù)據(jù)變更反映它自己的數(shù)據(jù)

canal 工作原理

  • canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議
  • MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
  • canal 解析 binary log 對象(原始為 byte 流)

四、部署Canal

4.1 安裝MySQL

???????? ?我之前發(fā)過如何部署MySQL我在這就不在寫一遍了,如果你的機器中沒有安裝MySQL那可以去看這篇—> https://blog.csdn.net/qq_43791724/article/details/108196454

開啟MySQL的 binary log 日志

???????? 當我們在安裝成功MySQL成功后會有一個my.cnf文件需要添加一下內(nèi)容

[mysqld]
log-bin=/var/lib/mysql/mysql-bin?#?開啟?binlog
binlog-format=ROW?#?選擇?ROW?模式
server_id=1?#?配置?MySQL?replaction?需要定義,不要和?canal?的?slaveId?重復

???????? 注意: 當我們在開啟了binary log日志模式后會在我們log-bin目錄下創(chuàng)建 mysql-bin.* 的文件。當我們數(shù)據(jù)庫中的數(shù)據(jù)發(fā)生改變時就會mysql-bin.*文件中生成記錄。

4.2 安裝Canal

去官下載需要的版本 https://github.com/alibaba/canal/releases我在這里使用的版本為:1.0.24

  1. 將下載好的gz包上傳到指定的目錄下
  2. 創(chuàng)建個文件夾
mkdir?canal
  1. 解壓gz包
tar?-zxvf?canal.deployer-1.0.24.tar.gz??-C?../servers/canal/
  1. 配置 canal.properties

common 屬性前四個配置項:

canal.id=?1
canal.ip=
canal.port=?11111
canal.zkServers=

canal.id是canal的編號,在集群環(huán)境下,不同canal的id不同,注意它和mysql的server_id不同。ip這里不指定,默認為本機,比如上面是192.168.100.201,端口號是11111。zk用于canal cluster。5. 再看下canal.propertiesdestinations相關的配置:

#################################################
#########???????destinations????????#############?
#################################################
canal.destinations?=?example
canal.conf.dir?=?../conf
canal.auto.scan?=?true
canal.auto.scan.interval?=?5
canal.instance.global.mode?=?spring?
canal.instance.global.lazy?=?false
canal.instance.global.spring.xml?=?classpath:spring/file-instance.xml

這里的canal.destinations = example可以設置多個,比如example1,example2,則需要創(chuàng)建對應的兩個文件夾,并且每個文件夾下都有一個instance.properties文件。全局的canal實例管理用spring,這里的file-instance.xml最終會實例化所有的destinations instances:\

  1. 全局的canal實例管理用spring,這里的file-instance.xml最終會實例化所有的destinations instances:
<!--?properties?-->
<bean?class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer"?lazy-init="false">
?<property?name="ignoreResourceNotFound"?value="true"?/>
????<property?name="systemPropertiesModeName"?value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!--?允許system覆蓋?-->
????<property?name="locationNames">
?????<list>
?????????<value>classpath:canal.properties</value>?????????????????????<value>classpath:${canal.instance.destination:}/instance.properties</value>
?????????</list>
????</property>
</bean>

<bean?id="socketAddressEditor"?class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor"?/>
<bean?class="org.springframework.beans.factory.config.CustomEditorConfigurer">?
???<property?name="propertyEditorRegistrars">
????<list>
??????<ref?bean="socketAddressEditor"?/>
???????</list>
???</property>
</bean>
<bean?id="instance"?class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
?<property?name="destination"?value="${canal.instance.destination}"?/>
????<property?name="eventParser">
?????<ref?local="eventParser"?/>
????</property>
????<property?name="eventSink">
????????<ref?local="eventSink"?/>
????</property>
????<property?name="eventStore">
????????<ref?local="eventStore"?/>
????</property>
????<property?name="metaManager">
????????<ref?local="metaManager"?/>
????</property>
????<property?name="alarmHandler">
????????<ref?local="alarmHandler"?/>
????</property>
</bean>

比如canal.instance.destination等于example,就會加載example/instance.properties配置文件7. 修改instance 配置文件

##?mysql?serverId,這里的slaveId不能和myql集群中已有的server_id一樣
canal.instance.mysql.slaveId?=?1234

#??按需修改成自己的數(shù)據(jù)庫信息
#################################################
...
canal.instance.master.address=192.168.1.120:3306
#?username/password,數(shù)據(jù)庫的用戶名和密碼
...
canal.instance.dbUsername?=?root
canal.instance.dbPassword?=?123456
#################################################
  1. 啟動
sh?bin/startup.sh
  1. 關閉
sh?bin/stop.sh
  1. 通過jps 查詢服務狀態(tài)
[root@node01?~]#?jps
2133?CanalLauncher
4184?Jps

到這里說明我們的服務就配好了,這時候我們可以使用java代碼創(chuàng)建一個客戶端來進行測試

五、通過Java編寫Canal客戶端

5.1 導入依賴

?<dependencies>
????????<dependency>
????????????<groupId>com.alibaba.otter</groupId>
????????????<artifactId>canal.client</artifactId>
????????????<version>1.0.24</version>
????????</dependency>
????????<dependency>
????????????<groupId>com.alibaba</groupId>
????????????<artifactId>fastjson</artifactId>
????????????<version>1.2.58</version>
????????</dependency>
????</dependencies>

5.2 編寫測試類

package?com.canal.Test;

/**
?*?@author?大數(shù)據(jù)老哥
?*?@version?V1.0
?*?@Package?com.canal.Test
?*?@File?:CanalTest.java
?*?@date?2021/1/11?21:54?*/

import?com.alibaba.otter.canal.client.CanalConnector;
import?com.alibaba.otter.canal.client.CanalConnectors;
import?com.alibaba.otter.canal.protocol.CanalEntry;
import?com.alibaba.otter.canal.protocol.Message;
import?com.google.protobuf.InvalidProtocolBufferException;
import?java.net.InetSocketAddress;
import?java.util.List;

/**
?*?測試canal配置是否成功?*/
public?class?CanalTest?{

????public?static?void?main(String[]?args)?{
????????//1.創(chuàng)建連接
????????CanalConnector?connect?=?CanalConnectors.newSingleConnector(new?InetSocketAddress("192.168.100.201",?11111),
????????????????"example",?"",?"");????????//指定一次性讀取的條數(shù)
????????int?bachChSize?=?1000;
????????//?設置轉(zhuǎn)態(tài)
????????boolean?running?=?true;
????????while?(running)?{
????????????//2.建立連接
????????????connect.connect();
????????????//回滾上次請求的信息放置防止數(shù)據(jù)丟失
????????????connect.rollback();
????????????//?訂閱匹配日志
????????????connect.subscribe();
????????????while?(running)?{
????????????????Message?message?=?connect.getWithoutAck(bachChSize);
????????????????//?獲取batchId
????????????????long?batchId?=?message.getId();
????????????????//?獲取binlog數(shù)據(jù)的條數(shù)
????????????????int?size?=?message.getEntries().size();
????????????????if?(batchId?==?-1?||?size?==?0)?{

????????????????}?else?{
????????????????????printSummary(message);
????????????????}
????????????????//?確認指定的batchId已經(jīng)消費成功
????????????????connect.ack(batchId);
????????????}
????????}
????}

????private?static?void?printSummary(Message?message)?{
????????//?遍歷整個batch中的每個binlog實體
????????for?(CanalEntry.Entry?entry?:?message.getEntries())?{
????????????//?事務開始
????????????if?(entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONBEGIN?||?entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONEND)?{
????????????????continue;
????????????}

????????????//?獲取binlog文件名
????????????String?logfileName?=?entry.getHeader().getLogfileName();
????????????//?獲取logfile的偏移量
????????????long?logfileOffset?=?entry.getHeader().getLogfileOffset();
????????????//?獲取sql語句執(zhí)行時間戳
????????????long?executeTime?=?entry.getHeader().getExecuteTime();
????????????//?獲取數(shù)據(jù)庫名
????????????String?schemaName?=?entry.getHeader().getSchemaName();
????????????//?獲取表名
????????????String?tableName?=?entry.getHeader().getTableName();
????????????//?獲取事件類型?insert/update/delete
????????????String?eventTypeName?=?entry.getHeader().getEventType().toString().toLowerCase();

????????????System.out.println("logfileName"?+?":"?+?logfileName);
????????????System.out.println("logfileOffset"?+?":"?+?logfileOffset);
????????????System.out.println("executeTime"?+?":"?+?executeTime);
????????????System.out.println("schemaName"?+?":"?+?schemaName);
????????????System.out.println("tableName"?+?":"?+?tableName);
????????????System.out.println("eventTypeName"?+?":"?+?eventTypeName);

????????????CanalEntry.RowChange?rowChange?=?null;
????????????try?{
????????????????//?獲取存儲數(shù)據(jù),并將二進制字節(jié)數(shù)據(jù)解析為RowChange實體
????????????????rowChange?=?CanalEntry.RowChange.parseFrom(entry.getStoreValue());
????????????}?catch?(InvalidProtocolBufferException?e)?{
????????????????e.printStackTrace();
????????????}

????????????//?迭代每一條變更數(shù)據(jù)
????????????for?(CanalEntry.RowData?rowData?:?rowChange.getRowDatasList())?{
????????????????//?判斷是否為刪除事件
????????????????if?(entry.getHeader().getEventType()?==?CanalEntry.EventType.DELETE)?{
????????????????????System.out.println("---delete---");
????????????????????printColumnList(rowData.getBeforeColumnsList());
????????????????????System.out.println("---");
????????????????}
????????????????//?判斷是否為更新事件
????????????????else?if?(entry.getHeader().getEventType()?==?CanalEntry.EventType.UPDATE)?{
????????????????????System.out.println("---update---");
????????????????????printColumnList(rowData.getBeforeColumnsList());
????????????????????System.out.println("---");
????????????????????printColumnList(rowData.getAfterColumnsList());
????????????????}
????????????????//?判斷是否為插入事件
????????????????else?if?(entry.getHeader().getEventType()?==?CanalEntry.EventType.INSERT)?{
????????????????????System.out.println("---insert---");
????????????????????printColumnList(rowData.getAfterColumnsList());
????????????????????System.out.println("---");
????????????????}
????????????}
????????}
????}
????//?打印所有列名和列值
????private?static?void?printColumnList(List<CanalEntry.Column>?columnList)?{
????????for?(CanalEntry.Column?column?:?columnList)?{
????????????System.out.println(column.getName()?+?"\t"?+?column.getValue());
????????}
????}
}

5.3 啟動測試

小結(jié)

???????? ?今天給大家分享了Canle它的主要的功能做增量數(shù)據(jù)同步,后面會使用Canle進行做實時數(shù)倉。我在這里為大家提供大數(shù)據(jù)的資源需要的朋友可以去下面GitHub去下載,信自己,努力和汗水總會能得到回報的。我是大數(shù)據(jù)老哥,我們下期見~~~

資源獲取 獲取Flink面試題,Spark面試題,程序員必備軟件,hive面試題,Hadoop面試題,Docker面試題,簡歷模板等資源請去GitHub自行下載 https://github.com/lhh2002/Framework-Of-BigDataGitee 自行下載 ?https://gitee.com/li_hey_hey/dashboard/projects


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容