Redis-Shake【二】 Sync功能實現(xiàn)簡介

Redis-Shake【一】簡要介紹
Redis-Shake【二】 Sync功能實現(xiàn)簡介

上一節(jié)簡要介紹了一下Redis-Shake的使用,接下來詳細說一下Redis-Shake的sync模式。

sync模式的基本原理是讓Redis-Shake模擬成一個redis slave,直接對源redis實例執(zhí)行sync/psync命令,該模式主要包含兩部分:全量同步(full)和增量同步(increment)。


Redis-Shake sync模式原理圖.png

sync模式的入口函數(shù),遍歷所有的AddressList,創(chuàng)建SyncNode對象,使用多線程的方式批量執(zhí)行dbSyncer的sync方法,開始同步邏輯

// 主要代碼在redis-shake/sync.go文件中,入口函數(shù)是Main()
func (cmd *CmdSync) Main() {
    type syncNode struct {
        id             int
        source         string
        sourcePassword string
        target         []string
        targetPassword string
    }

    // source redis number
    total := utils.GetTotalLink()
    syncChan := make(chan syncNode, total)
    cmd.dbSyncers = make([]*dbSyncer, total)
        // 遍歷SourceAddress ,每一個SourceAddress對應一個syncNode對象,
    for i, source := range conf.Options.SourceAddressList {
        var target []string
        if conf.Options.TargetType == conf.RedisTypeCluster {
            target = conf.Options.TargetAddressList
        } else {
            // round-robin pick
            pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddressList))
            target = []string{conf.Options.TargetAddressList[pick]}
        }

        nd := syncNode{
            id:             i,
            source:         source,
            sourcePassword: conf.Options.SourcePasswordRaw,
            target:         target,
            targetPassword: conf.Options.TargetPasswordRaw,
        }
        syncChan <- nd
    }

    var wg sync.WaitGroup
    wg.Add(len(conf.Options.SourceAddressList))
    // SourceRdbParallel用來設置多個數(shù)據(jù)源時,同時最多可以處理多少個Redis數(shù)據(jù)源
    for i := 0; i < int(conf.Options.SourceRdbParallel); i++ {
        go func() {
            for {
                nd, ok := <-syncChan
                if !ok {
                    break
                }

                ds := NewDbSyncer(nd.id, nd.source, nd.sourcePassword, nd.target, nd.targetPassword,
                    conf.Options.HttpProfile+i)
                cmd.dbSyncers[nd.id] = ds
                log.Infof("routine[%v] starts syncing data from %v to %v with http[%v]",
                    ds.id, ds.source, ds.target, ds.httpProfilePort)
                // run in routine
                go ds.sync()

                // wait full sync done 全量階段處理完時會close waitFull這個channel
                <-ds.waitFull

                wg.Done()
            }
        }()
    }

    wg.Wait()
    close(syncChan)

    // never quit because increment syncing is still running
    select {}
}

dbSyncer.sync 對源Redis執(zhí)行Sync/Psync命令,并依次執(zhí)行全量和增量同步

func (ds *dbSyncer) sync() {
    var sockfile *os.File
    if len(conf.Options.SockFileName) != 0 {
        sockfile = utils.OpenReadWriteFile(conf.Options.SockFileName)
        defer sockfile.Close()
    }
        
    // base.Status用于標示sync所處的階段,waitfull full incr
    base.Status = "waitfull"
    var input io.ReadCloser
    var nsize int64
    //執(zhí)行sync/psync命令,獲取連接
    if conf.Options.Psync {
        input, nsize = ds.sendPSyncCmd(ds.source, conf.Options.SourceAuthType, ds.sourcePassword, conf.Options.SourceTLSEnable)
    } else {
        input, nsize = ds.sendSyncCmd(ds.source, conf.Options.SourceAuthType, ds.sourcePassword, conf.Options.SourceTLSEnable)
    }
    defer input.Close()

    ...

    reader := bufio.NewReaderSize(input, utils.ReaderBufferSize)

    // sync rdb 全量同步階段
    base.Status = "full"
    ds.syncRDBFile(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword, nsize, conf.Options.TargetTLSEnable)

    // sync increment 增量同步階段
    base.Status = "incr"
    close(ds.waitFull)
    ds.syncCommand(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword, conf.Options.TargetTLSEnable)
}

