[LevelDB/源碼]Put操作的流程源碼分析

LevelDB作為一個Key-Value的NoSQL數(shù)據(jù)庫,其最基本的操作就是Put,即插入一對<key, value>記錄, 本文將以源碼走讀的方式解析數(shù)據(jù)記錄插入數(shù)據(jù)庫的基本流程

  1. 如下代碼所示是Put的函數(shù)實現(xiàn),其和delete操作共用putRec函數(shù)。
func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
    return db.putRec(keyTypeVal, key, value, wo)
}

2.putRec函數(shù)是實際的Put插入實現(xiàn)函數(shù)單看putRec函數(shù)會發(fā)現(xiàn)該函數(shù)非常難理解。在這里作者使用writeMergeC、writeMergedC、writeAckC和writeLockC共同控制多線程的數(shù)據(jù)插入和合并操作。這四個channel的定義如下,注意其中writeLockC的channel可以緩存一個對象。

writeMergeC:  make(chan writeMerge),
writeMergedC: make(chan bool),
writeLockC:   make(chan struct{}, 1),
writeAckC:    make(chan error),

其中writeMergeC的channel可以攜帶插入的數(shù)據(jù),其工作原理如下圖所示, 假設同時有c1,c2和c3的線程嘗試插入數(shù)據(jù),這時他們會在writeMergeC和writeLockC上競爭,由于其定義的不同。第一個進入的線程必然會獲取writeLockC,也即獲取寫鎖我們假設是圖中的C1。此時C1線程會將<key,value>數(shù)據(jù)線寫入batch然后執(zhí)行writeLock函數(shù)。而c2和c3則繼續(xù)競爭,由于此時寫鎖被writeLockC獨占。c2和c3只能競爭writeMergeC的寫入權限。

3.在writeLocked函數(shù)內(nèi)部,會對寫入記錄執(zhí)行merge操作,也就是會從writeMergeC中讀取當前堵塞的寫入并在空間夠用的情況下merge成一次寫操作。我們假設c2和c3依次獲取寫入writeMergeC的操作,則其數(shù)據(jù)最終會形成圖中Step2的狀態(tài)。此時c2 和c3堵塞在如下代碼處。

       if <-db.writeMergedC {
        return <-db.writeAckC
      }
階段1和階段2.png
  1. putRec函數(shù)解析
func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
    if err := db.ok(); err != nil {
        return err
    }
    //merge 和sync 以數(shù)據(jù)庫的初始化配置為主
    merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
    sync := wo.GetSync() && !db.s.o.GetNoSync()

    // Acquire write lock
    if merge {
        select {
        case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
            //如果能向writeMergeC 寫入新插入的key value 數(shù)據(jù)
            //則等待新的key value與老的數(shù)據(jù)進行merge操作
            if <-db.writeMergedC { //等待writeMerge的結(jié)果,如果merge失敗則繼續(xù)獲取寫鎖

                //merge成功等待merge之后的數(shù)據(jù)寫入結(jié)果
                // Write is merged.
                return <-db.writeAckC
            }
            // Write is not merged, the write lock is handed to us. Continue.
        case db.writeLockC <- struct{}{}: //嘗試獲取寫鎖
            // Write lock acquired.
        case err := <-db.compPerErrC:
            // Compaction error.
            return err
        case <-db.closeC:
            // Closed
            return ErrClosed
        }
    } else {
        //沒有merge的情況直接嘗試獲取寫入鎖
        select {
        case db.writeLockC <- struct{}{}:
            // Write lock acquired.
        case err := <-db.compPerErrC:
            // Compaction error.
            return err
        case <-db.closeC:
            // Closed
            return ErrClosed
        }
    }

    batch := db.batchPool.Get().(*Batch)
    batch.Reset()
    batch.appendRec(kt, key, value)
    return db.writeLocked(batch, batch, merge, sync)
}

4.writeLocked 函數(shù)解析

writeLocked函數(shù)是c1的真正執(zhí)行寫入的函數(shù),其寫入的主要流程包括:

  • 獲取內(nèi)存數(shù)據(jù)庫memDB,如果空間不足則擴容;
  • 如果任由merge空間和數(shù)據(jù),執(zhí)行merge邏輯;
  • 寫日志信息;
  • 數(shù)據(jù)寫入內(nèi)存;
  • 釋放相關所,以及通知給等待線程執(zhí)行結(jié)果(如圖中c2和c3)
func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
    // Try to flush memdb. This method would also trying to throttle writes
    // if it is too fast and compaction cannot catch-up.

    //1.嘗試flush db的數(shù)據(jù) 如果有需要
    // 返回DB的mdb以及mdb的剩余空間,如果mdbFree不夠則會對mdb進行擴容操作
    mdb, mdbFree, err := db.flush(batch.internalLen)

    if err != nil {
        db.unlockWrite(false, 0, err)
        return err
    }
    defer mdb.decref() //釋放當前引用數(shù)量

    var (
        overflow bool
        merged   int
        batches  = []*Batch{batch}
    )

    if merge { // 需要merge的情況
        // Merge limit.
        var mergeLimit int
        //控制merge的數(shù)量不是特別大
        if batch.internalLen > 128<<10 {
            mergeLimit = (1 << 20) - batch.internalLen
        } else {
            mergeLimit = 128 << 10
        }
        mergeCap := mdbFree - batch.internalLen
        if mergeLimit > mergeCap {
            mergeLimit = mergeCap
        }
        //控制最大能夠merge的量

    merge:
        for mergeLimit > 0 {
            select {
            case incoming := <-db.writeMergeC:
                if incoming.batch != nil { //writeMergeC 中存儲的是batch的情況
                    // Merge batch.
                    if incoming.batch.internalLen > mergeLimit {
                        overflow = true
                        break merge
                    }
                    batches = append(batches, incoming.batch)
                    mergeLimit -= incoming.batch.internalLen
                } else {
                    // Merge put.
                    internalLen := len(incoming.key) + len(incoming.value) + 8
                    if internalLen > mergeLimit {
                        overflow = true
                        break merge
                    }
                    if ourBatch == nil {
                        ourBatch = db.batchPool.Get().(*Batch)
                        ourBatch.Reset()
                        batches = append(batches, ourBatch)
                    }
                    // We can use same batch since concurrent write doesn't
                    // guarantee write order.
                    ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
                    mergeLimit -= internalLen
                }
                sync = sync || incoming.sync // 同步的情況需要通知寫入的等待線程寫入完畢
                merged++
                db.writeMergedC <- true

            default:
                break merge
            }
        }
    }

    // Seq number.
    seq := db.seq + 1 //seq是實際batch的數(shù)量編號, 此時db的實際seq并未更新

    // Write journal.
    // 2. batch 信息寫入日志
    if err := db.writeJournal(batches, seq, sync); err != nil {
        db.unlockWrite(overflow, merged, err)
        return err
    }

    // Put batches.
    // 3. batch 數(shù)據(jù)寫入內(nèi)存數(shù)據(jù)庫 mendb
    for _, batch := range batches {
        if err := batch.putMem(seq, mdb.DB); err != nil {
            panic(err)
        }
        seq += uint64(batch.Len())
    }

    // Incr seq number.
    db.addSeq(uint64(batchesLen(batches))) //更新db的seq

    // Rotate memdb if it's reach the threshold.
    if batch.internalLen >= mdbFree { //防止下次不夠?
        db.rotateMem(0, false)
    }

    db.unlockWrite(overflow, merged, nil)
    return nil
}
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容