消息中間件—簡(jiǎn)談Kafka中的NIO網(wǎng)絡(luò)通信模型

文章摘要:很多人喜歡把RocketMQ與Kafka做對(duì)比,其實(shí)這兩款消息隊(duì)列的網(wǎng)絡(luò)通信層還是比較相似的,本文就為大家簡(jiǎn)要地介紹下Kafka的NIO網(wǎng)絡(luò)通信模型

前面寫的兩篇RocketMQ源碼研究筆記系列:
(1)消息中間件—RocketMQ的RPC通信(一)
(2)消息中間件—RocketMQ的RPC通信(二)
基本上已經(jīng)較為詳細(xì)地將RocketMQ這款分布式消息隊(duì)列的RPC通信部分的協(xié)議格式、消息編解碼、通信方式(同步/異步/單向)、消息收發(fā)流程和Netty的Reactor多線程分離處理架構(gòu)講了一遍。同時(shí),聯(lián)想業(yè)界大名鼎鼎的另一款開源分布式消息隊(duì)列—Kafka,具備高吞吐量和高并發(fā)的特性,其網(wǎng)絡(luò)通信層是如何做到消息的高效傳輸?shù)哪??為了解開自己心中的疑慮,就查閱了Kafka的Network通信模塊的源碼,乘機(jī)會(huì)寫本篇文章。
本文主要通過(guò)對(duì)Kafka源碼的分析來(lái)簡(jiǎn)述其Reactor的多線程網(wǎng)絡(luò)通信模型和總體框架結(jié)構(gòu),同時(shí)簡(jiǎn)要介紹Kafka網(wǎng)絡(luò)通信層的設(shè)計(jì)與具體實(shí)現(xiàn)。

一、Kafka網(wǎng)絡(luò)通信模型的整體框架概述

Kafka的網(wǎng)絡(luò)通信模型是基于NIO的Reactor多線程模型來(lái)設(shè)計(jì)的。這里先引用Kafka源碼中注釋的一段話:

An NIO socket server. The threading model is
1 Acceptor thread that handles new connections.
Acceptor has N Processor threads that each have their own selector and read requests from sockets.
M Handler threads that handle requests and produce responses back to the processor threads for writing.

相信大家看了上面的這段引文注釋后,大致可以了解到Kafka的網(wǎng)絡(luò)通信層模型,主要采用了1(1個(gè)Acceptor線程)+N(N個(gè)Processor線程)+M(M個(gè)業(yè)務(wù)處理線程)。下面的表格簡(jiǎn)要的列舉了下(這里先簡(jiǎn)單的看下后面還會(huì)詳細(xì)說(shuō)明):

線程數(shù) 線程名 線程具體說(shuō)明
1 kafka-socket-acceptor_%x Acceptor線程,負(fù)責(zé)監(jiān)聽Client端發(fā)起的請(qǐng)求
N kafka-network-thread_%d Processor線程,負(fù)責(zé)對(duì)Socket進(jìn)行讀寫
M kafka-request-handler-_%d Worker線程,處理具體的業(yè)務(wù)邏輯并生成Response返回

Kafka網(wǎng)絡(luò)通信層的完整框架圖如下圖所示:


Kafka消息隊(duì)列的通信層模型—1+N+M模型.png

