概述
WAL(Write-ahead logging, 預(yù)寫日志)是關(guān)系型數(shù)據(jù)庫中利用日志來實(shí)現(xiàn)事務(wù)性和持久性的一種技術(shù),即在某個(gè)操作之前先將這個(gè)事情記錄下來,以便之后對(duì)數(shù)據(jù)進(jìn)行回滾,重試等操作保證數(shù)據(jù)的可靠性。
The write ahead log operates in segments that are numbered and sequential, e.g. 000000, 000001, 000002, etc., and are limited to 128MB by default. A segment is written to in pages of 32KB. Only the last page of the most recent segment may be partial. A WAL record is an opaque byte slice that gets split up into sub-records should it exceed the remaining space of the current page. Records are never split across segment boundaries. If a single record exceeds the default segment size, a segment with a larger size will be created. The encoding of pages is largely borrowed from LevelDB's/RocksDB's write ahead log.
Prometheus為了防止丟失暫存在內(nèi)存中的還未被寫入磁盤的監(jiān)控?cái)?shù)據(jù),引入了WAL機(jī)制。WAL被分割成默認(rèn)大小為128M的文件段(segment),之前版本默認(rèn)大小是256M,文件段以數(shù)字命名,長度為8位的整形。WAL的寫入單位是頁(page),每頁的大小為32KB,所以每個(gè)段大小必須是頁的大小的整數(shù)倍。每個(gè)文件段都有一個(gè)“==已使用的頁?==”屬性來標(biāo)識(shí)在該段中已經(jīng)分配的頁數(shù)目,如果WAL一次性寫入的頁數(shù)超過一個(gè)段的空閑頁數(shù),就會(huì)創(chuàng)建一個(gè)新的文件段來保存這些頁,從而確保一次性寫入的頁不會(huì)跨段存儲(chǔ)。
const (
DefaultSegmentSize = 128 * 1024 * 1024 // 128 MB
pageSize = 32 * 1024 // 32KB
recordHeaderSize = 7
)
本文是在prometheus tsdb部分源碼閱讀之wal.LogSeries基礎(chǔ)上進(jìn)行擴(kuò)展,由于tsdb/wal.go文件中定義的WAL接口以及相關(guān)方法已經(jīng)deprecated,如下所示。因此本文針對(duì)tsdb/wal/wal.go文件進(jìn)行分析。
// WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged.
//
// DEPRECATED: use wal pkg combined with the record codex instead.
type WAL interface {
Reader() WALReader
LogSeries([]RefSeries) error
LogSamples([]RefSample) error
LogDeletes([]Stone) error
Truncate(mint int64, keep func(uint64) bool) error
Close() error
}
Prometheus storage hierarchy
(ENV) ?? /Users/xsky/go/src/github.com/microyahoo/prometheus/data ? tree -h
.
├── [ 192] 01DPE8T5XPQ9ZYHSNJYBJBKGR6
│ ├── [ 96] chunks
│ │ └── [7.1K] 000001
│ ├── [ 22K] index
│ ├── [ 272] meta.json
│ └── [ 9] tombstones
├── [ 0] lock
├── [ 20K] queries.active
└── [ 256] wal
├── [ 0] 00000050
├── [ 0] 00000051
├── [ 0] 00000052
├── [ 0] 00000053
├── [ 10K] 00000054
└── [ 96] checkpoint.000049
└── [ 32K] 00000000
4 directories, 12 files
從上面目錄結(jié)構(gòu)可以看出wal主要包含segment文件,checkpoint目錄。
// WAL is a write ahead log that stores records in segment files.
// It must be read from start to end once before logging new data.
// If an error occurs during read, the repair procedure must be called
// before it's safe to do further writes.
//
// Segments are written to in pages of 32KB, with records possibly split
// across page boundaries.
// Records are never split across segments to allow full segments to be
// safely truncated. It also ensures that torn writes never corrupt records
// beyond the most recent segment.
type WAL struct {
dir string
logger log.Logger
segmentSize int
mtx sync.RWMutex
segment *Segment // Active segment.
donePages int // Pages written to the segment.
page *page // Active page.
stopc chan chan struct{}
actorc chan func()
closed bool // To allow calling Close() more than once without blocking.
compress bool
snappyBuf []byte
fsyncDuration prometheus.Summary
pageFlushes prometheus.Counter
pageCompletions prometheus.Counter
truncateFail prometheus.Counter
truncateTotal prometheus.Counter
currentSegment prometheus.Gauge
}
// Segment represents a segment file.
type Segment struct {
*os.File
dir string
i int
}

