golang使用信號(hào)量模式并發(fā)安全的遷移千萬條數(shù)據(jù)

由于公司業(yè)務(wù)需要: 需將 PostgreSQL 數(shù)據(jù)庫中的9百萬條數(shù)據(jù) 遷移到 MySQL.

現(xiàn)將遷移腳本的開發(fā)過程記錄如下:

安裝驅(qū)動(dòng)庫

go get -u gorm.io/gorm
go get -u gorm.io/driver/postgres
go get -u gorm.io/driver/mysql

初始化數(shù)據(jù)庫連接

import (
  "gorm.io/driver/postgres"
  "gorm.io/gorm"
)

var MySQLClientBI *gorm.DB
var PostgreSQLClient *gorm.DB

func init() {
    dsnPg := "host=localhost user=gorm password=gorm dbname=gorm port=9920 sslmode=disable TimeZone=Asia/Shanghai"
    pgDB, err := gorm.Open(postgres.Open(dsnPg), &gorm.Config{})
    if err != nil {
        fmt.Println(err)
    }
    PostgreSQLClient = pgDB
    
    dsn := "user:pass@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local"
    db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})  
    if err != nil {
        fmt.Println(err)
    }
    MySQLClientBI = db
}

批量插入

GORM 的 CreateInBatches 方法可以用于批量插入數(shù)據(jù),這確實(shí)有助于提高大量數(shù)據(jù)的插入效率。但是,有幾點(diǎn)需要注意:

  1. 內(nèi)存使用:即使使用 CreateInBatches,如果你首先從一個(gè)數(shù)據(jù)庫中提取900萬條記錄并嘗試將其存儲(chǔ)在內(nèi)存中,那么可能會(huì)出現(xiàn)內(nèi)存使用過高的問題。你應(yīng)該在查詢數(shù)據(jù)時(shí)考慮分頁或限制提取的記錄數(shù)。
  2. 批次大小:為了達(dá)到最佳性能和避免潛在的問題,您需要確定合適的批次大小。例如,一次插入1000或5000條記錄,而不是所有900萬條記錄。
  3. MySQL的限制:MySQL有一個(gè)max_allowed_packet參數(shù),它定義了單個(gè)客戶端發(fā)送到MySQL服務(wù)器的數(shù)據(jù)包的最大大小。批量插入時(shí)可能會(huì)觸及此限制,導(dǎo)致錯(cuò)誤。

基于以上考慮,建議以下方法:

  1. 分批從 PostgreSQL 中讀取數(shù)據(jù),例如每次讀取5000條。
  2. 使用 CreateInBatches 將每批數(shù)據(jù)插入到 MySQL 中。

這樣可以確保不會(huì)因?yàn)橐淮涡蕴幚泶罅繑?shù)據(jù)而耗盡內(nèi)存,并且可以在必要時(shí)輕松調(diào)整批次大小。

簡(jiǎn)化代碼如下:

const batchSize = 5000
var offset = 0
for {
    // 從pg查,一次查 5000
    var cfgEventParamsValue []EventParamsValue
    result := pgDB.Table("data_cfg.cfg_event_params_value").Limit(batchSize).Offset(offset).Find(&cfgEventParamsValue)
    if result.Error != nil {
        log.Fatalf("Error fetching from PostgreSQL: %v", result.Error)
    }
    if len(cfgEventParamsValue) == 0 {
        break
    }
    // 入mysql,一次入 5000
    err := mysqlDB.Table("cfg_event_params_value").CreateInBatches(cfgEventParamsValue, batchSize).Error
    if err != nil {
        log.Fatalf("Error inserting batch into MySQL: %v", err)
    }
    offset += batchSize
}

開啟協(xié)程

