diskqueue結(jié)構(gòu)體
type diskQueue struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
// run-time state (also persisted to disk)
// 數(shù)據(jù)文件相關(guān)信息
readPos int64 // 記錄數(shù)據(jù)文件讀的位置
writePos int64 // 記錄數(shù)據(jù)文件寫的位置
readFileNum int64 // 當(dāng)前讀文件的編號(hào)
writeFileNum int64 // 當(dāng)前寫文件的編號(hào)
depth int64 // 隊(duì)列中消息的數(shù)量
sync.RWMutex
// instantiation time metadata
// 元數(shù)據(jù)相關(guān)
name string // 元數(shù)據(jù)名稱
dataPath string // 元數(shù)據(jù)的數(shù)據(jù)目錄
maxBytesPerFile int64 // 每個(gè)文件的最大字節(jié)數(shù),默認(rèn)100M // currently this cannot change once created
minMsgSize int32 // 一條消息的最小長(zhǎng)度
maxMsgSize int32 // 一條消息的最大長(zhǎng)度
syncEvery int64 // 當(dāng)寫入的消息達(dá)到syncEvery時(shí)則執(zhí)行sync操作 // number of writes per fsync
syncTimeout time.Duration // 每隔syncTimeout時(shí)間執(zhí)行同步一次 // duration of time per fsync
exitFlag int32 // 隊(duì)列退出標(biāo)識(shí)。比如當(dāng)刪除隊(duì)列時(shí)會(huì)將該隊(duì)列標(biāo)記為1,阻止其他線程操作該隊(duì)列
needSync bool // 是否需要同步
// keeps track of the position where we have read
// (but not yet sent over readChan)
// 讀操作是為了投遞消息給客戶端,如果投遞失敗則繼續(xù)使用當(dāng)前的讀取位置再次嘗試投遞消息
nextReadPos int64 // 記錄正在投遞的消息的位置
nextReadFileNum int64 // 記錄正在投遞的消息的文件編號(hào)
readFile *os.File // 讀文件句柄
writeFile *os.File // 寫文件句柄
reader *bufio.Reader // 讀文件操作的緩存區(qū)
writeBuf bytes.Buffer // 寫文件操作的緩存區(qū)
// exposed via ReadChan()
readChan chan []byte // 獲取消息的channel
// internal channels
writeChan chan []byte // 寫入消息的channel
writeResponseChan chan error // 返回寫入消息的狀態(tài)
emptyChan chan int // 清空消息的channel
emptyResponseChan chan error // 返回清空隊(duì)列的狀態(tài)
exitChan chan int // 隊(duì)列退出的channel
exitSyncChan chan int // 隊(duì)列退出的同步channel,確保ioLoop先退出
logger Logger
}
nsq在創(chuàng)建topic和channel時(shí)會(huì)創(chuàng)建一個(gè)diskqueue,它負(fù)責(zé)像磁盤讀寫文件。
當(dāng)memoryMsgChan內(nèi)存隊(duì)列寫滿了,就會(huì)向diskqueue寫入消息。
當(dāng)memoryMsgChan讀取空了,就會(huì)從diskqueue讀取消息。
func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m:
default:
b := bufferPoolGet()
// 將消息寫入diskqueue
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
}
return nil
}
func (t *Topic) messagePump() {
...//參見上文代碼
for {
//從memoryMsgChan及DiskQueue.ReadChan中取消息
select {
case msg = <-memoryMsgChan:
case buf = <- t.backend.ReadChan():
msg, _ = decodeMessage(buf)
case <-t.exitChan:
return
}
... //將msg復(fù)制N份,發(fā)送到topic下的N個(gè)Channel中
}
}
生成一個(gè)diskQueue時(shí)會(huì)做兩件事情
d.retrieveMetaData()加載元數(shù)據(jù)信息
d.ioLoop()讀寫磁盤文件
func New(name string, dataPath string, maxBytesPerFile int64,
minMsgSize int32, maxMsgSize int32,
syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
d := diskQueue{
name: name,
dataPath: dataPath,
maxBytesPerFile: maxBytesPerFile,
minMsgSize: minMsgSize,
maxMsgSize: maxMsgSize,
readChan: make(chan []byte),
writeChan: make(chan []byte),
writeResponseChan: make(chan error),
emptyChan: make(chan int),
emptyResponseChan: make(chan error),
exitChan: make(chan int),
exitSyncChan: make(chan int),
syncEvery: syncEvery,
syncTimeout: syncTimeout,
logf: logf,
}
// no need to lock here, nothing else could possibly be touching this instance
err := d.retrieveMetaData()
if err != nil && !os.IsNotExist(err) {
d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)
}
go d.ioLoop()
return &d
}
retrieveMetaData:從元數(shù)據(jù)文件中恢復(fù)隊(duì)列的狀態(tài)。如果元數(shù)據(jù)文件不存在返回err,如果存在則加載元數(shù)據(jù)文件中的內(nèi)容
top1.diskqueue.meta.dat 元數(shù)據(jù)文件內(nèi)容格式只有三行("%d\n%d,%d\n%d,%d\n")
2 # 隊(duì)列中消息的數(shù)量
0,0 # 讀文件的編號(hào),偏移位置
0,76 # 寫文件的編號(hào),偏移位置
func (d *diskQueue) retrieveMetaData() error {
var f *os.File
var err error
fileName := d.metaDataFileName()
f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
if err != nil {
return err
}
defer f.Close()
var depth int64
_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
&depth,
&d.readFileNum, &d.readPos,
&d.writeFileNum, &d.writePos)
if err != nil {
return err
}
atomic.StoreInt64(&d.depth, depth)
d.nextReadFileNum = d.readFileNum
d.nextReadPos = d.readPos
return nil
}
負(fù)責(zé)讀寫文件的循環(huán)
func (d *diskQueue) ioLoop() {
var dataRead []byte
var err error
var count int64 // 計(jì)數(shù)器變量
var r chan []byte
syncTicker := time.NewTicker(d.syncTimeout)
for {
// dont sync all the time :)
// count計(jì)數(shù)器打到d.syncEvery的數(shù)量時(shí),設(shè)置d.needSync為true則執(zhí)行同步操作
if count == d.syncEvery {
d.needSync = true
}
// needSync變量控制是否需要同步,同步完成后該變量置為false
if d.needSync {
// 定時(shí)調(diào)用sync函數(shù)將內(nèi)存中的消息刷新到磁盤
err = d.sync()
if err != nil {
d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err)
}
count = 0
}
// 檢測(cè)當(dāng)前是否有數(shù)據(jù)需要被讀取
// 條件成立:執(zhí)行d.readOne()并將結(jié)果放入dataRead中,然后設(shè)置r為d.readChan
// 條件不成立:將r設(shè)置為nil
if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
if d.nextReadPos == d.readPos {
dataRead, err = d.readOne()
if err != nil {
d.logf("ERROR: reading from diskqueue(%s) at %d of %s - %s",
d.name, d.readPos, d.fileName(d.readFileNum), err)
d.handleReadError()
continue
}
}
r = d.readChan
} else {
r = nil
}
select {
// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
// 在注釋中作者寫了這是一個(gè)Golang的特性
// 如果r不為空,則會(huì)將dataRead送入go channel。進(jìn)入d.readChan的消息通過ReadChan函數(shù)向外暴露,最終被Topic/Channel的消息循環(huán)讀取。
// 而如果r為空,則這個(gè)分支會(huì)被跳過。這個(gè)特性的使用統(tǒng)一了select的邏輯,簡(jiǎn)化了當(dāng)數(shù)據(jù)為空時(shí)的判斷。
case r <- dataRead:
// 消息投遞
count++
// moveForward sets needSync flag if a file is removed
// 消息投遞成功后的操作
d.moveForward()
case <-d.emptyChan:
// 執(zhí)行清空操作時(shí),文件全被刪除,count計(jì)數(shù)器重置為0
d.emptyResponseChan <- d.deleteAllFiles()
count = 0
case dataWrite := <-d.writeChan:
// 消息寫入則count計(jì)數(shù)器自增
count++
d.writeResponseChan <- d.writeOne(dataWrite)
case <-syncTicker.C:
// 每隔syncTimeout時(shí)間同步一次
if count == 0 {
// avoid sync when there's no activity
continue
}
d.needSync = true
case <-d.exitChan:
// 退出ioLook
goto exit
}
}
exit:
d.logf("DISKQUEUE(%s): closing ... ioLoop", d.name)
syncTicker.Stop()
d.exitSyncChan <- 1
}
總結(jié)一下diskQueue
1、外部goroutine會(huì)在memoryMsgChan寫滿了或讀空了的時(shí)候,對(duì)diskQueue的writeChan或者readChan寫入消息。ioLoop則監(jiān)聽writeChan和readChan進(jìn)行磁盤的寫讀操作。
2、在對(duì)磁盤文件的讀寫時(shí),需要記錄文件編號(hào)和偏移位置。啟動(dòng)diskQueue時(shí)會(huì)從元數(shù)據(jù)文件恢復(fù)隊(duì)列數(shù)據(jù),關(guān)閉時(shí)會(huì)將最新的讀取位置記錄到元數(shù)據(jù)文件。
3、syncTimeout和syncEvery可以設(shè)置內(nèi)存數(shù)據(jù)同步到磁盤的頻率,syncTimeout是指每隔syncTimeout秒調(diào)用一次d.sync(),syncEvery是指每當(dāng)寫入syncEvery個(gè)消息后調(diào)用一次d.sync()。