Go消息中間件Nsq系列(七)------go-diskqueue 文件隊(duì)列實(shí)現(xiàn)

上一篇: Go消息中間件Nsq系列(六)------Message結(jié)構(gòu)

通過此次源碼閱讀, 可以學(xué)習(xí)到

  1. 結(jié)合go select io多路復(fù)用 實(shí)現(xiàn)文件隊(duì)列的思路

1. go-diskqueue

  1. 它是一個(gè)提供文件系統(tǒng)支持的FIFO(先進(jìn)先出)隊(duì)列庫(kù)
  2. 在nsq中,當(dāng)channel緩沖區(qū)超過mem_queue_size = 10000,其他消息往文件隊(duì)列里面寫入

2. 注意go中channel select的特性

  1. 當(dāng)channel 為nil的時(shí)候, 將跳過select操作
  2. 當(dāng)close channel的時(shí)候, select也會(huì)響應(yīng),避免錯(cuò)誤,可以判斷是否close

3. diskqueue源碼分析

3.1 diskQueue的定義

diskQueue 實(shí)現(xiàn)了 BackendQueue 接口,

// BackendQueue 接口
type Interface interface {
    // 加鎖檢測(cè) 標(biāo)志位是否退出, 如果否 則繼續(xù)往文件寫入數(shù)據(jù)并等待結(jié)果
    Put([]byte) error
    // 讀取文件數(shù)據(jù) 返回chan 可用于多消費(fèi)者并發(fā)讀取
    ReadChan() chan []byte
    // 等待ioloop結(jié)束, 正常關(guān)閉 并保存元數(shù)據(jù)
    Close() error
    // 等待ioloop結(jié)束, 直接關(guān)閉io流
    Delete() error
    // 未讀消息積壓量
    Depth() int64
    // 清空消息, 刪除文件
    Empty() error
}

type diskQueue struct {
    // 64bit atomic vars need to be first for proper alignment on 32bit platforms

    // run-time state (also persisted to disk)
       // 運(yùn)行時(shí)的數(shù)據(jù)保存, 也會(huì)保存到文件
    readPos      int64 // 文件讀取的位置
    writePos     int64 // 文件寫入的位置
    readFileNum  int64 // 讀取文件編號(hào)
    writeFileNum int64 // 寫入文件編號(hào)
    depth        int64 // 未讀消息積壓量

    sync.RWMutex // 讀寫鎖

    // instantiation time metadata
    name            string // 隊(duì)列實(shí)例名稱
    dataPath        string // 數(shù)據(jù)文件存儲(chǔ)路徑
    maxBytesPerFile int64  // 文件最大長(zhǎng)度為 100 * 1024 * 1024
    minMsgSize      int32  // 最小消息長(zhǎng)度 MsgIDLength + 8 + 2 =26//Id + Timestamp + Attempts
    maxMsgSize      int32  // 最大消息長(zhǎng)度  1024 * 1024
    syncEvery       int64         // 文件同步 count 累計(jì)間隔 2500 出發(fā)
    syncTimeout     time.Duration // 同步定時(shí)2s觸發(fā)
    exitFlag        int32  // 退出標(biāo)志位
    needSync        bool    // 需要同步

    // keeps track of the position where we have read
    // (but not yet sent over readChan)
    nextReadPos     int64 // 下一次讀取的位置
    nextReadFileNum int64 // 下一次讀取對(duì)應(yīng)的文件編號(hào)

    readFile  *os.File  // 讀取的文件
    writeFile *os.File  // 寫入的文件
    reader    *bufio.Reader  // 緩沖讀取
    writeBuf  bytes.Buffer // 緩沖寫入

    // exposed via ReadChan()
    readChan chan []byte // 讀取的數(shù)據(jù),可以多消費(fèi)者進(jìn)行通信消費(fèi)

    // internal channels
    writeChan         chan []byte // 寫入通道
    writeResponseChan chan error // 寫入結(jié)果反饋通道
    emptyChan         chan int // 清空隊(duì)列通道
    emptyResponseChan chan error // 清空反饋通道
    exitChan          chan int // 結(jié)束信號(hào)通道
    exitSyncChan      chan int // 退出同步通道

    logf AppLogFunc  // 日志記錄封裝
}

3.2 diskqueue 初始化流程

diskQueue在New初始化的時(shí)候會(huì)取回之前持久化的元數(shù)據(jù)

      // ... 前面一堆根據(jù)參數(shù)初始化diskqueue
     // 取回之前持久化的元數(shù)據(jù)
     err := d.retrieveMetaData()
      // .... 省略
    go d.ioLoop()

其保存的元數(shù)據(jù)格式為實(shí)例名稱.diskqueue.meta.dat, 保存的數(shù)據(jù)為

