Go操作阿里開(kāi)源的框架Canal用于MySQL實(shí)時(shí)binlog同步

第一章 Canal 服務(wù)端搭建

1.1 認(rèn)識(shí) Canal 框架

canal 官方開(kāi)源github地址: canal ;下載地址:
canal 官方文檔github地址:canal document
canal [k?'n?l],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)。

工作原理:

  • canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送 dump 協(xié)議
  • MySQL master 收到 dump 請(qǐng)求,開(kāi)始推送 binary log 給 slave (即 canal )
  • canal 解析 binary log 對(duì)象(原始為 byte 流)

1.2 搭建 Canal 服務(wù)框架

1.2.1 啟動(dòng) MySQL 的 Binlog 功能

對(duì)于系統(tǒng) MySQL , 需要先開(kāi)啟 Binlog 寫(xiě)入功能,配置 binlog-format 為 ROW 模式,修改MySQL配置文件my.cnf 或者 my.ini 中的配置,并重啟 MySQL 服務(wù);

[mysqld]
log-bin=mysql-bin # 開(kāi)啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復(fù)

1.2.2 啟動(dòng) MySQL 的 Binlog 功能

授權(quán) canal 鏈接 MySQL 賬號(hào)具有作為 MySQL slave 的權(quán)限, 如果已有賬戶可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

1.2.3 下載配置 canal 服務(wù)

下載 canal, 訪問(wèn) release 頁(yè)面 , 選擇需要的包下載, 如以 1.1.6 版本為例

wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

或者直接選擇文件下載


image.png

解壓文件到指定文件夾 canal 下;
配置修改

配置文件路徑:
canal\conf\example\instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的數(shù)據(jù)庫(kù)信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的數(shù)據(jù)庫(kù)信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

1.3 啟動(dòng) Canal 框架

到 canal 文件下查找 bin 文件夾,startup.bat 就是啟動(dòng)文件。雙擊 startup.bat 文件即可啟動(dòng) canal 框架;


image.png

如果啟動(dòng)界面無(wú)法啟動(dòng),或者出現(xiàn)一閃而過(guò),說(shuō)明 canal 框架啟動(dòng)失??;可以在文件 startup.bat 所在文件夾下,啟動(dòng) powershell,使用命令式執(zhí)行 startup.bat 文件,可查看運(yùn)行具體信息。如下所示:


image.png

1.4 問(wèn)題解決

1.4.1 啟動(dòng)報(bào)錯(cuò) Unrecognized VM option 'PermSize=128m'

# 錯(cuò)誤描述信息
Unrecognized VM option 'PermSize=128m'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
image.png

搜索查找資料大多數(shù)都是說(shuō)JDK問(wèn)題,需要安裝JDK1.8,配置JAVA_HOME路徑到JDK1.8就能解決;此方法,我沒(méi)有做測(cè)試,無(wú)法得知是否是可以解決;
仔細(xì)查看報(bào)錯(cuò)信息,是JAVA啟動(dòng)虛擬機(jī)失敗,無(wú)法識(shí)別 VM 參數(shù),因此是啟動(dòng)參數(shù)有問(wèn)題。
解決方法:此問(wèn)題是canal啟動(dòng)參數(shù)有問(wèn)題,修改啟動(dòng)參數(shù)即可解決。

打開(kāi)canal啟動(dòng)文件:canal\bin\startup.bat
set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m 
修改為
set JAVA_MEM_OPTS= -Xms128m -Xmx512m 

啟動(dòng)canal查看運(yùn)行情況:


image.png

1.4.2 客戶端連接canal報(bào)錯(cuò) Debugger failed to attach: timeout during handshake

[destination = example , address = /127.0.0.1:3306 , EventParser] 
ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address /127.0.0.1:3306 has an error, retrying.
caused by com.alibaba.otter.canal.parse.exception.CanalParseException: 
java.io.IOException: connect /127.0.0.1:3306 failure

查找多數(shù)都是是Java遠(yuǎn)程調(diào)試參數(shù)配置問(wèn)題等等;后來(lái)查看canal日志文件,找到問(wèn)題根本,是canal連接MySQL數(shù)據(jù)失敗導(dǎo)致;
解決方法:修改canal用戶權(quán)限;

查看user數(shù)據(jù)庫(kù)中canal用戶信息,重新配置用戶權(quán)限,并更新數(shù)據(jù);
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

還有一種情況是用戶密碼方式有問(wèn)題;myql8.0版本的密碼加密方式為caching_sha2_password,所以修改為mysql_native_password 就行并重新更新密碼。這也是我認(rèn)為有很大可能的另一種情況;值得注意的是,我只測(cè)試了修改了用戶權(quán)限,就解決問(wèn)題,修改用戶密碼方式,沒(méi)有測(cè)試,無(wú)法知曉是否解決

方法:
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal'; #更新一下用戶密碼

FLUSH PRIVILEGES; #刷新權(quán)限

第二章 Canal 客戶端搭建——Go語(yǔ)言庫(kù) canal-go

canal-go 庫(kù)開(kāi)源地址為:canal-go
你可以直接下載 canal-go 庫(kù),使用項(xiàng)目中已經(jīng)有的例子 canal-go/samples/main.go
也可創(chuàng)建新的項(xiàng)目,導(dǎo)入 canal-go 庫(kù),以下就是第二個(gè)方式。步驟如下:

1、創(chuàng)建項(xiàng)目 dbcanal

2、go mod init

3、創(chuàng)建 main.go 腳本

4、將以下代碼復(fù)制進(jìn)去(代碼在文章結(jié)尾。)

5、go mod tidy 下載依賴

6、go run main.go 執(zhí)行程序查看結(jié)果

image.png

7、更改數(shù)據(jù)庫(kù)中的數(shù)據(jù),再查看結(jié)果

MySQL 語(yǔ)句自己按照自己的數(shù)據(jù)庫(kù)進(jìn)行修改,增刪改語(yǔ)句都可以進(jìn)行監(jiān)聽(tīng)查看。
INSERT INTO test.meter_base_protocol 
(name,age)
VALUES 
("lilei",18);
image.png

image.png

到此,所有關(guān)于 Go 操作阿里開(kāi)源的框架 Canal 搭建完成,后續(xù)就是按照項(xiàng)目所需要求進(jìn)行代碼開(kāi)發(fā)了, 祝大家好運(yùn)。
注意:代碼

package main

import (
    "fmt"
    "github.com/golang/protobuf/proto"
    "github.com/withlin/canal-go/client"
    pbe "github.com/withlin/canal-go/protocol/entry"
    "log"
    "os"
    "time"
)

func main() {

    // 192.168.199.17 替換成你的canal server的地址
    // example 替換成-e canal.destinations=example 你自己定義的名字
    //  該字段名字在 canal\conf\example\meta.dat 文件中,NewSimpleCanalConnector函數(shù)參數(shù)配置,也在文件中
    /**
      NewSimpleCanalConnector 參數(shù)說(shuō)明
        client.NewSimpleCanalConnector("Canal服務(wù)端地址", "Canal服務(wù)端端口", "Canal服務(wù)端用戶名", "Canal服務(wù)端密碼", "Canal服務(wù)端destination", 60000, 60*60*1000)
        Canal服務(wù)端地址:canal服務(wù)搭建地址IP
        Canal服務(wù)端端口:canal\conf\canal.properties文件中
        Canal服務(wù)端用戶名、密碼:canal\conf\example\instance.properties 文件中
        Canal服務(wù)端destination :canal\conf\example\meta.dat 文件中
    */
    connector := client.NewSimpleCanalConnector("127.0.0.1", 11111,
        "canal", "canal", "example",
        60000, 60*60*1000)
    err := connector.Connect()
    if err != nil {
        log.Println(err)
        os.Exit(1)
    }

    // https://github.com/alibaba/canal/wiki/AdminGuide
    //mysql 數(shù)據(jù)解析關(guān)注的表,Perl正則表達(dá)式.
    //
    //多個(gè)正則之間以逗號(hào)(,)分隔,轉(zhuǎn)義符需要雙斜杠(\\)
    //
    //常見(jiàn)例子:
    //
    //  1.  所有表:.*   or  .*\\..*
    //  2.  canal schema下所有表: canal\\..*
    //  3.  canal下的以canal打頭的表:canal\\.canal.*
    //  4.  canal schema下的一張表:canal\\.test1
    //  5.  多個(gè)規(guī)則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號(hào)分隔)

    err = connector.Subscribe(".*\\..*")
    if err != nil {
        log.Println(err)
        os.Exit(1)
    }

    for {

        message, err := connector.Get(100, nil, nil)
        if err != nil {
            log.Println(err)
            os.Exit(1)
        }
        batchId := message.Id
        if batchId == -1 || len(message.Entries) <= 0 {
            time.Sleep(300 * time.Millisecond)
            fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "===沒(méi)有數(shù)據(jù)了===")
            continue
        }

        printEntry(message.Entries)

    }
}