全量同步階段支持并發(fā)寫入目標Redis,通過對目標Redis執(zhí)行restore命令來實現(xiàn)key數(shù)據(jù)的寫入

func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type, passwd string, nsize int64, tlsEnable bool) {
    // pipe是從源Redis接收、解析RDB文件的管道
    pipe := utils.NewRDBLoader(reader, &ds.rbytes, base.RDBPipeSize)
    wait := make(chan struct{})
    // 全量階段寫入目標Redis的操作是可以并發(fā)執(zhí)行的,可以通過Parallel設置并發(fā)數(shù)
    go func() {
        // 全部寫入Redis成功之后通過wait channel
        defer close(wait)
        var wg sync.WaitGroup
        wg.Add(conf.Options.Parallel)
        for i := 0; i < conf.Options.Parallel; i++ {
            go func() {
                defer wg.Done()
                c := utils.OpenRedisConn(target, auth_type, passwd, conf.Options.TargetType == conf.RedisTypeCluster,
                    tlsEnable)
                defer c.Close()
                var lastdb uint32 = 0
                // 獲取源redis解析到數(shù)據(jù)
                for e := range pipe {
                    //執(zhí)行過濾DB的邏輯,對應配置文件中的FilterDBBlacklist、FilterDBWhitelist
                    if filter.FilterDB(int(e.DB)) {
                        // db filter
                        ds.ignore.Incr()
                    } else {
                        ds.nentry.Incr()

                        ...
                        // 執(zhí)行過濾Key邏輯,對應FilterKeyBlacklist、FilterKeyWhitelist配置
                        if filter.FilterKey(string(e.Key)) == true {
                            // 1. judge if not pass filter key
                            ds.ignore.Incr()
                            continue
                        } else {
                            slot := int(utils.KeyToSlot(string(e.Key)))
                            if filter.FilterSlot(slot) == true {
                                // 2. judge if not pass filter slot
                                ds.ignore.Incr()
                                continue
                            }
                        }

                        log.Debugf("dbSyncer[%v] start restoring key[%s] with value length[%v]", ds.id, e.Key, len(e.Value))
                        // 對目標Redis執(zhí)行Restore命令把當前key寫入到目標redis
                        utils.RestoreRdbEntry(c, e)
                        log.Debugf("dbSyncer[%v] restore key[%s] ok", ds.id, e.Key)
                    }
                }
            }()
        }

        wg.Wait()
    }()

    var stat *syncerStat
        
    for done := false; !done; {
        select {
        case <-wait:
            done = true
        case <-time.After(time.Second):
        }
        stat = ds.Stat()
        var b bytes.Buffer
        // fmt.Fprintf(&b, "dbSyncer[%v] total=%s - %12d [%3d%%]  entry=%-12d",
        fmt.Fprintf(&b, "dbSyncer[%v] total = %s - %12s [%3d%%]  entry=%-12d",
            ds.id, utils.GetMetric(nsize), utils.GetMetric(stat.rbytes), 100*stat.rbytes/nsize, stat.nentry)
        if stat.ignore != 0 {
            fmt.Fprintf(&b, "  ignore=%-12d", stat.ignore)
        }
        log.Info(b.String())
        metric.GetMetric(ds.id).SetFullSyncProgress(ds.id, uint64(100*stat.rbytes/nsize))
    }
    log.Infof("dbSyncer[%v] sync rdb done", ds.id)
}

增量寫入階段

