Prometheus WAL源碼閱讀

概述

WAL(Write-ahead logging, 預(yù)寫日志)是關(guān)系型數(shù)據(jù)庫中利用日志來實(shí)現(xiàn)事務(wù)性和持久性的一種技術(shù),即在某個(gè)操作之前先將這個(gè)事情記錄下來,以便之后對(duì)數(shù)據(jù)進(jìn)行回滾,重試等操作保證數(shù)據(jù)的可靠性。

The write ahead log operates in segments that are numbered and sequential, e.g. 000000, 000001, 000002, etc., and are limited to 128MB by default. A segment is written to in pages of 32KB. Only the last page of the most recent segment may be partial. A WAL record is an opaque byte slice that gets split up into sub-records should it exceed the remaining space of the current page. Records are never split across segment boundaries. If a single record exceeds the default segment size, a segment with a larger size will be created. The encoding of pages is largely borrowed from LevelDB's/RocksDB's write ahead log.

Prometheus為了防止丟失暫存在內(nèi)存中的還未被寫入磁盤的監(jiān)控?cái)?shù)據(jù),引入了WAL機(jī)制。WAL被分割成默認(rèn)大小為128M的文件段(segment),之前版本默認(rèn)大小是256M,文件段以數(shù)字命名,長度為8位的整形。WAL的寫入單位是頁(page),每頁的大小為32KB,所以每個(gè)段大小必須是頁的大小的整數(shù)倍。每個(gè)文件段都有一個(gè)“==已使用的頁?==”屬性來標(biāo)識(shí)在該段中已經(jīng)分配的頁數(shù)目,如果WAL一次性寫入的頁數(shù)超過一個(gè)段的空閑頁數(shù),就會(huì)創(chuàng)建一個(gè)新的文件段來保存這些頁,從而確保一次性寫入的頁不會(huì)跨段存儲(chǔ)

const (
     DefaultSegmentSize = 128 * 1024 * 1024 // 128 MB
     pageSize           = 32 * 1024         // 32KB
     recordHeaderSize   = 7
)

本文是在prometheus tsdb部分源碼閱讀之wal.LogSeries基礎(chǔ)上進(jìn)行擴(kuò)展,由于tsdb/wal.go文件中定義的WAL接口以及相關(guān)方法已經(jīng)deprecated,如下所示。因此本文針對(duì)tsdb/wal/wal.go文件進(jìn)行分析。

// WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged.
//
// DEPRECATED: use wal pkg combined with the record codex instead.
type WAL interface {
    Reader() WALReader
    LogSeries([]RefSeries) error
    LogSamples([]RefSample) error
    LogDeletes([]Stone) error
    Truncate(mint int64, keep func(uint64) bool) error
    Close() error
}

Prometheus storage hierarchy

(ENV) ?? /Users/xsky/go/src/github.com/microyahoo/prometheus/data ? tree -h
.
├── [ 192]  01DPE8T5XPQ9ZYHSNJYBJBKGR6
│   ├── [  96]  chunks
│   │   └── [7.1K]  000001
│   ├── [ 22K]  index
│   ├── [ 272]  meta.json
│   └── [   9]  tombstones
├── [   0]  lock
├── [ 20K]  queries.active
└── [ 256]  wal
    ├── [   0]  00000050
    ├── [   0]  00000051
    ├── [   0]  00000052
    ├── [   0]  00000053
    ├── [ 10K]  00000054
    └── [  96]  checkpoint.000049
        └── [ 32K]  00000000

4 directories, 12 files

從上面目錄結(jié)構(gòu)可以看出wal主要包含segment文件,checkpoint目錄。

