MySQL數(shù)據(jù)實時增量同步到RocketMQ

一、go-mysql-transfer

go-mysql-transfer是使用Go語言實現(xiàn)的MySQL數(shù)據(jù)庫實時增量同步工具。能夠?qū)崟r監(jiān)聽MySQL二進(jìn)制日志(binlog)的變動,將變更內(nèi)容形成指定格式的消息,發(fā)送到接收端。在數(shù)據(jù)庫和接收端之間形成一個高性能、低延遲的增量數(shù)據(jù)(Binlog)同步管道, 具有如下特點:

go-mysql-transfer具有如下特點:

1、不依賴其它組件,一鍵部署

2、集成多種接收端,如:Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ,不需要再編寫客戶端,開箱即用

3、內(nèi)置豐富的數(shù)據(jù)解析、消息生成規(guī)則;支持Lua腳本,以處理更復(fù)雜的數(shù)據(jù)邏輯

4、支持監(jiān)控告警,集成Prometheus客戶端

5、高可用集群部署

6、數(shù)據(jù)同步失敗重試

7、全量數(shù)據(jù)初始化

詳情及安裝說明 請參見: MySQL Binlog 增量同步工具go-mysql-transfer實現(xiàn)詳解

項目開源地址:
gitee (速度更快) :go-mysql-transfer
github:go-mysql-transfer

如果此工具對你有幫助,請Star支持下

如果此工具對你有幫助,請Star支持下

二、配置

# app.yml

target: rocketmq #目標(biāo)類型
#rocketmq連接配置
rocketmq_name_servers: 127.0.0.1:9876 #rocketmq命名服務(wù)地址,多個用逗號分隔
#rocketmq_group_name: transfer_test_group #rocketmq group name,默認(rèn)為空
#rocketmq_instance_name: transfer_test_group_ins #rocketmq instance name,默認(rèn)為空
#rocketmq_access_key: RocketMQ #訪問控制 accessKey,默認(rèn)為空
#rocketmq_secret_key: 12345678 #訪問控制 secretKey,默認(rèn)為空

三、數(shù)據(jù)轉(zhuǎn)換規(guī)則

相關(guān)配置如下:

rule:
  -
    schema: eseap #數(shù)據(jù)庫名稱
    table: t_user #表名稱
    #order_by_column: id #排序字段,存量數(shù)據(jù)同步時不能為空
    #column_lower_case:false #列名稱轉(zhuǎn)為小寫,默認(rèn)為false
    #column_upper_case:false#列名稱轉(zhuǎn)為大寫,默認(rèn)為false
    column_underscore_to_camel: true #列名稱下劃線轉(zhuǎn)駝峰,默認(rèn)為false
    # 包含的列,多值逗號分隔,如:id,name,age,area_id  為空時表示包含全部列
    #include_columns: ID,USER_NAME,PASSWORD
    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id  默認(rèn)為空
    #column_mappings: CARD_NO=sfz    #列名稱映射,多個映射關(guān)系用逗號分隔,如:USER_NAME=account 表示將字段名USER_NAME映射為account
    #default_column_values: source=binlog,area_name=合肥  #默認(rèn)的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
    #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認(rèn)yyyy-MM-dd
    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認(rèn)yyyy-MM-dd HH:mm:ss
    value_encoder: json  #值編碼,支持json、kv-commas、v-commas;默認(rèn)為json
    #value_formatter: ${ID}|${USER_NAME} #值格式化表達(dá)式,如:${ID}|${USER_NAME},${ID}表示字段id的值、${USER_NAME}表示字段name的值
    
    #rocketmq相關(guān)
    rocketmq_topic: transfer_test_topic #rocketmq topic,可以為空,默認(rèn)使用表名稱

示例一

RocketMQ中創(chuàng)建名稱為transfer_test_topic的topic,topic名稱一定要和rule中rocketmq_topic配置的一致

t_user表,數(shù)據(jù)如下:

同步到RocketMQ的數(shù)據(jù)如下:

insert事件消息
update事件消息
delete事件消息

示例二

t_user表、topic 同實例一

使用如下配置:

rule:
  -
    schema: eseap #數(shù)據(jù)庫名稱
    table: t_user #表名稱
    #order_by_column: id #排序字段,存量數(shù)據(jù)同步時不能為空
    column_lower_case: true #列名稱轉(zhuǎn)為小寫,默認(rèn)為false
    #column_upper_case:false#列名稱轉(zhuǎn)為大寫,默認(rèn)為false
    #column_underscore_to_camel: true #列名稱下劃線轉(zhuǎn)駝峰,默認(rèn)為false
    # 包含的列,多值逗號分隔,如:id,name,age,area_id  為空時表示包含全部列
    #include_columns: ID,USER_NAME,PASSWORD
    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id  默認(rèn)為空
    column_mappings: USER_NAME=account    #列名稱映射,多個映射關(guān)系用逗號分隔,如:USER_NAME=account 表示將字段名USER_NAME映射為account
    default_column_values: area_name=合肥  #默認(rèn)的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
    #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認(rèn)yyyy-MM-dd
    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認(rèn)yyyy-MM-dd HH:mm:ss
    value_encoder: json  #值編碼,支持json、kv-commas、v-commas;默認(rèn)為json
    #value_formatter: ${ID}|${USER_NAME} #值格式化表達(dá)式,如:${ID}|${USER_NAME},${ID}表示字段id的值、${USER_NAME}表示字段name的值

    #rocketmq相關(guān)
    rocketmq_topic: transfer_test_topic #rocketmq topic,可以為空,默認(rèn)使用表名稱