func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type, passwd string, tlsEnable bool) {
    readeTimeout := time.Duration(10) * time.Minute
    writeTimeout := time.Duration(10) * time.Minute
    isCluster := conf.Options.TargetType == conf.RedisTypeCluster
// 打開目標Redis的連接
    c := utils.OpenRedisConnWithTimeout(target, auth_type, passwd, readeTimeout, writeTimeout, isCluster, tlsEnable)
    defer c.Close()

    ds.sendBuf = make(chan cmdDetail, conf.Options.SenderCount)
    ds.delayChannel = make(chan *delayNode, conf.Options.SenderDelayChannelSize)
    var sendId, recvId, sendMarkId atomic2.Int64 // sendMarkId is also used as mark the sendId in sender routine
  // 開啟一個協(xié)程,定期從Source中獲取當前redis-shake模擬的slave的SlaveOffset值
    go func() {
        ...
        srcConn := utils.OpenRedisConnWithTimeout([]string{ds.source}, conf.Options.SourceAuthType, ds.sourcePassword,
            readeTimeout, writeTimeout, false, conf.Options.SourceTLSEnable)
        ticker := time.NewTicker(10 * time.Second)
        for range ticker.C {
            offset, err := utils.GetFakeSlaveOffset(srcConn)
            if err != nil {
                ...
            } else {
                // 更新sourceOffset metric數(shù)據(jù)
                if ds.sourceOffset, err = strconv.ParseInt(offset, 10, 64); err != nil {
                    log.Errorf("dbSyncer[%v] Event:GetFakeSlaveOffsetFail\tId:%s\tError:%s",
                        ds.id, conf.Options.Id, err.Error())
                }
            }
        }
    }()
    // 開啟一個協(xié)程,用于處理目標Redis返回的寫入成功的命令,主要用于統(tǒng)計successCount、successCountTotal、delay
    go func() {
        var node *delayNode
        for {
            reply, err := c.Receive()

            recvId.Incr()
            id := recvId.Get() // receive id

            // print debug log of receive reply
            log.Debugf("receive reply-id[%v]: [%v], error:[%v]", id, reply, err)

            if conf.Options.Metric == false {
                continue
            }

            if err == nil {
                metric.GetMetric(ds.id).AddSuccessCmdCount(ds.id, 1)
            } else {
                metric.GetMetric(ds.id).AddFailCmdCount(ds.id, 1)
                if utils.CheckHandleNetError(err) {
                    log.Panicf("dbSyncer[%v] Event:NetErrorWhileReceive\tId:%s\tError:%s",
                        ds.id, conf.Options.Id, err.Error())
                } else {
                    log.Panicf("dbSyncer[%v] Event:ErrorReply\tId:%s\tCommand: [unknown]\tError: %s",
                        ds.id, conf.Options.Id, err.Error())
                }
            }

            if node == nil {
                // non-blocking read from delay channel
                select {
                case node = <-ds.delayChannel:
                default:
                    // it's ok, channel is empty
                }
            }

            if node != nil {
                if node.id == id {
                    metric.GetMetric(ds.id).AddDelay(uint64(time.Now().Sub(node.t).Nanoseconds()) / 1000000) // ms
                    node = nil
                } else if node.id < id {
                    log.Panicf("dbSyncer[%v] receive id invalid: node-id[%v] < receive-id[%v]",
                        ds.id, node.id, id)
                }
            }
        }
    }()
//開啟一個攜程,用于接收源Redis的增量Redis命令
    go func() {
        var (
            lastdb        int32 = 0
            bypass              = false
            isselect            = false
            scmd          string
            argv, newArgv [][]byte
            err           error
            reject        bool
        )

        decoder := redis.NewDecoder(reader)

        log.Infof("dbSyncer[%v] Event:IncrSyncStart\tId:%s\t", ds.id, conf.Options.Id)

        for {
            ignorecmd := false
            isselect = false
            resp := redis.MustDecodeOpt(decoder)

            if scmd, argv, err = redis.ParseArgs(resp); err != nil {
                log.PanicErrorf(err, "dbSyncer[%v] parse command arguments failed", ds.id)
            } else {
                metric.GetMetric(ds.id).AddPullCmdCount(ds.id, 1)

                ...

                if scmd != "ping" {
                    if strings.EqualFold(scmd, "select") {
                        if len(argv) != 1 {
                            log.Panicf("dbSyncer[%v] select command len(args) = %d", ds.id, len(argv))
                        }
                        s := string(argv[0])
                        n, err := strconv.Atoi(s)
                        if err != nil {
                            log.PanicErrorf(err, "dbSyncer[%v] parse db = %s failed", ds.id, s)
                        }
                        bypass = filter.FilterDB(n)
                        isselect = true
                    } else if filter.FilterCommands(scmd) {
                        ignorecmd = true
                    }
                    if bypass || ignorecmd {
                        ds.nbypass.Incr()
                        // ds.SyncStat.BypassCmdCount.Incr()
                        metric.GetMetric(ds.id).AddBypassCmdCount(ds.id, 1)
                        log.Debugf("dbSyncer[%v] ignore command[%v]", ds.id, scmd)
                        continue
                    }
                }
                // 過濾出寫命令,同時基于FilterKeyWhitelist、FilterKeyBlacklist對key做過濾
                newArgv, reject = filter.HandleFilterKeyWithCommand(scmd, argv)
                if bypass || ignorecmd || reject {
                    ds.nbypass.Incr()
                    metric.GetMetric(ds.id).AddBypassCmdCount(ds.id, 1)
                    log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd)
                    continue
                }
            }

            ...
            // 把過濾、解析之后的數(shù)據(jù)放入到sendBuf channel中
            ds.sendBuf <- cmdDetail{Cmd: scmd, Args: newArgv}
        }
    }()