// WAL is a write ahead log that stores records in segment files.
// It must be read from start to end once before logging new data.
// If an error occurs during read, the repair procedure must be called
// before it's safe to do further writes.
//
// Segments are written to in pages of 32KB, with records possibly split
// across page boundaries.
// Records are never split across segments to allow full segments to be
// safely truncated. It also ensures that torn writes never corrupt records
// beyond the most recent segment.
type WAL struct {
    dir         string
    logger      log.Logger
    segmentSize int
    mtx         sync.RWMutex
    segment     *Segment // Active segment.
    donePages   int      // Pages written to the segment.
    page        *page    // Active page.
    stopc       chan chan struct{}
    actorc      chan func()
    closed      bool // To allow calling Close() more than once without blocking.
    compress    bool
    snappyBuf   []byte

    fsyncDuration   prometheus.Summary
    pageFlushes     prometheus.Counter
    pageCompletions prometheus.Counter
    truncateFail    prometheus.Counter
    truncateTotal   prometheus.Counter
    currentSegment  prometheus.Gauge
}
// Segment represents a segment file.
type Segment struct {
    *os.File
    dir string
    i   int
}

Create Segment

創(chuàng)建segment主要是通過New方法或者NewSize方法。其中NewSize可以指定segment的大小,New方法傳入的是默認(rèn)的segment大小,即128M。加載DB時(shí)如果WAL enabled的話,則會(huì)調(diào)用NewSize創(chuàng)建新的segment,如下所示。下面對(duì)NewSize進(jìn)行展開。

wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression)
  • 首先判斷段大小是否是page的整數(shù)倍。
  • 如果wal目錄不存在的話則創(chuàng)建wal目錄。
  • 創(chuàng)建WAL數(shù)據(jù)結(jié)構(gòu)。因?yàn)閃AL是與active segment關(guān)聯(lián)的,所以WAL segmentSize即為指定的segmentSize。
  • 查找目前WAL中所有的segment記錄。因?yàn)閟egment是按照數(shù)字命名并遞增的,所以其名字即為索引,其內(nèi)部表述為segmentRef,name為其文件名,即前綴為0填充的八位整形表示,index即為其整形表示。
type segmentRef struct {
    name  string
    index int
}
  • 上面步驟獲取到last segment之后,下一個(gè)要寫入的segment index即為last+1,創(chuàng)建新的segment。
  • 將新創(chuàng)建的segment設(shè)置為WAL的active segment,同時(shí)根據(jù)segment文件大小設(shè)置WAL donePages,將WAL currentSegment設(shè)置為新創(chuàng)建的segment的index。
  • 啟動(dòng)新的goroutine執(zhí)行actorc通道發(fā)送過來的func。主要用于執(zhí)行比較耗時(shí)的操作,例如將segment中的數(shù)據(jù)fsync到disk中。

WAL Repair

Repair attempts to repair the WAL based on the error.
It discards all data after the corruption.

需要注意的是Repair僅能修復(fù)CorruptionErr。CorruptionErr中指定了目錄,段,偏移量以及具體的錯(cuò)誤信息。

// CorruptionErr is an error that's returned when corruption is encountered.
type CorruptionErr struct {
    Dir     string
    Segment int
    Offset  int64
    Err     error
}
  • 首先獲取wal目錄中所有的segments。
  • 刪除CorruptionErr.Segment索引之后的所有的segments。
  • 關(guān)閉WAL的active segment。
  • CorruptionErr.Segment索引的segment加上.repair后綴并重命名。
  • 創(chuàng)建一個(gè)新的segment文件,其index為CorruptionErr.Segment,即Corruption segment重命名之后重新創(chuàng)建一個(gè)新的segment文件,并將WAL的active segment指向新創(chuàng)建的segment。
  • 將Corruption segment中的數(shù)據(jù)讀到新創(chuàng)建的segment文件中,讀到CorruptionErr.Offset為止。
    • 遍歷讀取corrupted segment中的records,將其寫入new segment中。
  • 將active page刷入file。
  • 關(guān)閉corrupted segment文件,并刪除。
  • 創(chuàng)建一個(gè)新的segment文件,其index為CorruptionErr.Segment的index+1,并將active segment指向新創(chuàng)建的segment。

