TSI索引解析之TSL文件

LogFile解析

  • 作用: LogFile操作新寫(xiě)入的Series在內(nèi)存中的索引和持久化到WAL的;
  • 下面我們先來(lái)看一下用到的相關(guān)數(shù)據(jù)結(jié)構(gòu)
logTagValue
  • 定義:代表一個(gè)TagKey對(duì)應(yīng)的一個(gè)TagValue
type logTagValue struct {
    name      []byte  // tag value的值 
    deleted   bool //是否已經(jīng)被刪除
    series    map[uint64]struct{} // 屬于哪些series id
    seriesSet *tsdb.SeriesIDSet
}

seriesseriesSet其實(shí)都是用來(lái)存儲(chǔ)SeriesID, 當(dāng)SeriesID數(shù)量小于等于25個(gè)時(shí),存到series里,反之存到seriesSet這個(gè)roaring bitmap里;

  • 添加SeriesID:
func (tv *logTagValue) addSeriesID(x uint64) {
    if tv.seriesSet != nil {
        tv.seriesSet.AddNoLock(x)
        return
    }

    //數(shù)據(jù)量小就存在series map里
    tv.series[x] = struct{}{}

    //數(shù)據(jù)量大存在series這個(gè)roaring bitmap里
    if len(tv.series) > 25 {
        tv.seriesSet = tsdb.NewSeriesIDSet()
        for id := range tv.series {
            tv.seriesSet.AddNoLock(id)
        }
        tv.series = nil
    }
}
  • 刪除SeriesID: removeSeriesID
  • 獲取SeriesID的基數(shù),所謂的基數(shù)就是不相同的SeriesID的個(gè)數(shù)
func (tv *logTagValue) cardinality() int64 {
    if tv.seriesSet != nil {
        return int64(tv.seriesSet.Cardinality())
    }
    return int64(len(tv.series))
}
logTagKey
  • 定義:代表一個(gè)TagKey, 包含其對(duì)應(yīng)的所的的tag value
type logTagKey struct {
    name      []byte
    deleted   bool
    tagValues map[string]logTagValue
}
  • 添加TagValue
func (tk *logTagKey) createTagValueIfNotExists(value []byte) logTagValue {
    //這個(gè)value就是tag value的具體值,它作為key
    tv, ok := tk.tagValues[string(value)]
    if !ok {
        tv = logTagValue{name: value, series: make(map[uint64]struct{})}
    }
    return tv
}
  • 生成TagValueIterator, 用來(lái)遍歷這個(gè)TagKey對(duì)應(yīng)的所有的TagValue
func (tk *logTagKey) TagValueIterator() TagValueIterator {
    a := make([]logTagValue, 0, len(tk.tagValues))
    for _, v := range tk.tagValues {
        a = append(a, v)
    }
    return newLogTagValueIterator(a)
}
logMeasurement
  • 定義: 包含了一個(gè)measurement所有的tag key和series id
type logMeasurement struct {
    name      []byte  // measurement名字
    tagSet    map[string]logTagKey // tagkey的集合
    deleted   bool
    series    map[uint64]struct{} 
    seriesSet *tsdb.SeriesIDSet
}

其中seriesseriesSet其實(shí)都是用來(lái)存儲(chǔ)SeriesID, 當(dāng)SeriesID數(shù)量小于等于25個(gè)時(shí),存到series里,反之存到seriesSet這個(gè)roaring bitmap里;

  • 添加新的tagkey
func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagKey {
    ts, ok := m.tagSet[string(key)]
    if !ok {
        ts = logTagKey{name: key, tagValues: make(map[string]logTagValue)}
    }
}
  • 針對(duì)一個(gè)query請(qǐng)求,比如select * from measurement1 where tag1=tv1 and tag2=tv2, 根據(jù)measurement1我們可以確定到一個(gè)具體的logMeasurement, 然后根據(jù)tag1=tv1tag2=tv2中的tagkey我們可以在logMeasurement.tagSet中鎖定logTagKey, 在logTagKey中我們根據(jù)tagvalue就可以取到對(duì)應(yīng)的一系列series id啦~注意,這里就是所謂的純內(nèi)存的倒排索引
  • logMeasureMent的管理,包括創(chuàng)建,查詢到操作是在logFile對(duì)象里完成,我們稍后會(huì)介紹.