使用 Go 的協(xié)程可以極大地提高數(shù)據(jù)遷移的效率,特別是當(dāng)涉及到網(wǎng)絡(luò)IO或數(shù)據(jù)庫IO操作時(shí)。但請(qǐng)注意,太多的并發(fā)可能會(huì)對(duì)數(shù)據(jù)庫造成壓力,導(dǎo)致性能下降或其他問題,所以需要平衡。

代碼如下:

package main

import (
    "fmt"
    "log"
    "sync"

    "gorm.io/driver/mysql"
    "gorm.io/driver/postgres"
    "gorm.io/gorm"
)

type EventParamsValue struct {
    ID    uint   `gorm:"primaryKey"`
    // 根據(jù)實(shí)際字段和類型調(diào)整以下內(nèi)容
    FieldName string
}

var pgDB *gorm.DB
var mysqlDB *gorm.DB
var wg sync.WaitGroup   // 使用 sync.WaitGroup 來確保主程序等待所有的協(xié)程完成

const batchSize = 5000

func migrateBatch(offset int) {
    defer wg.Done()

     // 從pg查,一次查 5000
    var cfgEventParamsValue []EventParamsValue
    result := pgDB.Table("data_cfg.cfg_event_params_value").Limit(batchSize).Offset(offset).Find(&cfgEventParamsValue)
    if result.Error != nil {
        log.Printf("Error fetching from PostgreSQL: %v", result.Error)
        return
    }

    if len(cfgEventParamsValue) == 0 {
        return
    }
    
    // 入mysql,一次入 5000
    err := mysqlDB.Table("cfg_event_params_value").CreateInBatches(cfgEventParamsValue, batchSize).Error
    if err != nil {
        log.Printf("Error inserting batch into MySQL: %v", err)
    }
}

func main() {
    // PostgreSQL 連接 (省略了代碼...)

    // MySQL 連接 (省略了代碼...)

    // 獲取總記錄數(shù)
    var totalRecords int64
    pgDB.Table("data_cfg.cfg_event_params_value").Count(&totalRecords)

    for offset := 0; offset < int(totalRecords); offset += batchSize {
        wg.Add(1)
        go migrateBatch(offset)  // 使用協(xié)程執(zhí)行數(shù)據(jù)遷移
    }

    wg.Wait() // 等待所有協(xié)程完成
    fmt.Println("Migration complete!")
}

協(xié)程非常快,所以可能會(huì)很快地打開很多協(xié)程。如果發(fā)現(xiàn)數(shù)據(jù)庫響應(yīng)緩慢或有其他問題,可能需要引入一個(gè)限制并發(fā)數(shù)量的機(jī)制,例如使用通道 (channel) 或第三方庫,如 semaphore

在執(zhí)行此程序之前,務(wù)必先在非生產(chǎn)環(huán)境中測(cè)試,確保其行為如預(yù)期,并確保它不會(huì)對(duì)您的數(shù)據(jù)庫產(chǎn)生不良影響。

上述這段代碼根據(jù)數(shù)據(jù)總量和每批處理的數(shù)據(jù)量(batchSize)來決定開啟多少個(gè)協(xié)程。

這里是決定開啟協(xié)程數(shù)量的關(guān)鍵部分:

for offset := 0; offset < int(totalRecords); offset += batchSize {
    wg.Add(1)
    go migrateBatch(offset)  // 使用協(xié)程執(zhí)行數(shù)據(jù)遷移
}

每次迭代中,我們都會(huì)開啟一個(gè)新的協(xié)程。迭代的次數(shù)由總記錄數(shù)(totalRecords)和每批的大小(batchSize)決定。

計(jì)算開啟的協(xié)程數(shù)量的公式為:

numGoroutines = ceil(totalRecords / batchSize)

其中 ceil 是向上取整函數(shù)。例如,如果您有 9,000,000 條記錄,并且每批大小是 5,000,那么您會(huì)開啟 1,800 個(gè)協(xié)程。

