kafka 請求處理與RPC(四)

kafka server啟動后,會監(jiān)聽一些端口,然后開始接收請求進行日常的工作。
與請求處理相關的組件有 SocketServer、KafkaApis、KafkaRequestHandlerPool。這些都是在kafka server啟動時初始化并開始運行的。SocketServer是一個NIO服務,基于N+M的線程模型,由N個Acceptor線程以及M個Processor線程組成,和netty的網絡模型有點像。N個Acceptor線程專門用于監(jiān)聽連接事件,連接建立后將連接交給其他M個Processor線程繼續(xù)監(jiān)聽讀事件,這樣的線程模型使kafka可以很輕松的處理高并發(fā)的場景。

kafka請求處理架構圖

[圖片上傳失敗...(image-579286-1533214720127)]

  1. kakfa server在啟動時調用SocketServer#startup()方法,這個方法內會初始化N個Acceptor開始監(jiān)聽OP_ACCEPT事件,等待客戶端連接。初始化的Acceptor數量取決于用戶配置的listeners有幾個。在初始化每個Acceptor的同時,還會初始化M個Processor,并分配給Acceptor用于監(jiān)聽連接事件。Processor的數量取決于num.network.threads配置,該配置默認值是3,表示每個Acceptor分配3個Processor。
  2. Acceptor接收到一個新的連接時,會將這個請求以輪詢的方式分配給它管理的其中一個Processor處理
  3. Processor收到一個連接時,便開始監(jiān)聽它的OP_READ事件
  4. 如果Processor發(fā)現有請求發(fā)過來,就將這個請求放入Request隊列中,等待處理。該Request隊列的容量由配置queued.max.requests決定,改配置默認值是500.
  5. kakfa server在啟動時會初始化KafkaRequestHandlerPool類,該類在初始化時會構造一些的KafkaRequestHandler線程并啟動,構造的KafkaRequestHandler線程數量取決于配置num.io.threads的值,該配置默認值是8.。
  6. KafkaRequestHandler線程啟動后,會不斷自旋,從request queue中獲取請求,然后交給KafkaApis進行處理。KafkaApis根據請求的類型進行不同的業(yè)務處理
  7. KafkaApis組件處理完后,會將結果放入對應的Processor的response queue中,等待Processor處理
  8. Processor也是一個不斷自旋的線程,在自旋的過程中,Processor會檢查自己的response queue中是否有新的結果,如果有新的結果就將其從隊列中取出,準備發(fā)回給客戶端
  9. Processor通過NioChannel將結果寫回客戶端,自此一個通信流程結束

SocketServer的啟動

def startup() {
    this.synchronized {
      connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)

      val sendBufferSize = config.socketSendBufferBytes
      val recvBufferSize = config.socketReceiveBufferBytes
      val brokerId = config.brokerId

      var processorBeginIndex = 0
      config.listeners.foreach { endpoint =>
        val listenerName = endpoint.listenerName
        val securityProtocol = endpoint.securityProtocol
        val processorEndIndex = processorBeginIndex + numProcessorThreads

        for (i <- processorBeginIndex until processorEndIndex)
          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)

        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        acceptors.put(endpoint, acceptor)
        Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
        acceptor.awaitStartup()

        processorBeginIndex = processorEndIndex
      }
    }

    info("Started " + acceptors.size + " acceptor threads")
  }

socketServer啟動時,會初始化N個Acceptor,并為其分配好對應數量的Processor,然后啟動Acceptor線程。

Acceptor啟動監(jiān)聽相關代碼

def run() {
  //往selector監(jiān)聽OP_ACCEPT事件
  serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
  startupComplete()
  try {
    var currentProcessor = 0
    while (isRunning) {
      try {
        //開始輪詢
        val ready = nioSelector.select(500)
        if (ready > 0) {
          val keys = nioSelector.selectedKeys()
          val iter = keys.iterator()
          while (iter.hasNext && isRunning) {
            try {
              val key = iter.next
              iter.remove()
              //如果有連接進來,就將它交給指定的Processor處理
              if (key.isAcceptable)
                accept(key, processors(currentProcessor))
              else
                throw new IllegalStateException("Unrecognized key state for acceptor thread.")

              // round robin to the next processor thread
              currentProcessor = (currentProcessor + 1) % processors.length
            } catch {
              case e: Throwable => error("Error while accepting connection", e)
            }
          }
        }
      }
      catch {
        case e: ControlThrowable => throw e
        case e: Throwable => error("Error occurred", e)
      }
    }
  } finally {
    debug("Closing server socket and selector.")
    swallowError(serverChannel.close())
    swallowError(nioSelector.close())
    shutdownComplete()
  }
}
def accept(key: SelectionKey, processor: Processor) {
  //獲取對應channel
  val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
  val socketChannel = serverSocketChannel.accept()
  try {
    connectionQuotas.inc(socketChannel.socket().getInetAddress)
    socketChannel.configureBlocking(false)
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setKeepAlive(true)
    if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
      socketChannel.socket().setSendBufferSize(sendBufferSize)
    //Processor接收channel
    processor.accept(socketChannel)
  } catch {
    case e: TooManyConnectionsException =>
      info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
      close(socketChannel)
  }
}

Acceptor線程啟動后,就開始監(jiān)聽端口看有沒有新的連接進來。這里使用nio實現無阻塞的監(jiān)聽請求。收到請求后就分發(fā)給它管理的其中一個Processor線程處理。