65 // depth 未讀消息積壓量
52,0 // 52為當(dāng)前讀取文件編號(hào) , 0 為當(dāng)前讀取文件位置
53,0 // 53位當(dāng)前寫入文件編號(hào), 0 為當(dāng)前寫入文件位置

以下是對(duì)應(yīng)的方法d.retrieveMetaData()persistMetaData() 具體實(shí)現(xiàn),前者在初始化取回之前保存的state, 后者在sync同步的保存當(dāng)前的state

// 從本地文件取回元數(shù)據(jù)
func (d *diskQueue) retrieveMetaData() error {
    var f *os.File
    var err error

    // 存儲(chǔ)路徑.diskqueue.meta.dat
    fileName := d.metaDataFileName()
    f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
    if err != nil {
        return err
    }
    defer f.Close()

    var depth int64
    _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
        &depth, // 待讀取消息數(shù)量
        &d.readFileNum, // 待讀取文件編號(hào)
        &d.readPos, // 待讀取文件位置
        &d.writeFileNum, // 待寫入文件編號(hào)
        &d.writePos ,// 待寫入文件位置
    )
    if err != nil {
        return err
    }
    // 原子更新未讀消息
    atomic.StoreInt64(&d.depth, depth)
    // 更新讀取位置和文件編號(hào)
    d.nextReadFileNum = d.readFileNum
    d.nextReadPos = d.readPos

    return nil
}

// 同步元數(shù)據(jù)到本地文件
func (d *diskQueue) persistMetaData() error {
    var f *os.File
    var err error
    // metdat 臨時(shí)文件
    fileName := d.metaDataFileName()
    tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())

    // write to tmp file
    f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600)
    if err != nil {
        return err
    }

    _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
        atomic.LoadInt64(&d.depth),
        d.readFileNum, d.readPos,
        d.writeFileNum, d.writePos)
    if err != nil {
        f.Close()
        return err
    }
    f.Sync()
    f.Close()
    // 成功往臨時(shí)文件寫入數(shù)據(jù), 在進(jìn)行替換源文件
    // atomically rename
    return os.Rename(tmpFileName, fileName)
}

接下來(lái)就是diskqueue的核心所在,也就是ioLoop() 函數(shù)

func (d *diskQueue) ioLoop() {
    var dataRead []byte
    var err error
    var count int64
    var r chan []byte
        // 定時(shí)器
    syncTicker := time.NewTicker(d.syncTimeout)

    for {
        // dont sync all the time :)
        // 累計(jì)讀取次數(shù)超過閾值 進(jìn)行同步
        if count == d.syncEvery {
            d.needSync = true
        }

        // 需要同步的時(shí)候同步, 并重置閾值
        if d.needSync {
            err = d.sync()
            if err != nil {
                d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
            }
            count = 0
        }
        // 如果可以讀, 下次要讀的消息位置等于已經(jīng)讀取的消息地址,才進(jìn)行讀取
        if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
            if d.nextReadPos == d.readPos {
                dataRead, err = d.readOne()
                if err != nil {
                    d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
                        d.name, d.readPos, d.fileName(d.readFileNum), err)
                    // 壞的數(shù)據(jù)文件
                    d.handleReadError()
                    continue
                }
            }
            // 賦值channel
            r = d.readChan
        } else {
            r = nil
        }

        select {
        // the Go channel spec dictates that nil channel operations (read or write)
        // in a select are skipped, we set r to d.readChan only when there is data to read
        // 如果channel 為nil 進(jìn)行讀寫, select會(huì)跳過
        case r <- dataRead:
            count++
            // moveForward sets needSync flag if a file is removed
            d.moveForward()
        case <-d.emptyChan:
            // 刪除所有文件響應(yīng)
            d.emptyResponseChan <- d.deleteAllFiles()
            count = 0
        case dataWrite := <-d.writeChan:
            count++
            // 寫入消息到磁盤
            d.writeResponseChan <- d.writeOne(dataWrite)
        case <-syncTicker.C:
            // 沒有活動(dòng)避免同步
            if count == 0 {
                // avoid sync when there's no activity
                continue
            }
            d.needSync = true
        case <-d.exitChan:
            // 退出程序
            goto exit
        }
    }

exit:
    d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)
    syncTicker.Stop()
    d.exitSyncChan <- 1
}

