Kafka運維填坑

  • 前提: 只針對Kafka 0.9.0.1版本;
  • 說是運維,其實偏重于問題解決;
  • 大部分解決方案都是google而來, 我只是作了次搬運工;
  • 有些問題的解決方案未必一定是通用的, 若應用到線上請慎重;
  • 如有疏漏之處, 歡迎大家批評指正;
  • 列表:
    1. Replica無法從leader同步消息
    2. Broker到zk集群的連接不時會斷開重斷
    3. Broker重啟耗時很久
    4. 不允許臟主選舉導致Broker被強制關閉
    5. Replica從錯誤的Partition leader上去同步數(shù)據(jù)
    6. __consumer_offsets日志無法被清除
    7. GC問題
    8. zk和kafka部署
    9. 監(jiān)控很重要
    10. 大量異常: Attempted to decrease connection count for address with no connections
    11. 新版sdk訪問較舊版的kafka, 發(fā)送kafka不支持的request
    12. 頻繁FullGC
    13. 機器Swap使用

Replica無法從leader同步消息
  • 現(xiàn)象: 集群上某topic原來只有單復本, 增加雙復本后,發(fā)現(xiàn)有些partition沒有從leader同步數(shù)據(jù),導致isr列表中一直沒有新增的replica;
  • 日志分析:
[2017-09-20 19:37:05,265] ERROR Found invalid messages during fetch for partition [xxxx,87] offset 1503297 error Message is corrupt (stored crc = 286782282, computed crc = 400317671) (kafka.server.ReplicaFetcherThread)
[2017-09-20 19:37:05,458] ERROR Found invalid messages during fetch for partition [xxxx,75] offset 1501373 error Message found with corrupt size (0) in shallow iterator (kafka.server.ReplicaFetcherThread)
[2017-09-20 19:37:07,455] ERROR [ReplicaFetcherThread-0-5], Error due to  (kafka.server.ReplicaFetcherThread)
kafka.common.KafkaException: error processing data for partition [xxxx,87] offset 1503346
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:147)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:122)
        at scala.Option.foreach(Option.scala:257)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:122)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:120)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbractFeherThread.scala:120)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:93)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.lang.RuntimeException: Offset mismatch: fetched offset = 1503346, log end offset = 1503297.
        at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:110)
        at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:138)
  • 解決:
    1. Kafka 0.9.0.1版本的bug: ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted message
    2. 升級版本 或者 按上面鏈接中Reporter給出的簡單修復來避開這個問題;
  • 深究:
    這個bug被觸發(fā)實際是上下面這個導致:
    ERROR Found invalid messages during fetch for partition [qssnews_download,87] offset 1503297 error Message is corrupt (stored crc = 286782282, computed crc = 400317671) (kafka.server.ReplicaFetcherThread)
    當時觸發(fā)這個bug的時, 恰逢相應的broker機器上硬盤出現(xiàn)了多個壞塊, 但不能完全確定這個crc錯誤跟這個有關.這個也有個Kafka的issue: Replication issues
Broker到zk集群的連接不時會斷開重斷
  • 現(xiàn)象: broker不時地和zk重新建立session;
  • 日志分析: broker日志里報zk連接超時或不能從zk讀取任何數(shù)據(jù)
  • 解決: 增加broker的zk的session timeout時間, 不能完全解決,但會改善很多;
  • 深究:
    1. 目前用的kafka集群還是相對比較穩(wěn)定, 但是這個zk超時問題真是百思不得其解啊.
      broker在啟動時會在zk上注冊一個臨時節(jié)點,表時自己已上線, 一旦session超時,此臨時節(jié)點將被刪除, 相當于此broker下線, 必然引起整個集群的抖動,可參考KafkaController分析8-broker掛掉
    2. zk為何會timeout, 根本原因未能準確定位,目前看到跟諸多因素有關,比如磁盤IO, CPU負載, GC等等吧;
