nsq數(shù)據(jù)持久化

diskqueue結(jié)構(gòu)體

type diskQueue struct {
    // 64bit atomic vars need to be first for proper alignment on 32bit platforms

    // run-time state (also persisted to disk)
    // 數(shù)據(jù)文件相關(guān)信息
    readPos      int64 // 記錄數(shù)據(jù)文件讀的位置
    writePos     int64 // 記錄數(shù)據(jù)文件寫的位置
    readFileNum  int64 // 當(dāng)前讀文件的編號(hào)
    writeFileNum int64 // 當(dāng)前寫文件的編號(hào)
    depth        int64 // 隊(duì)列中消息的數(shù)量

    sync.RWMutex

    // instantiation time metadata
    // 元數(shù)據(jù)相關(guān)
    name            string        // 元數(shù)據(jù)名稱
    dataPath        string        // 元數(shù)據(jù)的數(shù)據(jù)目錄
    maxBytesPerFile int64         // 每個(gè)文件的最大字節(jié)數(shù),默認(rèn)100M // currently this cannot change once created
    minMsgSize      int32         // 一條消息的最小長(zhǎng)度
    maxMsgSize      int32         // 一條消息的最大長(zhǎng)度
    syncEvery       int64         // 當(dāng)寫入的消息達(dá)到syncEvery時(shí)則執(zhí)行sync操作 // number of writes per fsync
    syncTimeout     time.Duration // 每隔syncTimeout時(shí)間執(zhí)行同步一次          // duration of time per fsync
    exitFlag        int32         // 隊(duì)列退出標(biāo)識(shí)。比如當(dāng)刪除隊(duì)列時(shí)會(huì)將該隊(duì)列標(biāo)記為1,阻止其他線程操作該隊(duì)列
    needSync        bool          // 是否需要同步

    // keeps track of the position where we have read
    // (but not yet sent over readChan)
    // 讀操作是為了投遞消息給客戶端,如果投遞失敗則繼續(xù)使用當(dāng)前的讀取位置再次嘗試投遞消息
    nextReadPos     int64 // 記錄正在投遞的消息的位置
    nextReadFileNum int64 // 記錄正在投遞的消息的文件編號(hào)

    readFile  *os.File      // 讀文件句柄
    writeFile *os.File      // 寫文件句柄
    reader    *bufio.Reader // 讀文件操作的緩存區(qū)
    writeBuf  bytes.Buffer  // 寫文件操作的緩存區(qū)

    // exposed via ReadChan()
    readChan chan []byte // 獲取消息的channel

    // internal channels
    writeChan         chan []byte // 寫入消息的channel
    writeResponseChan chan error  // 返回寫入消息的狀態(tài)
    emptyChan         chan int    // 清空消息的channel
    emptyResponseChan chan error  // 返回清空隊(duì)列的狀態(tài)
    exitChan          chan int    // 隊(duì)列退出的channel
    exitSyncChan      chan int    // 隊(duì)列退出的同步channel,確保ioLoop先退出

    logger Logger
}

nsq在創(chuàng)建topic和channel時(shí)會(huì)創(chuàng)建一個(gè)diskqueue,它負(fù)責(zé)像磁盤讀寫文件。
當(dāng)memoryMsgChan內(nèi)存隊(duì)列寫滿了,就會(huì)向diskqueue寫入消息。
當(dāng)memoryMsgChan讀取空了,就會(huì)從diskqueue讀取消息。

func (t *Topic) put(m *Message) error {
    select {
    case t.memoryMsgChan <- m:
    default:
        b := bufferPoolGet()
        // 將消息寫入diskqueue
        err := writeMessageToBackend(b, m, t.backend)
        bufferPoolPut(b)
        t.ctx.nsqd.SetHealth(err)
        if err != nil {
            t.ctx.nsqd.logf(LOG_ERROR,
                "TOPIC(%s) ERROR: failed to write message to backend - %s",
                t.name, err)
            return err
        }
    }
    return nil
}

func (t *Topic) messagePump() {
   ...//參見上文代碼
    for {
        //從memoryMsgChan及DiskQueue.ReadChan中取消息
        select {
            case msg = <-memoryMsgChan:
            case buf = <- t.backend.ReadChan():
                msg, _ = decodeMessage(buf)
            case <-t.exitChan:
               return
        }
     ... //將msg復(fù)制N份,發(fā)送到topic下的N個(gè)Channel中
    }
}

生成一個(gè)diskQueue時(shí)會(huì)做兩件事情

  1. d.retrieveMetaData()加載元數(shù)據(jù)信息

  2. d.ioLoop()讀寫磁盤文件

func New(name string, dataPath string, maxBytesPerFile int64,
    minMsgSize int32, maxMsgSize int32,
    syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
    d := diskQueue{
        name:              name,
        dataPath:          dataPath,
        maxBytesPerFile:   maxBytesPerFile,
        minMsgSize:        minMsgSize,
        maxMsgSize:        maxMsgSize,
        readChan:          make(chan []byte),
        writeChan:         make(chan []byte),
        writeResponseChan: make(chan error),
        emptyChan:         make(chan int),
        emptyResponseChan: make(chan error),
        exitChan:          make(chan int),
        exitSyncChan:      make(chan int),
        syncEvery:         syncEvery,
        syncTimeout:       syncTimeout,
        logf:              logf,
    }

    // no need to lock here, nothing else could possibly be touching this instance
    err := d.retrieveMetaData()
    if err != nil && !os.IsNotExist(err) {
        d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)
    }

    go d.ioLoop()
    return &d
}