ioLoop() 的實(shí)現(xiàn)是通過for 輪詢, 加 select 多路復(fù)用監(jiān)聽多個(gè)通道, 具體如下

  1. count的閾值是通過syncEvery 設(shè)定的, 默認(rèn)2500, count的值為每次讀取或?qū)懭霑r(shí)遞增, 達(dá)到閾值觸發(fā)
    如果觸發(fā)同步的, needSync為true的情況下,進(jìn)行同步,并重置count
    ticker定時(shí)時(shí)間為2s, 觸發(fā)同步
  2. (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) 也就是說(shuō) 當(dāng)前rn < wn 或者 rp<wp 的時(shí)候, 并且下次讀取的位置要等于當(dāng)前讀取的位置(代表上條數(shù)據(jù)已經(jīng)讀取了,設(shè)置的nextReadPos就是新的讀取位置), 接著調(diào)用readOne() 去讀取一條數(shù)據(jù), 如果發(fā)生錯(cuò)誤,在handleReadError() 將當(dāng)前讀取文件標(biāo)記為bad文件,重新修改讀寫位置
    , 有可讀取的數(shù)據(jù)時(shí) 會(huì)將readChan賦值, 否則置為nil, 這是本文開頭所說(shuō)的channel select特性,講跳過select case
  3. select多路復(fù)用監(jiān)聽了以下通道
    3.1 r <- dataRead 讀取到一條數(shù)據(jù)時(shí), moveForward() 消費(fèi)數(shù)據(jù)處理讀取位置,并檢測(cè)已經(jīng)讀取完的文件會(huì)做刪除處理
    3.2 <-d.emptyChan 刪除從讀取-寫入的文件編號(hào), 刪除元數(shù)據(jù)文件, 下次寫入將會(huì)創(chuàng)建新的讀寫文件, 不會(huì)停止程序
    3.3 dataWrite := <-d.writeChan 調(diào)用writeOne()寫入數(shù)據(jù)到文件
    3.4 <-syncTicker.C 定時(shí)同步
    3.5 <-d.exitChan: 程序結(jié)束, 關(guān)閉定時(shí)器,給 d.exitSyncChan <- 1發(fā)送通知i

readOne 讀取一條數(shù)據(jù)的具體實(shí)現(xiàn):

// readOne performs a low level filesystem read for a single []byte
// while advancing read positions and rolling files, if necessary
func (d *diskQueue) readOne() ([]byte, error) {
    var err error
    var msgSize int32
    // 如果沒有初始化 則先初始化
    if d.readFile == nil {
        curFileName := d.fileName(d.readFileNum)
        d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
        if err != nil {
            return nil, err
        }

        d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)
        // 恢復(fù)到上次讀取的位置
        if d.readPos > 0 {
            _, err = d.readFile.Seek(d.readPos, 0)
            if err != nil {
                d.readFile.Close()
                d.readFile = nil
                return nil, err
            }
        }
        // 使用緩沖區(qū)來(lái)讀取
        d.reader = bufio.NewReader(d.readFile)
    }
    // 使用大字節(jié)序方式讀取4個(gè)字節(jié)  的消息包大小
    err = binary.Read(d.reader, binary.BigEndian, &msgSize)
    if err != nil {
        d.readFile.Close()
        d.readFile = nil
        return nil, err
    }
    // 無(wú)效的 消息包大小
    if msgSize < d.minMsgSize || msgSize > d.maxMsgSize {
        // this file is corrupt and we have no reasonable guarantee on
        // where a new message should begin
        d.readFile.Close()
        d.readFile = nil
        return nil, fmt.Errorf("invalid message read size (%d)", msgSize)
    }
    // 根據(jù)消息包大小 讀取消息內(nèi)容
    readBuf := make([]byte, msgSize)
    _, err = io.ReadFull(d.reader, readBuf)
    if err != nil {
        d.readFile.Close()
        d.readFile = nil
        return nil, err
    }
    // 總長(zhǎng)度 消息包大小+消息長(zhǎng)度
    totalBytes := int64(4 + msgSize)

    // we only advance next* because we have not yet sent this to consumers
    // (where readFileNum, readPos will actually be advanced)
    // 移動(dòng)位置
    d.nextReadPos = d.readPos + totalBytes
    d.nextReadFileNum = d.readFileNum

    // TODO: each data file should embed the maxBytesPerFile
    // as the first 8 bytes (at creation time) ensuring that
    // the value can change without affecting runtime
    // 文件大小超過設(shè)定閾值 則進(jìn)行自增
    if d.nextReadPos > d.maxBytesPerFile {
        if d.readFile != nil {
            d.readFile.Close()
            d.readFile = nil
        }

        d.nextReadFileNum++
        d.nextReadPos = 0
    }

    return readBuf, nil
}

writeOne() 寫入一條數(shù)據(jù)的具體實(shí)現(xiàn):

