一、go-mysql-transfer
go-mysql-transfer是使用Go語言實現(xiàn)的MySQL數(shù)據(jù)庫實時增量同步工具。能夠?qū)崟r監(jiān)聽MySQL二進(jìn)制日志(binlog)的變動,將變更內(nèi)容形成指定格式的消息,發(fā)送到接收端。在數(shù)據(jù)庫和接收端之間形成一個高性能、低延遲的增量數(shù)據(jù)(Binlog)同步管道, 具有如下特點:
go-mysql-transfer具有如下特點:
1、不依賴其它組件,一鍵部署
2、集成多種接收端,如:Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ,不需要再編寫客戶端,開箱即用
3、內(nèi)置豐富的數(shù)據(jù)解析、消息生成規(guī)則;支持Lua腳本,以處理更復(fù)雜的數(shù)據(jù)邏輯
4、支持監(jiān)控告警,集成Prometheus客戶端
5、高可用集群部署
6、數(shù)據(jù)同步失敗重試
7、全量數(shù)據(jù)初始化
詳情及安裝說明 請參見: MySQL Binlog 增量同步工具go-mysql-transfer實現(xiàn)詳解
項目開源地址:
gitee (速度更快) :go-mysql-transfer
github:go-mysql-transfer
如果此工具對你有幫助,請Star支持下
如果此工具對你有幫助,請Star支持下
二、配置
# app.yml
target: rocketmq #目標(biāo)類型
#rocketmq連接配置
rocketmq_name_servers: 127.0.0.1:9876 #rocketmq命名服務(wù)地址,多個用逗號分隔
#rocketmq_group_name: transfer_test_group #rocketmq group name,默認(rèn)為空
#rocketmq_instance_name: transfer_test_group_ins #rocketmq instance name,默認(rèn)為空
#rocketmq_access_key: RocketMQ #訪問控制 accessKey,默認(rèn)為空
#rocketmq_secret_key: 12345678 #訪問控制 secretKey,默認(rèn)為空
三、數(shù)據(jù)轉(zhuǎn)換規(guī)則
相關(guān)配置如下:
rule:
-
schema: eseap #數(shù)據(jù)庫名稱
table: t_user #表名稱
#order_by_column: id #排序字段,存量數(shù)據(jù)同步時不能為空
#column_lower_case:false #列名稱轉(zhuǎn)為小寫,默認(rèn)為false
#column_upper_case:false#列名稱轉(zhuǎn)為大寫,默認(rèn)為false
column_underscore_to_camel: true #列名稱下劃線轉(zhuǎn)駝峰,默認(rèn)為false
# 包含的列,多值逗號分隔,如:id,name,age,area_id 為空時表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id 默認(rèn)為空
#column_mappings: CARD_NO=sfz #列名稱映射,多個映射關(guān)系用逗號分隔,如:USER_NAME=account 表示將字段名USER_NAME映射為account
#default_column_values: source=binlog,area_name=合肥 #默認(rèn)的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認(rèn)yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認(rèn)yyyy-MM-dd HH:mm:ss
value_encoder: json #值編碼,支持json、kv-commas、v-commas;默認(rèn)為json
#value_formatter: ${ID}|${USER_NAME} #值格式化表達(dá)式,如:${ID}|${USER_NAME},${ID}表示字段id的值、${USER_NAME}表示字段name的值
#rocketmq相關(guān)
rocketmq_topic: transfer_test_topic #rocketmq topic,可以為空,默認(rèn)使用表名稱
示例一
RocketMQ中創(chuàng)建名稱為transfer_test_topic的topic,topic名稱一定要和rule中rocketmq_topic配置的一致
t_user表,數(shù)據(jù)如下:

同步到RocketMQ的數(shù)據(jù)如下:



示例二
t_user表、topic 同實例一
使用如下配置:
rule:
-
schema: eseap #數(shù)據(jù)庫名稱
table: t_user #表名稱
#order_by_column: id #排序字段,存量數(shù)據(jù)同步時不能為空
column_lower_case: true #列名稱轉(zhuǎn)為小寫,默認(rèn)為false
#column_upper_case:false#列名稱轉(zhuǎn)為大寫,默認(rèn)為false
#column_underscore_to_camel: true #列名稱下劃線轉(zhuǎn)駝峰,默認(rèn)為false
# 包含的列,多值逗號分隔,如:id,name,age,area_id 為空時表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id 默認(rèn)為空
column_mappings: USER_NAME=account #列名稱映射,多個映射關(guān)系用逗號分隔,如:USER_NAME=account 表示將字段名USER_NAME映射為account
default_column_values: area_name=合肥 #默認(rèn)的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認(rèn)yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認(rèn)yyyy-MM-dd HH:mm:ss
value_encoder: json #值編碼,支持json、kv-commas、v-commas;默認(rèn)為json
#value_formatter: ${ID}|${USER_NAME} #值格式化表達(dá)式,如:${ID}|${USER_NAME},${ID}表示字段id的值、${USER_NAME}表示字段name的值
#rocketmq相關(guān)
rocketmq_topic: transfer_test_topic #rocketmq topic,可以為空,默認(rèn)使用表名稱
column_mappings配置項表示對列名稱進(jìn)行重新映射
同步到RocketMQ的數(shù)據(jù)如下:

