MySQL 配置
[mysqld]
log-bin=mysql-bin
server-id=1
binlog-format=ROW
使用 go-mysql 讀取 binlog
go get github.com/siddontang/go-mysql
代碼實現(xiàn) demo
package main
import (
"log"
"github.com/siddontang/go-mysql/canal"
"github.com/go-redis/redis/v8"
"context"
)
// Redis 配置
var ctx = context.Background()
var redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis 地址
Password: "", // Redis 密碼
DB: 0, // 默認DB
})
func main() {
cfg := canal.NewDefaultConfig()
cfg.Addr = "127.0.0.1:3306" // MySQL 地址
cfg.User = "root"
cfg.Password = "yourpassword"
cfg.Flavor = "mysql" // 數(shù)據(jù)庫類型,默認為 MySQL
c, err := canal.NewCanal(cfg)
if err != nil {
log.Fatal(err)
}
// 注冊回調(diào)函數(shù),監(jiān)聽 binlog 事件
c.SetEventHandler(&handler{})
// 開始同步 binlog
c.Run()
}
type handler struct{}
// OnRow 是處理數(shù)據(jù)變動的回調(diào)函數(shù)
func (h *handler) OnRow(e *canal.RowsEvent) error {
// 解析 binlog 事件并處理
for _, row := range e.Rows {
log.Printf("table: %s, action: %s, data: %v", e.Table.Name, e.Action, row)
// 在這里進行同步到 Redis
// 例如:
if e.Action == "insert" || e.Action == "update" {
redisKey := e.Table.Name + ":id:" + string(row[0].([]byte)) // 主鍵作為 Redis key
redisClient.Set(ctx, redisKey, row, 0).Err()
} else if e.Action == "delete" {
redisKey := e.Table.Name + ":id:" + string(row[0].([]byte))
redisClient.Del(ctx, redisKey).Err()
}
}
return nil
}
func (h *handler) String() string {
return "MyEventHandler"
}
實現(xiàn)邏輯
- 連接 MySQL 并讀取 binlog:通過 go-mysql 庫監(jiān)聽 MySQL 的 binlog 事件(插入、更新、刪除等)。
- 處理 binlog 事件:通過 OnRow 回調(diào)函數(shù)處理每一個變動的行數(shù)據(jù)。
- 更新 Redis:根據(jù)不同的事件類型(插入、更新、刪除),更新或刪除 Redis 中對應(yīng)的鍵值對。