Kafka - 偏移量提交

Kafka - 偏移量提交


一、偏移量提交

消費(fèi)者提交偏移量的主要是消費(fèi)者往一個名為_consumer_offset的特殊主題發(fā)送消息,消息中包含每個分區(qū)的偏移量。

如果消費(fèi)者一直運(yùn)行,偏移量的提交并不會產(chǎn)生任何影響。但是如果有消費(fèi)者發(fā)生崩潰,或者有新的消費(fèi)者加入消費(fèi)者群組的時候,會觸發(fā) Kafka 的再均衡。這使得 Kafka 完成再均衡之后,每個消費(fèi)者可能被會分到新分區(qū)中。為了能夠繼續(xù)之前的工作,消費(fèi)者就需要讀取每一個分區(qū)的最后一次提交的偏移量,然后從偏移量指定的地方繼續(xù)處理。

但是這樣可能會出現(xiàn)如下的問題。

1.1 提交偏移量小于客戶端處理的偏移量

<center>
偏移量情況1.png-26.2kB
偏移量情況1.png-26.2kB

</center>

如果提交的偏移量小于客戶端處理的最后一個消息的偏移量,那么處于兩個偏移量之間的消息就會被重復(fù)處理。

1.2 提交偏移量大于客戶端處理的偏移量

<center>
偏移量情況2.png-25.7kB
偏移量情況2.png-25.7kB

</center>

如果提交的偏移量大于客戶端處理的最后一個消息的偏移量,那么處于兩個偏移量之間的消息將會丟失。

因此,如果處理偏移量,會對客戶端處理數(shù)據(jù)產(chǎn)生影響。KafkaConsumer API 提供了很多種方式來提交偏移量。

二、自動提交

自動提交是 Kafka 處理偏移量最簡單的方式。

當(dāng) enable.auto.commit 屬性被設(shè)為 true,那么每過 5s,消費(fèi)者會自動把從 poll()方法接收到的最大偏移量提交上去。這是因為提交時間間隔由 auto.commit.interval.ms 控制,默認(rèn)值是 5s。與消費(fèi)者里的其他東西一樣,自動提交也是在輪詢里進(jìn)行的。消費(fèi)者每次在進(jìn)行輪詢時會檢查是否該提交偏移量了,如果是,那么就會提交從上一次輪詢返回的偏移量。

但是使用這種方式,容易出現(xiàn)提交的偏移量小于客戶端處理的最后一個消息的偏移量這種情況的問題。假設(shè)我們?nèi)匀皇褂媚J(rèn)的 5s 提交時間間隔,在最近一次提交之后的 3s 發(fā)生了再均衡,再均衡之后,消費(fèi)者從最后一次提交的偏移量位置開始讀取消息。這個時候偏移量已經(jīng)落后了 3s(因為沒有達(dá)到5s的時限,并沒有提交偏移量),所以在這 3s 的數(shù)據(jù)將會被重復(fù)處理。

雖然可以通過修改提交時間間隔來更頻繁地提交偏移量,減小可能出現(xiàn)重復(fù)消息的時間窗的時間跨度,不過這種情況是無法完全避免的。

在使用自動提交時,每次調(diào)用輪詢方法都會把上一次調(diào)用返回的偏移量提交上去,它并不知道具體哪些消息已經(jīng)被處理了,所以在再次調(diào)用之前最好確保所有當(dāng)前調(diào)用返回的消息都已經(jīng)處理完畢(在調(diào)用 close() 方法之前也會進(jìn)行自動提交)。一般情況下不會有什么問題,不過在處理異常或提前退出輪詢時要格外小心。

三、手動提交

大部分開發(fā)者通過控制偏移量提交時間來消除丟失消息的可能性,并在發(fā)生再均衡時減少重復(fù)消息的數(shù)量。消費(fèi)者 API 提供了另一種提交偏移量的方式,開發(fā)者可以在必要的時候提交當(dāng)前偏移量,而不是基于時間間隔。

這是我們需要把把 auto.commit.offset 設(shè)為 false,讓應(yīng)用程序決定何時提交偏移量。

3.1 同步提交

使用 commitSync() 提交偏移量最簡單也最可靠。這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。

代碼示例如下:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset =
          %d, customer = %s, country = %s\n",
             record.topic(), record.partition(),
                  record.offset(), record.key(), record.value()); 
        }
        try {
          consumer.commitSync(); 
        } catch (CommitFailedException e) {
            log.error("commit failed", e) 
        }
}