func printEntry(entrys []pbe.Entry) {

    for _, entry := range entrys {
        if entry.GetEntryType() == pbe.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pbe.EntryType_TRANSACTIONEND {
            continue
        }
        rowChange := new(pbe.RowChange)

        err := proto.Unmarshal(entry.GetStoreValue(), rowChange)
        checkError(err)
        if rowChange != nil {
            eventType := rowChange.GetEventType()
            header := entry.GetHeader()
            fmt.Println(fmt.Sprintf("================> binlog[%s : %d],name[%s,%s], eventType: %s", header.GetLogfileName(), header.GetLogfileOffset(), header.GetSchemaName(), header.GetTableName(), header.GetEventType()))

            for _, rowData := range rowChange.GetRowDatas() {
                if eventType == pbe.EventType_DELETE {
                    printColumn(rowData.GetBeforeColumns())
                } else if eventType == pbe.EventType_INSERT {
                    printColumn(rowData.GetAfterColumns())
                } else {
                    fmt.Println("-------> before")
                    printColumn(rowData.GetBeforeColumns())
                    fmt.Println("-------> after")
                    printColumn(rowData.GetAfterColumns())
                }
            }
        }
    }
}

func printColumn(columns []*pbe.Column) {
    for _, col := range columns {
        fmt.Println(fmt.Sprintf("%s : %s  update= %t", col.GetName(), col.GetValue(), col.GetUpdated()))
    }
}

func checkError(err error) {
    if err != nil {
        fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
        os.Exit(1)
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容