05、Kafka的索引機(jī)制

Kafka中有三大類(lèi)索引:位移索引、時(shí)間戳索引和已中止事務(wù)索引。分別對(duì)應(yīng)了.index、.timeindex、.txnindex文件。

與之相關(guān)的源碼如下:

  1. AbstractIndex.scala:抽象類(lèi),封裝了所有索引的公共操作
  2. OffsetIndex.scala:位移索引,保存了位移值和對(duì)應(yīng)磁盤(pán)物理位置的關(guān)系
  3. TimeIndex.scala:時(shí)間戳索引,保存了時(shí)間戳和對(duì)應(yīng)位移值的關(guān)系
  4. TransactionIndex.scala:事務(wù)索引,啟用Kafka事務(wù)之后才會(huì)出現(xiàn)這個(gè)索引

先來(lái)看看AbstractIndex的定義


AbstractIndex的定義在代碼里已經(jīng)注釋了,成員變量里面還有個(gè)entrySize。這個(gè)變量其實(shí)是每個(gè)索引項(xiàng)的大小,每個(gè)索引項(xiàng)的大小是固定的。

entrySize

在OffsetIndex位移索引中是override def entrySize = 8,8個(gè)字節(jié)。
在TimeIndex時(shí)間戳索引中是override def entrySize = 12,12個(gè)字節(jié)。

為什么要這么麻煩,還要存?zhèn)€差值?
這其實(shí)和MySQL InnoDB 為何建議主鍵不宜過(guò)長(zhǎng)一樣。每個(gè)輔助索引都會(huì)存儲(chǔ)主鍵的值,主鍵越長(zhǎng),每條索引項(xiàng)占用的內(nèi)存就越大,緩存頁(yè)一次從磁盤(pán)獲取的索引數(shù)就越少,一次查詢(xún)需要訪(fǎng)問(wèn)磁盤(pán)次數(shù)就可能變多。而磁盤(pán)訪(fǎng)問(wèn)我們都知道,很慢。

位移索引

不同索引類(lèi)型保存不同的<Key , Value>對(duì),對(duì)OffsetIndex位移索引而言,Key就是消息的相對(duì)位移,Value保存該消息的日志段文件中該消息第一個(gè)字節(jié)的物理文件位置。

為何是8 ?

在OffsetIndex位移索引中是override def entrySize = 8,8個(gè)字節(jié)。

相對(duì)位移是一個(gè)整型,占用4個(gè)字節(jié),物理文件位置也是一個(gè)整型,同樣占用4個(gè)字節(jié),因此總共8個(gè)字節(jié)。

我們知道,Kafka中的消息位移值是一個(gè)長(zhǎng)整型,應(yīng)該占用8個(gè)字節(jié)才對(duì),在保存OffsetIndex<Key , Value>對(duì),Kafka做了一些優(yōu)化,每個(gè)OffsetIndex對(duì)象在創(chuàng)建時(shí),都已經(jīng)保存了對(duì)應(yīng)日志段對(duì)象的起始位移,因此保存與起始位移的差值就夠了。

  1. 為了節(jié)省空間,一個(gè)索引項(xiàng)節(jié)省了4字節(jié),想想那些日消息處理數(shù)萬(wàn)億的公司。
  2. 因?yàn)閮?nèi)存資源是很寶貴的,索引項(xiàng)越短,內(nèi)存中能存儲(chǔ)的索引項(xiàng)就越多,索引項(xiàng)多了直接命中的概率就高了。

寫(xiě)入索引項(xiàng)

寫(xiě)入索引項(xiàng)append方法的實(shí)現(xiàn)

 def append(offset: Long, position: Int): Unit = {
    inLock(lock) {
     // 索引文件如果已經(jīng)寫(xiě)滿(mǎn),直接拋出異常
      require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
    // 要保證待寫(xiě)入的位移offset比當(dāng)前索引文件中所存的位移值要大
  // 這主要是為了維護(hù)索引的單調(diào)性
      if (_entries == 0 || offset > _lastOffset) {
        trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
        mmap.putInt(relativeOffset(offset))//向mmap寫(xiě)入相對(duì)位移值
        mmap.putInt(position)//向mmap寫(xiě)入物理文件位置
        _entries += 1//更新索引項(xiàng)個(gè)數(shù)
        _lastOffset = offset//更新當(dāng)前索引文件最大位移值
      // 確保寫(xiě)入索引項(xiàng)格式符合要求
        require(_entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
      } else {
        throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
          s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
      }
    }
  }
image.png

時(shí)間戳索引

TimeIndex保存的是<時(shí)間戳,相對(duì)位移值>,時(shí)間戳需要長(zhǎng)整型來(lái)保存,相對(duì)位移值使用Integer來(lái)保存。因此TimeIndex單個(gè)索引項(xiàng)需要占用12個(gè)字節(jié)。

寫(xiě)入索引項(xiàng)

def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false): Unit = {
    inLock(lock) {
      if (!skipFullCheck)
  // 索引文件如果已經(jīng)寫(xiě)滿(mǎn),直接拋出異常
        require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
    // 這主要是為了維護(hù)索引的單調(diào)性
      if (_entries != 0 && offset < lastEntry.offset)
        throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" +
          s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.")
// 這主要是為了維護(hù)索引的單調(diào)性
      if (_entries != 0 && timestamp < lastEntry.timestamp)
        throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" +
          s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.")

      if (timestamp > lastEntry.timestamp) {
        trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.")
        mmap.putLong(timestamp)//向mmap寫(xiě)入時(shí)間戳
        mmap.putInt(relativeOffset(offset))//向mmap寫(xiě)入相對(duì)位移值
        _entries += 1
        _lastEntry = TimestampOffset(timestamp, offset)
        require(_entries * entrySize == mmap.position(), s"${_entries} entries but file position in index is ${mmap.position()}.")
      }
    }
  }

