—— 為什么 NSQ 的 DiskQueue 不用鎖也能做到高性能、強一致?
NSQ 的 diskqueue 是一個極其優(yōu)雅的磁盤持久化 FIFO 隊列實現(xiàn)。它的代碼不多,卻藏著許多工程上的巧思:
- 單調(diào)度協(xié)程保證線程安全
- 記錄格式簡潔高效(長度+數(shù)據(jù))
- 元數(shù)據(jù)持久化設(shè)計穩(wěn)健
- 文件滾動策略減少 IO 并保持性能
本文從設(shè)計思想切入,結(jié)合源碼逐塊解析 diskqueue 是如何實現(xiàn)一個可持久、線程安全、性能穩(wěn)定的磁盤隊列。
1. 單協(xié)程調(diào)度(ioLoop)保證線程安全:無需加鎖
DiskQueue 的最巧妙設(shè)計之一,就是所有寫入、讀取、同步、文件滾動等操作都在 一個 ioLoop 協(xié)程中完成。
即使 diskQueue 結(jié)構(gòu)體上存在 RWMutex,但核心變量(read/write position、file num、depth)都只在 ioLoop 中修改,因此可以不用鎖,天然線程安全。
創(chuàng)建隊列時就啟動這個協(xié)程:
// diskqueue.go: New()
go d.ioLoop()
來看 ioLoop 的結(jié)構(gòu):
// diskqueue.go: ioLoop()
func (d *diskQueue) ioLoop() {
for {
select {
case data := <-d.writeChan:
err := d.writeOne(data)
d.writeResponseChan <- err
case <-d.emptyChan:
err := d.deleteAllFiles()
d.emptyResponseChan <- err
case d.depthChan <- d.depth:
// return depth
// 其他分支:退出、同步等...
}
}
}
? 設(shè)計亮點
-
所有請求都通過 channel 串行進入 ioLoop
生產(chǎn)者調(diào)用Put()→ writeChan → ioLoop → writeOne() -
ioLoop 不需要任何互斥鎖
因為寫入操作永遠在單協(xié)程執(zhí)行,不存在并發(fā)修改。 - 讀通道 readChan 也在循環(huán)內(nèi)安全發(fā)送
這與 Redis 的單線程模型有異曲同工之妙:串行化換線程安全,避免鎖開銷。
2. 內(nèi)容數(shù)據(jù)按(長度+數(shù)據(jù))格式寫入磁盤
NSQ 為每條記錄寫入:
4字節(jié)長度(int32 大端序) + N 字節(jié)消息內(nèi)容
寫入邏輯在 writeOne():
// diskqueue.go: writeOne()
dataLen := int32(len(data))
totalBytes := int64(4 + dataLen)
// 寫長度
binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
// 寫內(nèi)容
d.writeBuf.Write(data)
// 一次性寫入文件
_, err = d.writeFile.Write(d.writeBuf.Bytes())
為什么采用“長度 + 內(nèi)容”?
- 讀取時無需掃描分隔符,直接按長度讀取 → 高性能順序讀
- 能快速跳過損壞數(shù)據(jù)(長度檢測) → 增強魯棒性
- 二進制格式緊湊 → 磁盤占用小
文件滿了會自動滾動
當 writePos + totalBytes > maxBytesPerFile:
if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile {
d.writeFileNum++
d.writePos = 0
d.sync()
d.writeFile.Close()
}
即:當前文件滿 → fsync → 開新文件繼續(xù)寫。
3. 元數(shù)據(jù)持久化:周期性記錄讀寫進度
為了保證崩潰后仍能從正確位置恢復(fù),DiskQueue 會定期將讀寫指針和 depth 寫入 metadata 文件。
元數(shù)據(jù)內(nèi)容示例:
depth
readFileNum,readPos
writeFileNum,writePos
對應(yīng)的寫邏輯在:
// diskqueue.go: persistMetaData()
fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
d.depth,
d.readFileNum, d.readPos,
d.writeFileNum, d.writePos)
采用安全的 tmp file + rename 原子替換,保證文件內(nèi)部的數(shù)據(jù)是完整:
tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
os.Rename(tmpFileName, fileName) // 原子提交
這樣即使在持久化過程中崩潰也不會破壞原有 metadata。
什么時候會同步?
兩種模式:
- 每寫 X 條消息(通過 syncEvery 配置)
- 定時 syncTimeout 觸發(fā) fsync
觸發(fā)邏輯在 ioLoop 中處理:
if d.needSync && (writes%syncEvery == 0 || time.Since(lastSync) > syncTimeout) {
d.sync()
}
4. 讀取記錄(一次取一條,同樣采用“長度+數(shù)據(jù)”格式)
讀取邏輯在 readOne():
var msgSize int32
binary.Read(d.reader, binary.BigEndian, &msgSize)
readBuf := make([]byte, msgSize)
io.ReadFull(d.reader, readBuf)
并計算下一次讀取的位置:
totalBytes := int64(4 + msgSize)
d.nextReadPos = d.readPos + totalBytes
文件滾動
讀取到文件末尾時:
if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
d.nextReadFileNum++
d.nextReadPos = 0
}
DiskQueue 會自動切換到下一個文件。
總結(jié):DiskQueue 以極小的代碼實現(xiàn)極穩(wěn)健的持久化隊列
| 設(shè)計點 | 帶來的價值 |
|---|---|
| ioLoop 單協(xié)程串行調(diào)度 | 無鎖、線程安全、高性能 |
| 長度+數(shù)據(jù) 格式 | 順序讀寫最高效、格式安全 |
| 文件滾動策略 | 提高磁盤局部性,避免巨型文件 |
| metadata 原子持久化 | 崩潰可恢復(fù),不產(chǎn)生部分寫入 |
NSQ 的 DiskQueue 是工程中小而美、高可靠、高性能磁盤隊列實現(xiàn)的典范,非常值得學(xué)習。
如果你正在實現(xiàn):
- 本地消息隊列
- 任務(wù)執(zhí)行器的持久化存儲
- 分布式系統(tǒng)的持久化 mailbox
- WAL / 順序?qū)懭罩?/li>
DiskQueue 都是一個極好的參考。
nsq的diskqueue源碼:https://github.com/nsqio/go-diskqueue/blob/master/diskqueue.go
我的小棧:https://itart.cn/blogs/2025/practice/nsq-diskqueue-persistent-queue-design-analysis.html