示例三
t_user表、topic 同實例一
使用如下配置:
rule:
-
schema: eseap #數(shù)據(jù)庫名稱
table: t_user #表名稱
#order_by_column: id #排序字段,存量數(shù)據(jù)同步時不能為空
column_lower_case: true #列名稱轉(zhuǎn)為小寫,默認(rèn)為false
#column_upper_case:false#列名稱轉(zhuǎn)為大寫,默認(rèn)為false
#column_underscore_to_camel: true #列名稱下劃線轉(zhuǎn)駝峰,默認(rèn)為false
# 包含的列,多值逗號分隔,如:id,name,age,area_id 為空時表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id 默認(rèn)為空
column_mappings: USER_NAME=account #列名稱映射,多個映射關(guān)系用逗號分隔,如:USER_NAME=account 表示將字段名USER_NAME映射為account
default_column_values: area_name=合肥 #默認(rèn)的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認(rèn)yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認(rèn)yyyy-MM-dd HH:mm:ss
value_encoder: v-commas #值編碼,支持json、kv-commas、v-commas;默認(rèn)為json
#value_formatter: ${ID}|${USER_NAME} #值格式化表達(dá)式,如:${ID}|${USER_NAME},${ID}表示字段id的值、${USER_NAME}表示字段name的值
#rocketmq相關(guān)
rocketmq_topic: transfer_test_topic #rocketmq topic,可以為空,默認(rèn)使用表名稱
column_mappings配置項表示對列名稱進(jìn)行重新映射
value_encoder配置項表示消息編碼方式
同步到RocketMQ的數(shù)據(jù)如下:

四、Lua腳本
使用Lua腳本可以實現(xiàn)更復(fù)雜的數(shù)據(jù)處理邏輯,go-mysql-transfer支持Lua5.1語法。
示例一
RocketMQ中創(chuàng)建名稱為transfer_test_topic的topic
t_user表,數(shù)據(jù)如下:

引入Lua腳本:
rule:
-
schema: eseap #數(shù)據(jù)庫名稱
table: t_user #表名稱
lua_file_path: lua/t_user_rocket.lua #lua腳本文件
Lua腳本:
local json = require("json") -- 加載json模塊
local ops = require("mqOps") --加載mq操作模塊
local row = ops.rawRow() --當(dāng)前數(shù)據(jù)庫的一行數(shù)據(jù),table類型,key為列名稱
local action = ops.rawAction() --當(dāng)前數(shù)據(jù)庫事件,包括:insert、updare、delete
local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local result = {} -- 定義一個table,作為結(jié)果
result["id"] = id
result["action"] = action
if action == "delete" -- 刪除事件
then
local val = json.encode(result) -- 將result轉(zhuǎn)為json
ops.SEND("transfer_test_topic",val) -- 發(fā)送消息,第一個參數(shù)為topic(string類型),第二個參數(shù)為消息內(nèi)容
else
result["userName"] = userName
result["password"] = password
result["createTime"] = createTime
result["source"] = "binlog" -- 數(shù)據(jù)來源
local val = json.encode(result) -- 將result轉(zhuǎn)為json
ops.SEND("transfer_test_topic",val) -- 發(fā)送消息,第一個參數(shù)為topic(string類型),第二個參數(shù)為消息內(nèi)容
end
同步到RocketMQ的數(shù)據(jù)如下:



示例二
t_user表、topic 同實例一
使用如下腳本:
local ops = require("mqOps") --加載mq操作模塊
local row = ops.rawRow() --當(dāng)前數(shù)據(jù)庫的一行數(shù)據(jù),table類型,key為列名稱
local action = ops.rawAction() --當(dāng)前數(shù)據(jù)庫事件,包括:insert、updare、delete
local userName = row["USER_NAME"] --獲取USER_NAME列的值
if action == "insert" then -- 只監(jiān)聽添加事件
local str = string.format("恭喜您:%s 注冊成功",userName)
ops.SEND("transfer_test_topic",str) -- 發(fā)送消息,第一個參數(shù)為topic(string類型),第二個參數(shù)為消息內(nèi)容
end
同步到RocketMQ的數(shù)據(jù)如下:

mqOps模塊提供的方法如下:
- SEND: 發(fā)送操作,如:ops.SEND(topic,result)。參數(shù)topic為字符串類型;參數(shù)result為要發(fā)送的消息