區(qū)塊的持久化之BoltDB(四)

在上一篇文章《區(qū)塊的持久化之BoltDB(三)》中,我們分析了在db.Update()的回調(diào)函數(shù)中通過可讀寫Transaction創(chuàng)建Bucket及向Bucket中寫入K/V的過程,回調(diào)函數(shù)正常返回后,db.Update()最后會調(diào)用Transaction的Commit()方法將對數(shù)據(jù)庫的修改提交并最終寫入磁盤文件。本文將通過分析Commit()的代碼來了解修改DB時將執(zhí)行哪些步驟。

我們提到過,在Commit()中會發(fā)生B+Tree的分裂與再平衡,為了便于大家直觀理解這兩個過程,我們用上一篇文章中的BoltDB畫像圖中的BucketAA為例子,給大家展示分裂與再平衡的過程。為了便于說明問題,我們調(diào)整BucketAA的B+Tree參數(shù)為(3, 3, 2, 100%),即節(jié)點上Key的個數(shù)至少為2,否則會觸發(fā)再平衡過程,如圖中所示刪除Key 8后節(jié)點會再平衡。

由于篇幅所限,本文不打算詳細介紹B+Tree節(jié)點的旋轉(zhuǎn)和分裂過程,不了解的讀者可以通過這里來演練B+Tree的插入、查找、刪除等操作。我們接下來分析Commit()的代碼:

//boltdb/bolt/tx.go

// Commit writes all changes to disk and updates the meta page.
// Returns an error if a disk write error occurs, or if Commit is
// called on a read-only transaction.
func (tx *Tx) Commit() error {
    
    ......

    // TODO(benbjohnson): Use vectorized I/O to write out dirty pages.

    // Rebalance nodes which have had deletions.
    var startTime = time.Now()
    tx.root.rebalance()                                                               (1)
    ......

    // spill data onto dirty pages.
    startTime = time.Now()
    if err := tx.root.spill(); err != nil {                                           (2)
        tx.rollback()
        return err
    }
    tx.stats.SpillTime += time.Since(startTime)

    // Free the old root bucket.
    tx.meta.root.root = tx.root.root                                                  (3)

    opgid := tx.meta.pgid                                                             (4)

    // Free the freelist and allocate new pages for it. This will overestimate
    // the size of the freelist but not underestimate the size (which would be bad).
    tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
    p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)               (5)
    if err != nil {
        tx.rollback()
        return err
    }
    if err := tx.db.freelist.write(p); err != nil {                                   (6)
        tx.rollback()
        return err
    }
    tx.meta.freelist = p.id

    // If the high water mark has moved up then attempt to grow the database.
    if tx.meta.pgid > opgid {                                                         (7)
        if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {      (8)
            tx.rollback()
            return err
        }
    }                                                                                 

    // Write dirty pages to disk.
    startTime = time.Now()
    if err := tx.write(); err != nil {                                                (9)
        tx.rollback()
        return err
    }

    ......

    // Write meta to disk.
    if err := tx.writeMeta(); err != nil {                                            (10)
        tx.rollback()
        return err
    }
    tx.stats.WriteTime += time.Since(startTime)

    // Finalize the transaction.
    tx.close()                                                                        (11)

    // Execute commit handlers now that the locks have been removed.
    for _, fn := range tx.commitHandlers {                                           
        fn()                                                                          (12)
    }

    return nil
}

