Kafka 源碼解析之Broker請(qǐng)求處理流程

kafka在設(shè)計(jì)上大量使用了Selector+Channel+Buffer的設(shè)計(jì)原理.所以在開(kāi)始之前簡(jiǎn)單介紹一下NIO 的Selector+Channel+Buffer

NIO 的Selector+Channel+Buffer

Buffers(緩沖區(qū))
Java NIO中的Buffer用于和NIO通道進(jìn)行交互。
緩沖區(qū)本質(zhì)上是一塊可以寫入數(shù)據(jù),然后可以從中讀取數(shù)據(jù)的內(nèi)存。這塊內(nèi)存被包裝成NIO Buffer對(duì)象,并提供了一組方法,用來(lái)方便的訪問(wèn)該塊內(nèi)存
標(biāo)準(zhǔn)的IO基于字節(jié)流和字符流進(jìn)行操作的,而NIO是基于通道(Channel)和緩沖區(qū)(Buffer)進(jìn)行操作,數(shù)據(jù)總是從通道讀取到緩沖區(qū)中,或者從緩沖區(qū)寫入到通道中。

Channels(通道)
Java NIO的通道類似流,但又有些不同:
既可以從通道中讀取數(shù)據(jù),又可以寫數(shù)據(jù)到通道。但流的讀寫通常是單向的。
通道可以異步地讀寫。

如下面圖示是Buffers與Channel交互:


image.png

Selectors(選擇器)

選擇器用于監(jiān)聽(tīng)多個(gè)通道的事件(比如:連接打開(kāi),數(shù)據(jù)到達(dá))。Selector(選擇器)是Java NIO中能夠檢測(cè)一到多個(gè)NIO通道,并能夠知曉通道是否為諸如讀寫事件做好準(zhǔn)備的組件。這樣,一個(gè)單獨(dú)的線程可以管理多個(gè)channel,從而管理多個(gè)網(wǎng)絡(luò)連接
下面是單線程中使用一個(gè)Selector處理3個(gè)Channel的圖示:


image.png

Non-blocking IO(非阻塞IO)
當(dāng)線程從通道讀取數(shù)據(jù)到緩沖區(qū)時(shí),線程還是可以進(jìn)行其他事情。當(dāng)數(shù)據(jù)被寫入到緩沖區(qū)時(shí),線程可以繼續(xù)處理它。從緩沖區(qū)寫入通道也類似。

Broker請(qǐng)求處理流程

下面通過(guò)重要環(huán)節(jié)的源碼分析,來(lái)梳理請(qǐng)求處理的整個(gè)過(guò)程(kafka2.3)

  • KafkaServer Kafka的網(wǎng)絡(luò)層入口類是SocketServer。
    kafka.Kafka是Kafka Broker的入口類,kafka.Kafka.main()是Kafka Server的main()方法,即Kafka Broker的啟動(dòng)入口。我們跟蹤代碼,即沿著方法調(diào)用棧kafka.Kafka.main() -> kafkaServerStartable.startup() -> KafkaServer().startup可以從main()方法入口一直跟蹤到SocketServer即網(wǎng)絡(luò)層對(duì)象的創(chuàng)建,這意味著Kafka Server啟動(dòng)的時(shí)候會(huì)初始化并啟動(dòng)SocketServer。
  def main(args: Array[String]): Unit = {
    try {
      val serverProps = getPropsFromArgs(args)
      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
      // 部分省略 ... 
      kafkaServerStartable.startup()
      kafkaServerStartable.awaitShutdown()
    }
    catch {
      case e: Throwable =>
        fatal("Exiting Kafka due to fatal exception", e)
        Exit.exit(1)
    }
    Exit.exit(0)
  }

class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter]) extends Logging {
  private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters)