下面截取tsdb/wal/wal.go文件中的部分Repair的代碼進(jìn)行分析。

    // 打開corrupted segment文件并讀取其中的records。
    r := NewReader(bufio.NewReader(f))

    // 下面WAL Reader部分有詳細(xì)解釋
    for r.Next() {
        // Add records only up to the where the error was.
        // 讀取到出錯(cuò)的地方為止。
        if r.Offset() >= cerr.Offset {
            break
        }
        // 下面著重介紹此方法
        if err := w.Log(r.Record()); err != nil {
            return errors.Wrap(err, "insert record")
        }
    }
    // We expect an error here from r.Err(), so nothing to handle.

    // We need to pad to the end of the last page in the repaired segment
    if err := w.flushPage(true); err != nil {
        return errors.Wrap(err, "flush page in repair")
    }

    // We explicitly close even when there is a defer for Windows to be
    // able to delete it. The defer is in place to close it in-case there
    // are errors above.
    if err := f.Close(); err != nil {
        return errors.Wrap(err, "close corrupted file")
    }
    // 刪除.repair文件
    if err := os.Remove(tmpfn); err != nil {
        return errors.Wrap(err, "delete corrupted segment")
    }

    // Explicitly close the segment we just repaired to avoid issues with Windows.
    s.Close()
    
    // We always want to start writing to a new Segment rather than an existing
    // Segment, which is handled by NewSize, but earlier in Repair we're deleting
    // all segments that come after the corrupted Segment. Recreate a new Segment here.
    // 創(chuàng)建新的segment file,并將active segment指向此新創(chuàng)建的segment file
    s, err = CreateSegment(w.dir, cerr.Segment+1)
    if err != nil {
        return err
    }
    if err := w.setSegment(s); err != nil {
        return err
    }
    return nil

Repair的核心就是將corrupted segment文件中的數(shù)據(jù)寫入新的segment中,然后刪除corrupted segment文件。下面著重介紹其寫入過程,主要是調(diào)用WAL的log方法。

// log writes rec to the log and forces a flush of the current page if:
// - the final record of a batch
// - the record is bigger than the page size
// - the current page is full.
func (w *WAL) log(rec []byte, final bool) error {
    // When the last page flush failed the page will remain full.
    // When the page is full, need to flush it before trying to add more records to it.
    // 如果WAL的active page已滿,則flush到file,并重置此page
    if w.page.full() {
        if err := w.flushPage(true); err != nil {
            return err
        }
    }
    // If the record is too big to fit within the active page in the current
    // segment, terminate the active segment and advance to the next one.
    // This ensures that records do not cross segment boundaries.
    left := w.page.remaining() - recordHeaderSize                                   // Free space in the active page.
    left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages in the active segment.

    // 如果record的長度大于active segment的剩余可用空間,則創(chuàng)建新的segment
    if len(rec) > left {
        // 1. 如果active page已分配大小不為0,則flush page到file。
        // 2. 創(chuàng)建新的segment file。
        // 3. 將上一個(gè)segment file的數(shù)據(jù)fsync到磁盤。
        if err := w.nextSegment(); err != nil {
            return err
        }
    }

    compressed := false
    if w.compress && len(rec) > 0 {
        // The snappy library uses `len` to calculate if we need a new buffer.
        // In order to allocate as few buffers as possible make the length
        // equal to the capacity.
        // 如果WAL compress enabled,則將record數(shù)據(jù)編碼為snappyBuf,
        // 將rec指向壓縮后的snappyBuf數(shù)據(jù)記錄。
        w.snappyBuf = w.snappyBuf[:cap(w.snappyBuf)]
        w.snappyBuf = snappy.Encode(w.snappyBuf, rec)
        if len(w.snappyBuf) < len(rec) {
            rec = w.snappyBuf
            compressed = true
        }
    }

    // Populate as many pages as necessary to fit the record.
    // Be careful to always do one pass to ensure we write zero-length records.
    for i := 0; i == 0 || len(rec) > 0; i++ {
        p := w.page

        // Find how much of the record we can fit into the page.
        var (
            l    = min(len(rec), (pageSize-p.alloc)-recordHeaderSize)
            part = rec[:l]
            // buf指向page.buf的offset為page.alloc
            buf  = p.buf[p.alloc:] 
            typ  recType
        )

        switch {
        case i == 0 && len(part) == len(rec):
            typ = recFull
        case len(part) == len(rec):
            typ = recLast
        case i == 0:
            typ = recFirst
        default:
            typ = recMiddle
        }
        if compressed {
            typ |= snappyMask
        }
        // 判斷record的type和是否compress,并將其寫入buf的第一個(gè)字節(jié)。
        // 接著寫入2字節(jié)的長度為min(len(record), (pageSize-p.alloc)-recordHeaderSize),
        // 也就是說有可能record的長度大于active page所能分配的最大長度,
        // 這樣record就會(huì)跨page,但是不能跨segment。
        // 然后寫入4個(gè)字節(jié)的CRC
        buf[0] = byte(typ)
        crc := crc32.Checksum(part, castagnoliTable)
        binary.BigEndian.PutUint16(buf[1:], uint16(len(part)))
        binary.BigEndian.PutUint32(buf[3:], crc)

        // 緊接著header寫入record數(shù)據(jù)記錄,可能是一部分。
        copy(buf[recordHeaderSize:], part)
        p.alloc += len(part) + recordHeaderSize

        // 如果page已滿,則flush到file中。
        if w.page.full() {
            if err := w.flushPage(true); err != nil {
                return err
            }
        }
        rec = rec[l:]
    }
    
    // If it's the final record of the batch and the page is not empty, flush it.
    if final && w.page.alloc > 0 {
        if err := w.flushPage(false); err != nil {
            return err
        }
    }

    return nil
}

