紹圣--kafka之生產(chǎn)者(五)

在看很多講kafka的文章里面都會說:kafka只保證單個partition的有序性,那么kafka是怎么保證有序的喃?

使用RecordAccumulator的mutePartition和unmutePartition方法來配合實現(xiàn)有序性

//記錄tp是否還有未完成的RecordBatch,保證一個tp的順序性,當一個tp對應的RecordBatch要開始發(fā)送時,就將此tp加入到muted中,tp對應的RecordBatch發(fā)送完成后,刪除muted中的tp

private final Set muted;

public void mutePartition(TopicPartition tp) { muted.add(tp); }

public void unmutePartition(TopicPartition tp) { muted.remove(tp); }

RecordAccumulator.ready方法中進行判斷(偽代碼)

public ReadyCheckResult ready(Cluster cluster, long nowMs) {

if (!readyNodes.contains(leader) && !muted.contains(part)) {}

}

if (!readyNodes.contains(leader) && !muted.contains(part)),如果muted中包含了這個tp,那么即使這個tp對應的leader存在,RecordBatch可以發(fā)送也不會去發(fā)送它,因為它上一個RecordBatch還沒有處理完成。

RecordAccumulator.drain方法中進行判斷(偽代碼)

public Map> drain(Cluster cluster, Set nodes, int maxSize, long now) {

if (!muted.contains(tp)){}

}

if (!muted.contains(tp))在對RecordAccumulator中的記錄進行重新組裝的時候,依舊會判斷對應的tp是否在muted中。在muted中的依舊不會選擇出來發(fā)送。

在Sender中的變量:guaranteeMessageOrder:是否保持單個partition的有序性

在KafkaProducer的構(gòu)造中

this.sender = new Sender(client, this.metadata, this.accumulator, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), this.metrics, new SystemTime(), clientId, this.requestTimeoutMs);

public Sender(KafkaClient client, Metadata metadata, RecordAccumulator accumulator, boolean guaranteeMessageOrder, int maxRequestSize, short acks, int retries, Metrics metrics, Time time, String clientId, int requestTimeout) { this.client = client; this.accumulator = accumulator; this.metadata = metadata; this.guaranteeMessageOrder = guaranteeMessageOrder; this.maxRequestSize = maxRequestSize; this.running = true; this.acks = acks; this.retries = retries; this.time = time; this.clientId = clientId; this.sensors = new SenderMetrics(metrics); this.requestTimeout = requestTimeout; }

guaranteeMessageOrder=config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1

我們可以在使用的時候設(shè)置max.in.flight.requests.per.connection來設(shè)置guaranteeMessageOrder的值。

mutePartition和unmutePartition方法都是在Sender中進行調(diào)用

mutePartition在Sender.run中調(diào)用

if (guaranteeMessageOrder) {

// 記錄將要發(fā)送的topicPartition到mute中

for (List batchList : batches.values()) {

for (RecordBatch batch : batchList)

this.accumulator.mutePartition(batch.topicPartition);

}

}

發(fā)送的時候,把將要提交的RecordBatch的tp加到muted中。下次再需要發(fā)送tp里的RecordBatch的時候,如果muted里面包含了此tp,就不會選擇出來發(fā)送。

在處理服務端響應的時候,清除muted中的tp

if (guaranteeMessageOrder) this.accumulator.unmutePartition(batch.topicPartition);

總結(jié):要保證單partition的有序性,需要配置max.in.flight.requests.per.connection=1。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評論 19 139
  • 學習kafka有一段時間了。關(guān)于它里面的知識還是需要總結(jié)一下,一來是能讓自己對kafka能有一個比較成型的理解,二...
    紹圣閱讀 1,200評論 0 3
  • kafka版本為0.10.1.0 大體流程 1:初始化,讀取配置,配置metrics等 2:創(chuàng)建 RecordAc...
    xcardata閱讀 697評論 0 0
  • 話說上回中,KafkaProducer已經(jīng)將生產(chǎn)的記錄追加到了RecordAccumulator中。那么接下來的事...
    紹圣閱讀 998評論 2 1
  • 前面三回在分析生產(chǎn)者時,重點在發(fā)送的主流程上:怎么生產(chǎn),怎么發(fā)送,怎么調(diào)度。略過了一個重要的環(huán)節(jié):Metadata...
    紹圣閱讀 807評論 0 0

友情鏈接更多精彩內(nèi)容