需要注意的是,盡管協(xié)程在 Go 中非常輕量,但同時(shí)開啟太多協(xié)程可能會(huì)導(dǎo)致數(shù)據(jù)庫連接的問題,尤其是當(dāng)數(shù)據(jù)庫的連接池大小有限時(shí)。您可能需要考慮增加數(shù)據(jù)庫的連接數(shù)限制或使用信號(hào)量來限制同時(shí)運(yùn)行的協(xié)程數(shù)量,以保護(hù)數(shù)據(jù)庫不被過度壓迫。

增加信號(hào)量模式

為了手動(dòng)控制協(xié)程數(shù)量,可以使用Go中的信號(hào)量模式,利用chan struct{}來達(dá)到這個(gè)目的。

package main

import (
    "fmt"
    "log"
    "sync"

    "gorm.io/driver/mysql"
    "gorm.io/driver/postgres"
    "gorm.io/gorm"
)

type EventParamsValue struct {
    ID    uint   `gorm:"primaryKey"`
    // 根據(jù)實(shí)際字段和類型調(diào)整以下內(nèi)容
    FieldName string
}

var pgDB *gorm.DB
var mysqlDB *gorm.DB
var wg sync.WaitGroup

const batchSize = 5000
const maxGoroutines = 10 // 手動(dòng)設(shè)置最大并發(fā)協(xié)程數(shù)量

var sem = make(chan struct{}, maxGoroutines)

func migrateBatch(offset int) {
    defer wg.Done()
    defer func() { <-sem }() // 釋放一個(gè)協(xié)程位

    var cfgEventParamsValue []EventParamsValue
    result := pgDB.Table("data_cfg.cfg_event_params_value").Limit(batchSize).Offset(offset).Find(&cfgEventParamsValue)
    if result.Error != nil {
        log.Printf("Error fetching from PostgreSQL: %v", result.Error)
        return
    }

    // 如果該批次沒有數(shù)據(jù),直接返回
    if len(cfgEventParamsValue) == 0 {
        return
    }

    log.Printf("Migrating records from offset %d to %d", offset, offset+batchSize-1) // 記錄每批次遷移數(shù)據(jù)的起止

    err := mysqlDB.Table("cfg_event_params_value").CreateInBatches(cfgEventParamsValue, batchSize).Error
    if err != nil {
        log.Printf("Error inserting batch into MySQL from offset %d to %d: %v", offset, offset+batchSize-1, err)
    } else {
        log.Printf("Successfully migrated records from offset %d to %d", offset, offset+batchSize-1)
    }
}

func main() {
    // PostgreSQL 連接 (省略了代碼...)

    // MySQL 連接 (省略了代碼...)

    // 獲取總記錄數(shù)
    var totalRecords int64
    pgDB.Table("data_cfg.cfg_event_params_value").Count(&totalRecords)

    for offset := 0; offset < int(totalRecords); offset += batchSize {
        sem <- struct{}{} // 獲取一個(gè)協(xié)程位
        wg.Add(1)
        go migrateBatch(offset)  // 使用協(xié)程執(zhí)行數(shù)據(jù)遷移
    }

    wg.Wait() // 等待所有協(xié)程完成
    fmt.Println("Migration complete!")
}

在上面的代碼中:

  • sem 是一個(gè)有限容量的通道,用于控制并發(fā)的協(xié)程數(shù)量。
  • sem <- struct{}{} 嘗試向通道發(fā)送一個(gè)空結(jié)構(gòu),如果通道已滿,這一行將會(huì)阻塞,直到有其他協(xié)程完成并釋放一個(gè)位置。
  • defer func() { <-sem }() 保證當(dāng)協(xié)程結(jié)束時(shí),從sem通道中移除一個(gè)空結(jié)構(gòu),從而釋放一個(gè)協(xié)程位置。

這樣,一次只有maxGoroutines數(shù)量的協(xié)程能夠并發(fā)運(yùn)行。您可以根據(jù)需要調(diào)整maxGoroutines的值來控制并發(fā)數(shù)量。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容