WAL Reader

// Reader reads WAL records from an io.Reader.
type Reader struct {
    rdr       io.Reader
    err       error
    rec       []byte
    snappyBuf []byte
    // buf為長度pageSize的字節(jié)數(shù)組
    buf       [pageSize]byte  
    total     int64   // Total bytes processed.
    curRecTyp recType // Used for checking that the last record is not torn.
}

其中Reader從rdr中讀取records,相當(dāng)于把io.Reader做了一層包裝;buf為大小為pageSize的字節(jié)數(shù)組。

+--------------------------------------------------+
|                   ≤ Reader.buf                   |
+-----------------------------------+--------------+
|               hdr                 |     buf      |
+-----------+----------+------------+--------------+
| type <1b> | len <2b> | CRC32 <4b> | data <bytes> |
+-----------+----------+------------+--------------+

the record fragment is encoded as:

+-----------+----------+------------+--------------+
| type <1b> | len <2b> | CRC32 <4b> | data <bytes> |
+-----------+----------+------------+--------------+

The type flag has the following states:

  • 0: rest of page will be empty
  • 1: a full record encoded in a single fragment
  • 2: first fragment of a record
  • 3: middle fragment of a record
  • 4: final fragment of a record

其中type占用一個(gè)byte,由于type flag只有五種狀態(tài),3 bits就可以表示了。前4個(gè) bit未分配,第5個(gè)bit表示是否壓縮,最后三個(gè)bit表示record type。

+--------------------+-------------------------------+------------------------+
| 4 bits unallocated | 1 bit snappy compression flag |   3 bit record type    |
+--------------------+-------------------------------+------------------------+
// Next advances the reader to the next records and returns true if it exists.
// It must not be called again after it returned false.
func (r *Reader) Next() bool {
    err := r.next()
    if errors.Cause(err) == io.EOF {
        // The last WAL segment record shouldn't be torn(should be full or last).
        // The last record would be torn after a crash just before
        // the last record part could be persisted to disk.
        if r.curRecTyp == recFirst || r.curRecTyp == recMiddle {
            r.err = errors.New("last record is torn")
        }
        return false
    }
    r.err = err
    return r.err == nil
}

