LevelDB作為一個Key-Value的NoSQL數(shù)據(jù)庫,其最基本的操作就是Put,即插入一對<key, value>記錄, 本文將以源碼走讀的方式解析數(shù)據(jù)記錄插入數(shù)據(jù)庫的基本流程
- 如下代碼所示是Put的函數(shù)實現(xiàn),其和delete操作共用putRec函數(shù)。
func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
return db.putRec(keyTypeVal, key, value, wo)
}
2.putRec函數(shù)是實際的Put插入實現(xiàn)函數(shù)單看putRec函數(shù)會發(fā)現(xiàn)該函數(shù)非常難理解。在這里作者使用writeMergeC、writeMergedC、writeAckC和writeLockC共同控制多線程的數(shù)據(jù)插入和合并操作。這四個channel的定義如下,注意其中writeLockC的channel可以緩存一個對象。
writeMergeC: make(chan writeMerge),
writeMergedC: make(chan bool),
writeLockC: make(chan struct{}, 1),
writeAckC: make(chan error),
其中writeMergeC的channel可以攜帶插入的數(shù)據(jù),其工作原理如下圖所示, 假設同時有c1,c2和c3的線程嘗試插入數(shù)據(jù),這時他們會在writeMergeC和writeLockC上競爭,由于其定義的不同。第一個進入的線程必然會獲取writeLockC,也即獲取寫鎖我們假設是圖中的C1。此時C1線程會將<key,value>數(shù)據(jù)線寫入batch然后執(zhí)行writeLock函數(shù)。而c2和c3則繼續(xù)競爭,由于此時寫鎖被writeLockC獨占。c2和c3只能競爭writeMergeC的寫入權限。
3.在writeLocked函數(shù)內(nèi)部,會對寫入記錄執(zhí)行merge操作,也就是會從writeMergeC中讀取當前堵塞的寫入并在空間夠用的情況下merge成一次寫操作。我們假設c2和c3依次獲取寫入writeMergeC的操作,則其數(shù)據(jù)最終會形成圖中Step2的狀態(tài)。此時c2 和c3堵塞在如下代碼處。
if <-db.writeMergedC {
return <-db.writeAckC
}

- putRec函數(shù)解析
func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
if err := db.ok(); err != nil {
return err
}
//merge 和sync 以數(shù)據(jù)庫的初始化配置為主
merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
sync := wo.GetSync() && !db.s.o.GetNoSync()
// Acquire write lock
if merge {
select {
case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
//如果能向writeMergeC 寫入新插入的key value 數(shù)據(jù)
//則等待新的key value與老的數(shù)據(jù)進行merge操作
if <-db.writeMergedC { //等待writeMerge的結(jié)果,如果merge失敗則繼續(xù)獲取寫鎖
//merge成功等待merge之后的數(shù)據(jù)寫入結(jié)果
// Write is merged.
return <-db.writeAckC
}
// Write is not merged, the write lock is handed to us. Continue.
case db.writeLockC <- struct{}{}: //嘗試獲取寫鎖
// Write lock acquired.
case err := <-db.compPerErrC:
// Compaction error.
return err
case <-db.closeC:
// Closed
return ErrClosed
}
} else {
//沒有merge的情況直接嘗試獲取寫入鎖
select {
case db.writeLockC <- struct{}{}:
// Write lock acquired.
case err := <-db.compPerErrC:
// Compaction error.
return err
case <-db.closeC:
// Closed
return ErrClosed
}
}
batch := db.batchPool.Get().(*Batch)
batch.Reset()
batch.appendRec(kt, key, value)
return db.writeLocked(batch, batch, merge, sync)
}
4.writeLocked 函數(shù)解析
writeLocked函數(shù)是c1的真正執(zhí)行寫入的函數(shù),其寫入的主要流程包括:
- 獲取內(nèi)存數(shù)據(jù)庫memDB,如果空間不足則擴容;
- 如果任由merge空間和數(shù)據(jù),執(zhí)行merge邏輯;
- 寫日志信息;
- 數(shù)據(jù)寫入內(nèi)存;
- 釋放相關所,以及通知給等待線程執(zhí)行結(jié)果(如圖中c2和c3)
func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
// Try to flush memdb. This method would also trying to throttle writes
// if it is too fast and compaction cannot catch-up.
//1.嘗試flush db的數(shù)據(jù) 如果有需要
// 返回DB的mdb以及mdb的剩余空間,如果mdbFree不夠則會對mdb進行擴容操作
mdb, mdbFree, err := db.flush(batch.internalLen)
if err != nil {
db.unlockWrite(false, 0, err)
return err
}
defer mdb.decref() //釋放當前引用數(shù)量
var (
overflow bool
merged int
batches = []*Batch{batch}
)
if merge { // 需要merge的情況
// Merge limit.
var mergeLimit int
//控制merge的數(shù)量不是特別大
if batch.internalLen > 128<<10 {
mergeLimit = (1 << 20) - batch.internalLen
} else {
mergeLimit = 128 << 10
}
mergeCap := mdbFree - batch.internalLen
if mergeLimit > mergeCap {
mergeLimit = mergeCap
}
//控制最大能夠merge的量
merge:
for mergeLimit > 0 {
select {
case incoming := <-db.writeMergeC:
if incoming.batch != nil { //writeMergeC 中存儲的是batch的情況
// Merge batch.
if incoming.batch.internalLen > mergeLimit {
overflow = true
break merge
}
batches = append(batches, incoming.batch)
mergeLimit -= incoming.batch.internalLen
} else {
// Merge put.
internalLen := len(incoming.key) + len(incoming.value) + 8
if internalLen > mergeLimit {
overflow = true
break merge
}
if ourBatch == nil {
ourBatch = db.batchPool.Get().(*Batch)
ourBatch.Reset()
batches = append(batches, ourBatch)
}
// We can use same batch since concurrent write doesn't
// guarantee write order.
ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
mergeLimit -= internalLen
}
sync = sync || incoming.sync // 同步的情況需要通知寫入的等待線程寫入完畢
merged++
db.writeMergedC <- true
default:
break merge
}
}
}
// Seq number.
seq := db.seq + 1 //seq是實際batch的數(shù)量編號, 此時db的實際seq并未更新
// Write journal.
// 2. batch 信息寫入日志
if err := db.writeJournal(batches, seq, sync); err != nil {
db.unlockWrite(overflow, merged, err)
return err
}
// Put batches.
// 3. batch 數(shù)據(jù)寫入內(nèi)存數(shù)據(jù)庫 mendb
for _, batch := range batches {
if err := batch.putMem(seq, mdb.DB); err != nil {
panic(err)
}
seq += uint64(batch.Len())
}
// Incr seq number.
db.addSeq(uint64(batchesLen(batches))) //更新db的seq
// Rotate memdb if it's reach the threshold.
if batch.internalLen >= mdbFree { //防止下次不夠?
db.rotateMem(0, false)
}
db.unlockWrite(overflow, merged, nil)
return nil
}