數(shù)據(jù)同步解決方案-canal

1.canal簡介

canal可以用來監(jiān)控數(shù)據(jù)庫數(shù)據(jù)的變化,從而獲得新增、修改的數(shù)據(jù)。

原理相對比較簡單:

(1)canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議

(2) mysql master收到dump請求,開始推送binary log給slave(也就是canal)

(3) canal解析binary log對象(原始為byte流)

2.環(huán)境部署

(1)mysql開啟binlog模式

查看當(dāng)前mysql是否開啟binlog模式,如果log_bin的值為OFF是未開啟,為ON是已開啟。

SHOW VARIABLES LIKE '%log_bin%'

修改/etc/my.cnf 需要開啟binlog模式。(修改完成之后,重啟mysqld的服務(wù)。)

log-bin=mysql-bin

binlog-format=ROW

server_id=1

進(jìn)入mysql

mysql -h localhost -u root -p

創(chuàng)建賬號 用于測試使用(使用root賬號創(chuàng)建用戶并授予權(quán)限)

create user canal@'%' IDENTIFIED by 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

3.canal服務(wù)端安裝配置

(1)下載地址canal

https://github.com/alibaba/canal/releases/tag/canal-1.0.24

(2)下載之后 上傳到linux系統(tǒng)中,解壓縮到指定的目錄/usr/local/canal

解壓縮之后的目錄結(jié)構(gòu)如下:

(3)修改 exmaple下的實例配置(修改如圖所示的幾個參數(shù))

vi conf/example/instance.properties

(4)指定讀取位置

進(jìn)入mysql中執(zhí)行下面語句查看binlog所在位置

如果fifile中binlog文件不為 mysql-bin.000001 可以重置mysql

mysql> reset master;

查看canal配置文件

vim /usr/local/canal/conf/example/meta.dat

找到對應(yīng)的binlog信息更改一致即可

"journalName":"mysql-bin.000001","position":120,"

注意:如果不一致,可能導(dǎo)致以下錯誤

c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x7f2e9be3, /192.168.200.56:52225 => /192.168.200.128:11111],exception=java.io.IOException: Connection reset by peer

(5)啟動服務(wù):

[root@localhost canal]# ./bin/startup.sh

(6)查看日志:

cat /usr/local/canal/logs/canal/canal.log

這樣就表示啟動成功了。

4.數(shù)據(jù)監(jiān)控微服務(wù)

當(dāng)用戶執(zhí)行數(shù)據(jù)庫的操作的時候,binlog日志會被canal捕獲到,并解析出數(shù)據(jù)。我們就可以將解析出來的數(shù)據(jù)進(jìn)行相應(yīng)的邏輯處理。

https://github.com/chenqian56131/spring-boot-starter-canal

以上開源項目,實現(xiàn)了springboot與canal的集成。比原生的canal更加優(yōu)雅。

使用前需要將starter-canal安裝到本地倉庫。我們可以參照它提供的canal-test,進(jìn)行代碼實現(xiàn)。

(1)創(chuàng)建工程模塊changgou_canal,pom引入依賴

<dependency>

????<groupId>com.xpand</groupId>

????<artifactId>starter-canal</artifactId>

????<version>0.0.1-SNAPSHOT</version>

</dependency>

(2)創(chuàng)建包com.changgou.canal ,包下創(chuàng)建啟動類

@SpringBootApplication

@EnableCanalClient //聲明當(dāng)前的服務(wù)是canal的客戶端

public class CanalApplication {

????public static void main(String[] args) {

????????SpringApplication.run(CanalApplication.class,args);

????}

}

(3)添加配置文件application.properties

canal.client.instances.example.port=11111

canal.client.instances.example.batchSize=1000

spring.rabbitmq.host=192.168.200.128

(4)創(chuàng)建com.changgou.canal.listener包,包下創(chuàng)建類

@CanalEventListener //聲明當(dāng)前的類是canal的監(jiān)聽類

public class BusinessListener {

????@Autowired

????private RabbitTemplate rabbitTemplate;

????/**

????*

????* @param eventType 當(dāng)前操作數(shù)據(jù)庫的類型

????* @param rowData 當(dāng)前操作數(shù)據(jù)庫的數(shù)據(jù)

????*/

????@ListenPoint(schema = "business",table = "tb_ad")

????public void adUpdate(CanalEntry.EventType eventType,CanalEntry.RowData rowData){

????????System.out.println("廣告表數(shù)據(jù)發(fā)生改變");

????????//獲取改變之前的數(shù)據(jù)

????????rowData.getBeforeColumnsList().forEach((c)-> System.out.println("改變前的數(shù)據(jù):"+c.getName()+"::"+c.getValue()));

????????//獲取改變之后的數(shù)據(jù)

????????rowData.getAfterColumnsList().forEach((c)-> System.out.println("改變之后的

????????數(shù)據(jù):"+c.getName()+"::"+c.getValue()));

????}

}

測試:啟動數(shù)據(jù)監(jiān)控微服務(wù),修改business的tb_ad表,觀察控制臺輸出。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容