在Commit()中:

  1. 代碼(1)處對根Bucket進行再平衡,這里的根Bucket也是整個DB的根Bucket,然而從根Bucket進行再平衡并不是要對DB中所有節(jié)點進行操作,而且對當前讀寫Transaction訪問過的Bucket中的有刪除操作的節(jié)點進行再平衡;
  2. 代碼(2)處對根Bucket進行溢出操作,同樣地,也是對訪問過的子Bucket進行溢出操作,而且只有當節(jié)點中Key數(shù)量確實超限時才會分裂節(jié)點;
  3. 進行再旋轉(zhuǎn)與分裂后,根Bucket的根節(jié)點可能發(fā)生了變化,因此代碼(3)處將根Bucket的根節(jié)點的頁號更新,且最終會寫入DB的meta page;
  4. 代碼(5)~(6)處更新DB的freeList page,這里需要解釋一下為什么對freelist作了先釋放后重新分配頁框并寫入的操作,這是因為在代碼(9)處寫磁盤時,只會向磁盤寫入由當前Transaction分配并寫入過的頁(臟頁),由于freeList page最初是在db初始化過程中分配的頁,如果不在Transaction內(nèi)釋放并重新分配,那么freeList page將沒有機會被更新到DB文件中,這里的實現(xiàn)并不很優(yōu)雅,讀者可以想一想更好的實現(xiàn)方式;
  5. 代碼(4)、(7)和(8)是為了實現(xiàn)這樣的邏輯: 只有當映射入內(nèi)存的頁數(shù)增加時,才調(diào)用db.grow()來刷新磁盤文件的元數(shù)據(jù),以及時更新文件大小信息。這里需要解釋一下: 我們前面介紹過,windows平臺下db.mmap()調(diào)用會通過ftruncate系統(tǒng)調(diào)用來增加文件大小,而linux平臺則沒有,但linux平臺會在db.grow()中調(diào)用ftruncate更新文件大小。我們前面介紹過,BoltDB寫數(shù)據(jù)時不是通過mmap內(nèi)存映射寫文件的,而是直接通過fwrite和fdatesync系統(tǒng)調(diào)用 向文件系統(tǒng)寫文件。當向文件寫入數(shù)據(jù)時,文件系統(tǒng)上該文件結(jié)點的元數(shù)據(jù)可能不會立即刷新,導致文件的size不會立即更新,當進程crash時,可能會出現(xiàn)寫文件結(jié)束但文件大小沒有更新的情況,所以為了防止這種情況出現(xiàn),在寫DB文件之前,無論是windows還是linux平臺,都會通過ftruncate系統(tǒng)調(diào)用來增加文件大??;但是linux平臺為什么不在每次mmap的時候調(diào)用ftruncate來更新文件大小呢?這里是一個優(yōu)化措施,因為頻繁地ftruncate系統(tǒng)調(diào)用會影響性能,這里的優(yōu)化方式是: 只有當:1) 重新分配freeListPage時,沒有空閑頁,這里大家可能會有疑惑,freeListPage不是剛剛通過freelist的free()方法釋放過它所占用的頁嗎,還會有重新分配時沒有空閑頁的情況嗎?實際上,free()過并不是真正釋放頁,而是將頁標記為pending,要等創(chuàng)建下一次讀寫Transaction時才會被真正回收(大家可以查看freeist的free()和release()以及DB的beginRWTx()方法中最后一節(jié)代碼了解具體邏輯);2) remmap的長度大于文件實際大小時,才會調(diào)用ftruncate來增加文件大小,且當映射文件大小大于16M后,每次增加文件大小時會比實際需要的文件大小多增加16M。這里的優(yōu)化比較隱晦,實現(xiàn)上并不優(yōu)雅,讀者也可以思考一下更好的優(yōu)化方式;
  6. 代碼(9)處將當前transaction分配的臟頁寫入磁盤;
  7. 代碼(10)處將當前transaction的meta寫入DB的meta頁,因為進行讀寫操作后,meta中的txid已經(jīng)改變,root、freelist和pgid也有可能已經(jīng)更新了;
  8. 代碼(11)處關(guān)閉當前transaction,清空相關(guān)字段;
  9. 代碼(12)處回調(diào)commit handlers;

接下來,我們先來看看Bucket的rebalance()方法:

//boltdb/bolt/bucket.go

// rebalance attempts to balance all nodes.
func (b *Bucket) rebalance() {
    for _, n := range b.nodes {
        n.rebalance()
    }
    for _, child := range b.buckets {
        child.rebalance()
    }
}

它先對Bucket中緩存的node進行再平衡操作,然后對所有子Bucket遞歸調(diào)用rebalance()。node的rebalance():

//boltdb/bolt/node.go

