由于公司業(yè)務(wù)需要: 需將 PostgreSQL 數(shù)據(jù)庫中的9百萬條數(shù)據(jù) 遷移到 MySQL.
現(xiàn)將遷移腳本的開發(fā)過程記錄如下:
安裝驅(qū)動(dòng)庫
go get -u gorm.io/gorm
go get -u gorm.io/driver/postgres
go get -u gorm.io/driver/mysql
初始化數(shù)據(jù)庫連接
import (
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
var MySQLClientBI *gorm.DB
var PostgreSQLClient *gorm.DB
func init() {
dsnPg := "host=localhost user=gorm password=gorm dbname=gorm port=9920 sslmode=disable TimeZone=Asia/Shanghai"
pgDB, err := gorm.Open(postgres.Open(dsnPg), &gorm.Config{})
if err != nil {
fmt.Println(err)
}
PostgreSQLClient = pgDB
dsn := "user:pass@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
fmt.Println(err)
}
MySQLClientBI = db
}
批量插入
GORM 的 CreateInBatches 方法可以用于批量插入數(shù)據(jù),這確實(shí)有助于提高大量數(shù)據(jù)的插入效率。但是,有幾點(diǎn)需要注意:
- 內(nèi)存使用:即使使用 CreateInBatches,如果你首先從一個(gè)數(shù)據(jù)庫中提取900萬條記錄并嘗試將其存儲(chǔ)在內(nèi)存中,那么可能會(huì)出現(xiàn)內(nèi)存使用過高的問題。你應(yīng)該在查詢數(shù)據(jù)時(shí)考慮分頁或限制提取的記錄數(shù)。
- 批次大小:為了達(dá)到最佳性能和避免潛在的問題,您需要確定合適的批次大小。例如,一次插入1000或5000條記錄,而不是所有900萬條記錄。
- MySQL的限制:MySQL有一個(gè)max_allowed_packet參數(shù),它定義了單個(gè)客戶端發(fā)送到MySQL服務(wù)器的數(shù)據(jù)包的最大大小。批量插入時(shí)可能會(huì)觸及此限制,導(dǎo)致錯(cuò)誤。
基于以上考慮,建議以下方法:
- 分批從 PostgreSQL 中讀取數(shù)據(jù),例如每次讀取5000條。
- 使用 CreateInBatches 將每批數(shù)據(jù)插入到 MySQL 中。
這樣可以確保不會(huì)因?yàn)橐淮涡蕴幚泶罅繑?shù)據(jù)而耗盡內(nèi)存,并且可以在必要時(shí)輕松調(diào)整批次大小。
簡(jiǎn)化代碼如下:
const batchSize = 5000
var offset = 0
for {
// 從pg查,一次查 5000
var cfgEventParamsValue []EventParamsValue
result := pgDB.Table("data_cfg.cfg_event_params_value").Limit(batchSize).Offset(offset).Find(&cfgEventParamsValue)
if result.Error != nil {
log.Fatalf("Error fetching from PostgreSQL: %v", result.Error)
}
if len(cfgEventParamsValue) == 0 {
break
}
// 入mysql,一次入 5000
err := mysqlDB.Table("cfg_event_params_value").CreateInBatches(cfgEventParamsValue, batchSize).Error
if err != nil {
log.Fatalf("Error inserting batch into MySQL: %v", err)
}
offset += batchSize
}
開啟協(xié)程
使用 Go 的協(xié)程可以極大地提高數(shù)據(jù)遷移的效率,特別是當(dāng)涉及到網(wǎng)絡(luò)IO或數(shù)據(jù)庫IO操作時(shí)。但請(qǐng)注意,太多的并發(fā)可能會(huì)對(duì)數(shù)據(jù)庫造成壓力,導(dǎo)致性能下降或其他問題,所以需要平衡。
代碼如下:
package main
import (
"fmt"
"log"
"sync"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
type EventParamsValue struct {
ID uint `gorm:"primaryKey"`
// 根據(jù)實(shí)際字段和類型調(diào)整以下內(nèi)容
FieldName string
}
var pgDB *gorm.DB
var mysqlDB *gorm.DB
var wg sync.WaitGroup // 使用 sync.WaitGroup 來確保主程序等待所有的協(xié)程完成
const batchSize = 5000
func migrateBatch(offset int) {
defer wg.Done()
// 從pg查,一次查 5000
var cfgEventParamsValue []EventParamsValue
result := pgDB.Table("data_cfg.cfg_event_params_value").Limit(batchSize).Offset(offset).Find(&cfgEventParamsValue)
if result.Error != nil {
log.Printf("Error fetching from PostgreSQL: %v", result.Error)
return
}
if len(cfgEventParamsValue) == 0 {
return
}
// 入mysql,一次入 5000
err := mysqlDB.Table("cfg_event_params_value").CreateInBatches(cfgEventParamsValue, batchSize).Error
if err != nil {
log.Printf("Error inserting batch into MySQL: %v", err)
}
}
func main() {
// PostgreSQL 連接 (省略了代碼...)
// MySQL 連接 (省略了代碼...)
// 獲取總記錄數(shù)
var totalRecords int64
pgDB.Table("data_cfg.cfg_event_params_value").Count(&totalRecords)
for offset := 0; offset < int(totalRecords); offset += batchSize {
wg.Add(1)
go migrateBatch(offset) // 使用協(xié)程執(zhí)行數(shù)據(jù)遷移
}
wg.Wait() // 等待所有協(xié)程完成
fmt.Println("Migration complete!")
}
協(xié)程非常快,所以可能會(huì)很快地打開很多協(xié)程。如果發(fā)現(xiàn)數(shù)據(jù)庫響應(yīng)緩慢或有其他問題,可能需要引入一個(gè)限制并發(fā)數(shù)量的機(jī)制,例如使用通道 (channel) 或第三方庫,如 semaphore。
在執(zhí)行此程序之前,務(wù)必先在非生產(chǎn)環(huán)境中測(cè)試,確保其行為如預(yù)期,并確保它不會(huì)對(duì)您的數(shù)據(jù)庫產(chǎn)生不良影響。
上述這段代碼根據(jù)數(shù)據(jù)總量和每批處理的數(shù)據(jù)量(batchSize)來決定開啟多少個(gè)協(xié)程。
這里是決定開啟協(xié)程數(shù)量的關(guān)鍵部分:
for offset := 0; offset < int(totalRecords); offset += batchSize {
wg.Add(1)
go migrateBatch(offset) // 使用協(xié)程執(zhí)行數(shù)據(jù)遷移
}
每次迭代中,我們都會(huì)開啟一個(gè)新的協(xié)程。迭代的次數(shù)由總記錄數(shù)(totalRecords)和每批的大小(batchSize)決定。
計(jì)算開啟的協(xié)程數(shù)量的公式為:
numGoroutines = ceil(totalRecords / batchSize)
其中 ceil 是向上取整函數(shù)。例如,如果您有 9,000,000 條記錄,并且每批大小是 5,000,那么您會(huì)開啟 1,800 個(gè)協(xié)程。
需要注意的是,盡管協(xié)程在 Go 中非常輕量,但同時(shí)開啟太多協(xié)程可能會(huì)導(dǎo)致數(shù)據(jù)庫連接的問題,尤其是當(dāng)數(shù)據(jù)庫的連接池大小有限時(shí)。您可能需要考慮增加數(shù)據(jù)庫的連接數(shù)限制或使用信號(hào)量來限制同時(shí)運(yùn)行的協(xié)程數(shù)量,以保護(hù)數(shù)據(jù)庫不被過度壓迫。
增加信號(hào)量模式
為了手動(dòng)控制協(xié)程數(shù)量,可以使用Go中的信號(hào)量模式,利用chan struct{}來達(dá)到這個(gè)目的。
package main
import (
"fmt"
"log"
"sync"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
type EventParamsValue struct {
ID uint `gorm:"primaryKey"`
// 根據(jù)實(shí)際字段和類型調(diào)整以下內(nèi)容
FieldName string
}
var pgDB *gorm.DB
var mysqlDB *gorm.DB
var wg sync.WaitGroup
const batchSize = 5000
const maxGoroutines = 10 // 手動(dòng)設(shè)置最大并發(fā)協(xié)程數(shù)量
var sem = make(chan struct{}, maxGoroutines)
func migrateBatch(offset int) {
defer wg.Done()
defer func() { <-sem }() // 釋放一個(gè)協(xié)程位
var cfgEventParamsValue []EventParamsValue
result := pgDB.Table("data_cfg.cfg_event_params_value").Limit(batchSize).Offset(offset).Find(&cfgEventParamsValue)
if result.Error != nil {
log.Printf("Error fetching from PostgreSQL: %v", result.Error)
return
}
// 如果該批次沒有數(shù)據(jù),直接返回
if len(cfgEventParamsValue) == 0 {
return
}
log.Printf("Migrating records from offset %d to %d", offset, offset+batchSize-1) // 記錄每批次遷移數(shù)據(jù)的起止
err := mysqlDB.Table("cfg_event_params_value").CreateInBatches(cfgEventParamsValue, batchSize).Error
if err != nil {
log.Printf("Error inserting batch into MySQL from offset %d to %d: %v", offset, offset+batchSize-1, err)
} else {
log.Printf("Successfully migrated records from offset %d to %d", offset, offset+batchSize-1)
}
}
func main() {
// PostgreSQL 連接 (省略了代碼...)
// MySQL 連接 (省略了代碼...)
// 獲取總記錄數(shù)
var totalRecords int64
pgDB.Table("data_cfg.cfg_event_params_value").Count(&totalRecords)
for offset := 0; offset < int(totalRecords); offset += batchSize {
sem <- struct{}{} // 獲取一個(gè)協(xié)程位
wg.Add(1)
go migrateBatch(offset) // 使用協(xié)程執(zhí)行數(shù)據(jù)遷移
}
wg.Wait() // 等待所有協(xié)程完成
fmt.Println("Migration complete!")
}
在上面的代碼中:
- sem 是一個(gè)有限容量的通道,用于控制并發(fā)的協(xié)程數(shù)量。
- sem <- struct{}{} 嘗試向通道發(fā)送一個(gè)空結(jié)構(gòu),如果通道已滿,這一行將會(huì)阻塞,直到有其他協(xié)程完成并釋放一個(gè)位置。
- defer func() { <-sem }() 保證當(dāng)協(xié)程結(jié)束時(shí),從sem通道中移除一個(gè)空結(jié)構(gòu),從而釋放一個(gè)協(xié)程位置。
這樣,一次只有maxGoroutines數(shù)量的協(xié)程能夠并發(fā)運(yùn)行。您可以根據(jù)需要調(diào)整maxGoroutines的值來控制并發(fā)數(shù)量。