func (r *Reader) next() (err error) {
    // We have to use r.buf since allocating byte arrays here fails escape
    // analysis and ends up on the heap, even though it seemingly should not.
    // 前面7個(gè)byte代表header,后面的為data
    hdr := r.buf[:recordHeaderSize]
    buf := r.buf[recordHeaderSize:]

    r.rec = r.rec[:0]
    r.snappyBuf = r.snappyBuf[:0]

    i := 0
    for {
        // 首先從rdr中讀取一個(gè)字節(jié)
        if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil {
            return errors.Wrap(err, "read first header byte")
        }
        r.total++
        // 獲取record type,并判斷是否壓縮
        r.curRecTyp = recTypeFromHeader(hdr[0])
        compressed := hdr[0]&snappyMask != 0

        // Gobble up zero bytes.
        // 如果record type為recPageTerm,代表后面page剩余數(shù)據(jù)全部為0填充。即第一個(gè)byte為type,后面全是0。
        if r.curRecTyp == recPageTerm {
            // recPageTerm is a single byte that indicates the rest of the page is padded.
            // If it's the first byte in a page, buf is too small and
            // needs to be resized to fit pageSize-1 bytes.
            buf = r.buf[1:]

            // We are pedantic and check whether the zeros are actually up
            // to a page boundary.
            // It's not strictly necessary but may catch sketchy state early.
            k := pageSize - (r.total % pageSize)
            if k == pageSize {
                continue // Initial 0 byte was last page byte.
            }
            n, err := io.ReadFull(r.rdr, buf[:k])
            if err != nil {
                return errors.Wrap(err, "read remaining zeros")
            }
            r.total += int64(n)

            for _, c := range buf[:k] {
                if c != 0 {
                    return errors.New("unexpected non-zero byte in padded page")
                }
            }
            continue
        }
        // 讀取剩余的6個(gè)byte,分別代表length和crc
        n, err := io.ReadFull(r.rdr, hdr[1:])
        if err != nil {
            return errors.Wrap(err, "read remaining header")
        }
        r.total += int64(n)

        var (
            length = binary.BigEndian.Uint16(hdr[1:])
            crc    = binary.BigEndian.Uint32(hdr[3:])
        )

        if length > pageSize-recordHeaderSize {
            return errors.Errorf("invalid record size %d", length)
        }
        // 讀取長度為length的數(shù)據(jù),并計(jì)算其crc值是否與header中的crc相等。
        n, err = io.ReadFull(r.rdr, buf[:length])
        if err != nil {
            return err
        }
        r.total += int64(n)

        if n != int(length) {
            return errors.Errorf("invalid size: expected %d, got %d", length, n)
        }
        if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc {
            return errors.Errorf("unexpected checksum %x, expected %x", c, crc)
        }
        
        // 如果為壓縮數(shù)據(jù),則將其存放在snappyBuf中,后面讀取完成后進(jìn)行decode。
        if compressed {
            r.snappyBuf = append(r.snappyBuf, buf[:length]...)
        } else {
            r.rec = append(r.rec, buf[:length]...)
        }

        if err := validateRecord(r.curRecTyp, i); err != nil {
            return err
        }
        if r.curRecTyp == recLast || r.curRecTyp == recFull {
            if compressed && len(r.snappyBuf) > 0 {
                // The snappy library uses `len` to calculate if we need a new buffer.
                // In order to allocate as few buffers as possible make the length
                // equal to the capacity.
                // 將壓縮的數(shù)據(jù)進(jìn)行解碼。
                r.rec = r.rec[:cap(r.rec)]
                r.rec, err = snappy.Decode(r.rec, r.snappyBuf)
                return err
            }
            return nil
        }
        // Only increment i for non-zero records since we use it
        // to determine valid content record sequences.
        i++
    }
}

Next方法從rdr中讀取record,如果存在則返回true。每次最多讀取pageSize-recordHeaderSize大小的數(shù)據(jù)片段,直到讀取完整的record,如果數(shù)據(jù)被壓縮,則需要decode,最后的record存儲(chǔ)在Reader.rec字段中。

問題點(diǎn)

  1. 為什么需要recPageTerm類型,這樣是不是浪費(fèi)了一整個(gè)page大小的空間?
  2. record代表什么?
  3. 為什么length不能大于pageSize-recordHeaderSize?
  4. record的大小會(huì)大于segment嗎?
  5. snappy.Decode

References

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容