// rebalance attempts to combine the node with sibling nodes if the node fill
// size is below a threshold or if there are not enough keys.
func (n *node) rebalance() {
    if !n.unbalanced {                                                            (1)
        return
    }
    n.unbalanced = false

    ......

    // Ignore if node is above threshold (25%) and has enough keys.
    var threshold = n.bucket.tx.db.pageSize / 4
    if n.size() > threshold && len(n.inodes) > n.minKeys() {                      (2)
        return
    }

    // Root node has special handling.
    if n.parent == nil {
        // If root node is a branch and only has one node then collapse it.
        if !n.isLeaf && len(n.inodes) == 1 {                                      (3)
            // Move root's child up.
            child := n.bucket.node(n.inodes[0].pgid, n)
            n.isLeaf = child.isLeaf
            n.inodes = child.inodes[:]
            n.children = child.children

            // Reparent all child nodes being moved.
            for _, inode := range n.inodes {
                if child, ok := n.bucket.nodes[inode.pgid]; ok {
                    child.parent = n
                }
            }

            // Remove old child.
            child.parent = nil
            delete(n.bucket.nodes, child.pgid)
            child.free()
        }

        return
    }

    // If node has no keys then just remove it.
    if n.numChildren() == 0 {                                                     (4)
        n.parent.del(n.key)
        n.parent.removeChild(n)
        delete(n.bucket.nodes, n.pgid)
        n.free()
        n.parent.rebalance()
        return
    }

    _assert(n.parent.numChildren() > 1, "parent must have at least 2 children")

    // Destination node is right sibling if idx == 0, otherwise left sibling.
    var target *node
    var useNextSibling = (n.parent.childIndex(n) == 0)                            (5)
    if useNextSibling {
        target = n.nextSibling()
    } else {
        target = n.prevSibling()
    }

    // If both this node and the target node are too small then merge them.
    if useNextSibling {                                                           (6)
        // Reparent all child nodes being moved.
        for _, inode := range target.inodes {
            if child, ok := n.bucket.nodes[inode.pgid]; ok {
                child.parent.removeChild(child)
                child.parent = n
                child.parent.children = append(child.parent.children, child)
            }
        }

        // Copy over inodes from target and remove target.
        n.inodes = append(n.inodes, target.inodes...)
        n.parent.del(target.key)
        n.parent.removeChild(target)
        delete(n.bucket.nodes, target.pgid)
        target.free()
    } else {                                                                      (7)
        // Reparent all child nodes being moved.
        for _, inode := range n.inodes {
            if child, ok := n.bucket.nodes[inode.pgid]; ok {
                child.parent.removeChild(child)
                child.parent = target
                child.parent.children = append(child.parent.children, child)
            }
        }

        // Copy over inodes to target and remove node.
        target.inodes = append(target.inodes, n.inodes...)
        n.parent.del(n.key)
        n.parent.removeChild(n)
        delete(n.bucket.nodes, n.pgid)
        n.free()
    }

    // Either this node or the target node was deleted from the parent so rebalance it.
    n.parent.rebalance()                                                         (8)
}

它的邏輯稍微有些復雜,主要包含:

  1. 代碼(1)處限制只有unbalanced為true時才進行節(jié)點再平衡,只有當節(jié)點中有過刪除操作時,unbalanced才為true;
  2. 代碼(2)處作再平衡條件檢查,只有當節(jié)點存的K/V總大小小于頁大小的25%且節(jié)點中Key的數(shù)量少于設(shè)定的每節(jié)點Key數(shù)量最小值時,才會進行旋轉(zhuǎn);
  3. 代碼(3)處處理當根節(jié)點只有一個子節(jié)點的情形(可以回顧上圖中最后一步): 將子節(jié)點上的inodes拷貝到根節(jié)點上,并將子節(jié)點的所有孩子移交給根節(jié)點,并將孩子節(jié)點的父節(jié)點更新為根節(jié)點;如果子節(jié)點是一個葉子節(jié)點,則將子節(jié)點的inodes全部拷貝到根節(jié)點上后,根節(jié)點也是一個葉子節(jié)點;最后,將子節(jié)點刪除;
  4. 如果節(jié)點變成一個空節(jié)點,則將它從B+Tree中刪除,并把父節(jié)點上的Key和Pointer刪除,由于父節(jié)點上有刪除,得對父節(jié)點進行再平衡;
  5. 代碼(5)~(7)合并兄弟節(jié)點與當前節(jié)點中的記錄集合。代碼(3)處決定是合并左節(jié)點還是右節(jié)點:如果當前節(jié)點是父節(jié)點的第一個孩子,則將右節(jié)點中的記錄合并到當前節(jié)點中;如果當前節(jié)點是父節(jié)點的第二個或以上的節(jié)點,則將當前節(jié)點中的記錄合并到左節(jié)點中;
  6. 代碼(6)將右節(jié)點中記錄合并到當前節(jié)點:首先將右節(jié)點的孩子節(jié)點全部變成當前節(jié)點的孩子,右節(jié)點將所有孩子移除;隨后,將右節(jié)點中的記錄全部拷貝到當前節(jié)點;最后,將右節(jié)點從B+Tree中移除,并將父節(jié)點中與右節(jié)點對應的記錄刪除;
  7. 代碼(7)將當前節(jié)點中的記錄合并到左節(jié)點,過程與代碼(6)處類似;
  8. 合并兄弟節(jié)點與當前節(jié)點時,會移除一個節(jié)點并從父節(jié)點中刪除一個記錄,所以需要對父節(jié)點進行再平衡,如代碼(8)處所示,所以節(jié)點的rebalance也是一個遞歸的過程,它會從當前結(jié)點一直進行到根節(jié)點處;

在Bucket rebalance過程中,節(jié)點合并后其大小可能超過頁大小,但在接下來的spill過程中,超過頁大小的節(jié)點會進行分裂。接下來,我們來看看Bucket的spill()方法:

//boltdb/bolt/bucket.go