column_mappings配置項表示對列名稱進(jìn)行重新映射

同步到RocketMQ的數(shù)據(jù)如下:

insert事件消息

示例三

t_user表、topic 同實例一

使用如下配置:

rule:
  -
    schema: eseap #數(shù)據(jù)庫名稱
    table: t_user #表名稱
    #order_by_column: id #排序字段,存量數(shù)據(jù)同步時不能為空
    column_lower_case: true #列名稱轉(zhuǎn)為小寫,默認(rèn)為false
    #column_upper_case:false#列名稱轉(zhuǎn)為大寫,默認(rèn)為false
    #column_underscore_to_camel: true #列名稱下劃線轉(zhuǎn)駝峰,默認(rèn)為false
    # 包含的列,多值逗號分隔,如:id,name,age,area_id  為空時表示包含全部列
    #include_columns: ID,USER_NAME,PASSWORD
    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id  默認(rèn)為空
    column_mappings: USER_NAME=account    #列名稱映射,多個映射關(guān)系用逗號分隔,如:USER_NAME=account 表示將字段名USER_NAME映射為account
    default_column_values: area_name=合肥  #默認(rèn)的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
    #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認(rèn)yyyy-MM-dd
    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認(rèn)yyyy-MM-dd HH:mm:ss
    value_encoder: v-commas  #值編碼,支持json、kv-commas、v-commas;默認(rèn)為json
    #value_formatter: ${ID}|${USER_NAME} #值格式化表達(dá)式,如:${ID}|${USER_NAME},${ID}表示字段id的值、${USER_NAME}表示字段name的值

    #rocketmq相關(guān)
    rocketmq_topic: transfer_test_topic #rocketmq topic,可以為空,默認(rèn)使用表名稱

column_mappings配置項表示對列名稱進(jìn)行重新映射

value_encoder配置項表示消息編碼方式

同步到RocketMQ的數(shù)據(jù)如下:

insert事件消息

四、Lua腳本

使用Lua腳本可以實現(xiàn)更復(fù)雜的數(shù)據(jù)處理邏輯,go-mysql-transfer支持Lua5.1語法。

示例一

RocketMQ中創(chuàng)建名稱為transfer_test_topic的topic

t_user表,數(shù)據(jù)如下:

引入Lua腳本:

rule:
  -
    schema: eseap #數(shù)據(jù)庫名稱
    table: t_user #表名稱
    lua_file_path: lua/t_user_rocket.lua   #lua腳本文件

Lua腳本:

local json = require("json")   -- 加載json模塊
local ops = require("mqOps") --加載mq操作模塊

local row = ops.rawRow()  --當(dāng)前數(shù)據(jù)庫的一行數(shù)據(jù),table類型,key為列名稱
local action = ops.rawAction()  --當(dāng)前數(shù)據(jù)庫事件,包括:insert、updare、delete

local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值

local result = {}  -- 定義一個table,作為結(jié)果
result["id"] = id
result["action"] = action

if action == "delete" -- 刪除事件
then
    local val = json.encode(result) -- 將result轉(zhuǎn)為json
    ops.SEND("transfer_test_topic",val) -- 發(fā)送消息,第一個參數(shù)為topic(string類型),第二個參數(shù)為消息內(nèi)容
else 
    result["userName"] = userName
    result["password"] = password
    result["createTime"] = createTime
    result["source"] = "binlog" -- 數(shù)據(jù)來源
    local val = json.encode(result) -- 將result轉(zhuǎn)為json
    ops.SEND("transfer_test_topic",val) -- 發(fā)送消息,第一個參數(shù)為topic(string類型),第二個參數(shù)為消息內(nèi)容
end 

同步到RocketMQ的數(shù)據(jù)如下:

insert事件消息
update事件消息
delete事件消息

示例二

t_user表、topic 同實例一

使用如下腳本:

local ops = require("mqOps") --加載mq操作模塊

local row = ops.rawRow()  --當(dāng)前數(shù)據(jù)庫的一行數(shù)據(jù),table類型,key為列名稱
local action = ops.rawAction()  --當(dāng)前數(shù)據(jù)庫事件,包括:insert、updare、delete

local userName = row["USER_NAME"] --獲取USER_NAME列的值

if action == "insert" then -- 只監(jiān)聽添加事件
    local str = string.format("恭喜您:%s  注冊成功",userName)  
    ops.SEND("transfer_test_topic",str) -- 發(fā)送消息,第一個參數(shù)為topic(string類型),第二個參數(shù)為消息內(nèi)容
end 

同步到RocketMQ的數(shù)據(jù)如下:

mqOps模塊提供的方法如下:

  1. SEND: 發(fā)送操作,如:ops.SEND(topic,result)。參數(shù)topic為字符串類型;參數(shù)result為要發(fā)送的消息
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容