- InfluxDB官網(wǎng)文檔是很多的參考
In-memory indexing and the Time-Structured Merge Tree (TSM)
Time Series Index (TSI) overview
Time Series Index (TSI) details
LogFile解析
- 作用: LogFile操作新寫(xiě)入的Series在內(nèi)存中的索引和持久化到WAL的;
- 下面我們先來(lái)看一下用到的相關(guān)數(shù)據(jù)結(jié)構(gòu)
logTagValue
- 定義:代表一個(gè)TagKey對(duì)應(yīng)的一個(gè)TagValue
type logTagValue struct {
name []byte // tag value的值
deleted bool //是否已經(jīng)被刪除
series map[uint64]struct{} // 屬于哪些series id
seriesSet *tsdb.SeriesIDSet
}
series和seriesSet其實(shí)都是用來(lái)存儲(chǔ)SeriesID, 當(dāng)SeriesID數(shù)量小于等于25個(gè)時(shí),存到series里,反之存到seriesSet這個(gè)roaring bitmap里;
- 添加SeriesID:
func (tv *logTagValue) addSeriesID(x uint64) {
if tv.seriesSet != nil {
tv.seriesSet.AddNoLock(x)
return
}
//數(shù)據(jù)量小就存在series map里
tv.series[x] = struct{}{}
//數(shù)據(jù)量大存在series這個(gè)roaring bitmap里
if len(tv.series) > 25 {
tv.seriesSet = tsdb.NewSeriesIDSet()
for id := range tv.series {
tv.seriesSet.AddNoLock(id)
}
tv.series = nil
}
}
- 刪除SeriesID:
removeSeriesID - 獲取SeriesID的基數(shù),所謂的基數(shù)就是不相同的SeriesID的個(gè)數(shù)
func (tv *logTagValue) cardinality() int64 {
if tv.seriesSet != nil {
return int64(tv.seriesSet.Cardinality())
}
return int64(len(tv.series))
}
logTagKey
- 定義:代表一個(gè)TagKey, 包含其對(duì)應(yīng)的所的的tag value
type logTagKey struct {
name []byte
deleted bool
tagValues map[string]logTagValue
}
- 添加TagValue
func (tk *logTagKey) createTagValueIfNotExists(value []byte) logTagValue {
//這個(gè)value就是tag value的具體值,它作為key
tv, ok := tk.tagValues[string(value)]
if !ok {
tv = logTagValue{name: value, series: make(map[uint64]struct{})}
}
return tv
}
- 生成TagValueIterator, 用來(lái)遍歷這個(gè)TagKey對(duì)應(yīng)的所有的TagValue
func (tk *logTagKey) TagValueIterator() TagValueIterator {
a := make([]logTagValue, 0, len(tk.tagValues))
for _, v := range tk.tagValues {
a = append(a, v)
}
return newLogTagValueIterator(a)
}
logMeasurement
- 定義: 包含了一個(gè)measurement所有的tag key和series id
type logMeasurement struct {
name []byte // measurement名字
tagSet map[string]logTagKey // tagkey的集合
deleted bool
series map[uint64]struct{}
seriesSet *tsdb.SeriesIDSet
}
其中series和seriesSet其實(shí)都是用來(lái)存儲(chǔ)SeriesID, 當(dāng)SeriesID數(shù)量小于等于25個(gè)時(shí),存到series里,反之存到seriesSet這個(gè)roaring bitmap里;
- 添加新的tagkey
func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagKey {
ts, ok := m.tagSet[string(key)]
if !ok {
ts = logTagKey{name: key, tagValues: make(map[string]logTagValue)}
}
}
- 針對(duì)一個(gè)query請(qǐng)求,比如
select * from measurement1 where tag1=tv1 and tag2=tv2, 根據(jù)measurement1我們可以確定到一個(gè)具體的logMeasurement, 然后根據(jù)tag1=tv1和tag2=tv2中的tagkey我們可以在logMeasurement.tagSet中鎖定logTagKey, 在logTagKey中我們根據(jù)tagvalue就可以取到對(duì)應(yīng)的一系列series id啦~注意,這里就是所謂的純內(nèi)存的倒排索引 -
logMeasureMent的管理,包括創(chuàng)建,查詢到操作是在logFile對(duì)象里完成,我們稍后會(huì)介紹.
LogEntry
- 定義: 寫(xiě)入到WAL文件中的數(shù)據(jù)格式,實(shí)際上是寫(xiě)入 dbname/rp/id/index/[id]/Lx-xxxxxxxx.tsl文件
type LogEntry struct {
Flag byte // flag
SeriesID uint64 // series id
Name []byte // measurement name
Key []byte // tag key
Value []byte // tag value
Checksum uint32 // checksum of flag/name/tags.
Size int // total size of record, in bytes.
//以上部分是LogEntry的真正的內(nèi)容
cached bool // Hint to LogFile that series data is already parsed
name []byte // series naem, this is a cached copy of the parsed measurement name
tags models.Tags // series tags, this is a cached copied of the parsed tags
batchidx int // position of entry in batch.
}
- 提供了序列化和反序列化方法:
appendLogEntryUnmarshalBinary,都比較簡(jiǎn)單,不累述;
LogFile
- 定義:提供了內(nèi)存的倒排索引和索引WAL的寫(xiě)入
type LogFile struct {
id int // file sequence identifier
data []byte // mmap
file *os.File // writer
w *bufio.Writer // buffered writer
bufferSize int // The size of the buffer used by the buffered writer
nosync bool // Disables buffer flushing and file syncing. Useful for offline tooling.
buf []byte // marshaling buffer
keyBuf []byte
sfile *tsdb.SeriesFile // series lookup
size int64 // tracks current file size
modTime time.Time // tracks last time write occurred
// In-memory series existence/tombstone sets.
seriesIDSet, tombstoneSeriesIDSet *tsdb.SeriesIDSet
// In-memory index.
mms logMeasurements
// Filepath to the log file.
path string
}
- 添加新的
logMeasurement, 添加到LogFile.mms, 它的類(lèi)型是type logMeasurements map[string]*logMeasurement,key就是measurement name
func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement {
mm := f.mms[string(name)]
if mm == nil {
mm = &logMeasurement{
name: name,
tagSet: make(map[string]logTagKey),
series: make(map[uint64]struct{}),
}
f.mms[string(name)] = mm
}
return mm
}
- open操作
func (f *LogFile) open() error {
//打開(kāi)文件,為append操作準(zhǔn)備
file, err := os.OpenFile(f.Path(), os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return err
}
f.file = file
...
f.w = bufio.NewWriterSize(f.file, f.bufferSize)
// 使用mmap映射現(xiàn)有文件內(nèi)容到內(nèi)存
data, err := mmap.Map(f.Path(), 0)
if err != nil {
return err
}
f.data = data
// 解析文件中的每一條logEntry,同時(shí)創(chuàng)建內(nèi)存索引
var n int64
for buf := f.data; len(buf) > 0; {
// Read next entry. Truncate partial writes.
var e LogEntry
// 反序列化成LogEntry對(duì)象
if err := e.UnmarshalBinary(buf); err == io.ErrShortBuffer || err == ErrLogEntryChecksumMismatch {
break
} else if err != nil {
return err
}
// 真正干事兒的都在這里
f.execEntry(&e)
// Move buffer forward.
n += int64(e.Size)
buf = buf[e.Size:]
}
//移動(dòng)文件指針到末尾,準(zhǔn)備寫(xiě)新數(shù)據(jù)
f.size = n
_, err = file.Seek(n, io.SeekStart)
return err
}
- 啟動(dòng)時(shí)處理tsl文件中的每條LogEntry, 構(gòu)建內(nèi)存索引 或者運(yùn)行時(shí)更新內(nèi)存索引
func (f *LogFile) execEntry(e *LogEntry) {
switch e.Flag {
case LogEntryMeasurementTombstoneFlag:
f.execDeleteMeasurementEntry(e)
case LogEntryTagKeyTombstoneFlag:
f.execDeleteTagKeyEntry(e)
case LogEntryTagValueTombstoneFlag:
f.execDeleteTagValueEntry(e)
default:
f.execSeriesEntry(e)
}
}
這里LogEntryMeasurementTombstoneFlag LogEntryTagKeyTombstoneFlag LogEntryTagValueTombstoneFlag都是創(chuàng)建用于delete的logMeasurement對(duì)象,已經(jīng)存在則更新相應(yīng)的字段
- 處理單條Series操作
execSeriesEntry,看著代碼多,其實(shí)很簡(jiǎn)單
func (f *LogFile) execSeriesEntry(e *LogEntry) {
var seriesKey []byte
if e.cached {
//將f.keyBuf更新為可以容納最長(zhǎng)的series key
sz := tsdb.SeriesKeySize(e.name, e.tags)
if len(f.keyBuf) < sz {
f.keyBuf = make([]byte, 0, sz)
}
seriesKey = tsdb.AppendSeriesKey(f.keyBuf[:0], e.name, e.tags)
} else {
// 從 series file里獲取SeriesKey
seriesKey = f.sfile.SeriesKey(e.SeriesID)
}
// Series keys can be removed if the series has been deleted from
// the entire database and the server is restarted. This would cause
// the log to replay its insert but the key cannot be found.
//
// https://github.com/influxdata/influxdb/issues/9444
if seriesKey == nil {
return
}
// 下面就都是解析這個(gè) SeriesKey, 得到measurement, tag key , tag value
// Check if deleted.
deleted := e.Flag == LogEntrySeriesTombstoneFlag
// Read key size.
_, remainder := tsdb.ReadSeriesKeyLen(seriesKey)
// Read measurement name.
name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder)
mm := f.createMeasurementIfNotExists(name)
mm.deleted = false
if !deleted {
mm.addSeriesID(e.SeriesID)
} else {
mm.removeSeriesID(e.SeriesID)
}
// Read tag count.
tagN, remainder := tsdb.ReadSeriesKeyTagN(remainder)
// Save tags.
var k, v []byte
for i := 0; i < tagN; i++ {
k, v, remainder = tsdb.ReadSeriesKeyTag(remainder)
ts := mm.createTagSetIfNotExists(k)
tv := ts.createTagValueIfNotExists(v)
// Add/remove a reference to the series on the tag value.
if !deleted {
tv.addSeriesID(e.SeriesID)
} else {
tv.removeSeriesID(e.SeriesID)
}
ts.tagValues[string(v)] = tv
mm.tagSet[string(k)] = ts
}
// Add/remove from appropriate series id sets.
if !deleted {
f.seriesIDSet.Add(e.SeriesID)
f.tombstoneSeriesIDSet.Remove(e.SeriesID)
} else {
f.seriesIDSet.Remove(e.SeriesID)
f.tombstoneSeriesIDSet.Add(e.SeriesID)
}
}
- 刪除整個(gè)Measurement相關(guān)的索引, 先appEntry到tsl文件,成功后再更新內(nèi)存索引
func (f *LogFile) DeleteMeasurement(name []byte) error {
f.mu.Lock()
defer f.mu.Unlock()
e := LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name}
if err := f.appendEntry(&e); err != nil {
return err
}
f.execEntry(&e)
// Flush buffer and sync to disk.
return f.FlushAndSync()
}
類(lèi)似的操作還有 DeleteTagKey DeleteTagValue DeleteSeriesID
- 獲取到SeriesIDIterator, 用于遍歷給定的tag key所對(duì)應(yīng)的所有的tag value所在的每一個(gè)series id
func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator {
f.mu.RLock()
defer f.mu.RUnlock()
mm, ok := f.mms[string(name)]
if !ok {
return nil
}
tk, ok := mm.tagSet[string(key)]
if !ok {
return nil
}
// Combine iterators across all tag keys.
itrs := make([]tsdb.SeriesIDIterator, 0, len(tk.tagValues))
for _, tv := range tk.tagValues {
if tv.cardinality() == 0 {
continue
}
if itr := tsdb.NewSeriesIDSetIterator(tv.seriesIDSet()); itr != nil {
itrs = append(itrs, itr)
}
}
return tsdb.MergeSeriesIDIterators(itrs...)
}
- 批量添加SeriesKey,對(duì)于已經(jīng)存在的就不處理,同時(shí)更新內(nèi)存索引和寫(xiě)入tsl文件
func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) ([]uint64, error) {
// 寫(xiě)入series file文件,返回所有的series id 列表
seriesIDs, err := f.sfile.CreateSeriesListIfNotExists(names, tagsSlice)
if err != nil {
return nil, err
}
var writeRequired bool
entries := make([]LogEntry, 0, len(names))
seriesSet.RLock()
for i := range names {
// seriesSet是該函數(shù)傳進(jìn)來(lái)的第一個(gè)參數(shù),如果id已經(jīng)存在于這個(gè)給定的seriesSet中,就不處理當(dāng)前的id
if seriesSet.ContainsNoLock(seriesIDs[i]) {
// We don't need to allocate anything for this series.
seriesIDs[i] = 0
continue
}
writeRequired = true
// 添充后面要使用的LogEntry列表
entries = append(entries, LogEntry{SeriesID: seriesIDs[i], name: names[i], tags: tagsSlice[i], cached: true, batchidx: i})
}
seriesSet.RUnlock()
// Exit if all series already exist.
if !writeRequired {
return seriesIDs, nil
}
f.mu.Lock()
defer f.mu.Unlock()
seriesSet.Lock()
defer seriesSet.Unlock()
for i := range entries { // NB - this doesn't evaluate all series ids returned from series file.
entry := &entries[i]
// 上面已經(jīng)過(guò)濾過(guò)一次了,這里還需要再過(guò)濾嗎?
if seriesSet.ContainsNoLock(entry.SeriesID) {
// We don't need to allocate anything for this series.
seriesIDs[entry.batchidx] = 0
continue
}
if err := f.appendEntry(entry); err != nil {
return nil, err
}
f.execEntry(entry)
seriesSet.AddNoLock(entry.SeriesID)
}
// Flush buffer and sync to disk.
if err := f.FlushAndSync(); err != nil {
return nil, err
}
return seriesIDs, nil
}