// spill writes all the nodes for this bucket to dirty pages.
func (b *Bucket) spill() error {
    // Spill all child buckets first.
    for name, child := range b.buckets {
        // If the child bucket is small enough and it has no child buckets then
        // write it inline into the parent bucket's page. Otherwise spill it
        // like a normal bucket and make the parent value a pointer to the page.
        var value []byte
        if child.inlineable() {                                                        (1)
            child.free()
            value = child.write()
        } else {
            if err := child.spill(); err != nil {                                      (2)
                return err
            }

            // Update the child bucket header in this bucket.
            value = make([]byte, unsafe.Sizeof(bucket{}))                              (3)
            var bucket = (*bucket)(unsafe.Pointer(&value[0]))                          (4)
            *bucket = *child.bucket                                                    (5)
        }

        // Skip writing the bucket if there are no materialized nodes.
        if child.rootNode == nil {
            continue
        }

        // Update parent node.
        var c = b.Cursor()
        k, _, flags := c.seek([]byte(name))
        
        ......
        
        c.node().put([]byte(name), []byte(name), value, 0, bucketLeafFlag)            (6)
    }

    // Ignore if there's not a materialized root node.
    if b.rootNode == nil {
        return nil
    }

    // Spill nodes.
    if err := b.rootNode.spill(); err != nil {                                        (7)
        return err
    }
    b.rootNode = b.rootNode.root()                                                    (8)

    // Update the root node for this bucket.
    if b.rootNode.pgid >= b.tx.meta.pgid {
        panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", b.rootNode.pgid, b.tx.meta.pgid))
    }
    b.root = b.rootNode.pgid                                                          (9)

    return nil
}

在spill()中:

  1. 首先,對Bucket樹的子Bucket進行深度優(yōu)先訪問并遞歸調(diào)用spill();
  2. 在代碼(2)處,子Bucket不滿足inlineable()條件時,如果子Bucket原來是一個內(nèi)置Bucket,則它將通過spill()變成一個普通的Bucket,即它的B+Tree有一個根節(jié)點和至少兩個葉子節(jié)點;如果子Bucket原本是一個普通Bucket,則spill()可能會更新它的根節(jié)點。從代碼(3)~(5)可以看出,一個普通的子Bucket的Value只保存了Bucket的頭部。相反地,在代碼(1)處,如果一個普通的子Bucket由于K/V記錄減少而滿足了inlineable()條件時,它將變成一個內(nèi)置Bucket,即它的B+Tree只有一個根節(jié)點,并將根節(jié)點上的所有inodes作為Value寫入父Bucket;
  3. 代碼(6)處將子Bucket的新的Value更新到父Bucket中;
  4. 更新完子Bucket后,就開始spill自己,代碼(7)處從當前Bucket的根節(jié)點處開始spill。在遞歸的最內(nèi)層調(diào)用中,訪問到了Bucket樹的某個(邏輯)葉子Bucket,由于它沒有子Bucket,將直接從其根節(jié)開始spill;
  5. Bucket spill完后,其根節(jié)點可能有變化,所以代碼(8)處更新根節(jié)點引用;
  6. 最后,代碼(9)處更新Bucket頭中的根節(jié)點頁號;

我們先來通過inlineable()的代碼了解成為內(nèi)置Bucket的條件:

//boltdb/bolt/bucket.go

// inlineable returns true if a bucket is small enough to be written inline
// and if it contains no subbuckets. Otherwise returns false.
func (b *Bucket) inlineable() bool {
    var n = b.rootNode

    // Bucket must only contain a single leaf node.
    if n == nil || !n.isLeaf {
        return false
    }

    // Bucket is not inlineable if it contains subbuckets or if it goes beyond
    // our threshold for inline bucket size.
    var size = pageHeaderSize
    for _, inode := range n.inodes {
        size += leafPageElementSize + len(inode.key) + len(inode.value)

        if inode.flags&bucketLeafFlag != 0 {
            return false
        } else if size > b.maxInlineBucketSize() {
            return false
        }
    }

    return true
}

......

// Returns the maximum total size of a bucket to make it a candidate for inlining.
func (b *Bucket) maxInlineBucketSize() int {
    return b.tx.db.pageSize / 4
}

可以看出,只有當Bucket只有一個葉子節(jié)點(即其根節(jié)點)且它序列化后的大小小于頁大小的25%時才能成為內(nèi)置Bucket。接下來,我們開始分析node的spill過程:

//boltdb/bolt/node.go