剛開始看到上面的這個(gè)框架圖可能會(huì)有一些不太理解,并不要緊,這里可以先對(duì)Kafka的網(wǎng)絡(luò)通信層框架結(jié)構(gòu)有一個(gè)大致了解。本文后面會(huì)結(jié)合Kafka的部分重要源碼來(lái)詳細(xì)闡述上面的過(guò)程。這里可以簡(jiǎn)單總結(jié)一下其網(wǎng)絡(luò)通信模型中的幾個(gè)重要概念:
(1),Acceptor:1個(gè)接收線程,負(fù)責(zé)監(jiān)聽新的連接請(qǐng)求,同時(shí)注冊(cè)O(shè)P_ACCEPT 事件,將新的連接按照"round robin"方式交給對(duì)應(yīng)的 Processor 線程處理;
(2),Processor:N個(gè)處理器線程,其中每個(gè) Processor 都有自己的 selector,它會(huì)向 Acceptor 分配的 SocketChannel 注冊(cè)相應(yīng)的 OP_READ 事件,N 的大小由“num.networker.threads”決定;
(3),KafkaRequestHandler:M個(gè)請(qǐng)求處理線程,包含在線程池—KafkaRequestHandlerPool內(nèi)部,從RequestChannel的全局請(qǐng)求隊(duì)列—requestQueue中獲取請(qǐng)求數(shù)據(jù)并交給KafkaApis處理,M的大小由“num.io.threads”決定;
(4),RequestChannel:其為Kafka服務(wù)端的請(qǐng)求通道,該數(shù)據(jù)結(jié)構(gòu)中包含了一個(gè)全局的請(qǐng)求隊(duì)列 requestQueue和多個(gè)與Processor處理器相對(duì)應(yīng)的響應(yīng)隊(duì)列responseQueue,提供給Processor與請(qǐng)求處理線程KafkaRequestHandler和KafkaApis交換數(shù)據(jù)的地方。
(5),NetworkClient:其底層是對(duì) Java NIO 進(jìn)行相應(yīng)的封裝,位于Kafka的網(wǎng)絡(luò)接口層。Kafka消息生產(chǎn)者對(duì)象—KafkaProducer的send方法主要調(diào)用NetworkClient完成消息發(fā)送;
(6),SocketServer:其是一個(gè)NIO的服務(wù),它同時(shí)啟動(dòng)一個(gè)Acceptor接收線程和多個(gè)Processor處理器線程。提供了一種典型的Reactor多線程模式,將接收客戶端請(qǐng)求和處理請(qǐng)求相分離;
(7),KafkaServer:代表了一個(gè)Kafka Broker的實(shí)例;其startup方法為實(shí)例啟動(dòng)的入口;
(8),KafkaApis:Kafka的業(yè)務(wù)邏輯處理Api,負(fù)責(zé)處理不同類型的請(qǐng)求;比如“發(fā)送消息”“獲取消息偏移量—offset”“處理心跳請(qǐng)求”等;

二、Kafka網(wǎng)絡(luò)通信層的設(shè)計(jì)與具體實(shí)現(xiàn)

這一節(jié)將結(jié)合Kafka網(wǎng)絡(luò)通信層的源碼來(lái)分析其設(shè)計(jì)與實(shí)現(xiàn),這里主要詳細(xì)介紹網(wǎng)絡(luò)通信層的幾個(gè)重要元素—SocketServer、Acceptor、Processor、RequestChannel和KafkaRequestHandler。本文分析的源碼部分均基于Kafka的0.11.0版本。

1、SocketServer

SocketServer是接收客戶端Socket請(qǐng)求連接、處理請(qǐng)求并返回處理結(jié)果的核心類,Acceptor及Processor的初始化、處理邏輯都是在這里實(shí)現(xiàn)的。在KafkaServer實(shí)例啟動(dòng)時(shí)會(huì)調(diào)用其startup的初始化方法,會(huì)初始化1個(gè) Acceptor和N個(gè)Processor線程(每個(gè)EndPoint都會(huì)初始化,一般來(lái)說(shuō)一個(gè)Server只會(huì)設(shè)置一個(gè)端口),其實(shí)現(xiàn)如下:

def startup() {
    this.synchronized {

      connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)

      val sendBufferSize = config.socketSendBufferBytes
      val recvBufferSize = config.socketReceiveBufferBytes
      val brokerId = config.brokerId

      var processorBeginIndex = 0
      // 一個(gè)broker一般只設(shè)置一個(gè)端口
      config.listeners.foreach { endpoint =>
        val listenerName = endpoint.listenerName
        val securityProtocol = endpoint.securityProtocol
        val processorEndIndex = processorBeginIndex + numProcessorThreads
        //N 個(gè) processor
        for (i <- processorBeginIndex until processorEndIndex)
          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool)
        //1個(gè) Acceptor
        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        acceptors.put(endpoint, acceptor)
        KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
        acceptor.awaitStartup()

        processorBeginIndex = processorEndIndex
      }
    }

2、Acceptor

Acceptor是一個(gè)繼承自抽象類AbstractServerThread的線程類。Acceptor的主要任務(wù)是監(jiān)聽并且接收客戶端的請(qǐng)求,同時(shí)建立數(shù)據(jù)傳輸通道—SocketChannel,然后以輪詢的方式交給一個(gè)后端的Processor線程處理(具體的方式是添加socketChannel至并發(fā)隊(duì)列并喚醒Processor線程處理)。
在該線程類中主要可以關(guān)注以下兩個(gè)重要的變量:
(1),nioSelector:通過(guò)NSelector.open()方法創(chuàng)建的變量,封裝了JAVA NIO Selector的相關(guān)操作;
(2),serverChannel:用于監(jiān)聽端口的服務(wù)端Socket套接字對(duì)象;
下面來(lái)看下Acceptor主要的run方法的源碼:

