kafka server啟動后,會監(jiān)聽一些端口,然后開始接收請求進行日常的工作。
與請求處理相關的組件有 SocketServer、KafkaApis、KafkaRequestHandlerPool。這些都是在kafka server啟動時初始化并開始運行的。SocketServer是一個NIO服務,基于N+M的線程模型,由N個Acceptor線程以及M個Processor線程組成,和netty的網絡模型有點像。N個Acceptor線程專門用于監(jiān)聽連接事件,連接建立后將連接交給其他M個Processor線程繼續(xù)監(jiān)聽讀事件,這樣的線程模型使kafka可以很輕松的處理高并發(fā)的場景。
kafka請求處理架構圖
[圖片上傳失敗...(image-579286-1533214720127)]
-
kakfa server在啟動時調用SocketServer#startup()方法,這個方法內會初始化N個Acceptor開始監(jiān)聽OP_ACCEPT事件,等待客戶端連接。初始化的Acceptor數量取決于用戶配置的listeners有幾個。在初始化每個Acceptor的同時,還會初始化M個Processor,并分配給Acceptor用于監(jiān)聽連接事件。Processor的數量取決于num.network.threads配置,該配置默認值是3,表示每個Acceptor分配3個Processor。 -
Acceptor接收到一個新的連接時,會將這個請求以輪詢的方式分配給它管理的其中一個Processor處理 -
Processor收到一個連接時,便開始監(jiān)聽它的OP_READ事件 - 如果Processor發(fā)現有請求發(fā)過來,就將這個請求放入Request隊列中,等待處理。該Request隊列的容量由配置
queued.max.requests決定,改配置默認值是500. - kakfa server在啟動時會初始化KafkaRequestHandlerPool類,該類在初始化時會構造一些的KafkaRequestHandler線程并啟動,構造的KafkaRequestHandler線程數量取決于配置
num.io.threads的值,該配置默認值是8.。 - KafkaRequestHandler線程啟動后,會不斷自旋,從
request queue中獲取請求,然后交給KafkaApis進行處理。KafkaApis根據請求的類型進行不同的業(yè)務處理 - KafkaApis組件處理完后,會將結果放入
對應的Processor的response queue中,等待Processor處理 - Processor也是一個不斷自旋的線程,在自旋的過程中,Processor會檢查自己的response queue中是否有新的結果,如果有新的結果就將其從隊列中取出,準備發(fā)回給客戶端
- Processor通過NioChannel將結果寫回客戶端,自此一個通信流程結束
SocketServer的啟動
def startup() {
this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
var processorBeginIndex = 0
config.listeners.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val processorEndIndex = processorBeginIndex + numProcessorThreads
for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)
Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
acceptor.awaitStartup()
processorBeginIndex = processorEndIndex
}
}
info("Started " + acceptors.size + " acceptor threads")
}
socketServer啟動時,會初始化N個Acceptor,并為其分配好對應數量的Processor,然后啟動Acceptor線程。
Acceptor啟動監(jiān)聽相關代碼
def run() {
//往selector監(jiān)聽OP_ACCEPT事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessor = 0
while (isRunning) {
try {
//開始輪詢
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
//如果有連接進來,就將它交給指定的Processor處理
if (key.isAcceptable)
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
swallowError(serverChannel.close())
swallowError(nioSelector.close())
shutdownComplete()
}
}
def accept(key: SelectionKey, processor: Processor) {
//獲取對應channel
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
try {
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socketChannel.socket().setSendBufferSize(sendBufferSize)
//Processor接收channel
processor.accept(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
close(socketChannel)
}
}
Acceptor線程啟動后,就開始監(jiān)聽端口看有沒有新的連接進來。這里使用nio實現無阻塞的監(jiān)聽請求。收到請求后就分發(fā)給它管理的其中一個Processor線程處理。
def accept(socketChannel: SocketChannel) {
//接收一個新的連接,newConnections集合表示當前Processor管理的連接
newConnections.add(socketChannel)
wakeup()
}
override def run() {
startupComplete()
//Processor線程不斷自旋
while (isRunning) {
try {
//把新接收的連接拿到并注冊OP_READ事件
configureNewConnections()
//從對應的response隊列中獲取response,然后進行相應的操作.這里不一定是將響應發(fā)送給客戶端,可能不用響應客戶端,也可能關閉連接
//另外,這個方法也不真正的發(fā)送響應,即使要發(fā)送響應給客戶端,這個方法里面也只是往對應的連接注冊OP_WRITE事件,然后等后面的poll()方法執(zhí)行時才真正將響應發(fā)送出去
processNewResponses()
//select()阻塞等待OP_READ和OP_WRITE事件被觸發(fā),然后處理,最長阻塞時間是300ms
//如果OP_READ事件就緒,說明有新的請求發(fā)送過來,這些請求的信息最終會被放入selector.completedReceives集合中,也就是List<NetworkReceive>
//如果OP_WRITE事件就緒,說明有響應需要發(fā)送出去,這時候才會將響應發(fā)送給客戶端。同時將這個連接放入completedSends表示該連接已經完成
poll()
//開始處理selector.completedReceives中的信息,最終會被封裝成RequestChannel.Request后放入request隊列中
processCompletedReceives()
//遍歷completedSends集合,將已經完成的連接從inflightResponses集合中移除
processCompletedSends()
//將已經斷開的連接從inflightResponses集合中移除
processDisconnected()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
// or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
case e: ControlThrowable => throw e
case e: Throwable =>
error("Processor got uncaught exception.", e)
}
}
debug("Closing selector - processor " + id)
swallowError(closeAll())
shutdownComplete()
}
Processor線程拿到Acceptor傳過來的請求后開始監(jiān)聽該連接的讀請求。同時還會做許多事情。比如發(fā)送響應、讀取請求、關閉連接等等。
KafkaRequestHandler 線程相關代碼
kakfa server在啟動時會初始化KafkaRequestHandlerPool類,該類在初始化時會構造一些的KafkaRequestHandler線程并啟動,構造的KafkaRequestHandler線程數量取決于配置num.io.threads的值,該配置默認值是8。
下面是KafkaRequestHandler線程的run方法
def run() {
while(true) {
try {
var req : RequestChannel.Request = null
while (req == null) {
//從request隊列中獲取請求
val startSelectTime = time.nanoseconds
req = requestChannel.receiveRequest(300)
val idleTime = time.nanoseconds - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
}
if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
return
}
req.requestDequeueTimeMs = time.milliseconds
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
//使用KafkaApis處理請求
apis.handle(req)
} catch {
case e: Throwable => error("Exception when handling request", e)
}
}
}
KafkaRequestHandler線程不斷的從請求隊列中取出請求處理。具體的請求最后交給KafkaApis處理。
KafkaApis 相關代碼
def handle(request: RequestChannel.Request) {
try {
trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
//根據請求的類型處理請求
ApiKeys.forId(request.requestId) match {
case ApiKeys.PRODUCE => handleProducerRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
if (request.requestObj != null) {
request.requestObj.handleError(e, requestChannel, request)
error("Error when handling request %s".format(request.requestObj), e)
} else {
val response = request.body.getErrorResponse(e)
/* If request doesn't have a default error response, we just close the connection.
For example, when produce request has acks set to 0 */
if (response == null)
requestChannel.closeConnection(request.processor, request)
else
requestChannel.sendResponse(new Response(request, response))
error("Error when handling request %s".format(request.body), e)
}
} finally
request.apiLocalCompleteTimeMs = time.milliseconds
}
kafkaApis根據請求的類型執(zhí)行不同的操作來處理請求。
在0.10.2版本中,kafkaApis可以處理21種類型的請求。