// spill writes the nodes to dirty pages and splits nodes as it goes.
// Returns an error if dirty pages cannot be allocated.
func (n *node) spill() error {
    var tx = n.bucket.tx
    if n.spilled {                                                                    (1)
        return nil
    }

    // Spill child nodes first. Child nodes can materialize sibling nodes in
    // the case of split-merge so we cannot use a range loop. We have to check
    // the children size on every loop iteration.
    sort.Sort(n.children)
    for i := 0; i < len(n.children); i++ {                                            (2)
        if err := n.children[i].spill(); err != nil {
            return err
        }
    }

    // We no longer need the child list because it's only used for spill tracking.
    n.children = nil                                                                 (3)

    // Split nodes into appropriate sizes. The first node will always be n.
    var nodes = n.split(tx.db.pageSize)                                              (4)
    for _, node := range nodes {
        // Add node's page to the freelist if it's not new.
        if node.pgid > 0 {
            tx.db.freelist.free(tx.meta.txid, tx.page(node.pgid))                    (5)
            node.pgid = 0
        }

        // Allocate contiguous space for the node.
        p, err := tx.allocate((node.size() / tx.db.pageSize) + 1)                    (6)
        if err != nil {
            return err
        }

        // Write the node.
        if p.id >= tx.meta.pgid {
            panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", p.id, tx.meta.pgid))
        }
        node.pgid = p.id                                                             (7)
        node.write(p)                                                                (8)
        node.spilled = true                                                          (9)

        // Insert into parent inodes.
        if node.parent != nil {
            var key = node.key
            if key == nil {
                key = node.inodes[0].key                                             (10)     
            }

            node.parent.put(key, node.inodes[0].key, nil, node.pgid, 0)              (11)
            node.key = node.inodes[0].key                                            (12)
            _assert(len(node.key) > 0, "spill: zero-length node key")
        }

        // Update the statistics.
        tx.stats.Spill++
    }

    // If the root node split and created a new root then we need to spill that
    // as well. We'll clear out the children to make sure it doesn't try to respill.
    if n.parent != nil && n.parent.pgid == 0 {
        n.children = nil                                                            (13)
        return n.parent.spill()                                                     (14)
    }

    return nil
}

node的spill過程與rebalance過程不同,rebalance是從當前節(jié)點到根節(jié)點遞歸,而spill是從根節(jié)點到葉子節(jié)點進行遞歸,不過它們最終都要處理根節(jié)點的rebalance或者spill。在spill中,如果根節(jié)點需要分裂(如上圖中的第二步),則需要對其遞歸調(diào)用spill,但是為了防止循環(huán),調(diào)用父節(jié)點的spill之前,會將父node中緩存的子節(jié)點引用集合children置空,以防止向下遞歸。我們來看看它的具體實現(xiàn):

  1. 代碼(1)處檢測當前node是否已經(jīng)spill過,如果spill過了則無需spill;
  2. 代碼(2)處對子節(jié)點進行深度優(yōu)先訪問并遞歸調(diào)用spill(),需要注意的是,子節(jié)點可能會分裂成多個節(jié)點,分裂出來的新節(jié)點也是當前節(jié)點的子節(jié)點,n.children這個slice的size會在循環(huán)中變化,幫不能使用rang的方式循環(huán)訪問;同時,分裂出來的新節(jié)點會在代碼(9)處被設(shè)為spilled,所以在代碼(2)的下一次循環(huán)訪問到新的子節(jié)點時不會重新spill,這也是代碼(1)處對spilled進行檢查的原因;
  3. 當所有子節(jié)點spill完成后,代碼(3)處將子節(jié)點引用集合children置為空,以防向上遞歸調(diào)用spill的時候形成回路;
  4. 代碼(4)處調(diào)用node的split()方法按頁大小將node分裂出若干新node,新node與當前node共享同一個父node,返回的nodes中包含當前node;
  5. 隨后代碼(5)~(12)處理分裂后產(chǎn)生的node。代碼(5)處為釋放當前node的所占頁,因為隨后要為它分配新的頁,我們前面說過transaction commit是只會向磁盤寫入當前transaction分配的臟頁,所以這里要對當前node重新分配頁;
  6. 代碼(6)處調(diào)用Tx的allocate()方法為分裂產(chǎn)生的node分配頁緩存,請注意,通過splite()方法分裂node后,node的大小為頁大小 * 填充率,默認填充率為50%,而且一般地它的值小于100%,所以這里為每個node實際上是分配一個頁框;
  7. 代碼(7)處將新node的頁號設(shè)為分配給他的頁框的頁號,同時,代碼(8)處將新node序列化并寫入剛剛分配的頁緩存;
  8. 代碼(9)處將spilled設(shè)為true,我們剛剛介紹過它的作用;
  9. 代碼(10)~(12)處向父節(jié)點更新或添加Key和Pointer,以指向分裂產(chǎn)生的新node。代碼(10)將父node的key設(shè)為第一個子node的第一個key;
  10. 代碼(11)處向父node寫入Key和Pointer,其中Key是子結(jié)點的第一個key,Pointer是子節(jié)點的pgid;
  11. 代碼(12)處將分裂產(chǎn)生的node的key設(shè)為其中的第一個key;
  12. 從根節(jié)點處遞歸完所有子節(jié)點的spill過程后,若根節(jié)點需要分裂,則它分裂后將產(chǎn)生新的根節(jié)點,代碼(13)和(14)對新產(chǎn)生的根節(jié)點進行spill;