def run() {
    //首先注冊(cè)O(shè)P_ACCEPT事件
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      //以輪詢方式查詢并等待關(guān)注的事件發(fā)生
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable)
                  //如果事件發(fā)生則調(diào)用accept方法對(duì)OP_ACCEPT事件處理
                  accept(key, processors(currentProcessor))
                else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
                //輪詢算法
                // round robin to the next processor thread
                currentProcessor = (currentProcessor + 1) % processors.length
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
       //代碼省略
  }

  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socketChannel.socket().setSendBufferSize(sendBufferSize)

      processor.accept(socketChannel)
    } catch {
        //省略部分代碼
    }
  }

  def accept(socketChannel: SocketChannel) {
    newConnections.add(socketChannel)
    wakeup()
  }

在上面源碼中可以看到,Acceptor線程啟動(dòng)后,首先會(huì)向用于監(jiān)聽端口的服務(wù)端套接字對(duì)象—ServerSocketChannel上注冊(cè)O(shè)P_ACCEPT 事件。然后以輪詢的方式等待所關(guān)注的事件發(fā)生。如果該事件發(fā)生,則調(diào)用accept()方法對(duì)OP_ACCEPT事件進(jìn)行處理。這里,Processor是通過(guò)round robin方法選擇的,這樣可以保證后面多個(gè)Processor線程的負(fù)載基本均勻。
Acceptor的accept()方法的作用主要如下:
(1)通過(guò)SelectionKey取得與之對(duì)應(yīng)的serverSocketChannel實(shí)例,并調(diào)用它的accept()方法與客戶端建立連接;
(2)調(diào)用connectionQuotas.inc()方法增加連接統(tǒng)計(jì)計(jì)數(shù);并同時(shí)設(shè)置第(1)步中創(chuàng)建返回的socketChannel屬性(如sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking等)
(3)將socketChannel交給processor.accept()方法進(jìn)行處理。這里主要是將socketChannel加入Processor處理器的并發(fā)隊(duì)列newConnections隊(duì)列中,然后喚醒Processor線程從隊(duì)列中獲取socketChannel并處理。其中,newConnections會(huì)被Acceptor線程和Processor線程并發(fā)訪問(wèn)操作,所以newConnections是ConcurrentLinkedQueue隊(duì)列(一個(gè)基于鏈接節(jié)點(diǎn)的無(wú)界線程安全隊(duì)列)

3、Processor

Processor同Acceptor一樣,也是一個(gè)線程類,繼承了抽象類AbstractServerThread。其主要是從客戶端的請(qǐng)求中讀取數(shù)據(jù)和將KafkaRequestHandler處理完響應(yīng)結(jié)果返回給客戶端。在該線程類中主要關(guān)注以下幾個(gè)重要的變量:
(1),newConnections:在上面的Acceptor一節(jié)中已經(jīng)提到過(guò),它是一種ConcurrentLinkedQueue[SocketChannel]類型的隊(duì)列,用于保存新連接交由Processor處理的socketChannel;
(2),inflightResponses:是一個(gè)Map[String, RequestChannel.Response]類型的集合,用于記錄尚未發(fā)送的響應(yīng);
(3),selector:是一個(gè)類型為KSelector變量,用于管理網(wǎng)絡(luò)連接;
下面先給出Processor處理器線程run方法執(zhí)行的流程圖:

Kafk_Processor線程的處理流程圖.png

