【開源實(shí)戰(zhàn)】阿里開源MySQL中間件Canal快速入門

前言

Canal用途很廣,并且上手非常簡單,小伙伴們?cè)谄綍r(shí)完成公司的需求時(shí),很有可能會(huì)用到。

舉個(gè)例子:

公司目前有多個(gè)開發(fā)人員正在開發(fā)一套服務(wù),為了縮短調(diào)用延時(shí),對(duì)部分接口數(shù)據(jù)加入了緩存。一旦這些數(shù)據(jù)在數(shù)據(jù)庫中進(jìn)行了更新操作,緩存就成了舊數(shù)據(jù),必須及時(shí)刪除。

刪除緩存的代碼理所當(dāng)然可以寫在更新數(shù)據(jù)的業(yè)務(wù)代碼里,但有時(shí)候者寫操作是在別的項(xiàng)目代碼里,你可能無權(quán)修改,亦或者別人不愿你在他代碼里寫這種業(yè)務(wù)之外的代碼。(畢竟多人協(xié)作中間會(huì)產(chǎn)生各種配合問題)。又或者就是單純的刪除緩存的操作失敗了,緩存依然是舊數(shù)據(jù)。

正如上篇文章緩存與數(shù)據(jù)庫雙寫一致性實(shí)戰(zhàn)里面所說,我們可以將緩存更新操作完全獨(dú)立出來,形成一套單獨(dú)的系統(tǒng)。Canal正是這么一個(gè)很好的幫手。 能幫我們實(shí)現(xiàn)像下圖這樣的系統(tǒng):

image

本篇文章的要點(diǎn)如下:

  • Canal是什么
  • Canal工作原理
  • 數(shù)據(jù)庫的讀寫分離
  • 數(shù)據(jù)庫主從同步
  • 數(shù)據(jù)庫主從同步一致性問題
    • 異步復(fù)制
    • 全同步復(fù)制
    • 半同步復(fù)制
  • Canal實(shí)戰(zhàn)
    • 開啟MySQL Binlog
    • 配置Canal服務(wù)
    • 運(yùn)行Canal服務(wù)
    • Java客戶端Demo

阿里開源MySQL中間件Canal快速入門

Canal是什么

眾所周知,阿里是國內(nèi)比較早地大量使用MySQL的互聯(lián)網(wǎng)企業(yè)(去IOE化:去掉IBM的小型機(jī)、Oracle數(shù)據(jù)庫、EMC存儲(chǔ)設(shè)備,代之以自己在開源軟件基礎(chǔ)上開發(fā)的系統(tǒng)),并且基于阿里巴巴/淘寶的業(yè)務(wù),從 2010 年開始,業(yè)務(wù)逐步嘗試數(shù)據(jù)庫日志解析獲取增量變更進(jìn)行同步,由此衍生出了大量的數(shù)據(jù)庫增量訂閱和消費(fèi)業(yè)務(wù)。

Canal應(yīng)運(yùn)而生,它通過偽裝成數(shù)據(jù)庫的從庫,讀取主庫發(fā)來的binlog,用來實(shí)現(xiàn)數(shù)據(jù)庫增量訂閱和消費(fèi)業(yè)務(wù)需求

Canal用途:

  • 數(shù)據(jù)庫鏡像
  • 數(shù)據(jù)庫實(shí)時(shí)備份
  • 索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引、倒排索引等)
  • 業(yè)務(wù) cache 緩存刷新
  • 帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理

開源項(xiàng)目地址:

https://github.com/alibaba/canal

在這里就不再摘抄項(xiàng)目簡介了,提煉幾個(gè)值得注意的點(diǎn):

  • canal 使用 client-server 模式,數(shù)據(jù)傳輸協(xié)議使用 protobuf 3.0(很多RPC框架也在使用例如gRPC)
  • 當(dāng)前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
  • canal 作為 MySQL binlog 增量獲取和解析工具,可將變更記錄投遞到 MQ 系統(tǒng)中,比如 Kafka/RocketMQ。

Canal工作原理

Canal實(shí)際是將自己偽裝成數(shù)據(jù)庫的從庫,來讀取Binlog。我們先補(bǔ)習(xí)下關(guān)于MySQL數(shù)據(jù)庫主從數(shù)據(jù)庫的基礎(chǔ)知識(shí),這樣就能更快的理解Canal。