// writeOne performs a low level filesystem write for a single []byte
// while advancing write positions and rolling files, if necessary
func (d *diskQueue) writeOne(data []byte) error {
    var err error

    if d.writeFile == nil {
        curFileName := d.fileName(d.writeFileNum)
        d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
        if err != nil {
            return err
        }

        d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)
        // 有沒有上次寫入文件位置, 有著跳轉(zhuǎn)到之前的位置
        if d.writePos > 0 {
            _, err = d.writeFile.Seek(d.writePos, 0)
            if err != nil {
                d.writeFile.Close()
                d.writeFile = nil
                return err
            }
        }
    }

    // 消息包大小
    dataLen := int32(len(data))

    if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
        return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)
    }
    // 重置buf 然后大字節(jié)序?qū)懭氪笮〉絙uf, 然后在寫入數(shù)據(jù)包
    d.writeBuf.Reset()
    err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
    if err != nil {
        return err
    }

    _, err = d.writeBuf.Write(data)
    if err != nil {
        return err
    }

    // only write to the file once
    // 寫入到文章
    _, err = d.writeFile.Write(d.writeBuf.Bytes())
    if err != nil {
        d.writeFile.Close()
        d.writeFile = nil
        return err
    }

    // 移動(dòng)讀取位置, 并消息量加1
    totalBytes := int64(4 + dataLen)
    d.writePos += totalBytes
    atomic.AddInt64(&d.depth, 1)

    // 寫入的文件大于切片大小, 則新建文件
    if d.writePos > d.maxBytesPerFile {
        d.writeFileNum++
        d.writePos = 0

        // sync every time we start writing to a new file
        // 將之前的文件同步到磁盤
        err = d.sync()
        if err != nil {
            d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
        }

        if d.writeFile != nil {
            d.writeFile.Close()
            d.writeFile = nil
        }
    }

    return err
}

其他的一些函數(shù)如下:

ReadChan() // 下面的例子 監(jiān)聽readChan多并發(fā)消費(fèi)
exit() // channel 特性 close 也會(huì)走select的選項(xiàng)
deleteAllFiles()
skipToNextRWFile() // 刪除當(dāng)前所有的文件,繼續(xù)新的開始
sync() // 同步數(shù)據(jù)(含元數(shù)據(jù))
metaDataFileName() // 元數(shù)據(jù)保存文件名
fileName() // 數(shù)據(jù)文件名
checkTailCorruption() // 檢查數(shù)據(jù)丟失之類的不規(guī)范操作
moveForward() // 消費(fèi)操作,判斷是否判斷舊文件
handleReadError() // 處理讀取數(shù)據(jù)異常

下面是使用的例子

func Logf( f string, args ...interface{}) {
    log.Output(3, fmt.Sprintf("diskqueue: "+f, args...))
}

func main(){
    dqLogf := func(level LogLevel, f string, args ...interface{}) {
        Logf( f, args...)
    }
    dq  := New(
        "test_dq",
        "D:/GoWorkspace/src/Examples/nsq_dq_test",
        1*1024,
        10,
        100,
        100,
        2 * time.Second,
        dqLogf,
    )
    go func() {
        var err error
        for   {
            // 110毫秒寫入一條數(shù)據(jù)
            err = dq.Put([]byte("測(cè)試數(shù)據(jù)"))
            if err == nil{
                log.Println("寫入完成后 消息積壓量",dq.Depth())
            }
            time.Sleep(time.Millisecond*110)
        }
    }()

    go func() {
        a := time.Tick(time.Millisecond*50)
        for   {
            select {
            case <-a:
                 msg :=  <- dq.ReadChan()
                    log.Println("讀取數(shù)據(jù):",string(msg),dq.Depth())
            }

        }
    }()
        // 多消費(fèi)者進(jìn)行消費(fèi)
    go func() {
        a := time.Tick(2*time.Second)
        for   {
            select {
            case <-a:
                msg :=  <- dq.ReadChan()
                log.Println("讀取數(shù)據(jù)2:",string(msg),dq.Depth())
            }

        }
    }()
    for   {

    }

}

// 執(zhí)行結(jié)果

2019/08/20 18:25:39 diskqueue: DISKQUEUE(test_dq): writeOne() opened D:/GoWorkspace/src/Examples/nsq_dq_test/test_dq.diskqueue.000000.dat
2019/08/20 18:25:39 寫入完成后 消息積壓量 1
2019/08/20 18:25:39 diskqueue: DISKQUEUE(test_dq): readOne() opened D:/GoWorkspace/src/Examples/nsq_dq_test/test_dq.diskqueue.000000.dat
2019/08/20 18:25:39 讀取數(shù)據(jù): 測(cè)試數(shù)據(jù) 1
2019/08/20 18:25:39 讀取數(shù)據(jù): 測(cè)試數(shù)據(jù) 0
2019/08/20 18:25:39 寫入完成后 消息積壓量 0
2019/08/20 18:25:39 讀取數(shù)據(jù): 測(cè)試數(shù)據(jù) 0
2019/08/20 18:25:39 寫入完成后 消息積壓量 0
2019/08/20 18:25:39 讀取數(shù)據(jù): 測(cè)試數(shù)據(jù) 0
2019/08/20 18:25:39 寫入完成后 消息積壓量 0
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容