Broker重啟耗時很久
  • 現(xiàn)象: broker重啟下分耗時
  • 日志分析: 重啟時加載所有的log segments, rebuild index;
  • 解決: 應該是stop時, 沒有優(yōu)雅的shutdown, 直接 kill -9導致;
  • 深究:
    1. 停止broker服務請使用kafka本身提供的腳本優(yōu)雅shutdown;
    2. 在shutdown broker時確保相應的zk集群是可用狀態(tài), 否則可能無法優(yōu)雅地shutdown broker.
不允許臟主選舉導致Broker被強制關閉
  • 現(xiàn)象: 監(jiān)控到集群中某臺broker掛掉
  • 日志分析:
    [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because log truncation is not allowed for topic test, Current leader 1's latest offset 0 is less than replica 2's latest offset 151 (kafka.server.ReplicaFetcherThread)
  • 解決: 實際上是設置了unclean.leader.election.enable=false, 然后走到了代碼里下面這段邏輯
  if (leaderEndOffset < replica.logEndOffset.messageOffset) {
      // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
      // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
      // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
      if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
        ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
        // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
        fatal("...")
        Runtime.getRuntime.halt(1)
      }

調(diào)用Runtime.getRuntime.halt(1)直接暴力退出了.
可參考Kafka issue: Unclean leader election and "Halting because log truncation is not allowed"

Replica從錯誤的Partition leader上去同步數(shù)據(jù)
  • 現(xiàn)象: 集群里若干臺機器先后磁盤空間報警, 經(jīng)查是kafka log占用大量磁盤空間,接著看log, 里面有大量的
WARN [Replica Manager on Broker 3]: While recording the replica LEO, the partition [orderservice.production,0] hasn't been created. (kafka.server.ReplicaManager)

ERROR [ReplicaFetcherThread-0-58], Error for partition [reptest,0] to broker 58:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
  • 日志分析:
    從上面的日志結合當前topic的partiton的復本和isr情況,可知是錯誤的replica從錯誤的partition leader上去同步數(shù)據(jù)了, 這理論上不應該啊;
    1. 之前每個集群因硬件原因掛掉了一臺機器, 然后想刪掉上面的一個partiton, 但因為kafka本身不支持partiton的刪除, 就在zk上的/brokers/[topic]節(jié)點的內(nèi)容里直接去掉了這個partiton的信息, 但是kafka controller并不會處理partiton減少的情況, 可參考KafkaController分析
    2. 為了觸發(fā)這個topic的partition的刪除, 我又遷移了其他的partiton;
    3. 然后還刪除了zk上的/controller臨時節(jié)點;
    4. 最后連自己都暈了;
    5. 然后之前壞的機器修好又上線了, 然后問題出現(xiàn)了;
  • 解決: 將broker都重啟了一遍;
  • 深究:
    1. 最終原因沒有完全確認, 發(fā)現(xiàn)問題的時候之前的kafka debug log被刪除了;
    2. kafka 上有類以的issue: can't create as many partitions as brokers exists
    3. 盡量不要手動更新zk上的kafka相關節(jié)點內(nèi)容;
    4. 考慮在kafka源碼里加個delete partition的功能, 這個不會太難;
__consumer_offsets日志無法被清除
  • 現(xiàn)象: 集群中若干臺機器磁盤空間報警, 上去查看是__consumer_offsets的一個partition占用了幾十G的空間
  • 日志分析: 之前的日志被清理了,沒有有效的日志了.為了debug這個問題,我把這個partition下的index和log文件打包拷貝到了測試集群, 然后重啟了當前的broker, 發(fā)現(xiàn)了下面的日志:
[2017-09-30 10:49:36,126] ERROR [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
java.lang.IllegalArgumentException: requirement failed: 138296566648 messages in segment __consumer_offsets-5/00000000000000000000.log but offset map can fit only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads
        at scala.Predef$.require(Predef.scala:219)
        at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
        at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
        at scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
        at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
        at kafka.log.Cleaner.clean(LogCleaner.scala:322)
        at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
        at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
  • 問題分析:
    結合LogCleaner的源碼可知,是00000000000000000000.log這個logSegmentsegment.nextOffset() - segment.baseOffset大于了maxDesiredMapSize, 導致了LogClean線程的終止, 從而無法清理, 這不應該啊?!
     val segmentSize = segment.nextOffset() - segment.baseOffset
      require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset map can fit only %d. You can in了crease log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(segmentSize,  log.name, segment.log.file.getName, maxDesiredMapSize))
      if (map.size + segmentSize <= maxDesiredMapSize)
        offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
      else
        full = true
  • 解決: 我也沒想到其他的好辦法, 暴力刪除了00000000000000000000.log00000000000000000000.index, 然后刪掉了cleaner-offset-checkpoint中相關的項,重啟broker, 日志開始了壓縮清理
  • 深究:
    這個logSegmentsegment.nextOffset() - segment.baseOffset大于了maxDesiredMapSize, 猜測是有個業(yè)務是手動提交offset到這個partition, 沒有控制好,導致每秒能提交8,9MByte上來;
GC問題
  • 現(xiàn)象: 集群報警某臺broker down, 在zk上無此broker節(jié)點的注冊信息
  • 日志分析:
    1. 看broker日志里報zk連接超時或不能從zk讀取任何數(shù)據(jù), 其實和上面的Broker到zk集群的連接不時會斷開重斷現(xiàn)象是一樣的;
    2. 看broker的gc日志, 對應時間gc耗時很長, 導致stop the world,broker所有線程都停止工作, 自然也無法與zk保持心跳;
  • 解決: 暫時無解決方案, GC是個大麻煩, 網(wǎng)上也搜了一圈, 沒找到有效的解決方案, 個人水平有限, 哪位大神有什么好的方法, 可以留言給我,謝謝~
  • 補充: 關于GC這個找到了莊博士的這個視頻,可以參考下OS 造成的長時間非典型 JVM GC 停頓:深度分析和解決
  • GC慢,引起的STW會導致很多問題, 我們還遇到了他導致的OOM, Listen隊列被打滿
zk和kafka部署
  • zk和kafka broker 如果部署在同一臺機器上, 請盡量將各自的data和log路徑放在不同的磁盤, 避免磁盤io的競爭;
  • kafka對zk的波動很敏感, 因此zk最好是單獨部署,保證其穩(wěn)定運行;
  • 對zk不要有大量的寫入操作, zk的寫操作最后都會轉移動leader上zk;
  • 如果采用了zk和broker是混部的方式,并且還有大量的zk寫入操作,比如使用較舊版本的storm,其提交offset到zk上, 導致zk的IO較高, 在啟動zk時可以加上zookeeper.forceSync=no, 降低寫盤IO, 這個配置有其副作用, 在線上使用時還需慎重;
監(jiān)控很重要
  • 實時監(jiān)控: 在集群上建立一個專門的topic, 監(jiān)控程序實時的寫入數(shù)據(jù), 但無法寫入或寫入耗時達到閾值時報警, 這個實時監(jiān)控真的真好用,基本上都第一時間發(fā)現(xiàn)問題;
  • 基礎監(jiān)控: cpu, 磁盤IO, 網(wǎng)卡流量, FD, 連接數(shù)等;
  • Topic流量監(jiān)控: 監(jiān)控topic的生產(chǎn)和消費流量, 特別是流量突增的情況, 快速找出害群之馬, 可以通過kafka的jmx來獲取相關的數(shù)據(jù), 使用Grafana來顯示和報警;
大量異常: Attempted to decrease connection count for address with no connections
  • 現(xiàn)象: 集群中某臺broker所在機器磁盤報警, 查看是server.log很大;
  • 日志分析: 日志里在刷大量的如下log:
[2016-10-13 00:00:00,495] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.IllegalArgumentException: Attempted to decrease connection count for address with no connections, address: /xxx.xxx.xxx.xxx
        at kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
        at kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
        at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
        at scala.collection.AbstractMap.getOrElse(Map.scala:59)
        at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564)
        at kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450)
        at kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445)
        at scala.collection.Iterator$class.foreach(Iterator.scala:742)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.network.Processor.run(SocketServer.scala:445)
        at java.lang.Thread.run(Thread.java:745)