數(shù)據(jù)庫的讀寫分離

為了應(yīng)對(duì)高并發(fā)場(chǎng)景,MySQL支持把一臺(tái)數(shù)據(jù)庫主機(jī)分為單獨(dú)的一臺(tái)寫主庫(主要負(fù)責(zé)寫操作),而把讀的數(shù)據(jù)庫壓力分配給讀的從庫,而且讀從庫可以變?yōu)槎嗯_(tái),這就是讀寫分離的典型場(chǎng)景。

image

數(shù)據(jù)庫主從同步

實(shí)現(xiàn)數(shù)據(jù)庫的讀寫分離,是通過數(shù)據(jù)庫主從同步,讓從數(shù)據(jù)庫監(jiān)聽主數(shù)據(jù)庫Binlog實(shí)現(xiàn)的。大體流程如下圖:

MySQL master 將數(shù)據(jù)變更寫入二進(jìn)制日志( binary log, 其中記錄叫做二進(jìn)制日志事件binary log events,可以通過 show binlog events 進(jìn)行查看)

MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)

MySQL slave 重放 relay log 中事件,將數(shù)據(jù)變更反映它自己的數(shù)據(jù)

image

詳細(xì)主從同步原理在這里就不展開細(xì)說了。

可以看到,這種架構(gòu)下會(huì)有一個(gè)問題,數(shù)據(jù)庫主從同步會(huì)存在延遲,那么就會(huì)有短暫的時(shí)間,主從數(shù)據(jù)庫的數(shù)據(jù)是不一致的。

這種不一致大多數(shù)情況下非常短暫,很多時(shí)候我們可以忽略他。

但一旦要求數(shù)據(jù)一致,就會(huì)引申出如何解決這個(gè)問題的思考。

數(shù)據(jù)庫主從同步一致性問題

我們通常使用MySQL主從復(fù)制來解決MySQL的單點(diǎn)故障問題,其通過邏輯復(fù)制的方式把主庫的變更同步到從庫,主備之間無法保證嚴(yán)格一致的模式,

于是,MySQL的主從復(fù)制帶來了主從“數(shù)據(jù)一致性”的問題。MySQL的復(fù)制分為:異步復(fù)制、半同步復(fù)制、全同步復(fù)制。

異步復(fù)制

MySQL默認(rèn)的復(fù)制即是異步復(fù)制,主庫在執(zhí)行完客戶端提交的事務(wù)后會(huì)立即將結(jié)果返給給客戶端,并不關(guān)心從庫是否已經(jīng)接收并處理,這樣就會(huì)有一個(gè)問題,主如果crash掉了,此時(shí)主上已經(jīng)提交的事務(wù)可能并沒有傳到從庫上,如果此時(shí),強(qiáng)行將從提升為主,可能導(dǎo)致新主上的數(shù)據(jù)不完整。

主庫將事務(wù) Binlog 事件寫入到 Binlog 文件中,此時(shí)主庫只會(huì)通知一下 Dump 線程發(fā)送這些新的 Binlog,然后主庫就會(huì)繼續(xù)處理提交操作,而此時(shí)不會(huì)保證這些 Binlog 傳到任何一個(gè)從庫節(jié)點(diǎn)上。

全同步復(fù)制

指當(dāng)主庫執(zhí)行完一個(gè)事務(wù),所有的從庫都執(zhí)行了該事務(wù)才返回給客戶端。因?yàn)樾枰却袕膸靾?zhí)行完該事務(wù)才能返回,所以全同步復(fù)制的性能必然會(huì)收到嚴(yán)重的影響。

當(dāng)主庫提交事務(wù)之后,所有的從庫節(jié)點(diǎn)必須收到、APPLY并且提交這些事務(wù),然后主庫線程才能繼續(xù)做后續(xù)操作。但缺點(diǎn)是,主庫完成一個(gè)事務(wù)的時(shí)間會(huì)被拉長,性能降低。

半同步復(fù)制

