繼續(xù)接續(xù)前一次的分析,地址:點(diǎn)這里
C1 tree -- tsm/tsm1/engine.go
engine就是實(shí)際的引擎類了,真正的聚合文件操作在compact(wg *sync.WaitGroup)這個(gè)方法中,根據(jù)是否是全量和合并的層級調(diào)用compact.go中的實(shí)際聚合方法.
C1 tree -- tsm/tsm1/compact.go
再來是聚合的類。
我個(gè)人理解,時(shí)序數(shù)據(jù)庫因?yàn)槠鋺?yīng)用業(yè)務(wù)的特點(diǎn):寫入量通常大,讀的時(shí)候一般按時(shí)間順序取一段時(shí)間的數(shù)據(jù),所以在Cache層是可以有亂序的,但是在文件這一層為了讀的效率和準(zhǔn)確性數(shù)據(jù)必須是有序的。這個(gè)聚合的過程除了將小文件聚合為大文件以外,還會將墓碑?dāng)?shù)據(jù)移除,墓碑?dāng)?shù)據(jù)就是在內(nèi)存中寫入但是后續(xù)被刪除的數(shù)據(jù)。
先來看一下聚合定義的操作:
// CompactionPlanner determines what TSM files and WAL segments to include in a
// given compaction run. 這個(gè)類里的方法只負(fù)責(zé)找出哪些是策略對應(yīng)需要合并的文件
type CompactionPlanner interface {
//全量觸發(fā)所有l(wèi)evel的文件合并
Plan(lastWrite time.Time) []CompactionGroup
//觸發(fā)特定level的文件合并
PlanLevel(level int) []CompactionGroup
//觸發(fā)level4的文件合并
PlanOptimize() []CompactionGroup
Release(group []CompactionGroup)
//判斷是否文件都已經(jīng)全部整合完畢了 實(shí)際是條件是層次<=1并且文件中沒有墓碑,統(tǒng)計(jì)時(shí)會跳過正在合并的文件
FullyCompacted() bool
//修改forcefull布爾變量 下一次Plan()會強(qiáng)制全量
ForceFull()
SetFileStore(fs *FileStore)
}
// tsmGeneration represents the TSM files within a generation.
// 000001-01.tsm, 000001-02.tsm would be in the same generation
// 000001 each with different sequence numbers.
// 這里的官方注釋很清楚不多說了
type tsmGeneration struct {
id int
files []FileStat
parseFileName ParseFileNameFunc
}
func (l compactionLevel) String() string {
switch l {
case 0:
return "snapshot"
case 1, 2, 3:
return fmt.Sprint(int(l))
case 4:
//叫優(yōu)化的原因大概是這是最后一層了
return "optimize"
case 5:
return "full"
default:
panic("unsupported compaction level")
}
}
文件的讀取
先理解一下文件的結(jié)構(gòu)。
源碼的tsm1/design.md里可以看到tsm實(shí)際的文件結(jié)構(gòu),我去掉了前面一些相對不重要的結(jié)構(gòu)只看最核心的:
┌────────┬────────────────────────────────────┬─────────────┬──────────────┐
│ Header │ Blocks │ Index │ Footer │
│5 bytes │ N bytes │ N bytes │ 4 bytes │
└────────┴────────────────────────────────────┴─────────────┴──────────────┘
┌────────────────────────────────────────────────────────────────────────────┐
│ Index │
├─────────┬─────────┬──────┬───────┬─────────┬─────────┬────────┬────────┬───┤
│ Key Len │ Key │ Type │ Count │Min Time │Max Time │ Offset │ Size │...│
│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │ │
└─────────┴─────────┴──────┴───────┴─────────┴─────────┴────────┴────────┴───┘
可以看到最后合并整理完的有序的index文件,文件頭里都包含了這一塊文件對應(yīng)的時(shí)間段。
而在讀取時(shí)我們肯定不會一個(gè)個(gè)文件地遍歷過去查找,這樣速度太慢了而且耗時(shí)是未知的,讀文件這里又用到了mmap的技術(shù),底層的原理比較深?yuàn)W,就看一下大致讀取的流程吧。
// 這是tsmreader根據(jù)時(shí)間戳和tags找實(shí)際數(shù)據(jù)的方法
func (t *TSMReader) Read(key []byte, timestamp int64) ([]Value, error) {
t.mu.RLock()
v, err := t.accessor.read(key, timestamp)
t.mu.RUnlock()
return v, err
}
func (m *mmapAccessor) read(key []byte, timestamp int64) ([]Value, error) {
//先判斷是否在某一個(gè)文件塊中
entry := m.index.Entry(key, timestamp)
if entry == nil {
return nil, nil
}
//這里實(shí)際返回的是timestamp所在文件對應(yīng)entry的所有數(shù)據(jù)而不是一個(gè)時(shí)間點(diǎn)的數(shù)據(jù)
return m.readBlock(entry, nil)
}
//entries是內(nèi)存對文件索引的一個(gè)映射
func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry {
entries, err := d.ReadEntries(key, nil)
if err != nil {
d.logger.Error("error reading tsm index key", zap.String("key", fmt.Sprintf("%q", key)))
return nil
}
for _, entry := range entries {
//這個(gè)contains就是判讀時(shí)間戳是不是大于entries當(dāng)中的最小時(shí)間戳且小于最大時(shí)間戳
if entry.Contains(timestamp) {
return &entry
}
}
return nil
}
和內(nèi)存一樣,文件也有墓碑的概念,只是是單獨(dú)一個(gè)文件tombstone,結(jié)構(gòu)和正常的文件類似,不贅述了。刪除數(shù)據(jù)的api實(shí)際上就是往這個(gè)文件里寫數(shù)據(jù)。
influxdb的幾個(gè)小技巧
文件索引怎么壓縮
表名,索引,值
cpu,host=server1 value=1
cpu,host=server2 value=2
memory,host=server1 value=3
這一段數(shù)據(jù)你會怎么壓縮?
.
.
.
influxdb做了一些映射,利用一個(gè)編碼字典完成了壓縮
tags是一個(gè)map
先把表名映射為cpu = 1, memory = 2
索引字段映射為host = 1
索引枚舉映射為server1 = 1, server2 = 2
實(shí)際值存儲的字段映射為value = 1
最終索引就映射為一個(gè)數(shù)組了
cpu,host=server1 value=1 --> 1,1,1,1
cpu,host=server2 value=2 --> 1,1,2,1
memory,host=server1 value=3 --> 3,1,2,1
基本上influxdb核心的數(shù)據(jù)處理過程都在這兩篇隨筆中了,大家可以參照著看看源碼,希望大家都有收獲。