commitSync() 將會提交由 poll() 返回的最新偏移量,所以在處理完所有記錄后要確保調(diào)用了 commitSync(),否則還是會有丟失消息的風(fēng)險。如果發(fā)生了再均衡,從最近一批消息到發(fā)生再均衡之間的所有消息都將被重復(fù)處理。

同時在這個程序中,只要沒有發(fā)生不可恢復(fù)的錯誤,commitSync() 方法會一直嘗試直至提交成功。如果提交失敗,我們也只能把異常記錄到錯誤日志里。

3.2 異步提交

同步提交有一個不足之處,在 broker 對提交請求作出回應(yīng)之前,應(yīng)用程序會一直阻塞,這樣會限制應(yīng)用程序的吞吐量。我們可以通過降低提交頻率來提升吞吐量,但如果發(fā)生了再均衡,會增加重復(fù)消息的數(shù)量。

這個時候可以使用異步提交 API。我們只管發(fā)送提交請求,無需等待 broker 的響應(yīng)。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    consumer.commitAsync(); 
}

在成功提交或碰到無法恢復(fù)的錯誤之前,commitSync() 會一直重試,但是 commitAsync() 不會,這也是 commitAsync() 不好的一個地方。

它之所以不進(jìn)行重試,是因為在它收到服務(wù)器響應(yīng)的時候,可能有一個更大的偏移量已經(jīng)提交成功。假設(shè)我們發(fā)出一個請求用于提交偏移量 2000,這個時候發(fā)生了短暫的通信問題,服務(wù)器收不到請求,自然也不會作出任何響應(yīng)。與此同時,我們處理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync() 重新嘗試提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。這個時候如果發(fā)生再均衡,就會出現(xiàn)重復(fù)消息。

commitAsync() 也支持回調(diào),在 broker 作出響應(yīng)時會執(zhí)行回調(diào)。回調(diào)經(jīng)常被用于記錄提交錯誤或生成度量指標(biāo)。如果要用它來進(jìn)行重試,則一定要注意提交的順序。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition,
        OffsetAndMetadata> offsets, Exception e) {
            if (e != null)
                log.error("Commit failed for offsets {}", offsets, e);
        }
      }); 
}

3.3 同步和異步混合提交

一般情況下,針對偶爾出現(xiàn)的提交失敗,不進(jìn)行重試不會有太大問題,因為如果提交失敗是因為臨時問題導(dǎo)致的,那么后續(xù)的提交總會有成功的。

但如果這是發(fā)生在關(guān)閉消費(fèi)者或再均衡前的最后一次提交,就要確保能夠提交成功。因此在這種情況下,我們應(yīng)該考慮使用混合提交的方法:

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("topic = %s, partition = %s, offset = %d,
            customer = %s, country = %s\n",
            record.topic(), record.partition(),
            record.offset(), record.key(), record.value());
        }
        consumer.commitAsync(); 
    }
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync(); 
    } finally {
        consumer.close();
    }
}
  1. 在程序正常運(yùn)行過程中,我們使用 commitAsync 方法來進(jìn)行提交,這樣的運(yùn)行速度更快,而且就算當(dāng)前提交失敗,下次提交成功也可以。
  2. 如果直接關(guān)閉消費(fèi)者,就沒有所謂的“下一次提交”了,因為不會再調(diào)用poll()方法。使用 commitSync() 方法會一直重試,直到提交成功或發(fā)生無法恢復(fù)的錯誤。

3.4 提交特定的偏移量

如果 poll() 方法返回一大批數(shù)據(jù),為了避免因再均衡引起的重復(fù)處理整批消息,想要在批次中間提交偏移量該怎么辦?這種情況無法通過調(diào)用 commitSync() 或 commitAsync() 來實(shí)現(xiàn),因為它們只會提交最后一個偏移量,而此時該批次里的消息還沒有處理完。

這時候需要使用一下的兩個方法:

/**
 * Commit the specified offsets for the specified list of topics and partitions.
 */
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)


/**
 * Commit the specified offsets for the specified list of topics and partitions to Kafka.
 */
@Override
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

消費(fèi)者 API 允許在調(diào)用 commitSync() 和 commitAsync() 方法時傳進(jìn)去希望提交的分區(qū)和偏移量的 map。

假設(shè)處理了半個批次的消息,最后一個來自主題“customers”分區(qū) 3 的消息的偏移量是 5000,你可以調(diào)用 commitSync() 方法來提交它。不過,因為消費(fèi)者可能不只讀取一個分區(qū),你需要跟蹤所有分區(qū)的偏移量,所以在這個層面上控制偏移量的提交會讓代碼變復(fù)雜。

代碼如下:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets =
    new HashMap<>(); 