是介于全同步復(fù)制與全異步復(fù)制之間的一種,主庫只需要等待至少一個(gè)從庫節(jié)點(diǎn)收到并且 Flush Binlog 到 Relay Log 文件即可,主庫不需要等待所有從庫給主庫反饋。同時(shí),這里只是一個(gè)收到的反饋,而不是已經(jīng)完全完成并且提交的反饋,如此,節(jié)省了很多時(shí)間。

介于異步復(fù)制和全同步復(fù)制之間,主庫在執(zhí)行完客戶端提交的事務(wù)后不是立刻返回給客戶端,而是等待至少一個(gè)從庫接收到并寫到relay log中才返回給客戶端。相對(duì)于異步復(fù)制,半同步復(fù)制提高了數(shù)據(jù)的安全性,同時(shí)它也造成了一定程度的延遲,這個(gè)延遲最少是一個(gè)TCP/IP往返的時(shí)間。所以,半同步復(fù)制最好在低延時(shí)的網(wǎng)絡(luò)中使用。

image

事實(shí)上,半同步復(fù)制并不是嚴(yán)格意義上的半同步復(fù)制,MySQL半同步復(fù)制架構(gòu)中,主庫在等待備庫ack時(shí)候,如果超時(shí)會(huì)退化為異步后,也可能導(dǎo)致“數(shù)據(jù)不一致”。

當(dāng)半同步復(fù)制發(fā)生超時(shí)時(shí)(由rpl_semi_sync_master_timeout參數(shù)控制,單位是毫秒,默認(rèn)為10000,即10s),會(huì)暫時(shí)關(guān)閉半同步復(fù)制,轉(zhuǎn)而使用異步復(fù)制。當(dāng)master dump線程發(fā)送完一個(gè)事務(wù)的所有事件之后,如果在rpl_semi_sync_master_timeout內(nèi),收到了從庫的響應(yīng),則主從又重新恢復(fù)為半同步復(fù)制。

關(guān)于半同步復(fù)制的詳細(xì)原理分析可以看這篇引申文章,在此不展開:

https://www.cnblogs.com/ivictor/p/5735580.html

回到Canal的工作原理

回顧了數(shù)據(jù)庫從庫的數(shù)據(jù)同步原理,理解Canal十分簡單,直接引用官網(wǎng)原文:

  • canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議
  • MySQL master 收到 dump 請(qǐng)求,開始推送 binary log 給 slave (即 canal )
  • canal 解析 binary log 對(duì)象(原始為 byte 流)

Canal實(shí)戰(zhàn)

開啟MySQL Binlog

這個(gè)步驟我在之前的文章教你使用Binlog日志恢復(fù)誤刪的MySQL數(shù)據(jù)已經(jīng)提到過,這里完善了一下,再貼一下,方便大家。

首先進(jìn)入數(shù)據(jù)庫控制臺(tái),運(yùn)行指令:

mysql> show variables like'log_bin%';
+---------------------------------+-------+
| Variable_name                   | Value |
+---------------------------------+-------+
| log_bin                         | OFF   |
| log_bin_basename                |       |
| log_bin_index                   |       |
| log_bin_trust_function_creators | OFF   |
| log_bin_use_v1_row_events       | OFF   |
+---------------------------------+-------+
5 rows in set (0.00 sec)

可以看到我們的binlog是關(guān)閉的,都是OFF。接下來我們需要修改Mysql配置文件,執(zhí)行命令:

sudo vi /etc/mysql/mysql.conf.d/mysqld.cnf

在文件末尾添加:

log-bin=/var/lib/mysql/mysql-bin
binlog-format=ROW

保存文件,重啟mysql服務(wù):

sudo service mysql restart

重啟完成后,查看下mysql的狀態(tài):

systemctl status mysql.service

