NSQ 磁盤持久隊列 DiskQueue 設(shè)計思想全解析

—— 為什么 NSQ 的 DiskQueue 不用鎖也能做到高性能、強一致?

NSQ 的 diskqueue 是一個極其優(yōu)雅的磁盤持久化 FIFO 隊列實現(xiàn)。它的代碼不多,卻藏著許多工程上的巧思:

  • 單調(diào)度協(xié)程保證線程安全
  • 記錄格式簡潔高效(長度+數(shù)據(jù))
  • 元數(shù)據(jù)持久化設(shè)計穩(wěn)健
  • 文件滾動策略減少 IO 并保持性能

本文從設(shè)計思想切入,結(jié)合源碼逐塊解析 diskqueue 是如何實現(xiàn)一個可持久、線程安全、性能穩(wěn)定的磁盤隊列。


1. 單協(xié)程調(diào)度(ioLoop)保證線程安全:無需加鎖

DiskQueue 的最巧妙設(shè)計之一,就是所有寫入、讀取、同步、文件滾動等操作都在 一個 ioLoop 協(xié)程中完成。

即使 diskQueue 結(jié)構(gòu)體上存在 RWMutex,但核心變量(read/write position、file num、depth)都只在 ioLoop 中修改,因此可以不用鎖,天然線程安全。

創(chuàng)建隊列時就啟動這個協(xié)程:

// diskqueue.go: New()
go d.ioLoop()

來看 ioLoop 的結(jié)構(gòu):

// diskqueue.go: ioLoop()
func (d *diskQueue) ioLoop() {
    for {
        select {
        case data := <-d.writeChan:
            err := d.writeOne(data)
            d.writeResponseChan <- err
        
        case <-d.emptyChan:
            err := d.deleteAllFiles()
            d.emptyResponseChan <- err

        case d.depthChan <- d.depth:
            // return depth

        // 其他分支:退出、同步等...
        }
    }
}

? 設(shè)計亮點

  • 所有請求都通過 channel 串行進入 ioLoop
    生產(chǎn)者調(diào)用 Put() → writeChan → ioLoop → writeOne()
  • ioLoop 不需要任何互斥鎖
    因為寫入操作永遠在單協(xié)程執(zhí)行,不存在并發(fā)修改。
  • 讀通道 readChan 也在循環(huán)內(nèi)安全發(fā)送

這與 Redis 的單線程模型有異曲同工之妙:串行化換線程安全,避免鎖開銷


2. 內(nèi)容數(shù)據(jù)按(長度+數(shù)據(jù))格式寫入磁盤

NSQ 為每條記錄寫入:

4字節(jié)長度(int32 大端序) + N 字節(jié)消息內(nèi)容

寫入邏輯在 writeOne()

// diskqueue.go: writeOne()
dataLen := int32(len(data))
totalBytes := int64(4 + dataLen)

// 寫長度
binary.Write(&d.writeBuf, binary.BigEndian, dataLen)

// 寫內(nèi)容
d.writeBuf.Write(data)

// 一次性寫入文件
_, err = d.writeFile.Write(d.writeBuf.Bytes())

為什么采用“長度 + 內(nèi)容”?

  • 讀取時無需掃描分隔符,直接按長度讀取 → 高性能順序讀
  • 能快速跳過損壞數(shù)據(jù)(長度檢測) → 增強魯棒性
  • 二進制格式緊湊 → 磁盤占用小

文件滿了會自動滾動

writePos + totalBytes > maxBytesPerFile

if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile {
    d.writeFileNum++
    d.writePos = 0
    d.sync()
    d.writeFile.Close()
}

即:當前文件滿 → fsync → 開新文件繼續(xù)寫


3. 元數(shù)據(jù)持久化:周期性記錄讀寫進度

為了保證崩潰后仍能從正確位置恢復(fù),DiskQueue 會定期將讀寫指針和 depth 寫入 metadata 文件。

元數(shù)據(jù)內(nèi)容示例:

depth
readFileNum,readPos
writeFileNum,writePos

對應(yīng)的寫邏輯在:

// diskqueue.go: persistMetaData()
fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
    d.depth,
    d.readFileNum, d.readPos,
    d.writeFileNum, d.writePos)

采用安全的 tmp file + rename 原子替換,保證文件內(nèi)部的數(shù)據(jù)是完整:

tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
os.Rename(tmpFileName, fileName)  // 原子提交

這樣即使在持久化過程中崩潰也不會破壞原有 metadata。

什么時候會同步?

兩種模式:

  1. 每寫 X 條消息(通過 syncEvery 配置)
  2. 定時 syncTimeout 觸發(fā) fsync

觸發(fā)邏輯在 ioLoop 中處理:

if d.needSync && (writes%syncEvery == 0 || time.Since(lastSync) > syncTimeout) {
    d.sync()
}

4. 讀取記錄(一次取一條,同樣采用“長度+數(shù)據(jù)”格式)

讀取邏輯在 readOne()

var msgSize int32
binary.Read(d.reader, binary.BigEndian, &msgSize)

readBuf := make([]byte, msgSize)
io.ReadFull(d.reader, readBuf)

并計算下一次讀取的位置:

totalBytes := int64(4 + msgSize)
d.nextReadPos = d.readPos + totalBytes

文件滾動

讀取到文件末尾時:

if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
    d.nextReadFileNum++
    d.nextReadPos = 0
}

DiskQueue 會自動切換到下一個文件。


總結(jié):DiskQueue 以極小的代碼實現(xiàn)極穩(wěn)健的持久化隊列

設(shè)計點 帶來的價值
ioLoop 單協(xié)程串行調(diào)度 無鎖、線程安全、高性能
長度+數(shù)據(jù) 格式 順序讀寫最高效、格式安全
文件滾動策略 提高磁盤局部性,避免巨型文件
metadata 原子持久化 崩潰可恢復(fù),不產(chǎn)生部分寫入

NSQ 的 DiskQueue 是工程中小而美、高可靠、高性能磁盤隊列實現(xiàn)的典范,非常值得學(xué)習。

如果你正在實現(xiàn):

  • 本地消息隊列
  • 任務(wù)執(zhí)行器的持久化存儲
  • 分布式系統(tǒng)的持久化 mailbox
  • WAL / 順序?qū)懭罩?/li>

DiskQueue 都是一個極好的參考。

nsq的diskqueue源碼:https://github.com/nsqio/go-diskqueue/blob/master/diskqueue.go

我的小棧:https://itart.cn/blogs/2025/practice/nsq-diskqueue-persistent-queue-design-analysis.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容