// 開啟一個協(xié)程把sendBuf channel中的命令寫入到目標Redis中
    go func() {
        var noFlushCount uint
        var cachedSize uint64

        for item := range ds.sendBuf {
            length := len(item.Cmd)
            data := make([]interface{}, len(item.Args))
            for i := range item.Args {
                data[i] = item.Args[i]
                length += len(item.Args[i])
            }
            // 對目標Redis執(zhí)行命令
            err := c.Send(item.Cmd, data...)
            ...
        }
    }()
 // 阻塞當前協(xié)程。定時打印統(tǒng)計數(shù)據(jù)
    for lstat := ds.Stat(); ; {
        time.Sleep(time.Second)
        nstat := ds.Stat()
        var b bytes.Buffer
        fmt.Fprintf(&b, "dbSyncer[%v] sync: ", ds.id)
        fmt.Fprintf(&b, " +forwardCommands=%-6d", nstat.forward-lstat.forward)
        fmt.Fprintf(&b, " +filterCommands=%-6d", nstat.nbypass-lstat.nbypass)
        fmt.Fprintf(&b, " +writeBytes=%d", nstat.wbytes-lstat.wbytes)
        log.Info(b.String())
        lstat = nstat
    }
}
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 一、Redis主從復制 主從復制:主節(jié)點負責寫數(shù)據(jù),從節(jié)點負責讀數(shù)據(jù),主節(jié)點定期把數(shù)據(jù)同步到從節(jié)點保證數(shù)據(jù)的一致性...
    愛情小傻蛋閱讀 1,088評論 0 0
  • 1.1 資料 ,最好的入門小冊子,可以先于一切文檔之前看,免費。 作者Antirez的博客,Antirez維護的R...
    JefferyLcm閱讀 17,293評論 1 51
  • 本篇就一下方面展開分析 如何使用主從復制? 主從復制的原理(重點是全量復制和部分復制、以及心跳機制) 實際應用中需...
    lucode閱讀 1,059評論 0 5
  • 1.主從同步原理像MySQL一樣,Redis是支持主從同步的,而且也支持一主多從以及多級從結(jié)構。主從結(jié)構,一是為了...
    碼出高效閱讀 2,251評論 0 1
  • 今天,爸爸帶我們?nèi)ビ斡尽?游完泳,就是吃飯時間啦。 而且,是我最喜歡的水煮魚。 因為游泳,感覺特別餓。 這個時候,...
    HongVi閱讀 1,006評論 11 13

友情鏈接更多精彩內(nèi)容