LogEntry
  • 定義: 寫(xiě)入到WAL文件中的數(shù)據(jù)格式,實(shí)際上是寫(xiě)入 dbname/rp/id/index/[id]/Lx-xxxxxxxx.tsl文件
type LogEntry struct {
    Flag     byte   // flag
    SeriesID uint64 // series id
    Name     []byte // measurement name
    Key      []byte // tag key
    Value    []byte // tag value
    Checksum uint32 // checksum of flag/name/tags.
    Size     int    // total size of record, in bytes.
    //以上部分是LogEntry的真正的內(nèi)容

    cached   bool        // Hint to LogFile that series data is already parsed
    name     []byte      // series naem, this is a cached copy of the parsed measurement name
    tags     models.Tags // series tags, this is a cached copied of the parsed tags
    batchidx int         // position of entry in batch.
}
  • 提供了序列化和反序列化方法: appendLogEntry UnmarshalBinary ,都比較簡(jiǎn)單,不累述;
LogFile
  • 定義:提供了內(nèi)存的倒排索引和索引WAL的寫(xiě)入
type LogFile struct {
    id         int            // file sequence identifier
    data       []byte         // mmap
    file       *os.File       // writer
    w          *bufio.Writer  // buffered writer
    bufferSize int            // The size of the buffer used by the buffered writer
    nosync     bool           // Disables buffer flushing and file syncing. Useful for offline tooling.
    buf        []byte         // marshaling buffer
    keyBuf     []byte

    sfile   *tsdb.SeriesFile // series lookup
    size    int64            // tracks current file size
    modTime time.Time        // tracks last time write occurred

    // In-memory series existence/tombstone sets.
    seriesIDSet, tombstoneSeriesIDSet *tsdb.SeriesIDSet

    // In-memory index.
    mms logMeasurements

    // Filepath to the log file.
    path string
}
  • 添加新的logMeasurement, 添加到LogFile.mms, 它的類(lèi)型是type logMeasurements map[string]*logMeasurement,key就是measurement name
func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement {
    mm := f.mms[string(name)]
    if mm == nil {
        mm = &logMeasurement{
            name:   name,
            tagSet: make(map[string]logTagKey),
            series: make(map[uint64]struct{}),
        }
        f.mms[string(name)] = mm
    }
    return mm
}
  • open操作
func (f *LogFile) open() error {
    //打開(kāi)文件,為append操作準(zhǔn)備
    file, err := os.OpenFile(f.Path(), os.O_WRONLY|os.O_CREATE, 0666)
    if err != nil {
        return err
    }
    f.file = file

    ...
    
    f.w = bufio.NewWriterSize(f.file, f.bufferSize)

    // 使用mmap映射現(xiàn)有文件內(nèi)容到內(nèi)存
    data, err := mmap.Map(f.Path(), 0)
    if err != nil {
        return err
    }
    f.data = data

    // 解析文件中的每一條logEntry,同時(shí)創(chuàng)建內(nèi)存索引
    var n int64
    for buf := f.data; len(buf) > 0; {
        // Read next entry. Truncate partial writes.
        var e LogEntry
        // 反序列化成LogEntry對(duì)象
        if err := e.UnmarshalBinary(buf); err == io.ErrShortBuffer || err == ErrLogEntryChecksumMismatch {
            break
        } else if err != nil {
            return err
        }

        // 真正干事兒的都在這里
        f.execEntry(&e)

        // Move buffer forward.
        n += int64(e.Size)
        buf = buf[e.Size:]
    }

    //移動(dòng)文件指針到末尾,準(zhǔn)備寫(xiě)新數(shù)據(jù)
    f.size = n
    _, err = file.Seek(n, io.SeekStart)
    return err
}
  • 啟動(dòng)時(shí)處理tsl文件中的每條LogEntry, 構(gòu)建內(nèi)存索引 或者運(yùn)行時(shí)更新內(nèi)存索引