...
  def startup() {
    try server.startup()
    catch {
     ...
    }
  }
  }
  • SocketServer處理與代理之間的新連接、請(qǐng)求和響應(yīng)。
    Kafka支持兩種類型的請(qǐng)求

  • 數(shù)據(jù)層面:處理來(lái)自集群中的客戶端和其他代理的請(qǐng)求。
    線程模型是每個(gè)監(jiān)聽(tīng)器有一個(gè)Acceptor線程,用來(lái)處理新的連接。可以通過(guò)在KafkaConfig中為“ listeners”指定多個(gè)“、”分隔的endpoint來(lái)配置多個(gè)監(jiān)聽(tīng)端口。
    Acceptor有N個(gè)處理器線程(每個(gè)線程都有自己的selector并從套接字中讀取請(qǐng)求)和M處理程序線程(它處理請(qǐng)求并將響應(yīng)返回給處理器線程進(jìn)行編寫)

  • 控制層面:處理來(lái)自控制器的請(qǐng)求。這是可選的,可以通過(guò)指定“control.plan .listener.name”來(lái)配置。如果沒(méi)有配置,控制器請(qǐng)求由數(shù)據(jù)層面處理。
    線程模型是處理新連接的接受線程Acceptor有一個(gè)處理器線程(它有自己的選擇器并從套接字中讀取請(qǐng)求)和1處理程序線程,它處理請(qǐng)求并將響應(yīng)生成回處理器線程進(jìn)行編寫

  • SocketServer的startup方法,創(chuàng)建Control和Data層面的Acceptor和Processor線程并啟動(dòng)所有的processor線程

 def startup(startupProcessors: Boolean = true) {
   this.synchronized {
     connectionQuotas = new ConnectionQuotas(config, time)
    //控制層面
     createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
    //數(shù)據(jù)層面
     createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
     if (startupProcessors) {
       //在控制層面啟動(dòng)Processor線程
       startControlPlaneProcessor()
      //在數(shù)據(jù)層面啟動(dòng)Processor線程
       startDataPlaneProcessors()
     } 
   }
 }

 private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
                                                   endpoints: Seq[EndPoint]): Unit = synchronized {
   endpoints.foreach { endpoint =>
     connectionQuotas.addListener(config, endpoint.listenerName)
   //每一個(gè)endPoint創(chuàng)建一個(gè)Acceptor,創(chuàng)建多個(gè)Processor放入processor線程數(shù)組
     val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
     addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
   }
 }
  • Acceptor的構(gòu)造方法中,首先通過(guò)openServerSocket()打開(kāi)自己負(fù)責(zé)的EndPoint的Socket,即打開(kāi)端口并啟動(dòng)監(jiān)聽(tīng)。
    然后,Acceptor會(huì)負(fù)責(zé)構(gòu)造并管理的一個(gè)Processor的ArrayBuffer。其實(shí),每一個(gè)Processor都是一個(gè)獨(dú)立線程
  • Acceptor線程的run()方法,是不斷監(jiān)聽(tīng)對(duì)應(yīng)ServerChannel上的連接請(qǐng)求(ACCEPT),如果有新的連接請(qǐng)求,使用的輪詢方式將通道分配給Processor.
    新連接交付給Processor的具體的調(diào)用是在方法assignNewConnection方法中
private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              brokerId: Int,
                              connectionQuotas: ConnectionQuotas,
                              metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

  private val nioSelector = NSelector.open()
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
  private val processors = new ArrayBuffer[Processor]()

  /**
   * Accept loop that checks for new connection attempts
   */
  def run() {
   //將ServerChannel注冊(cè)到Selector,并監(jiān)聽(tīng)ACCEPT事件
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessorIndex = 0
      while (isRunning) {
        try {

          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()

                if (key.isAcceptable) {
                  accept(key).foreach { socketChannel =>

                    var retriesLeft = synchronized(processors.length)
                    var processor: Processor = null
                    do {
                      retriesLeft -= 1
                      processor = synchronized {
             
                        processors(currentProcessorIndex)
                      }
                      currentProcessorIndex += 1
                    // 此處調(diào)用assignNewConnection
                    } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                  }
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
        // ...
        }
      }
    } 
  //...
  }

  • assignNewConnection中通過(guò)processor.accept的調(diào)用,將SocketChannel放入每個(gè)processor自己維護(hù)的新連接的隊(duì)列,后面processor會(huì)從隊(duì)列取出做后續(xù)處理
  private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
    //調(diào)用processor.accept
    if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {
     // ...
      true
    } else
      false
  }
  • 每一個(gè)Processor都維護(hù)了一個(gè)單獨(dú)的Selector對(duì)象,這個(gè)Selector只負(fù)責(zé)這個(gè)Processor上所有channel的監(jiān)聽(tīng)。這樣最大程度上保證了不同Processor線程之間的完全并行和業(yè)務(wù)隔離.同時(shí)每一個(gè)processor維護(hù)一個(gè)responseQueue,用于KafkaRequestHandler交互,在下面的流程會(huì)提到