int count = 0;

...

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset = %d,
        customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value()); 
        currentOffsets.put(new TopicPartition(record.topic(),
        record.partition()), new
        OffsetAndMetadata(record.offset()+1, "no metadata")); 
        if (count % 1000 == 0) 
            consumer.commitAsync(currentOffsets,null); 
        count++;
    }
}

這里調(diào)用的是 commitAsync(),不過調(diào)用commitSync()也是完全可以的。在提交特定偏移量時,仍然要處理可能發(fā)生的錯誤。

四、監(jiān)聽再均衡

如果 Kafka 觸發(fā)了再均衡,我們需要在消費(fèi)者失去對一個分區(qū)的所有權(quán)之前提交最后一個已處理記錄的偏移量。如果消費(fèi)者準(zhǔn)備了一個緩沖區(qū)用于處理偶發(fā)的事件,那么在失去分區(qū)所有權(quán)之前,需要處理在緩沖區(qū)累積下來的記錄??赡苓€需要關(guān)閉文件句柄、數(shù)據(jù)庫連接等。

在為消費(fèi)者分配新分區(qū)或移除舊分區(qū)時,可以通過消費(fèi)者 API 執(zhí)行一些應(yīng)用程序代碼,在調(diào)用 subscribe() 方法時傳進(jìn)去一個 ConsumerRebalanceListener 實(shí)例就可以了。 ConsumerRebalanceListener 有兩個需要實(shí)現(xiàn)的方法。

  1. public void onPartitionsRevoked(Collection<TopicPartition> partitions) 方法會在再均衡開始之前和消費(fèi)者停止讀取消息之后被調(diào)用。如果在這里提交偏移量,下一個接管分區(qū)的消費(fèi)者就知道該從哪里開始讀取了。
  2. public void onPartitionsAssigned(Collection<TopicPartition> partitions) 方法會在重新分配分區(qū)之后和消費(fèi)者開始讀取消息之前被調(diào)用。

下面的例子將演示如何在失去分區(qū)所有權(quán)之前通過 onPartitionsRevoked() 方法來提交偏移量。

private Map<TopicPartition, OffsetAndMetadata> currentOffsets=
  new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener { 
    public void onPartitionsAssigned(Collection<TopicPartition>
      partitions) { 
    }

    public void onPartitionsRevoked(Collection<TopicPartition>
      partitions) {
        System.out.println("Lost partitions in rebalance.
          Committing current
        offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets); 
    }
}

try {
    consumer.subscribe(topics, new HandleRebalance()); 

    while (true) {
        ConsumerRecords<String, String> records =
          consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            System.out.println("topic = %s, partition = %s, offset = %d,
             customer = %s, country = %s\n",
             record.topic(), record.partition(), record.offset(),
             record.key(), record.value());
             currentOffsets.put(new TopicPartition(record.topic(),
             record.partition()), new
             OffsetAndMetadata(record.offset()+1, "no metadata"));
        }
        consumer.commitAsync(currentOffsets, null);
    }
} catch (WakeupException e) {
    // 忽略異常,正在關(guān)閉消費(fèi)者
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync(currentOffsets);
    } finally {
        consumer.close();
        System.out.println("Closed consumer and we are done");
    }
}

如果發(fā)生再均衡,我們要在即將失去分區(qū)所有權(quán)時提交偏移量。要注意,提交的是最近處理過的偏移量,而不是批次中還在處理的最后一個偏移量。因為分區(qū)有可能在我們還在處理消息的時候被撤回。我們要提交所有分區(qū)的偏移量,而不只是那些即將失去所有權(quán)的分區(qū)的偏移量——因為提交的偏移量是已經(jīng)處理過的,所以不會有什么問題。調(diào)用 commitSync() 方法,確保在再均衡發(fā)生之前提交偏移量。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,607評論 19 139
  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,900評論 13 425
  • kafka的定義:是一個分布式消息系統(tǒng),由LinkedIn使用Scala編寫,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,539評論 1 15
  • #悅讀悅美,書香校園# 世上有兩樣?xùn)|西不可直視,一是太陽,二是人心?!獤|野圭吾《白夜行》 自《嫌疑人X的...
    陽光點(diǎn)Orz閱讀 2,362評論 31 244
  • 本來這樣的文章應(yīng)該出現(xiàn)在某一個外貿(mào)圈里,只是早上剛看了簡書出版的《想想》,就決定想在這兒提筆隨寫。 早上打開郵箱沒...
    7bc2291aac0c閱讀 202評論 0 1

友情鏈接更多精彩內(nèi)容