func (f *LogFile) execEntry(e *LogEntry) {
    switch e.Flag {
    case LogEntryMeasurementTombstoneFlag:
        f.execDeleteMeasurementEntry(e)
    case LogEntryTagKeyTombstoneFlag:
        f.execDeleteTagKeyEntry(e)
    case LogEntryTagValueTombstoneFlag:
        f.execDeleteTagValueEntry(e)
    default:
        f.execSeriesEntry(e)
    }
}

這里LogEntryMeasurementTombstoneFlag LogEntryTagKeyTombstoneFlag LogEntryTagValueTombstoneFlag都是創(chuàng)建用于delete的logMeasurement對(duì)象,已經(jīng)存在則更新相應(yīng)的字段

  • 處理單條Series操作 execSeriesEntry,看著代碼多,其實(shí)很簡(jiǎn)單
func (f *LogFile) execSeriesEntry(e *LogEntry) {
    var seriesKey []byte
    if e.cached {
        //將f.keyBuf更新為可以容納最長(zhǎng)的series key
        sz := tsdb.SeriesKeySize(e.name, e.tags)
        if len(f.keyBuf) < sz {
            f.keyBuf = make([]byte, 0, sz)
        }
        seriesKey = tsdb.AppendSeriesKey(f.keyBuf[:0], e.name, e.tags)
    } else {
        // 從 series file里獲取SeriesKey
        seriesKey = f.sfile.SeriesKey(e.SeriesID)
    }

    // Series keys can be removed if the series has been deleted from
    // the entire database and the server is restarted. This would cause
    // the log to replay its insert but the key cannot be found.
    //
    // https://github.com/influxdata/influxdb/issues/9444
    if seriesKey == nil {
        return
    }

    // 下面就都是解析這個(gè) SeriesKey, 得到measurement, tag key , tag value
    // Check if deleted.
    deleted := e.Flag == LogEntrySeriesTombstoneFlag

    // Read key size.
    _, remainder := tsdb.ReadSeriesKeyLen(seriesKey)

    // Read measurement name.
    name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder)
    mm := f.createMeasurementIfNotExists(name)
    mm.deleted = false
    if !deleted {
        mm.addSeriesID(e.SeriesID)
    } else {
        mm.removeSeriesID(e.SeriesID)
    }

    // Read tag count.
    tagN, remainder := tsdb.ReadSeriesKeyTagN(remainder)

    // Save tags.
    var k, v []byte
    for i := 0; i < tagN; i++ {
        k, v, remainder = tsdb.ReadSeriesKeyTag(remainder)
        ts := mm.createTagSetIfNotExists(k)
        tv := ts.createTagValueIfNotExists(v)

        // Add/remove a reference to the series on the tag value.
        if !deleted {
            tv.addSeriesID(e.SeriesID)
        } else {
            tv.removeSeriesID(e.SeriesID)
        }

        ts.tagValues[string(v)] = tv
        mm.tagSet[string(k)] = ts
    }

    // Add/remove from appropriate series id sets.
    if !deleted {
        f.seriesIDSet.Add(e.SeriesID)
        f.tombstoneSeriesIDSet.Remove(e.SeriesID)
    } else {
        f.seriesIDSet.Remove(e.SeriesID)
        f.tombstoneSeriesIDSet.Add(e.SeriesID)
    }
}
  • 刪除整個(gè)Measurement相關(guān)的索引, 先appEntry到tsl文件,成功后再更新內(nèi)存索引
func (f *LogFile) DeleteMeasurement(name []byte) error {
    f.mu.Lock()
    defer f.mu.Unlock()

    e := LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name}
    if err := f.appendEntry(&e); err != nil {
        return err
    }
    f.execEntry(&e)

    // Flush buffer and sync to disk.
    return f.FlushAndSync()
}

類(lèi)似的操作還有 DeleteTagKey DeleteTagValue DeleteSeriesID

  • 獲取到SeriesIDIterator, 用于遍歷給定的tag key所對(duì)應(yīng)的所有的tag value所在的每一個(gè)series id
