SeriesFile 解析
SeriesFile是什么
- SeriesFile其實(shí)叫
SeriesKeyFile比較合適,里面存儲(chǔ)了當(dāng)前DB下的所有series key; - 其中的series key = (measurement + tag set)
SeriesFile的持久化
- 它對(duì)應(yīng)于磁盤上的若干文件, 每個(gè)database都有自己的一組SeriesFile, 其目錄為: [influxdb data path]/[database]/_series
- 我們來看下_series目錄下的結(jié)構(gòu):
./_series/
├── 00
│ └── 0000
├── 01
│ └── 0000
├── 02
│ └── 0000
├── 03
│ └── 0000
├── 04
│ └── 0000
├── 05
│ └── 0000
├── 06
│ └── 0000
└── 07
└── 0000
-
每個(gè)DB下面的series文件分成至多8個(gè)partition, 每個(gè)partition下又分成多個(gè)Segment, 每個(gè)partition又對(duì)應(yīng)一個(gè)內(nèi)存索引
influxdb_series_file.png
SeriesSegment
- 定義: 由seriese entries的log會(huì)組成磁盤文件, 這個(gè)類就負(fù)責(zé)讀寫這個(gè)磁盤文件
type SeriesSegment struct {
id uint16
path string
data []byte // mmap file
file *os.File // write file handle
w *bufio.Writer // bufferred file handle
size uint32 // current file size
}
-
SeriesSegment磁盤文件格式:
influxdb_series_file_format.png
其中的flag有兩個(gè)可能的值:
SeriesEntryInsertFlag:表示當(dāng)前寫入的SeriesKey是有效的;
SeriesEntryTombstoneFlag:墓碑標(biāo)識(shí)。
- 創(chuàng)建SeriesSegment:
CreateSeriesSegment
func CreateSeriesSegment(id uint16, path string) (*SeriesSegment, error) {
// 先創(chuàng)建 .initializing結(jié)尾的臨時(shí)文件
f, err := os.Create(path + ".initializing")
if err != nil {
return nil, err
}
defer f.Close()
// 先頭部,包括Magic, Version
hdr := NewSeriesSegmentHeader()
if _, err := hdr.WriteTo(f); err != nil {
return nil, err
// 一個(gè)Segment文件需要預(yù)分配文件大小 ,最小4M, 最大256M
} else if err := f.Truncate(int64(SeriesSegmentSize(id))); err != nil {
return nil, err
} else if err := f.Close(); err != nil {
return nil, err
}
// Swap with target path.
if err := os.Rename(f.Name(), path); err != nil {
return nil, err
}
// Open segment at new location.
segment := NewSeriesSegment(id, path)
// 打開當(dāng)前的Segment, 我們?cè)谙旅鎲为?dú)介紹
if err := segment.Open(); err != nil {
return nil, err
}
return segment, nil
}
- 打開一個(gè)SeriesSegment, 使用內(nèi)存映射讀到內(nèi)存:
Open
func (s *SeriesSegment) Open() error {
if err := func() (err error) {
// 內(nèi)存映射讀到內(nèi)存中
if s.data, err = mmap.Map(s.path, int64(SeriesSegmentSize(s.id))); err != nil {
return err
}
// 讀頭部并且校驗(yàn)Version
hdr, err := ReadSeriesSegmentHeader(s.data)
if err != nil {
return err
} else if hdr.Version != SeriesSegmentVersion {
return ErrInvalidSeriesSegmentVersion
}
return nil
}(); err != nil {
s.Close()
return err
}
return nil
}
- 初始化寫操作
InitForWrite:從頭開始讀取當(dāng)前segment, 計(jì)算讀到結(jié)尾時(shí)的size,在讀的過程中作簡(jiǎn)單有效性校驗(yàn),然后打開文件,文件寫入的游標(biāo)定位在文件結(jié)尾
func (s *SeriesSegment) InitForWrite() (err error) {
// Only calculcate segment data size if writing.
for s.size = uint32(SeriesSegmentHeaderSize); s.size < uint32(len(s.data)); {
flag, _, _, sz := ReadSeriesEntry(s.data[s.size:])
if !IsValidSeriesEntryFlag(flag) {
break
}
s.size += uint32(sz)
}
// Open file handler for writing & seek to end of data.
if s.file, err = os.OpenFile(s.path, os.O_WRONLY|os.O_CREATE, 0666); err != nil {
return err
} else if _, err := s.file.Seek(int64(s.size), io.SeekStart); err != nil {
return err
}
s.w = bufio.NewWriterSize(s.file, 32*1024)
return nil
}
- 寫入log entry到segment文件
WriteLogEntry:
func (s *SeriesSegment) WriteLogEntry(data []byte) (offset int64, err error) {
if !s.CanWrite(data) {
return 0, ErrSeriesSegmentNotWritable
}
offset = JoinSeriesOffset(s.id, s.size)
if _, err := s.w.Write(data); err != nil {
return 0, err
}
s.size += uint32(len(data))
return offset, nil
}
這個(gè)方法返回的offset非常有用,它由segment id和 segment size組成:
func JoinSeriesOffset(segmentID uint16, pos uint32) int64 {
return (int64(segmentID) << 32) | int64(pos)
}
通過segment id可以知道寫入了哪個(gè)segment文件,通過segment size可知道寫了segment文件的什么位置
- Segment文件遍歷操作
ForEachEntry:遍歷讀取每一條SeriesEntry, 然后回調(diào)傳入的函數(shù)
func (s *SeriesSegment) ForEachEntry(fn func(flag uint8, id uint64, offset int64, key []byte) error) error {
for pos := uint32(SeriesSegmentHeaderSize); pos < uint32(len(s.data)); {
flag, id, key, sz := ReadSeriesEntry(s.data[pos:])
if !IsValidSeriesEntryFlag(flag) {
break
}
offset := JoinSeriesOffset(s.id, pos)
if err := fn(flag, id, offset, key); err != nil {
return err
}
pos += uint32(sz)
}
return nil
}
- 根據(jù)offset讀取SeriesKey
ReadSeriesKeyFromSegments
func ReadSeriesKeyFromSegments(a []*SeriesSegment, offset int64) []byte {
// 從offset中分離出sigment id 和 pos
segmentID, pos := SplitSeriesOffset(offset)
segment := FindSegment(a, segmentID)
if segment == nil {
return nil
}
buf := segment.Slice(pos)
key, _ := ReadSeriesKey(buf)
return key
}
- 讀取SeriesKey
ReadSeriesEntry, 按照Segment的format按字節(jié)讀取flag, id, key
func ReadSeriesEntry(data []byte) (flag uint8, id uint64, key []byte, sz int64) {
// If flag byte is zero then no more entries exist.
flag, data = uint8(data[0]), data[1:]
if !IsValidSeriesEntryFlag(flag) {
return 0, 0, nil, 1
}
id, data = binary.BigEndian.Uint64(data), data[8:]
switch flag {
case SeriesEntryInsertFlag:
key, _ = ReadSeriesKey(data)
}
return flag, id, key, int64(SeriesEntryHeaderSize + len(key))
}
SeriesIndex
- 定義: SeriesIndex是對(duì)Partition下所有Segment file的內(nèi)存索引,最主要的就是series key到 series id的map和series id到offset的map;
在內(nèi)存中的Index數(shù)量超過閾值時(shí),會(huì)在調(diào)用CreateSeriesListIfNoExists時(shí)被compact到磁盤文件;SeriesIndex對(duì)象在被初始化時(shí)會(huì)從磁盤文件中讀取index, 在磁盤文件中的存儲(chǔ)是按hash方式來定位寫入的,使用的是mmap的方式;查找索引時(shí)先從內(nèi)存查找才從磁盤文件查找
type SeriesIndex struct {
path string
count uint64
capacity int64
mask int64
maxSeriesID uint64
maxOffset int64
//以下這三項(xiàng)用來mmap磁盤index到內(nèi)存
data []byte // mmap data
keyIDData []byte // key/id mmap data
idOffsetData []byte // id/offset mmap data
// In-memory data since rebuild.
keyIDMap *rhh.HashMap //series key到 series id的map
idOffsetMap map[uint64]int64 //series id到offset的map
tombstones map[uint64]struct{}
}
- 我們來看一下磁盤index的結(jié)構(gòu)
- 先是Header:
SeriesIndexHeaderSize = 0 +
4 + 1 + // magic + version
8 + 8 + // max series + max offset
8 + 8 + // count + capacity
8 + 8 + // key/id map offset & size
8 + 8 + // id/offset map offset & size
- 具體的內(nèi)容部分就是兩個(gè)map(兩個(gè)hash map), serieskey -> seriesid和 seriesid -> seriesoffset, 它們?cè)谖募械钠鹗嘉恢煤痛笮≡趆eader里都可以讀到;
- 針對(duì)serieskey -> seriesid這個(gè)hash map, 存入時(shí)的key是series key, value是offset和id
- 針對(duì)seriesid -> seriesoffset這個(gè)hash map, 存入時(shí)的key是series id, value是id和offset
-
Open操作
func (idx *SeriesIndex) Open() (err error) {
// Map data file, if it exists.
if err := func() error {
if _, err := os.Stat(idx.path); err != nil && !os.IsNotExist(err) {
return err
} else if err == nil {
//將index磁盤文件內(nèi)存映射到idx.data
if idx.data, err = mmap.Map(idx.path, 0); err != nil {
return err
}
// 讀文件構(gòu)造header
hdr, err := ReadSeriesIndexHeader(idx.data)
if err != nil {
return err
}
idx.count, idx.capacity, idx.mask = hdr.Count, hdr.Capacity, hdr.Capacity-1
idx.maxSeriesID, idx.maxOffset = hdr.MaxSeriesID, hdr.MaxOffset
// 通過header信息構(gòu)造兩個(gè)map的byte slice
idx.keyIDData = idx.data[hdr.KeyIDMap.Offset : hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size]
idx.idOffsetData = idx.data[hdr.IDOffsetMap.Offset : hdr.IDOffsetMap.Offset+hdr.IDOffsetMap.Size]
}
return nil
}(); err != nil {
idx.Close()
return err
}
idx.keyIDMap = rhh.NewHashMap(rhh.DefaultOptions)
idx.idOffsetMap = make(map[uint64]int64)
idx.tombstones = make(map[uint64]struct{})
return nil
}
- 在內(nèi)存中構(gòu)建索引
Recover
func (idx *SeriesIndex) Recover(segments []*SeriesSegment) error {
// Allocate new in-memory maps.
idx.keyIDMap = rhh.NewHashMap(rhh.DefaultOptions)
idx.idOffsetMap = make(map[uint64]int64)
idx.tombstones = make(map[uint64]struct{})
// Process all entries since the maximum offset in the on-disk index.
minSegmentID, _ := SplitSeriesOffset(idx.maxOffset)
//遍歷每一個(gè)Segment
for _, segment := range segments {
if segment.ID() < minSegmentID {
continue
}
//遍歷Segment中的每一個(gè)SeriesEntry
if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {
if offset <= idx.maxOffset {
return nil
}
// 每個(gè)SeriesEntry都用idx.execEntry處理
idx.execEntry(flag, id, offset, key)
return nil
}); err != nil {
return err
}
}
return nil
}
- 操作一個(gè)Entry
func (idx *SeriesIndex) execEntry(flag uint8, id uint64, offset int64, key []byte) {
switch flag {
// 更新兩個(gè)map
case SeriesEntryInsertFlag:
idx.keyIDMap.Put(key, id)
idx.idOffsetMap[id] = offset
if id > idx.maxSeriesID {
idx.maxSeriesID = id
}
if offset > idx.maxOffset {
idx.maxOffset = offset
}
case SeriesEntryTombstoneFlag:
idx.tombstones[id] = struct{}{}
default:
panic("unreachable")
}
}
- 各種查找方法, 先查內(nèi)存中map, 再查從磁盤文件mmap后構(gòu)建的map
func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte) uint64
func (idx *SeriesIndex) FindIDByNameTags(segments []*SeriesSegment, name []byte, tags models.Tags, buf []byte) uint64
func (idx *SeriesIndex) FindIDListByNameTags(segments []*SeriesSegment, names [][]byte, tagsSlice []models.Tags, buf []byte) (ids []uint64, ok bool)
func (idx *SeriesIndex) FindOffsetByID(id uint64) int64
SeriesPartition
- 管理旗下所有的SeriesPartition
-
Open方法:遍歷目錄下所有的segment file, 針對(duì)最新的一個(gè)segment作寫入的初始化,構(gòu)建內(nèi)存index
func (p *SeriesPartition) Open() error {
// Create path if it doesn't exist.
if err := os.MkdirAll(filepath.Join(p.path), 0777); err != nil {
return err
}
// Open components.
if err := func() (err error) {
// 遍歷所有的segment
if err := p.openSegments(); err != nil {
return err
}
// Init last segment for writes.
if err := p.activeSegment().InitForWrite(); err != nil {
return err
}
// 構(gòu)建內(nèi)存索引
p.index = NewSeriesIndex(p.IndexPath())
if err := p.index.Open(); err != nil {
return err
} else if p.index.Recover(p.segments); err != nil {
return err
}
return nil
}(); err != nil {
p.Close()
return err
}
return nil
}
- 給定一系列Series key, 返回對(duì)應(yīng)的Series id,如果沒有對(duì)應(yīng)的id,則將series key插入到Partition(其實(shí)就是寫入到對(duì)應(yīng)的segment中)
func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitionIDs []int, ids []uint64) error {
var writeRequired bool
p.mu.RLock()
//利用index作快速查找,同時(shí)確認(rèn)是否沒有包含的key
for i := range keys {
if keyPartitionIDs[i] != p.id {
continue
}
id := p.index.FindIDBySeriesKey(p.segments, keys[i])
if id == 0 {
writeRequired = true
continue
}
ids[i] = id
}
p.mu.RUnlock()
// Exit if all series for this partition already exist.
if !writeRequired {
return nil
}
type keyRange struct {
id uint64
offset int64
}
newKeyRanges := make([]keyRange, 0, len(keys))
// Obtain write lock to create new series.
p.mu.Lock()
defer p.mu.Unlock()
// Track offsets of duplicate series.
newIDs := make(map[string]uint64, len(ids))
for i := range keys {
// Skip series that don't belong to the partition or have already been created.
if keyPartitionIDs[i] != p.id || ids[i] != 0 {
continue
}
// Re-attempt lookup under write lock.
key := keys[i]
if ids[i] = newIDs[string(key)]; ids[i] != 0 {
continue
} else if ids[i] = p.index.FindIDBySeriesKey(p.segments, key); ids[i] != 0 {
continue
}
// Write to series log and save offset.
// 寫入log entry
id, offset, err := p.insert(key)
if err != nil {
return err
}
// Append new key to be added to hash map after flush.
ids[i] = id
newIDs[string(key)] = id
newKeyRanges = append(newKeyRanges, keyRange{id, offset})
}
// Flush active segment writes so we can access data in mmap.
// 立即寫入文件
if segment := p.activeSegment(); segment != nil {
if err := segment.Flush(); err != nil {
return err
}
}
// Add keys to hash map(s).
// 更新內(nèi)存索引
for _, keyRange := range newKeyRanges {
p.index.Insert(p.seriesKeyByOffset(keyRange.offset), keyRange.id, keyRange.offset)
}
// compact 到磁盤文件,下面會(huì)詳細(xì)講解
return nil
}
- 根據(jù)id獲取series key
func (p *SeriesPartition) SeriesKey(id uint64) []byte {
if id == 0 {
return nil
}
p.mu.RLock()
// 先用索引根據(jù)id獲取到offset, 再用offset獲取到key
key := p.seriesKeyByOffset(p.index.FindOffsetByID(id))
p.mu.RUnlock()
return key
}
- 根據(jù)series key獲取id
func (p *SeriesPartition) FindIDBySeriesKey(key []byte) uint64 {
p.mu.RLock()
if p.closed {
p.mu.RUnlock()
return 0
}
id := p.index.FindIDBySeriesKey(p.segments, key)
p.mu.RUnlock()
return id
}
- 插入series key
func (p *SeriesPartition) insert(key []byte) (id uint64, offset int64, err error) {
// id為p.seq, 每插入一條,p.seq會(huì)遞增SeriesFilePartitionN
id = p.seq
offset, err = p.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryInsertFlag, id, key))
if err != nil {
return 0, 0, err
}
p.seq += SeriesFilePartitionN
return id, offset, nil
}
- compact index到磁盤
func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error {
hdr := NewSeriesIndexHeader()
hdr.Count = seriesN
hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor)
//分配兩個(gè)hash map的內(nèi)存空間,后面就是填充這兩個(gè)map,然后寫到磁盤
keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
// Reindex all partitions.
var entryN int
// 遍歷所有的segment
for _, segment := range segments {
errDone := errors.New("done")
// 遍歷segment內(nèi)部的每一個(gè)Series Entry
if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {
// Make sure we don't go past the offset where the compaction began.
if offset > index.maxOffset {
return errDone
}
// Check for cancellation periodically.
// 每處理1000條entry,檢查這個(gè)compact過程是否需要中斷
if entryN++; entryN%1000 == 0 {
select {
case <-c.cancel:
return ErrSeriesPartitionCompactionCancelled
default:
}
}
// Only process insert entries.
switch flag {
case SeriesEntryInsertFlag: // fallthrough
case SeriesEntryTombstoneFlag: //遇到墓碑flag就跳過
return nil
default:
return fmt.Errorf("unexpected series partition log entry flag: %d", flag)
}
// Save max series identifier processed.
hdr.MaxSeriesID, hdr.MaxOffset = id, offset
// Ignore entry if tombstoned.
// 如果id已經(jīng)被標(biāo)識(shí)為刪除,就跳過
if index.IsDeleted(id) {
return nil
}
// Insert into maps.
// 根據(jù)key定位到寫入的內(nèi)存map的位置后寫入
c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset)
return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)
}); err == errDone {
break
} else if err != nil {
return err
}
}
// Open file handler.
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
// Calculate map positions.
hdr.KeyIDMap.Offset, hdr.KeyIDMap.Size = SeriesIndexHeaderSize, int64(len(keyIDMap))
hdr.IDOffsetMap.Offset, hdr.IDOffsetMap.Size = hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size, int64(len(idOffsetMap))
// Write header.
if _, err := hdr.WriteTo(f); err != nil {
return err
}
// Write maps.
if _, err := f.Write(keyIDMap); err != nil {
return err
} else if _, err := f.Write(idOffsetMap); err != nil {
return err
}
// Sync & close.
if err := f.Sync(); err != nil {
return err
} else if err := f.Close(); err != nil {
return err
}
return nil
}
SeriesFile
- 定義: 管理當(dāng)前db下所有的SeriesePartition, 提供了操作Series的公共接口,對(duì)外屏蔽了SeriesPartition和SeriesSegment的存在;
- 我們?cè)谶@里講一下series id的產(chǎn)生規(guī)則
- Influxdb將paritition數(shù)量定死了為 8, 就是說所有的serieskey放在這8個(gè)桶里
- 如何確定放在哪個(gè)桶里呢?就是上面提到的計(jì)算SeriesKey的hash值然后取模parition個(gè)數(shù)
int(xxhash.Sum64(key) % SeriesFilePartitionN) - 所有這些partition的id是0 到 7, 每個(gè)partiton都有一個(gè)順列號(hào)seq, 初始值為partition id + 1, 這個(gè)順列號(hào)就是放入這個(gè)parition中的seriese key對(duì)應(yīng)的id,每次增加 8, 比如對(duì)于1號(hào)partition, 第一個(gè)放入的series id就是2, 第二個(gè)就是10
- 有了上面的規(guī)則,從seriese id上就很容易得到它屬于哪個(gè) partition:
int((id - 1) % SeriesFilePartitionN)
- 將一系列的SeriesKey寫入相應(yīng)的Partiton, 寫入哪個(gè)partition是計(jì)算SeriesKey的hash值然后取模parition個(gè)數(shù)
int(xxhash.Sum64(key) % SeriesFilePartitionN)
func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags) ([]uint64, error) {
//生成SeriesKey的slice
keys := GenerateSeriesKeys(names, tagsSlice)
//生成parition id的slice
keyPartitionIDs := f.SeriesKeysPartitionIDs(keys)
ids := make([]uint64, len(keys))
var g errgroup.Group
//使用goroutine并行寫入
for i := range f.partitions {
p := f.partitions[i]
g.Go(func() error {
return p.CreateSeriesListIfNotExists(keys, keyPartitionIDs, ids)
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return ids, nil
}
- 所有查詢操作,基本上都是首先定位到Partition, 然后再由partition代勞,partition使用index和segment也來搞定, 這里不詳述了
SeriesIDSet
- 用bitmap存儲(chǔ)的Series ID, 這其實(shí)是個(gè)布隆過濾器的實(shí)現(xiàn),如果布隆過濾器說一個(gè)id不存在于這個(gè)bitmap中那一定是不存在,但如果它說存在卻不一定是真存在;
- 定義:
type SeriesIDSet struct {
sync.RWMutex
bitmap *roaring.Bitmap
}
- 這個(gè)bitmap使用roaring.Bitmap實(shí)現(xiàn)
- 我們來簡(jiǎn)單的看一下New方法的實(shí)現(xiàn)
func NewSeriesIDSet(a ...uint64) *SeriesIDSet {
ss := &SeriesIDSet{bitmap: roaring.NewBitmap()}
if len(a) > 0 {
a32 := make([]uint32, len(a))
for i := range a {
a32[i] = uint32(a[i])
}
ss.bitmap.AddMany(a32)
}
return ss
}
這里最關(guān)鍵的是參數(shù)里SeriesID是uinit64, 但存入bitmap時(shí)強(qiáng)轉(zhuǎn)成了uinit32, 只取了SeriesID的低32位,也因此在查詢id是否存在時(shí),也只用低32位去查詢,如果查到了有兩種可能,存入的id就是這個(gè)uinit32值,存入的id的低32位是這個(gè)uint32值