在node的spill()過程中,除了通過遞歸來保證整個樹結(jié)構(gòu)被spill外,比較重要的地方是spite如何分裂節(jié)點,我們來看看node的split()方法:

//boltdb/bolt/node.go

// split breaks up a node into multiple smaller nodes, if appropriate.
// This should only be called from the spill() function.
func (n *node) split(pageSize int) []*node {
    var nodes []*node

    node := n
    for {
        // Split node into two.
        a, b := node.splitTwo(pageSize)
        nodes = append(nodes, a)

        // If we can't split then exit the loop.
        if b == nil {
            break
        }

        // Set node to b so it gets split on the next iteration.
        node = b
    }

    return nodes
}

可以看到,split()實際上就是把node分成兩段,其中一段滿足node要求的大小,另一段再進一步按相同規(guī)則分成兩段,一直到不能再分為止。我們來看看其中的splitTwo():

//boltdb/bolt/node.go

// splitTwo breaks up a node into two smaller nodes, if appropriate.
// This should only be called from the split() function.
func (n *node) splitTwo(pageSize int) (*node, *node) {
    // Ignore the split if the page doesn't have at least enough nodes for
    // two pages or if the nodes can fit in a single page.
    if len(n.inodes) <= (minKeysPerPage*2) || n.sizeLessThan(pageSize) {   (1)
        return n, nil
    }

    // Determine the threshold before starting a new node.
    var fillPercent = n.bucket.FillPercent
    if fillPercent < minFillPercent {
        fillPercent = minFillPercent
    } else if fillPercent > maxFillPercent {
        fillPercent = maxFillPercent
    }
    threshold := int(float64(pageSize) * fillPercent)                     (2)

    // Determine split position and sizes of the two pages.
    splitIndex, _ := n.splitIndex(threshold)                              (3)

    // Split node into two separate nodes.
    // If there's no parent then we'll need to create one.
    if n.parent == nil {
        n.parent = &node{bucket: n.bucket, children: []*node{n}}          (4)
    }

    // Create a new node and add it to the parent.
    next := &node{bucket: n.bucket, isLeaf: n.isLeaf, parent: n.parent}   (5)
    n.parent.children = append(n.parent.children, next)

    // Split inodes across two nodes.
    next.inodes = n.inodes[splitIndex:]                                   (6)
    n.inodes = n.inodes[:splitIndex]                                      (7)

    ......

    return n, next
}

可以看到:

  1. 代碼(1)處決定了節(jié)點分裂的條件: 1) 節(jié)點大小超過了頁大小, 且2) 節(jié)點Key個數(shù)大于每節(jié)點Key數(shù)量最小值的兩倍,這是為了保證分裂出的兩個節(jié)點中的Key數(shù)量都大于每節(jié)點Key數(shù)量的最小值;
  2. 代碼(2)處決定分裂的門限值,即頁大小 x 填充率;
  3. 代碼(3)處調(diào)用splitIndex()方法根據(jù)門限值計算分裂的位置;
  4. 代碼(4)如果要分裂的節(jié)點沒有父節(jié)點(可能是根節(jié)點),則應該新建一個父node,同時將當前節(jié)點設(shè)為它的子node;
  5. 代碼(5)創(chuàng)建了一個新node,并將當前node的父節(jié)點設(shè)為它的父節(jié)點;
  6. 代碼(6)處將當前node的從分裂位置開始的右半部分記錄拷貝給新node;
  7. 代碼(7)處將當前node的記錄更新為原記錄集合從分裂位置開始的左半部分,從而實現(xiàn)了將當前node一分為二;

我們來看看splitIndex()是如何決定分裂位置的:

//boltdb/bolt/node.go

// splitIndex finds the position where a page will fill a given threshold.
// It returns the index as well as the size of the first page.
// This is only be called from split().
func (n *node) splitIndex(threshold int) (index, sz int) {
    sz = pageHeaderSize

    // Loop until we only have the minimum number of keys required for the second page.
    for i := 0; i < len(n.inodes)-minKeysPerPage; i++ {
        index = i
        inode := n.inodes[i]
        elsize := n.pageElementSize() + len(inode.key) + len(inode.value)

        // If we have at least the minimum number of keys and adding another
        // node would put us over the threshold then exit and return.
        if i >= minKeysPerPage && sz+elsize > threshold {
            break
        }

        // Add the element size to the total size.
        sz += elsize
    }

    return
}

可以看出分裂位置要同時保證:

  1. 前半部分的節(jié)點數(shù)量大于每節(jié)點Key數(shù)量最小值(minKeysPerPage);
  2. 后半部分的節(jié)點數(shù)量大于每節(jié)點Key數(shù)量最小值(minKeysPerPage);
  3. 分裂后半部分node的大小是不超過門限值的最小值,即前半部分的size要在門限范圍內(nèi)盡量大;