func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator {
    f.mu.RLock()
    defer f.mu.RUnlock()

    mm, ok := f.mms[string(name)]
    if !ok {
        return nil
    }

    tk, ok := mm.tagSet[string(key)]
    if !ok {
        return nil
    }

    // Combine iterators across all tag keys.
    itrs := make([]tsdb.SeriesIDIterator, 0, len(tk.tagValues))
    for _, tv := range tk.tagValues {
        if tv.cardinality() == 0 {
            continue
        }
        if itr := tsdb.NewSeriesIDSetIterator(tv.seriesIDSet()); itr != nil {
            itrs = append(itrs, itr)
        }
    }

    return tsdb.MergeSeriesIDIterators(itrs...)
}
  • 批量添加SeriesKey,對(duì)于已經(jīng)存在的就不處理,同時(shí)更新內(nèi)存索引和寫(xiě)入tsl文件
func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) ([]uint64, error) {
    // 寫(xiě)入series file文件,返回所有的series id 列表
    seriesIDs, err := f.sfile.CreateSeriesListIfNotExists(names, tagsSlice)
    if err != nil {
        return nil, err
    }

    var writeRequired bool
    entries := make([]LogEntry, 0, len(names))
    seriesSet.RLock()
    for i := range names {
        // seriesSet是該函數(shù)傳進(jìn)來(lái)的第一個(gè)參數(shù),如果id已經(jīng)存在于這個(gè)給定的seriesSet中,就不處理當(dāng)前的id
        if seriesSet.ContainsNoLock(seriesIDs[i]) {
            // We don't need to allocate anything for this series.
            seriesIDs[i] = 0
            continue
        }
        writeRequired = true
        // 添充后面要使用的LogEntry列表
        entries = append(entries, LogEntry{SeriesID: seriesIDs[i], name: names[i], tags: tagsSlice[i], cached: true, batchidx: i})
    }
    seriesSet.RUnlock()

    // Exit if all series already exist.
    if !writeRequired {
        return seriesIDs, nil
    }

    f.mu.Lock()
    defer f.mu.Unlock()

    seriesSet.Lock()
    defer seriesSet.Unlock()

    for i := range entries { // NB - this doesn't evaluate all series ids returned from series file.
        entry := &entries[i]
        //  上面已經(jīng)過(guò)濾過(guò)一次了,這里還需要再過(guò)濾嗎?
        if seriesSet.ContainsNoLock(entry.SeriesID) {
            // We don't need to allocate anything for this series.
            seriesIDs[entry.batchidx] = 0
            continue
        }
        if err := f.appendEntry(entry); err != nil {
            return nil, err
        }
        f.execEntry(entry)
        seriesSet.AddNoLock(entry.SeriesID)
    }

    // Flush buffer and sync to disk.
    if err := f.FlushAndSync(); err != nil {
        return nil, err
    }
    return seriesIDs, nil
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 最近這段時(shí)間在使用influxdb,抽空翻譯了一下配置文件。有不足的地方請(qǐng)指正。因?yàn)楹?jiǎn)書(shū)默認(rèn)markdown編輯器...
    wangrui927閱讀 4,379評(píng)論 0 4
  • 關(guān)于Mongodb的全面總結(jié) MongoDB的內(nèi)部構(gòu)造《MongoDB The Definitive Guide》...
    中v中閱讀 32,305評(píng)論 2 89
  • ORA-00001: 違反唯一約束條件 (.) 錯(cuò)誤說(shuō)明:當(dāng)在唯一索引所對(duì)應(yīng)的列上鍵入重復(fù)值時(shí),會(huì)觸發(fā)此異常。 O...
    我想起個(gè)好名字閱讀 5,972評(píng)論 0 9
  • 題目: 寫(xiě)一個(gè)程序打印1到100這些數(shù)字。但是遇到數(shù)字為3的倍數(shù)的時(shí)候,打印“Fizz”替代數(shù)字,5的倍數(shù)用“Bu...
    陳小陌丿閱讀 939評(píng)論 0 0
  • @咩了個(gè)喵 不會(huì)的功課可以隨時(shí)找到人問(wèn) 懶得寫(xiě)的作業(yè)也可以撒撒嬌交給他 一起上學(xué)放學(xué),過(guò)馬路沒(méi)看紅綠燈時(shí)會(huì)被拽著帽...
    喬文呀喬文閱讀 3,412評(píng)論 9 3

友情鏈接更多精彩內(nèi)容