private[kafka] class Processor(val id: Int,
                               time: Time,
                               maxRequestSize: Int,
                               requestChannel: RequestChannel,
                               connectionQuotas: ConnectionQuotas,
                               connectionsMaxIdleMs: Long,
                               failedAuthenticationDelayMs: Int,
                               listenerName: ListenerName,
                               securityProtocol: SecurityProtocol,
                               config: KafkaConfig,
                               metrics: Metrics,
                               credentialProvider: CredentialProvider,
                               memoryPool: MemoryPool,
                               logContext: LogContext,
                               connectionQueueSize: Int = ConnectionQueueSize) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
// 維護(hù)一個(gè)新連接隊(duì)列,在run方法里會(huì)取出處理
 private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
  //每一個(gè)processor維護(hù)一個(gè)responseQueue
  private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()

// processor都維護(hù)了一個(gè)單獨(dú)的Selector
 private val selector = createSelector(
    ChannelBuilders.serverChannelBuilder(listenerName,
      listenerName == config.interBrokerListenerName,
      securityProtocol,
      config,
      credentialProvider.credentialCache,
      credentialProvider.tokenCache,
      time))
  // Visible to override for testing
  protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
    channelBuilder match {
      case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable)
      case _ =>
    }
    new KSelector(
      maxRequestSize,
      connectionsMaxIdleMs,
      failedAuthenticationDelayMs,
      metrics,
      time,
      "socket-server",
      metricTags,
      false,
      true,
      channelBuilder,
      memoryPool,
      logContext)
  }

  override def run() {
     //表示初始化流程已經(jīng)結(jié)束,通過(guò)這個(gè)CountDownLatch代表初始化已經(jīng)結(jié)束,這個(gè)Processor已經(jīng)開(kāi)始正常運(yùn)行了
    startupComplete() 
    try {
      while (isRunning) {
        try {
          // setup any new connections that have been queued up
          configureNewConnections()
          // register any new responses for writing
         //處理響應(yīng)隊(duì)列,這個(gè)響應(yīng)隊(duì)列是Handler線程處理以后的結(jié)果,會(huì)交付給RequestChannel.responseQueue.同時(shí)調(diào)用unmute,開(kāi)始接受請(qǐng)求
          processNewResponses() 
          //調(diào)用KSelector.poll(),進(jìn)行真正的數(shù)據(jù)讀寫
          poll()
          //調(diào)用Selector.mute,不再接受Read請(qǐng)求,發(fā)送響應(yīng)之前,不可以再接收任何請(qǐng)求
          processCompletedReceives()
          processCompletedSends()
          processDisconnected()
          closeExcessConnections()
        } catch {
         // ...
        }
      }
    } finally {
      // ...
    }
  }
  }
  • run方法中configureNewConnections是processor從自己維護(hù)的newConnections隊(duì)列取出新連接,并將其注冊(cè)到selector并監(jiān)聽(tīng)OR_READ事件。configureNewConnections 內(nèi)部調(diào)用register()方法,會(huì)將新接收的新連接SocketChannel注冊(cè)到服務(wù)器端的Selector,并監(jiān)聽(tīng)OP_READ事件,如果發(fā)生讀請(qǐng)求,可以取出對(duì)應(yīng)的request進(jìn)行后續(xù)處理
 private def configureNewConnections() {
    var connectionsProcessed = 0
    while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {
     // 取出新連接SocketChannel
      val channel = newConnections.poll()
      try {
     
       // 將SocketChannel注冊(cè)到selector
        selector.register(connectionId(channel.socket), channel)
        connectionsProcessed += 1
      } catch {
       
        case e: Throwable =>
          // ...
      }
    }
  }

 public void register(String id, SocketChannel socketChannel) throws IOException {
        ensureNotRegistered(id);
        registerChannel(id, socketChannel, SelectionKey.OP_READ);
        this.sensors.connectionCreated.record();
    }
  • RequestChannel 負(fù)責(zé)消息從網(wǎng)絡(luò)層轉(zhuǎn)接到業(yè)務(wù)層,以及將業(yè)務(wù)層的處理結(jié)果交付給網(wǎng)絡(luò)層進(jìn)而返回給客戶端。每一個(gè)SocketServer只有一個(gè)RequestChannel對(duì)象,在SocketServer中構(gòu)造。RequestChannel構(gòu)造方法中初始化了requestQueue,用來(lái)存放網(wǎng)絡(luò)層接收到的請(qǐng)求,這些請(qǐng)求即將交付給業(yè)務(wù)層進(jìn)行處理。同時(shí),初始化了responseQueues,為每一個(gè)Processor建立了一個(gè)response隊(duì)列,用來(lái)存放這個(gè)Processor的一個(gè)或者多個(gè)Response,這些response即將交付給網(wǎng)絡(luò)層返回給客戶端。
    class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends KafkaMetricsGroup {
      import RequestChannel._
      val metrics = new RequestChannel.Metrics
      private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
      private val processors = new ConcurrentHashMap[Int, Processor]()
    
      /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
      def sendRequest(request: RequestChannel.Request) {
        requestQueue.put(request)
      }

    
      }

    }

  • Processor.processCompletedReceives()通過(guò)遍歷completedReceives,對(duì)于每一個(gè)已經(jīng)完成接收的數(shù)據(jù),對(duì)數(shù)據(jù)進(jìn)行解析和封裝,交付給RequestChannel,RequestChannel會(huì)交付給具體的業(yè)務(wù)處理層進(jìn)行處理。其中RequestChannel拿到請(qǐng)求數(shù)據(jù),會(huì)調(diào)用RequestChannel.sendRequest方法,將請(qǐng)求put到requestQueue中,以供后續(xù)的處理請(qǐng)求線程處理
  private def processCompletedReceives() {

    selector.completedReceives.asScala.foreach { receive =>
      try {
        openOrClosingChannel(receive.source) match {
          case Some(channel) =>
       
            else {
              val nowNanos = time.nanoseconds()
              if (channel.serverAuthenticationSessionExpired(nowNanos)) {
               // ...
              } else {
               
              //將請(qǐng)求通過(guò)RequestChannel.requestQueue交付給Handler
                requestChannel.sendRequest(req)
                selector.mute(connectionId)//不再接受Read請(qǐng)求,發(fā)送響應(yīng)之前,不可以再接收任何請(qǐng)求
                handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
              }
            }
        
      } catch {
       // ...
      }
    }
  }

  • KafkaRequestHandler請(qǐng)求處理線程和KafkaRequestHandlerPool線程池
    KafkaRequestHandler 主要關(guān)注run方法,該方法的具體邏輯是從RequestChannel取出processor之前put請(qǐng)求,調(diào)用KafkaApi針對(duì)不同請(qǐng)求類型分別處理
class KafkaRequestHandler(id: Int,
                          brokerId: Int,
                          val aggregateIdleMeter: Meter,
                          val totalHandlerThreads: AtomicInteger,
                          val requestChannel: RequestChannel,
                          apis: KafkaApis,
                          time: Time) extends Runnable with Logging {
  def run() {
    while (!stopped) {
    
      //從RequestChannel.requestQueue中取出請(qǐng)求
      val req = requestChannel.receiveRequest(300)
      req match {
        case RequestChannel.ShutdownRequest =>
          shutdownComplete.countDown()
          return

        case request: RequestChannel.Request =>
          try {
          // 調(diào)用KafkaApi.handle(),將請(qǐng)求交付給業(yè)務(wù)
            apis.handle(request)
          } catch {
          // 異常處理 ...
          } finally {
            request.releaseBuffer()
          }

        case null => // continue
      }
    }
    shutdownComplete.countDown()
  }

  • KafkaRequestHandlerPool構(gòu)造方法中初始化并啟動(dòng)了多個(gè)KafkaRequestHandler線程對(duì)象,線程池大小通過(guò)Kafka配置文件配置項(xiàng)num.io.threads進(jìn)行配置。
    KafkaRequestHandlerPool線程池中的所有KafkaRequestHandler,通過(guò)競(jìng)爭(zhēng)方式從RequestChannel.requestQueue中獲取請(qǐng)求進(jìn)行處理。由于requestQueue的類型是ArrayBlockingQueue,通過(guò)調(diào)用ArrayBlockingQueue.poll()方法取出請(qǐng)求.
class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              time: Time,
                              numThreads: Int,
                              requestHandlerAvgIdleMetricName: String,
                              logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {

  private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)

  //初始化由KafkaRequestHandler線程構(gòu)成的線程數(shù)組
  val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
  for (i <- 0 until numThreads) {
    createHandler(i)
  }
  def createHandler(id: Int): Unit = synchronized {
    runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
    KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
  }
// ...
}
  • KafkaApis類似一個(gè)工具類,解析用戶請(qǐng)求并將請(qǐng)求交付給業(yè)務(wù)層,我們可以把它看做Kafka的API層。從上面KafkaRequestHandler.run()方法可以看到,這是通過(guò)調(diào)用KafkaApis.handle()方法完成的
  def handle(request: RequestChannel.Request) {
      request.header.apiKey match {
        case ApiKeys.PRODUCE => handleProduceRequest(request)
        case ApiKeys.FETCH => handleFetchRequest(request)
        case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
        case ApiKeys.METADATA => handleTopicMetadataRequest(request)
        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
                //其它ApiKeys,略
               //異常處理略
      }
  
  }
  • 我們以ApiKeys.PRODUCE 的流程來(lái)分析后續(xù)流程,handleProduceRequest方法中有兩個(gè)重要的方法sendResponseCallback()和replicaManager.appendRecords() .其中sendResponseCallback回調(diào)函數(shù)中調(diào)用requestChannel.sendResponse()將response交付給RequestChannel
 def handleProduceRequest(request: RequestChannel.Request) {
    val produceRequest = request.body[ProduceRequest]

        // 回調(diào)函數(shù),內(nèi)部將業(yè)務(wù)層處理的最終結(jié)果發(fā)送到對(duì)應(yīng)processor負(fù)責(zé)的響應(yīng)隊(duì)列
    def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
    
      // Send the response immediately. In case of throttling, the channel has already been muted.
      if (produceRequest.acks == 0) {
        // 通過(guò)RequestChannel將response放入processor的響應(yīng)隊(duì)列,調(diào)用requestChannel.sendResponse()將response交付給RequestChannel
        sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)), None)
      }
    }
      // appendRecords方法是records寫入的邏輯
      replicaManager.appendRecords(
        timeout = produceRequest.timeout.toLong,
        requiredAcks = produceRequest.acks,
        internalTopicsAllowed = internalTopicsAllowed,
        isFromClient = true,
        entriesPerPartition = authorizedRequestInfo,
        responseCallback = sendResponseCallback,
        recordConversionStatsCallback = processingStatsCallback)
    // ... 
    }
  }
  • 最后,在上文講解Processor的時(shí)候說(shuō)過(guò),Procossor.processNewResponses()就是從requestChannel.responseQueues取出屬于自己的連接上的響應(yīng),準(zhǔn)備返回給客戶端

一圖勝千言,最后通過(guò)一張圖來(lái)回顧整個(gè)Broker請(qǐng)求處理流程

整體流程圖示如下:


image.png

參考自

https://blog.csdn.net/zhanyuanlin/article/details/76906583
http://ifeve.com/channels/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容