對Bucket進行rebalance和spill后,Bucket及其子Bucket對應的B+Tree將處于平衡狀態(tài),隨后各node將被寫入DB文件。這兩個過程在樹結(jié)構(gòu)上進行遞歸,可能不太好理解,讀者可以對照本文開頭給出的示例圖推演。node的spill()調(diào)用中,還涉及到了Tx的allocate()方法,我們將在介紹完Tx的Commit()后再來分析它。Commit()中接下來比較重要的步驟例是調(diào)用tx.write()和tx.writeMeta()來寫DB文件了。我們先來看看tx.write():

//boltdb/bolt/tx.go

// write writes any dirty pages to disk.
func (tx *Tx) write() error {
    // Sort pages by id.
    pages := make(pages, 0, len(tx.pages))
    for _, p := range tx.pages {
        pages = append(pages, p)                                             (1)
    }
    // Clear out page cache early.
    tx.pages = make(map[pgid]*page)                                          (2)
    sort.Sort(pages)                                                         (3)

    // Write pages to disk in order.
    for _, p := range pages {                                                (4)
        size := (int(p.overflow) + 1) * tx.db.pageSize
        offset := int64(p.id) * int64(tx.db.pageSize)

        // Write out page in "max allocation" sized chunks.
        ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p))
        for {
            // Limit our write to our max allocation size.
            sz := size
            if sz > maxAllocSize-1 {
                sz = maxAllocSize - 1
            }

            // Write chunk to disk.
            buf := ptr[:sz]
            if _, err := tx.db.ops.writeAt(buf, offset); err != nil {        (5)
                return err
            }

            ......

            // Exit inner for loop if we've written all the chunks.
            size -= sz
            if size == 0 {
                break
            }

            // Otherwise move offset forward and move pointer to next chunk.
            offset += int64(sz)
            ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz]))
        }
    }

    // Ignore file sync if flag is set on DB.
    if !tx.db.NoSync || IgnoreNoSync {
        if err := fdatasync(tx.db); err != nil {                          (6)
            return err
        }
    }

    ......

    return nil
}

tx.write()的主要步驟是:

  1. 首先將當前tx中的臟頁的引用保存到本地slice變量中,并釋放原來的引用。請注意,Tx對象并不是線程安全的,而接下來的寫文件操作會比較耗時,此時應該避免tx.pages被修改;
  2. 代碼(3)處對頁按其pgid排序,保證在隨后按頁順序?qū)懳募?,一定程度上提高寫文件效?
  3. 代碼(4)處開始將各頁循環(huán)寫入文件,循環(huán)體中代碼(5)處通過fwrite系統(tǒng)調(diào)用寫文件;
  4. 代碼(6)處通過fdatasync將磁盤緩沖寫入磁盤。

在將臟頁寫入磁盤后,tx.Commit()隨后將meta寫入磁盤:

//boltdb/bolt/tx.go

// writeMeta writes the meta to the disk.
func (tx *Tx) writeMeta() error {
    // Create a temporary buffer for the meta page.
    buf := make([]byte, tx.db.pageSize)
    p := tx.db.pageInBuffer(buf, 0)
    tx.meta.write(p)

    // Write the meta page to file.
    if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
        return err
    }
    if !tx.db.NoSync || IgnoreNoSync {
        if err := fdatasync(tx.db); err != nil {
            return err
        }
    }

    ......

    return nil
}

writeMeta()的實現(xiàn)比較簡單,先向臨時分配的頁緩存寫入序列化后的meta頁,然后通過fwrite和fdatesync系統(tǒng)調(diào)用將其寫入DB的meta page。

到此,我們就完整地了解了Transaction Commit的全部過程,主要包括:

  1. 從根Bucket開始,對訪問過的Bucket進行轉(zhuǎn)換與分裂,讓進行過插入與刪除操作的B+Tree重新達到平衡狀態(tài);
  2. 更新freeList頁;
  3. 將由當前transaction分配的頁緩存寫入磁盤,需要分配頁緩存的地方有: 1)節(jié)點分裂時產(chǎn)生新的節(jié)點, 2) freeList頁重新分配;
  4. 將meta頁寫入磁盤;

最后,我們來看一看tx是如何通過allocate()方法來分配頁緩存的:

//boltdb/bolt/tx.go

// allocate returns a contiguous block of memory starting at a given page.
func (tx *Tx) allocate(count int) (*page, error) {
    p, err := tx.db.allocate(count)
    if err != nil {
        return nil, err
    }

    // Save to our page cache.
    tx.pages[p.id] = p

    ......

    return p, nil
}

可以看出,它實際上是通過DB的allocate()方法來做實際分配的;同時,這里分配的頁緩存將被加入到tx的pages字段中,也就是我們前面提到的臟頁集合。