Create Segment
創(chuàng)建segment主要是通過New方法或者NewSize方法。其中NewSize可以指定segment的大小,New方法傳入的是默認(rèn)的segment大小,即128M。加載DB時(shí)如果WAL enabled的話,則會(huì)調(diào)用NewSize創(chuàng)建新的segment,如下所示。下面對(duì)NewSize進(jìn)行展開。
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression)
- 首先判斷段大小是否是
page的整數(shù)倍。 - 如果
wal目錄不存在的話則創(chuàng)建wal目錄。 - 創(chuàng)建WAL數(shù)據(jù)結(jié)構(gòu)。因?yàn)閃AL是與
active segment關(guān)聯(lián)的,所以WALsegmentSize即為指定的segmentSize。 - 查找目前WAL中所有的segment記錄。因?yàn)閟egment是按照數(shù)字命名并遞增的,所以其名字即為索引,其內(nèi)部表述為
segmentRef,name為其文件名,即前綴為0填充的八位整形表示,index即為其整形表示。
type segmentRef struct {
name string
index int
}
- 上面步驟獲取到last segment之后,下一個(gè)要寫入的segment index即為
last+1,創(chuàng)建新的segment。 - 將新創(chuàng)建的segment設(shè)置為WAL的active segment,同時(shí)根據(jù)segment文件大小設(shè)置WAL
donePages,將WALcurrentSegment設(shè)置為新創(chuàng)建的segment的index。 - 啟動(dòng)新的goroutine執(zhí)行
actorc通道發(fā)送過來的func。主要用于執(zhí)行比較耗時(shí)的操作,例如將segment中的數(shù)據(jù)fsync到disk中。
WAL Repair
Repair attempts to repair the WAL based on the error.
It discards all data after the corruption.
需要注意的是Repair僅能修復(fù)CorruptionErr。CorruptionErr中指定了目錄,段,偏移量以及具體的錯(cuò)誤信息。
// CorruptionErr is an error that's returned when corruption is encountered.
type CorruptionErr struct {
Dir string
Segment int
Offset int64
Err error
}
- 首先獲取wal目錄中所有的segments。
-
刪除
CorruptionErr.Segment索引之后的所有的segments。 - 關(guān)閉WAL的
active segment。 - 將
CorruptionErr.Segment索引的segment加上.repair后綴并重命名。 - 創(chuàng)建一個(gè)新的segment文件,其index為
CorruptionErr.Segment,即Corruption segment重命名之后重新創(chuàng)建一個(gè)新的segment文件,并將WAL的active segment指向新創(chuàng)建的segment。 - 將Corruption segment中的數(shù)據(jù)讀到新創(chuàng)建的segment文件中,讀到
CorruptionErr.Offset為止。- 遍歷讀取corrupted segment中的records,將其寫入new segment中。
- 將active page刷入file。
- 關(guān)閉corrupted segment文件,并刪除。
- 創(chuàng)建一個(gè)新的segment文件,其index為
CorruptionErr.Segment的index+1,并將active segment指向新創(chuàng)建的segment。
下面截取tsdb/wal/wal.go文件中的部分Repair的代碼進(jìn)行分析。
// 打開corrupted segment文件并讀取其中的records。
r := NewReader(bufio.NewReader(f))
// 下面WAL Reader部分有詳細(xì)解釋
for r.Next() {
// Add records only up to the where the error was.
// 讀取到出錯(cuò)的地方為止。
if r.Offset() >= cerr.Offset {
break
}
// 下面著重介紹此方法
if err := w.Log(r.Record()); err != nil {
return errors.Wrap(err, "insert record")
}
}
// We expect an error here from r.Err(), so nothing to handle.
// We need to pad to the end of the last page in the repaired segment
if err := w.flushPage(true); err != nil {
return errors.Wrap(err, "flush page in repair")
}
// We explicitly close even when there is a defer for Windows to be
// able to delete it. The defer is in place to close it in-case there
// are errors above.
if err := f.Close(); err != nil {
return errors.Wrap(err, "close corrupted file")
}
// 刪除.repair文件
if err := os.Remove(tmpfn); err != nil {
return errors.Wrap(err, "delete corrupted segment")
}
// Explicitly close the segment we just repaired to avoid issues with Windows.
s.Close()
// We always want to start writing to a new Segment rather than an existing
// Segment, which is handled by NewSize, but earlier in Repair we're deleting
// all segments that come after the corrupted Segment. Recreate a new Segment here.
// 創(chuàng)建新的segment file,并將active segment指向此新創(chuàng)建的segment file
s, err = CreateSegment(w.dir, cerr.Segment+1)
if err != nil {
return err
}
if err := w.setSegment(s); err != nil {
return err
}
return nil
Repair的核心就是將corrupted segment文件中的數(shù)據(jù)寫入新的segment中,然后刪除corrupted segment文件。下面著重介紹其寫入過程,主要是調(diào)用WAL的log方法。
// log writes rec to the log and forces a flush of the current page if:
// - the final record of a batch
// - the record is bigger than the page size
// - the current page is full.
func (w *WAL) log(rec []byte, final bool) error {
// When the last page flush failed the page will remain full.
// When the page is full, need to flush it before trying to add more records to it.
// 如果WAL的active page已滿,則flush到file,并重置此page
if w.page.full() {
if err := w.flushPage(true); err != nil {
return err
}
}
// If the record is too big to fit within the active page in the current
// segment, terminate the active segment and advance to the next one.
// This ensures that records do not cross segment boundaries.
left := w.page.remaining() - recordHeaderSize // Free space in the active page.
left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages in the active segment.
// 如果record的長度大于active segment的剩余可用空間,則創(chuàng)建新的segment
if len(rec) > left {
// 1. 如果active page已分配大小不為0,則flush page到file。
// 2. 創(chuàng)建新的segment file。
// 3. 將上一個(gè)segment file的數(shù)據(jù)fsync到磁盤。
if err := w.nextSegment(); err != nil {
return err
}
}
compressed := false
if w.compress && len(rec) > 0 {
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
// 如果WAL compress enabled,則將record數(shù)據(jù)編碼為snappyBuf,
// 將rec指向壓縮后的snappyBuf數(shù)據(jù)記錄。
w.snappyBuf = w.snappyBuf[:cap(w.snappyBuf)]
w.snappyBuf = snappy.Encode(w.snappyBuf, rec)
if len(w.snappyBuf) < len(rec) {
rec = w.snappyBuf
compressed = true
}
}
// Populate as many pages as necessary to fit the record.
// Be careful to always do one pass to ensure we write zero-length records.
for i := 0; i == 0 || len(rec) > 0; i++ {
p := w.page
// Find how much of the record we can fit into the page.
var (
l = min(len(rec), (pageSize-p.alloc)-recordHeaderSize)
part = rec[:l]
// buf指向page.buf的offset為page.alloc
buf = p.buf[p.alloc:]
typ recType
)
switch {
case i == 0 && len(part) == len(rec):
typ = recFull
case len(part) == len(rec):
typ = recLast
case i == 0:
typ = recFirst
default:
typ = recMiddle
}
if compressed {
typ |= snappyMask
}
// 判斷record的type和是否compress,并將其寫入buf的第一個(gè)字節(jié)。
// 接著寫入2字節(jié)的長度為min(len(record), (pageSize-p.alloc)-recordHeaderSize),
// 也就是說有可能record的長度大于active page所能分配的最大長度,
// 這樣record就會(huì)跨page,但是不能跨segment。
// 然后寫入4個(gè)字節(jié)的CRC
buf[0] = byte(typ)
crc := crc32.Checksum(part, castagnoliTable)
binary.BigEndian.PutUint16(buf[1:], uint16(len(part)))
binary.BigEndian.PutUint32(buf[3:], crc)
// 緊接著header寫入record數(shù)據(jù)記錄,可能是一部分。
copy(buf[recordHeaderSize:], part)
p.alloc += len(part) + recordHeaderSize
// 如果page已滿,則flush到file中。
if w.page.full() {
if err := w.flushPage(true); err != nil {
return err
}
}
rec = rec[l:]
}
// If it's the final record of the batch and the page is not empty, flush it.
if final && w.page.alloc > 0 {
if err := w.flushPage(false); err != nil {
return err
}
}
return nil
}
WAL Reader
// Reader reads WAL records from an io.Reader.
type Reader struct {
rdr io.Reader
err error
rec []byte
snappyBuf []byte
// buf為長度pageSize的字節(jié)數(shù)組
buf [pageSize]byte
total int64 // Total bytes processed.
curRecTyp recType // Used for checking that the last record is not torn.
}
其中Reader從rdr中讀取records,相當(dāng)于把io.Reader做了一層包裝;buf為大小為pageSize的字節(jié)數(shù)組。
+--------------------------------------------------+
| ≤ Reader.buf |
+-----------------------------------+--------------+
| hdr | buf |
+-----------+----------+------------+--------------+
| type <1b> | len <2b> | CRC32 <4b> | data <bytes> |
+-----------+----------+------------+--------------+
the record fragment is encoded as:
+-----------+----------+------------+--------------+
| type <1b> | len <2b> | CRC32 <4b> | data <bytes> |
+-----------+----------+------------+--------------+
The type flag has the following states:
-
0: rest of page will be empty -
1: a full record encoded in a single fragment -
2: first fragment of a record -
3: middle fragment of a record -
4: final fragment of a record
其中type占用一個(gè)byte,由于type flag只有五種狀態(tài),3 bits就可以表示了。前4個(gè) bit未分配,第5個(gè)bit表示是否壓縮,最后三個(gè)bit表示record type。
+--------------------+-------------------------------+------------------------+
| 4 bits unallocated | 1 bit snappy compression flag | 3 bit record type |
+--------------------+-------------------------------+------------------------+
// Next advances the reader to the next records and returns true if it exists.
// It must not be called again after it returned false.
func (r *Reader) Next() bool {
err := r.next()
if errors.Cause(err) == io.EOF {
// The last WAL segment record shouldn't be torn(should be full or last).
// The last record would be torn after a crash just before
// the last record part could be persisted to disk.
if r.curRecTyp == recFirst || r.curRecTyp == recMiddle {
r.err = errors.New("last record is torn")
}
return false
}
r.err = err
return r.err == nil
}
func (r *Reader) next() (err error) {
// We have to use r.buf since allocating byte arrays here fails escape
// analysis and ends up on the heap, even though it seemingly should not.
// 前面7個(gè)byte代表header,后面的為data
hdr := r.buf[:recordHeaderSize]
buf := r.buf[recordHeaderSize:]
r.rec = r.rec[:0]
r.snappyBuf = r.snappyBuf[:0]
i := 0
for {
// 首先從rdr中讀取一個(gè)字節(jié)
if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil {
return errors.Wrap(err, "read first header byte")
}
r.total++
// 獲取record type,并判斷是否壓縮
r.curRecTyp = recTypeFromHeader(hdr[0])
compressed := hdr[0]&snappyMask != 0
// Gobble up zero bytes.
// 如果record type為recPageTerm,代表后面page剩余數(shù)據(jù)全部為0填充。即第一個(gè)byte為type,后面全是0。
if r.curRecTyp == recPageTerm {
// recPageTerm is a single byte that indicates the rest of the page is padded.
// If it's the first byte in a page, buf is too small and
// needs to be resized to fit pageSize-1 bytes.
buf = r.buf[1:]
// We are pedantic and check whether the zeros are actually up
// to a page boundary.
// It's not strictly necessary but may catch sketchy state early.
k := pageSize - (r.total % pageSize)
if k == pageSize {
continue // Initial 0 byte was last page byte.
}
n, err := io.ReadFull(r.rdr, buf[:k])
if err != nil {
return errors.Wrap(err, "read remaining zeros")
}
r.total += int64(n)
for _, c := range buf[:k] {
if c != 0 {
return errors.New("unexpected non-zero byte in padded page")
}
}
continue
}
// 讀取剩余的6個(gè)byte,分別代表length和crc
n, err := io.ReadFull(r.rdr, hdr[1:])
if err != nil {
return errors.Wrap(err, "read remaining header")
}
r.total += int64(n)
var (
length = binary.BigEndian.Uint16(hdr[1:])
crc = binary.BigEndian.Uint32(hdr[3:])
)
if length > pageSize-recordHeaderSize {
return errors.Errorf("invalid record size %d", length)
}
// 讀取長度為length的數(shù)據(jù),并計(jì)算其crc值是否與header中的crc相等。
n, err = io.ReadFull(r.rdr, buf[:length])
if err != nil {
return err
}
r.total += int64(n)
if n != int(length) {
return errors.Errorf("invalid size: expected %d, got %d", length, n)
}
if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc {
return errors.Errorf("unexpected checksum %x, expected %x", c, crc)
}
// 如果為壓縮數(shù)據(jù),則將其存放在snappyBuf中,后面讀取完成后進(jìn)行decode。
if compressed {
r.snappyBuf = append(r.snappyBuf, buf[:length]...)
} else {
r.rec = append(r.rec, buf[:length]...)
}
if err := validateRecord(r.curRecTyp, i); err != nil {
return err
}
if r.curRecTyp == recLast || r.curRecTyp == recFull {
if compressed && len(r.snappyBuf) > 0 {
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
// 將壓縮的數(shù)據(jù)進(jìn)行解碼。
r.rec = r.rec[:cap(r.rec)]
r.rec, err = snappy.Decode(r.rec, r.snappyBuf)
return err
}
return nil
}
// Only increment i for non-zero records since we use it
// to determine valid content record sequences.
i++
}
}
Next方法從rdr中讀取record,如果存在則返回true。每次最多讀取pageSize-recordHeaderSize大小的數(shù)據(jù)片段,直到讀取完整的record,如果數(shù)據(jù)被壓縮,則需要decode,最后的record存儲(chǔ)在Reader.rec字段中。
問題點(diǎn)
- 為什么需要
recPageTerm類型,這樣是不是浪費(fèi)了一整個(gè)page大小的空間? -
record代表什么? - 為什么
length不能大于pageSize-recordHeaderSize? - record的大小會(huì)大于segment嗎?
snappy.Decode