新版sdk訪問較舊版的kafka, 發(fā)送kafka不支持的request
  • 現(xiàn)象: 日志里有大量如下日志:
[2017-10-12 16:52:38,141] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException: 18
        at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68)
        at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79)
        at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426)
        at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421)
        at scala.collection.Iterator$class.foreach(Iterator.scala:742)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.network.Processor.run(SocketServer.scala:421)
        at java.lang.Thread.run(Thread.java:745)
  • 分析:
    1. 當前用的kafka版本為0.9.0.1, 支持的request最大id為16, 這個18是新版 kafka中的ApiVersion Request, 因此會拋這個異常出來;
    2. 跟了一下代碼, 在SocketServer中:
         try {
            val channel = selector.channel(receive.source)
            val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
              channel.socketAddress)
            val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
            requestChannel.sendRequest(req)
          } catch {
            case e @ (_: InvalidRequestException | _: SchemaException) =>
              // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
              error("Closing socket for " + receive.source + " because of error", e)
              isClose = true
              close(selector, receive.source)
          }

在處理Request時并未處理這個異常,導致這個異常被其外層的try...catch...處理, 直接進入了下一輪的selector.poll(300), 而在這個selector.poll(300)中會清理之前所有的接收到的Requests, 這就導致在這種情況下,可能會漏處理一些Request, 這樣看起來還是個比較嚴重的問題;

  • 解決:
    1. 一個簡單修復:
selector.completedReceives.asScala.foreach { receive =>
          var isClose = false

          try {
            val channel = selector.channel(receive.source)
            val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
              channel.socketAddress)
            val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
            requestChannel.sendRequest(req)
          } catch {
            case e @ (_: InvalidRequestException | _: SchemaException) =>
              // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
              error("Closing socket for " + receive.source + " because of error", e)
              isClose = true
              close(selector, receive.source)
            case e : ArrayIndexOutOfBoundsException =>
              error("NotSupport Request | Closing socket for " + receive.source + " because of error", e)
              isClose = true
              close(selector, receive.source)
          }
          if (!isClose) {
            selector.mute(receive.source)
          }
        }
  1. Kafka上也有相關的Broker does not disconnect client on unknown request, 這個修復內(nèi)容比較多.
頻繁FullGC
  • 現(xiàn)象: Kafka broker停止工作, 日志無輸出,整個進程Hang住;
  • 分析: 查看kafkaServer-gc.log, 有FullGC log, 內(nèi)存無法回收, 考慮是存在內(nèi)存泄漏
    我們找到了 SocketServer inflightResponses collection leaks memory on client disconnect: inflightResponses會緩存住需要發(fā)送但還沒有發(fā)送完成的response, 這個response又同時持有其對應的request的引用, 訪問請求量大的時候其內(nèi)存占用不少.
    對于inflightResponses0.9.0.1代碼中只在completedSends中作了remove, 在disconnectedclose中沒有處理;
  • 修復:
    1. 最暴力的,可以直接將這個inflightResponses變量去掉, 但這會有個副作用,會影響到Metrics的統(tǒng)計;
    2. 優(yōu)雅的,可以參考最新的kafka代碼, 在disconnectedclose也加入移除的操作;
機器Swap使用
  • 使用大內(nèi)存的機器,并且禁用掉swap

Kafka源碼分析-匯總

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容