//boltdb/bolt/db.go

// allocate returns a contiguous block of memory starting at a given page.
func (db *DB) allocate(count int) (*page, error) {
    // Allocate a temporary buffer for the page.
    var buf []byte
    if count == 1 {
        buf = db.pagePool.Get().([]byte)                                 (1)
    } else {
        buf = make([]byte, count*db.pageSize)
    }
    p := (*page)(unsafe.Pointer(&buf[0]))
    p.overflow = uint32(count - 1)

    // Use pages from the freelist if they are available.
    if p.id = db.freelist.allocate(count); p.id != 0 {                   (2)
        return p, nil
    }

    // Resize mmap() if we're at the end.
    p.id = db.rwtx.meta.pgid                                             (3)
    var minsz = int((p.id+pgid(count))+1) * db.pageSize                  (4)
    if minsz >= db.datasz {                                  
        if err := db.mmap(minsz); err != nil {                           (5)
            return nil, fmt.Errorf("mmap allocate error: %s", err)
        }
    }

    // Move the page id high water mark.
    db.rwtx.meta.pgid += pgid(count)                                     (6)

    return p, nil
}

可以看出,實際分配頁緩存的過程是:

  1. 首先分配所需的緩存。這里有一個優(yōu)化措施: 如果只需要一頁緩存的話,并不直接進行內(nèi)存分配,而是通過Go中的Pool緩沖池來分配,以減小分配內(nèi)存帶來的時間開銷。tx.write()中向磁盤寫入臟頁后,會將所有只占一個頁框的臟頁清空,并放入Pool緩沖池;
  2. 從freeList查看有沒有可用的頁號,如果有則分配給剛剛申請到的頁緩存,并返回;如果freeList中沒有可用的頁號,則說明當前映射入內(nèi)存的文件段沒有空閑頁,需要增大文件映射范圍;
  3. 代碼(3)處將新申請的頁緩存的頁號設(shè)為文件內(nèi)容結(jié)尾處的頁號; 請注意,我們所說的文件內(nèi)容結(jié)尾處并不是指文件結(jié)尾處,如大小為32K 的文件,只寫入了4頁(頁大小為4K),則文件內(nèi)容結(jié)尾處為16K處,結(jié)尾處的頁號是4。我們在《區(qū)塊的持久化之BoltDB(一)》中介紹過meta.pgid簡單理解為文件總頁數(shù),實際上并不準確,我們說簡單理解為文件總頁數(shù),是假設(shè)文件被寫滿(如剛創(chuàng)建DB文件時)的情況?,F(xiàn)在我們知道,當映射文件大小大于16M時,文件實際大小會大于文件內(nèi)容長度。實際上,BoltDB允許在Open()的時候指定初始的文件映射長度,并可以超過文件大小,在linux平臺上,在讀寫transaction commit之前,映射區(qū)長度均會大于文件實際大小,但meta.pgid總是記錄文件內(nèi)容所占的最大頁號加1;
  4. 代碼(4)處計算需要的總頁數(shù);
  5. 在代碼(5)處,如果需要的頁數(shù)大于已經(jīng)映射到內(nèi)存的文件總頁數(shù),則觸發(fā)remmap,將映射區(qū)域擴大到寫入文件后新的文件內(nèi)容結(jié)尾處。我們前面介紹db.mmaplock的時候說過,讀寫transaction在remmap時,需要等待所有已經(jīng)open的只讀transaction結(jié)束,從這里我們知道,如果打開DB文件時,設(shè)定的初始文件映射長度足夠長,可以減少讀寫transaction需要remmap的概率,從而降低讀寫transaction被阻塞的概率,提高讀寫并發(fā);
  6. 代碼(6)讓meta.pgid指向新的文件內(nèi)容結(jié)尾處;

到這里,我們就了解了通過db.Update()來讀寫B(tài)oltDB的全過程。它涉及到了創(chuàng)建Transaction、Bucket,在Bucket中查找、插入、刪除K/V,以及在最后Commit讀寫transaction時發(fā)生的B+Tree的旋轉(zhuǎn)、分裂等過程,其中B+Tree節(jié)點的旋轉(zhuǎn)與分裂過程又涉及到頁緩存的分配、remmap、調(diào)整文件大小及寫文件等等。從整個過程來看,我們并沒有發(fā)現(xiàn)讀寫數(shù)據(jù)庫時真正用到鎖的地方,從我們前面的介紹中,大家已經(jīng)知道BoltDB通過meta的txid進行了版本管理,并且它肯定也有MVCC的機制,那么,BoltDB究竟是如何實現(xiàn)MVCC的呢?我們將在下篇文章《區(qū)塊的持久化之BoltDB(五)》中介紹完db.View()后來分析其MVCC機制。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容