跟我學(xué)Kafka之NIO通信機(jī)制

很久沒有做技術(shù)方面的分享了,今天閑來有空寫一篇關(guān)于Kafka通信方面的文章與大家共同學(xué)習(xí)。

一、Kafka通信機(jī)制的整體結(jié)構(gòu)

74EACA88-8B9D-45F8-B7BF-202D658205A9.png

這個(gè)圖采用的就是我們之前提到的SEDA多線程模型,鏈接如下:
http://www.itdecent.cn/p/e184fdc0ade4
1、對(duì)于broker來說,客戶端連接數(shù)量有限,不會(huì)頻繁新建大量連接。因此一個(gè)Acceptor thread線程處理新建連接綽綽有余。
2、Kafka高吐吞量,則要求broker接收和發(fā)送數(shù)據(jù)必須快速,因此用proccssor thread線程池處理,并把讀取客戶端數(shù)據(jù)轉(zhuǎn)交給緩沖區(qū),不會(huì)導(dǎo)致客戶端請(qǐng)求大量堆積。
3、Kafka磁盤操作比較頻繁會(huì)且有io阻塞或等待,IO Thread線程數(shù)量一般設(shè)置為proccssor thread num兩倍,可以根據(jù)運(yùn)行環(huán)境需要進(jìn)行調(diào)節(jié)。

二、SocketServer整體設(shè)計(jì)時(shí)序圖

Kafka 通信時(shí)序圖.jpg

說明:

Kafka SocketServer是基于Java NIO來開發(fā)的,采用了Reactor的模式,其中包含了1個(gè)Acceptor負(fù)責(zé)接受客戶端請(qǐng)求,N個(gè)Processor線程負(fù)責(zé)讀寫數(shù)據(jù),M個(gè)Handler來處理業(yè)務(wù)邏輯。在Acceptor和Processor,Processor和Handler之間都有隊(duì)列來緩沖請(qǐng)求。

下面我們就針對(duì)以上整體設(shè)計(jì)思路分開講解各個(gè)不同部分的源代碼。

2.1 啟動(dòng)初始化工作

def startup() {
    val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
    for(i <- 0 until numProcessorThreads) {
      processors(i) = new Processor(i, 
                                    time, 
                                    maxRequestSize, 
                                    aggregateIdleMeter,
                                    newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
                                    numProcessorThreads, 
                                    requestChannel,
                                    quotas,
                                    connectionsMaxIdleMs)
      Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
    }

    newGauge("ResponsesBeingSent", new Gauge[Int] {
      def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
    })

    // register the processor threads for notification of responses
    requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
   
    // start accepting connections
    this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
    Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
    acceptor.awaitStartup
    info("Started")
  }

說明:

ConnectionQuotas對(duì)象負(fù)責(zé)管理連接數(shù)/IP, 創(chuàng)建一個(gè)Acceptor偵聽者線程,初始化N個(gè)Processor線程,processors是一個(gè)線程數(shù)組,可以作為線程池使用,默認(rèn)是三個(gè),Acceptor線程和N個(gè)Processor線程中每個(gè)線程都獨(dú)立創(chuàng)建Selector.open()多路復(fù)用器,相關(guān)代碼在下面:

val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue));

val serverChannel = openServerSocket(host, port);

范圍可以設(shè)定從1到Int的最大值。

2.2 Acceptor線程

def run() {
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    startupComplete()
    var currentProcessor = 0
    while(isRunning) {
      val ready = selector.select(500)
      if(ready > 0) {
        val keys = selector.selectedKeys()
        val iter = keys.iterator()
        while(iter.hasNext && isRunning) {
          var key: SelectionKey = null
          try {
            key = iter.next
            iter.remove()
            if(key.isAcceptable)
               accept(key, processors(currentProcessor))
            else
               throw new IllegalStateException("Unrecognized key state for acceptor thread.")

            // round robin to the next processor thread
            currentProcessor = (currentProcessor + 1) % processors.length
          } catch {
            case e: Throwable => error("Error while accepting connection", e)
          }
        }
      }
    }
    debug("Closing server socket and selector.")
    swallowError(serverChannel.close())
    swallowError(selector.close())
    shutdownComplete()
  }

