Kafka源碼分析(六)消息發(fā)送可靠性——acks

接下去幾篇關(guān)于可靠性的文章全部只討論一個經(jīng)典問題:
Kafka怎么樣才能不丟消息?

怎么樣的情況叫做丟消息?客戶端調(diào)用future = send(msg, callback),但是中途報錯了,這種情況不叫丟消息。真正丟消息的場景是,客戶端調(diào)用了future = send(msg, callback)后,Broker已經(jīng)明確告知客戶端,這條消息已經(jīng)發(fā)送成功了(future.isDone為true,或者callback的onSuccess被調(diào)用),但是消費者缺永久性消費不到這條數(shù)據(jù)。

在生產(chǎn)者上,有一個參數(shù)叫做acks,
如果acks=0,代表消息一旦被發(fā)送到Socket buffer中,就已經(jīng)可以考慮消息發(fā)送成功,這個顯然是不安全的,不做討論;
如果acks=1,代表消息只要在1ISR中被持久化成功后,Broker就可以告訴生產(chǎn)者,消息已經(jīng)發(fā)送成功了。
如果acks=all,代表消息需要在所有ISR都被持久化成功后,Broker才可以告訴生產(chǎn)者,消息已經(jīng)發(fā)送成功了。

假如Broker關(guān)于測試Topic的Replic設(shè)置為3,也就是說正常情況下ISR為3。
首先將生產(chǎn)者的acks配置為1(acks=1)
消息被發(fā)送到Broker后,是由該TopicPartition的Leader處理。Leader會調(diào)用appendToLocalLog將消息持久化在本地。

val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
        isFromClient = isFromClient, entriesPerPartition, requiredAcks)

持久化成功后,如果Leader立刻用reponse通知生產(chǎn)者,說,消息已經(jīng)發(fā)送成功了,萬一這時Leader掛了,那么消息就丟失了,消費者將沒有辦法消費到這條數(shù)據(jù)。

將生產(chǎn)者的acks配置為all(acks=-1/all)
Leader調(diào)用appendToLocalLog將消息持久化在本地后,不會立馬給生產(chǎn)者返回,而是啟動一個DelayedProduce(延時發(fā)送任務(wù))。

if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
    // create delayed produce operation
    val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
    val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

    // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
    val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq

    // try to complete the request immediately, otherwise put it into the purgatory
    // this is because while the delayed produce operation is being created, new
    // requests may arrive and hence make this operation completable.
    delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else {
    // we can respond immediately
    val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
    responseCallback(produceResponseStatus)
}

// If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete
//
// 1. required acks = -1
// 2. there is data to append
// 3. at least one partition append was successful (fewer errors than partitions)
private def delayedProduceRequestRequired(requiredAcks: Short,
                                        entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                        localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
    requiredAcks == -1 &&
    entriesPerPartition.nonEmpty &&
    localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
}

這時對于生產(chǎn)者而言,它還沒有被通知消息已經(jīng)發(fā)送成功了。即使這個時候這個Leader掛了,也不能算是消息丟失,只是生產(chǎn)者需要重新發(fā)送下就好。

問題還沒有結(jié)束,對于那個Leader而言,剛剛說到它只是創(chuàng)建了一個DelayedProduce,它什么時候才會給生產(chǎn)者回復(fù)呢。問題就到了這個DelayedProduce身上,延時是不可能無休止的,查看到DelayedProduce的tryComplete方法,只要滿足了下面的這個條件,DelayedProduce這個延時任務(wù)就需要開始執(zhí)行。

// kafka.server.DelayedProduce#tryComplete
override def tryComplete(): Boolean = {
    // check for each partition if it still has pending acks
    produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
        if (status.acksPending) {
        val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {
            // code 
            // 返回false的情況很多,但是我們目前只關(guān)注返回true的情況,所以這里需要跟進去
            partition.checkEnoughReplicasReachOffset(status.requiredOffset)
            // code
        }
        // code return false
    }

    // ..
}

def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
    leaderReplicaIfLocal match {
        // code

        val minIsr = leaderReplica.log.get.config.minInSyncReplicas
        // 足夠數(shù)量的ISR同步到了待發(fā)送的這條消息
        if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
            /*
            * The topic may be configured not to accept messages if there are not enough replicas in ISR
            * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
            */
            if (minIsr <= curInSyncReplicas.size)
            (true, Errors.NONE)
            else
            (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
        } else
            (false, Errors.NONE)
        case None =>
        (false, Errors.NOT_LEADER_FOR_PARTITION)
    }
}

簡述這段邏輯就是,當(dāng)足夠數(shù)量的ISR同步到了待發(fā)送的這條消息,DelayedProduce會主動給生產(chǎn)者發(fā)送成功的響應(yīng),也就是下面這段邏輯。

// kafka.server.DelayedProduce#onComplete
override def onComplete() {
    val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
    responseCallback(responseStatus)
}

生產(chǎn)者順利收到Broker的響應(yīng)后,消息就成功發(fā)送。這時,萬一Leader掛了,就不怕了,剩下存活著的ISR中的某一個會被選為新的Leader(這個邏輯之后再聊),消費者照樣還是能消費到這條消息的。

問題解決了么?還沒有,請看下篇分解。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容