這時(shí),如果你的mysql版本在5.7或更高版本,就會(huì)報(bào)錯(cuò):

Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.190791Z 0 [Warning] Changed limits: max_open_files: 1024 (requested 5000)
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.190839Z 0 [Warning] Changed limits: table_open_cache: 431 (requested 2000)
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.359713Z 0 [Warning] TIMESTAMP with implicit DEFAULT value is deprecated. Please use --explicit_defaults_for_timestamp server option (se
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.361395Z 0 [Note] /usr/sbin/mysqld (mysqld 5.7.28-0ubuntu0.16.04.2-log) starting as process 5930 ...
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363017Z 0 [ERROR] You have enabled the binary log, but you haven't provided the mandatory server-id. Please refer to the proper server
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363747Z 0 [ERROR] Aborting
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363922Z 0 [Note] Binlog end
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.364108Z 0 [Note] /usr/sbin/mysqld: Shutdown complete
Jan 06 15:49:58 VM-0-11-ubuntu systemd[1]: mysql.service: Main process exited, code=exited, status=1/FAILURE

You have enabled the binary log, but you haven't provided the mandatory server-id. Please refer to the proper server

之前我們的配置,對(duì)于5.7以下版本應(yīng)該是可以的。但對(duì)于高版本,我們需要指定server-id。

我們給這個(gè)MySQL指定為2(只要不與其他庫id重復(fù)):

server-id=2

創(chuàng)建數(shù)據(jù)庫Canal使用賬號(hào)

mysql> select user, host from user;
+------------------+-----------+
| user             | host      |
+------------------+-----------+
| root             | %         |
| debian-sys-maint | localhost |
| mysql.session    | localhost |
| mysql.sys        | localhost |
| root             | localhost |
+------------------+-----------+
5 rows in set
CREATE USER canal IDENTIFIED BY 'xxxx';  (填寫密碼)  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';  
FLUSH PRIVILEGES;  

show grants for 'canal' 

配置Canal服務(wù)

去Github下載最近的Canal穩(wěn)定版本包:

解壓縮:

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal

配置文件設(shè)置:

主要有兩個(gè)文件配置,一個(gè)是conf/canal.properties一個(gè)是conf/example/instance.properties。

為了快速運(yùn)行Demo,只修改conf/example/instance.properties里的數(shù)據(jù)庫連接賬號(hào)密碼即可

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=xxxxxxx
canal.instance.connectionCharset = UTF-8

運(yùn)行Canal服務(wù)

請(qǐng)先確保機(jī)器上有JDK,接著運(yùn)行Canal啟動(dòng)腳本:

sh bin/startup.sh

下圖即成功運(yùn)行:

image

Java客戶端代碼

我在秒殺系統(tǒng)系列文章的代碼倉庫里(miaosha-job)編寫了如下客戶端代碼

倉庫源碼地址:https://github.com/qqxx6661/miaosha

package job;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class CanalClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(CanalClient.class);

    public static void main(String[] args) {

        // 第一步:與canal進(jìn)行連接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
                "example", "", "");
        connector.connect();

        // 第二步:開啟訂閱
        connector.subscribe();

        // 第三步:循環(huán)訂閱
        while (true) {
            try {
                // 每次讀取 1000 條
                Message message = connector.getWithoutAck(1000);

                long batchID = message.getId();

                int size = message.getEntries().size();

                if (batchID == -1 || size == 0) {
                    LOGGER.info("當(dāng)前暫時(shí)沒有數(shù)據(jù),休眠1秒");
                    Thread.sleep(1000);
                } else {
                    LOGGER.info("-------------------------- 有數(shù)據(jù)啦 -----------------------");
                    printEntry(message.getEntries());
                }

                connector.ack(batchID);

            } catch (Exception e) {
                LOGGER.error("處理出錯(cuò)");
            } finally {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 獲取每條打印的記錄
     */
    public static void printEntry(List<Entry> entrys) {

        for (Entry entry : entrys) {

            // 第一步:拆解entry 實(shí)體
            Header header = entry.getHeader();
            EntryType entryType = entry.getEntryType();

            // 第二步: 如果當(dāng)前是RowData,那就是我需要的數(shù)據(jù)
            if (entryType == EntryType.ROWDATA) {

                String tableName = header.getTableName();
                String schemaName = header.getSchemaName();

                RowChange rowChange = null;

                try {
                    rowChange = RowChange.parseFrom(entry.getStoreValue());
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }

                EventType eventType = rowChange.getEventType();

                LOGGER.info(String.format("當(dāng)前正在操作表 %s.%s, 執(zhí)行操作= %s", schemaName, tableName, eventType));

                // 如果是‘查詢’ 或者 是 ‘DDL’ 操作,那么sql直接打出來
                if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
                    LOGGER.info("執(zhí)行了查詢語句:[{}]", rowChange.getSql());
                    return;
                }

                // 第三步:追蹤到 columns 級(jí)別
                rowChange.getRowDatasList().forEach((rowData) -> {

                    // 獲取更新之前的column情況
                    List<Column> beforeColumns = rowData.getBeforeColumnsList();

                    // 獲取更新之后的 column 情況
                    List<Column> afterColumns = rowData.getAfterColumnsList();

                    // 當(dāng)前執(zhí)行的是 刪除操作
                    if (eventType == EventType.DELETE) {
                        printColumn(beforeColumns);
                    }

                    // 當(dāng)前執(zhí)行的是 插入操作
                    if (eventType == EventType.INSERT) {
                        printColumn(afterColumns);
                    }

                    // 當(dāng)前執(zhí)行的是 更新操作
                    if (eventType == EventType.UPDATE) {
                        printColumn(afterColumns);
                        // 進(jìn)行刪除緩存操作
                        deleteCache(afterColumns, tableName, schemaName);
                    }


                });
            }
        }
    }

    /**
     * 每個(gè)row上面的每一個(gè)column 的更改情況
     * @param columns
     */
    public static void printColumn(List<Column> columns) {

        columns.forEach((column) -> {
            String columnName = column.getName();
            String columnValue = column.getValue();
            String columnType = column.getMysqlType();
            // 判斷 該字段是否更新
            boolean isUpdated = column.getUpdated();
            LOGGER.info(String.format("數(shù)據(jù)列:columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName, columnValue, columnType, isUpdated));
        });
    }

    /**
     * 秒殺下單接口刪除庫存緩存
     */
    public static void deleteCache(List<Column> columns, String tableName, String schemaName) {
        if ("stock".equals(tableName) && "m4a_miaosha".equals(schemaName)) {
            AtomicInteger id = new AtomicInteger();
            columns.forEach((column) -> {
                String columnName = column.getName();
                String columnValue = column.getValue();
                if ("id".equals(columnName)) {
                    id.set(Integer.parseInt(columnValue));
                }
            });
            // TODO: 刪除緩存
            LOGGER.info("Canal刪除stock表id:[{}] 的庫存緩存", id);

        }
    }
}

代碼中有詳細(xì)的注釋,就不做解釋了。

我們跑起代碼,緊接著我們?cè)跀?shù)據(jù)庫中進(jìn)行更改UPDATE操作,把法外狂徒張三改成張三1,然后再改回張三,見下圖。

image

Canal成功收到了兩條更新操作:

image

緊接著我們模擬一個(gè)刪除Cache緩存的業(yè)務(wù),在代碼中有:

/**
 * 秒殺下單接口刪除庫存緩存
 */
public static void deleteCache(List<Column> columns, String tableName, String schemaName) {
    if ("stock".equals(tableName) && "m4a_miaosha".equals(schemaName)) {
        AtomicInteger id = new AtomicInteger();
        columns.forEach((column) -> {
            String columnName = column.getName();
            String columnValue = column.getValue();
            if ("id".equals(columnName)) {
                id.set(Integer.parseInt(columnValue));
            }
        });
        // TODO: 刪除緩存
        LOGGER.info("Canal刪除stock表id:[{}] 的庫存緩存", id);

    }
}

在上面的代碼中,在收到m4a_miaosha.stock表的更新操作后,我們刷新庫存緩存。效果如下:

image
image

簡單的Canal使用就介紹到這里,剩下的發(fā)揮空間留給各位讀者大大們。

總結(jié)

本文總結(jié)了Canal的基本原理和簡單的使用。

總結(jié)如下幾點(diǎn):

  • Canal實(shí)際是將自己偽裝成數(shù)據(jù)庫的從庫,來讀取主數(shù)據(jù)庫發(fā)來的Binlog。
  • Canal用途很廣,比如數(shù)據(jù)庫實(shí)時(shí)備份、索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引、倒排索引等)、業(yè)務(wù) cache 緩存刷新。
  • Canal可以推送至非常多數(shù)據(jù)源,并支持推送到消息隊(duì)列,方便多語言使用。

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容