2.1.1 注冊(cè)O(shè)P_ACCEPT事件

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

2.1.2 內(nèi)部邏輯

此處采用的是同步非阻塞邏輯,每隔500MS輪詢一次,關(guān)于同步非阻塞的知識(shí)點(diǎn)在http://www.itdecent.cn/p/e9c6690c0737
當(dāng)有請(qǐng)求到來的時(shí)候采用輪詢的方式獲取一個(gè)Processor線程處理請(qǐng)求,代碼如下:

currentProcessor = (currentProcessor + 1) % processors.length

之后將代碼添加到newConnections隊(duì)列之后返回,代碼如下:

def accept(socketChannel: SocketChannel) {  newConnections.add(socketChannel)  wakeup()}

//newConnections是一個(gè)線程安全的隊(duì)列,存放SocketChannel通道
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()

2.3 kafka.net.Processor

override def run() {
    startupComplete()
    while(isRunning) {
      // setup any new connections that have been queued up
      configureNewConnections()
      // register any new responses for writing
      processNewResponses()
      val startSelectTime = SystemTime.nanoseconds
      val ready = selector.select(300)
      currentTimeNanos = SystemTime.nanoseconds
      val idleTime = currentTimeNanos - startSelectTime
      idleMeter.mark(idleTime)
      // We use a single meter for aggregate idle percentage for the thread pool.
      // Since meter is calculated as total_recorded_value / time_window and
      // time_window is independent of the number of threads, each recorded idle
      // time should be discounted by # threads.
      aggregateIdleMeter.mark(idleTime / totalProcessorThreads)

      trace("Processor id " + id + " selection time = " + idleTime + " ns")
      if(ready > 0) {
        val keys = selector.selectedKeys()
        val iter = keys.iterator()
        while(iter.hasNext && isRunning) {
          var key: SelectionKey = null
          try {
            key = iter.next
            iter.remove()
            if(key.isReadable)
              read(key)
            else if(key.isWritable)
              write(key)
            else if(!key.isValid)
              close(key)
            else
              throw new IllegalStateException("Unrecognized key state for processor thread.")
          } catch {
            case e: EOFException => {
              info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
              close(key)
            } case e: InvalidRequestException => {
              info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
              close(key)
            } case e: Throwable => {
              error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
              close(key)
            }
          }
        }
      }
      maybeCloseOldestConnection
    }
    debug("Closing selector.")
    closeAll()
    swallowError(selector.close())
    shutdownComplete()
  }

先來重點(diǎn)看一下configureNewConnections這個(gè)方法:

private def configureNewConnections() {
    while(newConnections.size() > 0) {
      val channel = newConnections.poll()
      debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress)
      channel.register(selector, SelectionKey.OP_READ)
    }
  }

循環(huán)判斷NewConnections的大小,如果有值則彈出,并且注冊(cè)為OP_READ讀事件。
再回到主邏輯看一下read方法。

def read(key: SelectionKey) {
    lruConnections.put(key, currentTimeNanos)
    val socketChannel = channelFor(key)
    var receive = key.attachment.asInstanceOf[Receive]
    if(key.attachment == null) {
      receive = new BoundedByteBufferReceive(maxRequestSize)
      key.attach(receive)
    }
    val read = receive.readFrom(socketChannel)
    val address = socketChannel.socket.getRemoteSocketAddress();
    trace(read + " bytes read from " + address)
    if(read < 0) {
      close(key)
    } else if(receive.complete) {
      val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
      requestChannel.sendRequest(req)
      key.attach(null)
      // explicitly reset interest ops to not READ, no need to wake up the selector just yet
      key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
    } else {
      // more reading to be done
      trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
      key.interestOps(SelectionKey.OP_READ)
      wakeup()
    }
  }

說明

1、把當(dāng)前SelectionKey和事件循環(huán)時(shí)間放入LRU映射表中,將來檢查時(shí)回收連接資源。
2、建立BoundedByteBufferReceive對(duì)象,具體讀取操作由這個(gè)對(duì)象的readFrom方法負(fù)責(zé)進(jìn)行,返回讀取的字節(jié)大小。

  • 如果讀取完成,則修改狀態(tài)為receive.complete,并通過requestChannel.sendRequest(req)將封裝好的Request對(duì)象放到RequestQueue隊(duì)列中。
  • 如果沒有讀取完成,則讓selector繼續(xù)偵聽OP_READ事件。