retrieveMetaData:從元數(shù)據(jù)文件中恢復(fù)隊(duì)列的狀態(tài)。如果元數(shù)據(jù)文件不存在返回err,如果存在則加載元數(shù)據(jù)文件中的內(nèi)容
top1.diskqueue.meta.dat 元數(shù)據(jù)文件內(nèi)容格式只有三行("%d\n%d,%d\n%d,%d\n")
2 # 隊(duì)列中消息的數(shù)量
0,0 # 讀文件的編號(hào),偏移位置
0,76 # 寫文件的編號(hào),偏移位置

func (d *diskQueue) retrieveMetaData() error {
    var f *os.File
    var err error

    fileName := d.metaDataFileName()
    f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
    if err != nil {
        return err
    }
    defer f.Close()

    var depth int64
    _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
        &depth,
        &d.readFileNum, &d.readPos,
        &d.writeFileNum, &d.writePos)
    if err != nil {
        return err
    }
    atomic.StoreInt64(&d.depth, depth)
    d.nextReadFileNum = d.readFileNum
    d.nextReadPos = d.readPos

    return nil
}

負(fù)責(zé)讀寫文件的循環(huán)

func (d *diskQueue) ioLoop() {
    var dataRead []byte
    var err error
    var count int64 // 計(jì)數(shù)器變量
    var r chan []byte

    syncTicker := time.NewTicker(d.syncTimeout)

    for {
        // dont sync all the time :)
        // count計(jì)數(shù)器打到d.syncEvery的數(shù)量時(shí),設(shè)置d.needSync為true則執(zhí)行同步操作
        if count == d.syncEvery {
            d.needSync = true
        }

        // needSync變量控制是否需要同步,同步完成后該變量置為false
        if d.needSync {
            // 定時(shí)調(diào)用sync函數(shù)將內(nèi)存中的消息刷新到磁盤
            err = d.sync()
            if err != nil {
                d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err)
            }
            count = 0
        }

        // 檢測(cè)當(dāng)前是否有數(shù)據(jù)需要被讀取
        // 條件成立:執(zhí)行d.readOne()并將結(jié)果放入dataRead中,然后設(shè)置r為d.readChan
        // 條件不成立:將r設(shè)置為nil
        if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
            if d.nextReadPos == d.readPos {
                dataRead, err = d.readOne()
                if err != nil {
                    d.logf("ERROR: reading from diskqueue(%s) at %d of %s - %s",
                        d.name, d.readPos, d.fileName(d.readFileNum), err)
                    d.handleReadError()
                    continue
                }
            }
            r = d.readChan
        } else {
            r = nil
        }

        select {
        // the Go channel spec dictates that nil channel operations (read or write)
        // in a select are skipped, we set r to d.readChan only when there is data to read
        // 在注釋中作者寫了這是一個(gè)Golang的特性
        // 如果r不為空,則會(huì)將dataRead送入go channel。進(jìn)入d.readChan的消息通過ReadChan函數(shù)向外暴露,最終被Topic/Channel的消息循環(huán)讀取。
        // 而如果r為空,則這個(gè)分支會(huì)被跳過。這個(gè)特性的使用統(tǒng)一了select的邏輯,簡(jiǎn)化了當(dāng)數(shù)據(jù)為空時(shí)的判斷。
        case r <- dataRead:
            // 消息投遞
            count++
            // moveForward sets needSync flag if a file is removed
            // 消息投遞成功后的操作
            d.moveForward()
        case <-d.emptyChan:
            // 執(zhí)行清空操作時(shí),文件全被刪除,count計(jì)數(shù)器重置為0
            d.emptyResponseChan <- d.deleteAllFiles()
            count = 0
        case dataWrite := <-d.writeChan:
            // 消息寫入則count計(jì)數(shù)器自增
            count++
            d.writeResponseChan <- d.writeOne(dataWrite)
        case <-syncTicker.C:
            // 每隔syncTimeout時(shí)間同步一次
            if count == 0 {
                // avoid sync when there's no activity
                continue
            }
            d.needSync = true
        case <-d.exitChan:
            // 退出ioLook
            goto exit
        }
    }

exit:
    d.logf("DISKQUEUE(%s): closing ... ioLoop", d.name)
    syncTicker.Stop()
    d.exitSyncChan <- 1
}

總結(jié)一下diskQueue
1、外部goroutine會(huì)在memoryMsgChan寫滿了或讀空了的時(shí)候,對(duì)diskQueue的writeChan或者readChan寫入消息。ioLoop則監(jiān)聽writeChan和readChan進(jìn)行磁盤的寫讀操作。
2、在對(duì)磁盤文件的讀寫時(shí),需要記錄文件編號(hào)和偏移位置。啟動(dòng)diskQueue時(shí)會(huì)從元數(shù)據(jù)文件恢復(fù)隊(duì)列數(shù)據(jù),關(guān)閉時(shí)會(huì)將最新的讀取位置記錄到元數(shù)據(jù)文件。
3、syncTimeout和syncEvery可以設(shè)置內(nèi)存數(shù)據(jù)同步到磁盤的頻率,syncTimeout是指每隔syncTimeout秒調(diào)用一次d.sync(),syncEvery是指每當(dāng)寫入syncEvery個(gè)消息后調(diào)用一次d.sync()。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容