位移索引和時(shí)間戳索引的區(qū)別是什么?

image.png

改進(jìn)的二分查找

就Kafka而言,索引是在文件末尾追加的寫(xiě)入的,并且一般寫(xiě)入的數(shù)據(jù)立馬就會(huì)被讀取。所以數(shù)據(jù)的熱點(diǎn)集中在尾部。并且操作系統(tǒng)基本上都是用頁(yè)為單位緩存和管理內(nèi)存的,內(nèi)存又是有限的,因此會(huì)通過(guò)類(lèi)LRU機(jī)制淘汰內(nèi)存。

看起來(lái)LRU非常適合Kafka的場(chǎng)景,但是使用標(biāo)準(zhǔn)的二分查找會(huì)有缺頁(yè)中斷的情況,畢竟二分是跳著訪(fǎng)問(wèn)的。

簡(jiǎn)單的來(lái)講,假設(shè)某索引占page cache 13頁(yè),此時(shí)數(shù)據(jù)已經(jīng)寫(xiě)到了12頁(yè)。按照kafka訪(fǎng)問(wèn)的特性,此時(shí)訪(fǎng)問(wèn)的數(shù)據(jù)都在第12頁(yè),因此二分查找的特性,此時(shí)緩存頁(yè)的訪(fǎng)問(wèn)順序依次是0,6,9,11,12。因?yàn)轭l繁被訪(fǎng)問(wèn),所以這幾頁(yè)一定存在page cache中。


當(dāng)?shù)?2頁(yè)不斷被填充,滿(mǎn)了之后會(huì)申請(qǐng)新頁(yè)第13頁(yè)保存索引項(xiàng),而按照二分查找的特性,此時(shí)緩存頁(yè)的訪(fǎng)問(wèn)順序依次是:0,7,10,12。這7和10很久沒(méi)被訪(fǎng)問(wèn)到了,很可能已經(jīng)不再緩存中了,然后需要從磁盤(pán)上讀取數(shù)據(jù)。注釋說(shuō):在他們的測(cè)試中,這會(huì)導(dǎo)致至少會(huì)產(chǎn)生從幾毫秒跳到1秒的延遲。

基于以上問(wèn)題,Kafka使用了改進(jìn)版的二分查找,改的不是二分查找的內(nèi)部,而且把所有索引項(xiàng)分為熱區(qū)和冷區(qū)
這個(gè)改進(jìn)可以讓查詢(xún)熱數(shù)據(jù)部分時(shí),遍歷的Page永遠(yuǎn)是固定的,這樣能避免缺頁(yè)中斷。
看到這里其實(shí)我想到了一致性hash,一致性hash相對(duì)于普通的hash不就是在node新增的時(shí)候緩存的訪(fǎng)問(wèn)固定,或者只需要遷移少部分?jǐn)?shù)據(jù)。

Kafka為什么不采用InnoDB的索引機(jī)制

InnoDB中維護(hù)索引的代價(jià)比Kafka中的要高。Kafka中當(dāng)有新的索引文件建立的時(shí)候ConcurrentSkipListMap才會(huì)更新,而不是每次有數(shù)據(jù)寫(xiě)入時(shí)就會(huì)更新,這塊的維護(hù)量基本可以忽略,B+樹(shù)中數(shù)據(jù)有插入、更新、刪除的時(shí)候都需要更新索引,還會(huì)引來(lái)“頁(yè)分裂”等相對(duì)耗時(shí)的操作。Kafka中的索引文件也是順序追加文件的操作,和B+樹(shù)比起來(lái)工作量要小很多。

說(shuō)到底還是應(yīng)用場(chǎng)景不同所決定的。MySQL中需要頻繁地執(zhí)行CRUD的操作,CRUD是MySQL的主要工作內(nèi)容,而為了支撐這個(gè)操作需要使用維護(hù)量大很多的B+樹(shù)去支撐。Kafka中的消息一般都是順序?qū)懭氪疟P(pán),再到從磁盤(pán)順序讀出(不深入探討page cache等),他的主要工作內(nèi)容就是:寫(xiě)入+讀取,很少有檢索查詢(xún)的操作,換句話(huà)說(shuō),檢索查詢(xún)只是Kafka的一個(gè)輔助功能,不需要為了這個(gè)功能而去花費(fèi)特別太的代價(jià)去維護(hù)一個(gè)高level的索引。前面也說(shuō)過(guò),Kafka中的這種方式是在磁盤(pán)空間、內(nèi)存空間、查找時(shí)間等多方面之間的一個(gè)折中。?

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容