2.4 kafka.server.KafkaRequestHandler

def run() {
    while(true) {
      try {
        var req : RequestChannel.Request = null
        while (req == null) {
          // We use a single meter for aggregate idle percentage for the thread pool.
          // Since meter is calculated as total_recorded_value / time_window and
          // time_window is independent of the number of threads, each recorded idle
          // time should be discounted by # threads.
          val startSelectTime = SystemTime.nanoseconds
          req = requestChannel.receiveRequest(300)
          val idleTime = SystemTime.nanoseconds - startSelectTime
          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
        }

        if(req eq RequestChannel.AllDone) {
          debug("Kafka request handler %d on broker %d received shut down command".format(
            id, brokerId))
          return
        }
        req.requestDequeueTimeMs = SystemTime.milliseconds
        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
        apis.handle(req)
      } catch {
        case e: Throwable => error("Exception when handling request", e)
      }
    }
  }

說明

KafkaRequestHandler也是一個(gè)事件處理線程,不斷的循環(huán)讀取requestQueue隊(duì)列中的Request請(qǐng)求數(shù)據(jù),其中超時(shí)時(shí)間設(shè)置為300MS,并將請(qǐng)求發(fā)送到apis.handle方法中處理,并將請(qǐng)求響應(yīng)結(jié)果放到responseQueue隊(duì)列中去。
代碼如下:

try{
      trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
      request.requestId match {
        case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
        case RequestKeys.FetchKey => handleFetchRequest(request)
        case RequestKeys.OffsetsKey => handleOffsetRequest(request)
        case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
        case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
        case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
        case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
        case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
        case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
        case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
        case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
        case requestId => throw new KafkaException("Unknown api code " + requestId)
      }
    } catch {
      case e: Throwable =>
        request.requestObj.handleError(e, requestChannel, request)
        error("error when handling request %s".format(request.requestObj), e)
    } finally
      request.apiLocalCompleteTimeMs = SystemTime.milliseconds
  }

說明如下:

參數(shù) 說明 對(duì)應(yīng)方法
RequestKeys.ProduceKey producer請(qǐng)求 ProducerRequest
RequestKeys.FetchKey consumer請(qǐng)求 FetchRequest
RequestKeys.OffsetsKey topic的offset請(qǐng)求 OffsetRequest
RequestKeys.MetadataKey topic元數(shù)據(jù)請(qǐng)求 TopicMetadataRequest
RequestKeys.LeaderAndIsrKey leader和isr信息更新請(qǐng)求 LeaderAndIsrRequest
RequestKeys.StopReplicaKey 停止replica請(qǐng)求 StopReplicaRequest
RequestKeys.UpdateMetadataKey 更新元數(shù)據(jù)請(qǐng)求 UpdateMetadataRequest
RequestKeys.ControlledShutdownKey controlledShutdown請(qǐng)求 ControlledShutdownRequest
RequestKeys.OffsetCommitKey commitOffset請(qǐng)求 OffsetCommitRequest
RequestKeys.OffsetFetchKey consumer的offset請(qǐng)求 OffsetFetchRequest

2.5 Processor響應(yīng)數(shù)據(jù)處理

private def processNewResponses() {  
  var curr = requestChannel.receiveResponse(id)  
  while(curr != null) {  
    val key = curr.request.requestKey.asInstanceOf[SelectionKey]  
    curr.responseAction match {  
      case RequestChannel.SendAction => {  
        key.interestOps(SelectionKey.OP_WRITE)  
        key.attach(curr)  
      }  
    }  
  curr = requestChannel.receiveResponse(id)  
  }  
}  

我們回到Processor線程類中,processNewRequest()方法是發(fā)送請(qǐng)求,那么會(huì)調(diào)用processNewResponses()來處理Handler提供給客戶端的Response,把requestChannel中responseQueue的Response取出來,注冊(cè)O(shè)P_WRITE事件,將數(shù)據(jù)返回給客戶端。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容