從上面的流程圖中能夠可以看出Processor處理器線程在其主流程中主要完成了這樣子幾步操作:
(1),處理newConnections隊(duì)列中的socketChannel。遍歷取出隊(duì)列中的每個(gè)socketChannel并將其在selector上注冊(cè)O(shè)P_READ事件;
(2),處理RequestChannel中與當(dāng)前Processor對(duì)應(yīng)響應(yīng)隊(duì)列中的Response。在這一步中會(huì)根據(jù)responseAction的類型(NoOpAction/SendAction/CloseConnectionAction)進(jìn)行判斷,若為“NoOpAction”,表示該連接對(duì)應(yīng)的請(qǐng)求無(wú)需響應(yīng);若為“SendAction”,表示該Response需要發(fā)送給客戶端,則會(huì)通過(guò)“selector.send”注冊(cè)O(shè)P_WRITE事件,并且將該Response從responseQueue響應(yīng)隊(duì)列中移至inflightResponses集合中;“CloseConnectionAction”,表示該連接是要關(guān)閉的;
(3),調(diào)用selector.poll()方法進(jìn)行處理。該方法底層即為調(diào)用nioSelector.select()方法進(jìn)行處理。
(4),處理已接受完成的數(shù)據(jù)包隊(duì)列—completedReceives。在processCompletedReceives方法中調(diào)用“requestChannel.sendRequest”方法將請(qǐng)求Request添加至requestChannel的全局請(qǐng)求隊(duì)列—requestQueue中,等待KafkaRequestHandler來(lái)處理。同時(shí),調(diào)用“selector.mute”方法取消與該請(qǐng)求對(duì)應(yīng)的連接通道上的OP_READ事件;
(5),處理已發(fā)送完的隊(duì)列—completedSends。當(dāng)已經(jīng)完成將response發(fā)送給客戶端,則將其從inflightResponses移除,同時(shí)通過(guò)調(diào)用“selector.unmute”方法為對(duì)應(yīng)的連接通道重新注冊(cè)O(shè)P_READ事件;
(6),處理斷開連接的隊(duì)列。將該response從inflightResponses集合中移除,同時(shí)將connectionQuotas統(tǒng)計(jì)計(jì)數(shù)減1;

4、RequestChannel

在Kafka的網(wǎng)絡(luò)通信層中,RequestChannel為Processor處理器線程與KafkaRequestHandler線程之間的數(shù)據(jù)交換提供了一個(gè)數(shù)據(jù)緩沖區(qū),是通信過(guò)程中Request和Response緩存的地方。因此,其作用就是在通信中起到了一個(gè)數(shù)據(jù)緩沖隊(duì)列的作用。Processor線程將讀取到的請(qǐng)求添加至RequestChannel的全局請(qǐng)求隊(duì)列—requestQueue中;KafkaRequestHandler線程從請(qǐng)求隊(duì)列中獲取并處理,處理完以后將Response添加至RequestChannel的響應(yīng)隊(duì)列—responseQueue中,并通過(guò)responseListeners喚醒對(duì)應(yīng)的Processor線程,最后Processor線程從響應(yīng)隊(duì)列中取出后發(fā)送至客戶端。

5、KafkaRequestHandler

KafkaRequestHandler也是一種線程類,在KafkaServer實(shí)例啟動(dòng)時(shí)候會(huì)實(shí)例化一個(gè)線程池—KafkaRequestHandlerPool對(duì)象(包含了若干個(gè)KafkaRequestHandler線程),這些線程以守護(hù)線程的方式在后臺(tái)運(yùn)行。在KafkaRequestHandler的run方法中會(huì)循環(huán)地從RequestChannel中阻塞式讀取request,讀取后再交由KafkaApis來(lái)具體處理。

6、KafkaApis

KafkaApis是用于處理對(duì)通信網(wǎng)絡(luò)傳輸過(guò)來(lái)的業(yè)務(wù)消息請(qǐng)求的中心轉(zhuǎn)發(fā)組件。該組件反映出Kafka Broker Server可以提供哪些服務(wù)。

三、總結(jié)

仔細(xì)閱讀Kafka的NIO網(wǎng)絡(luò)通信層的源碼過(guò)程中還是可以收獲不少關(guān)于NIO網(wǎng)絡(luò)通信模塊的關(guān)鍵技術(shù)。Apache的任何一款開源中間件都有其設(shè)計(jì)獨(dú)到之處,值得借鑒和學(xué)習(xí)。對(duì)于任何一位使用Kafka這款分布式消息隊(duì)列的同學(xué)來(lái)說(shuō),如果能夠在一定實(shí)踐的基礎(chǔ)上,再通過(guò)閱讀其源碼能起到更為深入理解的效果,對(duì)于大規(guī)模Kafka集群的性能調(diào)優(yōu)和問(wèn)題定位都大有裨益。
對(duì)于剛接觸Kafka的同學(xué)來(lái)說(shuō),想要自己掌握其NIO網(wǎng)絡(luò)通信層模型的關(guān)鍵設(shè)計(jì),還需要不斷地使用本地環(huán)境進(jìn)行debug調(diào)試和閱讀源碼反復(fù)思考。限于筆者的才疏學(xué)淺,對(duì)本文內(nèi)容可能還有理解不到位的地方,如有闡述不合理之處還望留言一起探討。后續(xù)還會(huì)根據(jù)自己的實(shí)踐和研發(fā),陸續(xù)發(fā)布關(guān)于Kafka分布式消息隊(duì)列的其他相關(guān)技術(shù)文章,敬請(qǐng)關(guān)注。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容