從一個(gè)小需求開始---從LSM到influxdb源碼解讀(2)

繼續(xù)接續(xù)前一次的分析,地址:點(diǎn)這里

C1 tree -- tsm/tsm1/engine.go

engine就是實(shí)際的引擎類了,真正的聚合文件操作在compact(wg *sync.WaitGroup)這個(gè)方法中,根據(jù)是否是全量和合并的層級調(diào)用compact.go中的實(shí)際聚合方法.

C1 tree -- tsm/tsm1/compact.go

再來是聚合的類。
我個(gè)人理解,時(shí)序數(shù)據(jù)庫因?yàn)槠鋺?yīng)用業(yè)務(wù)的特點(diǎn):寫入量通常大,讀的時(shí)候一般按時(shí)間順序取一段時(shí)間的數(shù)據(jù),所以在Cache層是可以有亂序的,但是在文件這一層為了讀的效率和準(zhǔn)確性數(shù)據(jù)必須是有序的。這個(gè)聚合的過程除了將小文件聚合為大文件以外,還會將墓碑?dāng)?shù)據(jù)移除,墓碑?dāng)?shù)據(jù)就是在內(nèi)存中寫入但是后續(xù)被刪除的數(shù)據(jù)。
先來看一下聚合定義的操作:

// CompactionPlanner determines what TSM files and WAL segments to include in a
// given compaction run. 這個(gè)類里的方法只負(fù)責(zé)找出哪些是策略對應(yīng)需要合并的文件
type CompactionPlanner interface {
    //全量觸發(fā)所有l(wèi)evel的文件合并
    Plan(lastWrite time.Time) []CompactionGroup
    //觸發(fā)特定level的文件合并
    PlanLevel(level int) []CompactionGroup
    //觸發(fā)level4的文件合并
    PlanOptimize() []CompactionGroup
    Release(group []CompactionGroup)
    
    //判斷是否文件都已經(jīng)全部整合完畢了 實(shí)際是條件是層次<=1并且文件中沒有墓碑,統(tǒng)計(jì)時(shí)會跳過正在合并的文件
    FullyCompacted() bool

    //修改forcefull布爾變量  下一次Plan()會強(qiáng)制全量
    ForceFull()

    SetFileStore(fs *FileStore)
}


// tsmGeneration represents the TSM files within a generation.
// 000001-01.tsm, 000001-02.tsm would be in the same generation
// 000001 each with different sequence numbers.
// 這里的官方注釋很清楚不多說了
type tsmGeneration struct {
    id            int
    files         []FileStat
    parseFileName ParseFileNameFunc
}

func (l compactionLevel) String() string {
    switch l {
    case 0:
        return "snapshot"
    case 1, 2, 3:
        return fmt.Sprint(int(l))
    case 4:
        //叫優(yōu)化的原因大概是這是最后一層了
        return "optimize"
    case 5:
        return "full"
    default:
        panic("unsupported compaction level")
    }
}

文件的讀取

先理解一下文件的結(jié)構(gòu)。
源碼的tsm1/design.md里可以看到tsm實(shí)際的文件結(jié)構(gòu),我去掉了前面一些相對不重要的結(jié)構(gòu)只看最核心的:

┌────────┬────────────────────────────────────┬─────────────┬──────────────┐
│ Header │               Blocks               │    Index    │    Footer    │
│5 bytes │              N bytes               │   N bytes   │   4 bytes    │
└────────┴────────────────────────────────────┴─────────────┴──────────────┘


┌────────────────────────────────────────────────────────────────────────────┐
│                                   Index                                    │
├─────────┬─────────┬──────┬───────┬─────────┬─────────┬────────┬────────┬───┤
│ Key Len │   Key   │ Type │ Count │Min Time │Max Time │ Offset │  Size  │...│
│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │   │
└─────────┴─────────┴──────┴───────┴─────────┴─────────┴────────┴────────┴───┘

可以看到最后合并整理完的有序的index文件,文件頭里都包含了這一塊文件對應(yīng)的時(shí)間段。
而在讀取時(shí)我們肯定不會一個(gè)個(gè)文件地遍歷過去查找,這樣速度太慢了而且耗時(shí)是未知的,讀文件這里又用到了mmap的技術(shù),底層的原理比較深?yuàn)W,就看一下大致讀取的流程吧。

// 這是tsmreader根據(jù)時(shí)間戳和tags找實(shí)際數(shù)據(jù)的方法
func (t *TSMReader) Read(key []byte, timestamp int64) ([]Value, error) {
    t.mu.RLock()
    v, err := t.accessor.read(key, timestamp)
    t.mu.RUnlock()
    return v, err
}

func (m *mmapAccessor) read(key []byte, timestamp int64) ([]Value, error) {
    //先判斷是否在某一個(gè)文件塊中
    entry := m.index.Entry(key, timestamp)
    if entry == nil {
        return nil, nil
    }
    //這里實(shí)際返回的是timestamp所在文件對應(yīng)entry的所有數(shù)據(jù)而不是一個(gè)時(shí)間點(diǎn)的數(shù)據(jù)  
    return m.readBlock(entry, nil)
}

//entries是內(nèi)存對文件索引的一個(gè)映射
func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry {
    entries, err := d.ReadEntries(key, nil)
    if err != nil {
        d.logger.Error("error reading tsm index key", zap.String("key", fmt.Sprintf("%q", key)))
        return nil
    }
    for _, entry := range entries {
        //這個(gè)contains就是判讀時(shí)間戳是不是大于entries當(dāng)中的最小時(shí)間戳且小于最大時(shí)間戳
        if entry.Contains(timestamp) {
            return &entry
        }
    }
    return nil
}

和內(nèi)存一樣,文件也有墓碑的概念,只是是單獨(dú)一個(gè)文件tombstone,結(jié)構(gòu)和正常的文件類似,不贅述了。刪除數(shù)據(jù)的api實(shí)際上就是往這個(gè)文件里寫數(shù)據(jù)。

influxdb的幾個(gè)小技巧

文件索引怎么壓縮
表名,索引,值
cpu,host=server1 value=1
cpu,host=server2 value=2
memory,host=server1 value=3

這一段數(shù)據(jù)你會怎么壓縮?
.
.
.
influxdb做了一些映射,利用一個(gè)編碼字典完成了壓縮
tags是一個(gè)map
先把表名映射為cpu = 1, memory = 2
索引字段映射為host = 1
索引枚舉映射為server1 = 1, server2 = 2
實(shí)際值存儲的字段映射為value = 1

最終索引就映射為一個(gè)數(shù)組了

cpu,host=server1 value=1 -->    1,1,1,1
cpu,host=server2 value=2 -->    1,1,2,1
memory,host=server1 value=3 --> 3,1,2,1

基本上influxdb核心的數(shù)據(jù)處理過程都在這兩篇隨筆中了,大家可以參照著看看源碼,希望大家都有收獲。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容