def accept(socketChannel: SocketChannel) {
  //接收一個新的連接,newConnections集合表示當前Processor管理的連接
  newConnections.add(socketChannel)
  wakeup()
}
override def run() {
  startupComplete()
  //Processor線程不斷自旋
  while (isRunning) {
    try {
      //把新接收的連接拿到并注冊OP_READ事件
      configureNewConnections()
      //從對應的response隊列中獲取response,然后進行相應的操作.這里不一定是將響應發(fā)送給客戶端,可能不用響應客戶端,也可能關閉連接
      //另外,這個方法也不真正的發(fā)送響應,即使要發(fā)送響應給客戶端,這個方法里面也只是往對應的連接注冊OP_WRITE事件,然后等后面的poll()方法執(zhí)行時才真正將響應發(fā)送出去
      processNewResponses()
      //select()阻塞等待OP_READ和OP_WRITE事件被觸發(fā),然后處理,最長阻塞時間是300ms
      //如果OP_READ事件就緒,說明有新的請求發(fā)送過來,這些請求的信息最終會被放入selector.completedReceives集合中,也就是List<NetworkReceive>
      //如果OP_WRITE事件就緒,說明有響應需要發(fā)送出去,這時候才會將響應發(fā)送給客戶端。同時將這個連接放入completedSends表示該連接已經完成
      poll()
      //開始處理selector.completedReceives中的信息,最終會被封裝成RequestChannel.Request后放入request隊列中
      processCompletedReceives()
      //遍歷completedSends集合,將已經完成的連接從inflightResponses集合中移除
      processCompletedSends()
      //將已經斷開的連接從inflightResponses集合中移除
      processDisconnected()
    } catch {
      // We catch all the throwables here to prevent the processor thread from exiting. We do this because
      // letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
      // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
      // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
      case e: ControlThrowable => throw e
      case e: Throwable =>
        error("Processor got uncaught exception.", e)
    }
  }

  debug("Closing selector - processor " + id)
  swallowError(closeAll())
  shutdownComplete()
}

Processor線程拿到Acceptor傳過來的請求后開始監(jiān)聽該連接的讀請求。同時還會做許多事情。比如發(fā)送響應、讀取請求、關閉連接等等。

KafkaRequestHandler 線程相關代碼

kakfa server在啟動時會初始化KafkaRequestHandlerPool類,該類在初始化時會構造一些的KafkaRequestHandler線程并啟動,構造的KafkaRequestHandler線程數量取決于配置num.io.threads的值,該配置默認值是8。

下面是KafkaRequestHandler線程的run方法

def run() {
  while(true) {
    try {
      var req : RequestChannel.Request = null
      while (req == null) {
        //從request隊列中獲取請求
        val startSelectTime = time.nanoseconds
        req = requestChannel.receiveRequest(300)
        val idleTime = time.nanoseconds - startSelectTime
        aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
      }

      if(req eq RequestChannel.AllDone) {
        debug("Kafka request handler %d on broker %d received shut down command".format(
          id, brokerId))
        return
      }
      req.requestDequeueTimeMs = time.milliseconds
      trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
      //使用KafkaApis處理請求
      apis.handle(req)
    } catch {
      case e: Throwable => error("Exception when handling request", e)
    }
  }
}

KafkaRequestHandler線程不斷的從請求隊列中取出請求處理。具體的請求最后交給KafkaApis處理。

KafkaApis 相關代碼

def handle(request: RequestChannel.Request) {
  try {
    trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
      format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
    //根據請求的類型處理請求
    ApiKeys.forId(request.requestId) match {
      case ApiKeys.PRODUCE => handleProducerRequest(request)
      case ApiKeys.FETCH => handleFetchRequest(request)
      case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
      case ApiKeys.METADATA => handleTopicMetadataRequest(request)
      case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
      case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
      case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
      case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
      case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
      case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
      case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
      case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
      case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
      case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
      case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
      case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
      case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
      case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
      case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
      case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
      case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
      case requestId => throw new KafkaException("Unknown api code " + requestId)
    }
  } catch {
    case e: Throwable =>
      if (request.requestObj != null) {
        request.requestObj.handleError(e, requestChannel, request)
        error("Error when handling request %s".format(request.requestObj), e)
      } else {
        val response = request.body.getErrorResponse(e)

        /* If request doesn't have a default error response, we just close the connection.
           For example, when produce request has acks set to 0 */
        if (response == null)
          requestChannel.closeConnection(request.processor, request)
        else
          requestChannel.sendResponse(new Response(request, response))

        error("Error when handling request %s".format(request.body), e)
      }
  } finally
    request.apiLocalCompleteTimeMs = time.milliseconds
}

kafkaApis根據請求的類型執(zhí)行不同的操作來處理請求。
在0.10.2版本中,kafkaApis可以處理21種類型的請求。

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現,斷路器,智...
    卡卡羅2017閱讀 136,506評論 19 139
  • 文章摘要:很多人喜歡把RocketMQ與Kafka做對比,其實這兩款消息隊列的網絡通信層還是比較相似的,本文就為大...
    癲狂俠閱讀 8,270評論 0 13
  • kafka的定義:是一個分布式消息系統(tǒng),由LinkedIn使用Scala編寫,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,537評論 1 15
  • 聽《想你的365天》我想,寫這首歌的人當時一定像現在的我一樣,對一個人是如此的思念。 不管做什么事,你都會想著她:...
    不愿長大的孩子閱讀 317評論 0 1
  • 《月亮與六便士》是一本講述一位天才畫家拋妻棄子,離開幸福穩(wěn)定的工作和生活,去往異地他鄉(xiāng)尋找并重拾自己夢想的故事。他...
    郭斯特DTD閱讀 399評論 0 1

友情鏈接更多精彩內容