Kafka生產(chǎn)者:寫(xiě)消息到Kafka

本章我們將會(huì)討論Kafka生產(chǎn)者是如何發(fā)送消息到Kafka的。Kafka項(xiàng)目有一個(gè)生產(chǎn)者客戶端,我們可以通過(guò)這個(gè)客戶端的API來(lái)發(fā)送消息。

概要

當(dāng)我們發(fā)送消息之前,先問(wèn)幾個(gè)問(wèn)題:每條消息都是很關(guān)鍵且不能容忍丟失么?偶爾重復(fù)消息可以么?我們關(guān)注的是消息延遲還是寫(xiě)入消息的吞吐量?
舉個(gè)例子,有一個(gè)信用卡交易處理系統(tǒng),當(dāng)交易發(fā)生時(shí)會(huì)發(fā)送一條消息到Kafka,另一個(gè)服務(wù)來(lái)讀取消息并根據(jù)規(guī)則引擎來(lái)檢查交易是否通過(guò),將結(jié)果通過(guò)Kafka返回。對(duì)于這樣的業(yè)務(wù),消息既不能丟失也不能重復(fù),由于交易量大因此吞吐量需要盡可能大,延遲可以稍微高一點(diǎn)。
再舉個(gè)例子,假如我們需要收集用戶在網(wǎng)頁(yè)上的點(diǎn)擊數(shù)據(jù),對(duì)于這樣的場(chǎng)景,少量消息丟失或者重復(fù)是可以容忍的,延遲多大都不重要只要不影響用戶體驗(yàn),吞吐則根據(jù)實(shí)時(shí)用戶數(shù)來(lái)決定。
不同的業(yè)務(wù)需要使用不同的寫(xiě)入方式和配置。后面我們將會(huì)討論這些API,現(xiàn)在先看下生產(chǎn)者寫(xiě)消息的基本流程:


image.png

流程如下:

  1. 首先,我們需要?jiǎng)?chuàng)建一個(gè)ProducerRecord,這個(gè)對(duì)象需要包含消息的主題(topic)和值(value),可以選擇性指定一個(gè)鍵值(key)或者分區(qū)(partition)。
  2. 發(fā)送消息時(shí),生產(chǎn)者會(huì)對(duì)鍵值和值序列化成字節(jié)數(shù)組,然后發(fā)送到分配器(partitioner)。
  3. 如果我們指定了分區(qū),那么分配器返回該分區(qū)即可;否則,分配器將會(huì)基于鍵值來(lái)選擇一個(gè)分區(qū)并返回。
  4. 選擇完分區(qū)后,生產(chǎn)者知道了消息所屬的主題和分區(qū),它將這條記錄添加到相同主題和分區(qū)的批量消息中,另一個(gè)線程負(fù)責(zé)發(fā)送這些批量消息到對(duì)應(yīng)的Kafka broker。
  5. 當(dāng)broker接收到消息后,如果成功寫(xiě)入則返回一個(gè)包含消息的主題、分區(qū)及位移的RecordMetadata對(duì)象,否則返回異常。
  6. 生產(chǎn)者接收到結(jié)果后,對(duì)于異常可能會(huì)進(jìn)行重試。

創(chuàng)建Kafka生產(chǎn)者

創(chuàng)建Kafka生產(chǎn)者有三個(gè)基本屬性:

  • bootstrap.servers:屬性值是一個(gè)host:port的broker列表。這個(gè)屬性指定了生產(chǎn)者建立初始連接的broker列表,這個(gè)列表不需要包含所有的broker,因?yàn)樯a(chǎn)者建立初始連接后會(huì)從相應(yīng)的broker獲取到集群信息。但建議指定至少包含兩個(gè)broker,這樣一個(gè)broker宕機(jī)后生產(chǎn)者可以連接到另一個(gè)broker。
  • key.serializer:屬性值是類的名稱。這個(gè)屬性指定了用來(lái)序列化鍵值(key)的類。Kafka broker只接受字節(jié)數(shù)組,但生產(chǎn)者的發(fā)送消息接口允許發(fā)送任何的Java對(duì)象,因此需要將這些對(duì)象序列化成字節(jié)數(shù)組。key.serializer指定的類需要實(shí)現(xiàn)org.apache.kafka.common.serialization.Serializer接口,Kafka客戶端包中包含了幾個(gè)默認(rèn)實(shí)現(xiàn),例如ByteArraySerializer、StringSerializer和IntegerSerializer。
  • value.serializer:屬性值是類的名稱。這個(gè)屬性指定了用來(lái)序列化消息記錄的類,與key.serializer差不多。

Maven依賴

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.0</version>
    </dependency>

下面是一個(gè)樣例代碼:

private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);

創(chuàng)建完生產(chǎn)者后,我們可以發(fā)送消息。Kafka中有三種發(fā)送消息的方式:

  • 只發(fā)不管結(jié)果(fire-and-forget):只調(diào)用接口發(fā)送消息到Kafka服務(wù)器,但不管成功寫(xiě)入與否。由于Kafka是高可用的,因此大部分情況下消息都會(huì)寫(xiě)入,但在異常情況下會(huì)丟消息。
  • 同步發(fā)送(Synchronous send):調(diào)用send()方法返回一個(gè)Future對(duì)象,我們可以使用它的get()方法來(lái)判斷消息發(fā)送成功與否。
  • 異步發(fā)送(Asynchronous send):調(diào)用send()時(shí)提供一個(gè)回調(diào)方法,當(dāng)接收到broker結(jié)果后回調(diào)此方法。

本章的例子都是單線程發(fā)送的,但生產(chǎn)者對(duì)象是線程安全的,它支持多線程發(fā)送消息來(lái)提高吞吐。需要的話,我們可以使用多個(gè)生產(chǎn)者對(duì)象來(lái)進(jìn)一步提高吞吐。

發(fā)送消息到Kafka

最簡(jiǎn)單的發(fā)送消息方式如下:

ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");

try {
  producer.send(record);
} catch (Exception e) {
  e.printStackTrace();
}

這里做了如下幾件事:

  1. 我們創(chuàng)建了一個(gè)ProducerRecord,并且指定了主題以及消息的key/value。主題總是字符串類型的,但key/value則可以是任意類型,在本例中也是字符串。需要注意的是,這里的key/value的類型需要與serializer和生產(chǎn)者的類型匹配。
  2. 使用send()方法來(lái)發(fā)送消息,該方法會(huì)返回一個(gè)RecordMetadata的Future對(duì)象,但由于我們沒(méi)有跟蹤Future對(duì)象,因此并不知道發(fā)送結(jié)果。如前所述,這種方式可能會(huì)丟失消息。
  3. 雖然我們忽略了發(fā)送消息到broker的異常,但是我們調(diào)用send()方法時(shí)仍然可能會(huì)遇到一些異常,例如序列化異常、發(fā)送緩沖區(qū)溢出異常等等。

同步發(fā)送消息

同步發(fā)送方式可以簡(jiǎn)單修改如下:

ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");

try {
  producer.send(record).get();
} catch (Exception e) {
  e.printStackTrace();
}

注意到,這里使用了Future.get()來(lái)獲取發(fā)送結(jié)果,如果發(fā)送消息失敗則會(huì)拋出異常,否則返回一個(gè)RecordMetadata對(duì)象。發(fā)送失敗異常包含:1)broker返回不可恢復(fù)異常,生產(chǎn)者直接拋出該異常;2)對(duì)于broker其他異常,生產(chǎn)者會(huì)進(jìn)行重試,如果重試超過(guò)一定次數(shù)仍不成功則拋出異常。

可恢復(fù)異常指的是,如果生產(chǎn)者進(jìn)行重試可能會(huì)成功,例如連接異常;不可恢復(fù)異常則是進(jìn)行重試也不會(huì)成功的異常,例如消息內(nèi)容過(guò)大。

異步發(fā)送消息

首先了解下什么場(chǎng)景下需要異步發(fā)送消息。假如生產(chǎn)者與broker之間的網(wǎng)絡(luò)延時(shí)為10ms,我們發(fā)送100條消息,發(fā)送每條消息都等待結(jié)果,那么需要1秒的時(shí)間。而如果我們采用異步的方式,幾乎沒(méi)有任何耗時(shí),而且我們還可以通過(guò)回調(diào)知道消息的發(fā)送結(jié)果。

異步發(fā)送消息的樣例如下:

public class DemoProducerCallback implements Callback {
  @Override
  public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (e != null) {
      e.printStackTrace();
    }
  }
}

ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");

producer.send(record, new DemoProducerCallback());

異步回調(diào)的類需要實(shí)現(xiàn)org.apache.kafka.clients.producer.Callback接口,這個(gè)接口只有一個(gè)onCompletion方法。當(dāng)Kafka返回異常時(shí),異常值不為null,代碼中只是簡(jiǎn)單的打印,但我們可以采取其他處理方式。

kafka生產(chǎn)者 配置

  1. ackstimeout.ms
  • timeout.ms(0.9.0.0版本中就被棄用)
    指定了 broker 等待同步副本返回消息確認(rèn)的時(shí)間,與 asks 的配置相匹配——如果在指定時(shí)間內(nèi)沒(méi)有收到同步副本的確認(rèn),那么 broker 就會(huì)返回一個(gè)錯(cuò)誤。

  • acks = 1
    指定了必須要有多少個(gè)分區(qū)副本收到消息,生產(chǎn)者才會(huì)認(rèn)為消息寫(xiě)入是成功的。這個(gè)參數(shù)對(duì)消
    息丟失的可能性有重要影響。該參數(shù)有如下選項(xiàng):

    • acks=0,生產(chǎn)者在成功寫(xiě)入消息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng)。也就是說(shuō),如果當(dāng)中
      出現(xiàn)了問(wèn)題,導(dǎo)致服務(wù)器沒(méi)有收到消息,那么生產(chǎn)者就無(wú)從得知,消息也就丟失了。不過(guò),因?yàn)?br> 生產(chǎn)者不需要等待服務(wù)器的響應(yīng),所以它可以以網(wǎng)絡(luò)能夠支持的最大速度發(fā)送消息,從而達(dá)到很
      高的吞吐量。

    • acks=1,只要集群的 Leader 節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng)。如果消息無(wú)法到達(dá) Leader 節(jié)點(diǎn)(比如首領(lǐng)節(jié)點(diǎn)崩潰,新的 Leader 還沒(méi)有被選舉出來(lái)),生產(chǎn)者會(huì)收到一個(gè)錯(cuò)誤響應(yīng),為了避免數(shù)據(jù)丟失,生產(chǎn)者會(huì)重發(fā)消息。不過(guò),如果一個(gè)沒(méi)有收到消息的節(jié)點(diǎn)成為新Leader,消息還是會(huì)丟失。這個(gè)時(shí)候的吞吐量取決于使用的是同步發(fā)送還是異步發(fā)送。如果讓發(fā)送客戶端等待服務(wù)器的響應(yīng)(通過(guò)調(diào)用 Future 對(duì)象的 get() 方法),顯然會(huì)增加延遲(在網(wǎng)絡(luò)上傳輸一個(gè)來(lái)回的延遲)。如果客戶端使用回調(diào),延遲問(wèn)題就可以得到緩解,不過(guò)吞吐量還是會(huì)受發(fā)送中消息數(shù)量的限制(比如,生產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少個(gè)消息)。

    • 如果 acks=all,只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時(shí),生產(chǎn)者才會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng)。這種模式是最安全的,它可以保證不止一個(gè)服務(wù)器收到消息,就算有服務(wù)器發(fā)生崩潰,整個(gè)集群仍然可以運(yùn)行。不過(guò),它的延遲比 acks=1 時(shí)更高,因?yàn)槲覀円却恢灰粋€(gè)服務(wù)器節(jié)點(diǎn)接收消息。

  1. buffer.memory=33554432
    該參數(shù)用來(lái)設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小,生產(chǎn)者用它緩沖要發(fā)送到服務(wù)器的消息。如果生產(chǎn)消息的速度超過(guò)發(fā)送的速度,會(huì)導(dǎo)致生產(chǎn)者空間不足。這個(gè)時(shí)候, send()方法調(diào)用要么被阻塞,要么拋出異常,取決于如何設(shè)置 block.on.buffer.full 參數(shù)(在 0.9.0.0 版本里被替換成了max.block.ms,表示在拋出異常之前可以阻塞一段時(shí)間)

  2. compression.type=none
    默認(rèn)情況下,消息發(fā)送時(shí)不會(huì)被壓縮。該參數(shù)可以設(shè)置為 snappy、gziplz4,它指定了消息被發(fā)送給 broker 之前使用哪一種壓縮算法進(jìn)行壓縮。

  • snappy 壓縮算法由 Google 發(fā)明,占用較少的 CPU,卻能提供較好的性能和相當(dāng)可觀的壓縮比,如果比較關(guān)注性能和網(wǎng)絡(luò)帶寬,可以使用這種算法。
  • gzip 壓縮算法一般會(huì)占用較多的 CPU,但會(huì)提供更高的壓縮比,所以如果網(wǎng)絡(luò)帶寬比較有限,可以使用這種算法。

使用壓縮可以降低網(wǎng)絡(luò)傳輸開(kāi)銷和存儲(chǔ)開(kāi)銷,而這往往是向 Kafka 發(fā)送消息的瓶頸所在。

  1. retriesretry.backoff.ms
  • retries=0
    生產(chǎn)者從服務(wù)器收到的錯(cuò)誤有可能是臨時(shí)性的錯(cuò)誤(比如分區(qū)找不到 Leader)。在這種情況下,retries
    參數(shù)的值決定了生產(chǎn)者可以重發(fā)消息的次數(shù),如果達(dá)到這個(gè)次數(shù),生產(chǎn)者會(huì)放棄重試并返回錯(cuò)誤。

  • retry.backoff.ms=100
    默認(rèn)情況下,生產(chǎn)者會(huì)在每次重試之間等待 100ms,不過(guò)可以通過(guò) retry.backoff.ms 參數(shù)來(lái)改變這個(gè)時(shí)間間隔。建議在設(shè)置重試次數(shù)和重試時(shí)間間隔之前,先測(cè)試一下恢復(fù)一個(gè)崩潰節(jié)點(diǎn)需要多少時(shí)間(比如所有分區(qū)選舉出 Leader 需要多長(zhǎng)時(shí)間),讓總的重試時(shí)間比 Kafka 集群從崩潰中恢復(fù)的時(shí)間長(zhǎng),否則生產(chǎn)者會(huì)過(guò)早地放棄重試。

  • 不過(guò)有些錯(cuò)誤不是臨時(shí)性錯(cuò)誤,沒(méi)辦法通過(guò)重試來(lái)解決(比如“消息太大”錯(cuò)誤)。一般情況下,因?yàn)樯a(chǎn)者會(huì)自動(dòng)進(jìn)行重試,所以就沒(méi)必要在代碼邏輯里處理那些可重試的錯(cuò)誤。你只需要處理那些不可重試的錯(cuò)誤和重試次數(shù)超出上限的情況。

  1. batch.sizelinger.ms
  • batch.size:=16384
    當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放在同一個(gè)批次里。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算(而不是消息個(gè)數(shù))。
  • linger.ms:=0
    指定了生產(chǎn)者在每次發(fā)送消息的時(shí)間間隔

當(dāng)批次被填滿 或者 等待時(shí)間達(dá)到 linger.ms設(shè)置的間隔時(shí)間,批次里的所有消息會(huì)被發(fā)送出去,哪怕此時(shí)該批次只有一條消息。
所以就算把批次大小設(shè)置得很大,也不會(huì)造成延遲,只是會(huì)占用更多的內(nèi)存而已。但如果設(shè)置得太小,因?yàn)樯a(chǎn)者需要更頻繁地發(fā)送消息,會(huì)增加一些額外的開(kāi)銷。

  1. client.id=''
    該參數(shù)可以是任意的字符串,服務(wù)器會(huì)用它來(lái)識(shí)別消息的來(lái)源

  2. max.in.flight.requests.per.connection=5
    該參數(shù)指定了生產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少個(gè)消息。它的值越高,就會(huì)占用越多的內(nèi)存,不過(guò)也會(huì)提升吞吐量。把它設(shè)為 1 可以保證消息是按照發(fā)送的順序?qū)懭敕?wù)器的,即使發(fā)生了重試。

如何保證順序性:如果把 retries 設(shè)為非零整數(shù),同時(shí)把 max.in.flight.requests.per.connection 設(shè)為比 1 大的數(shù),那么,如果第一個(gè)批次消息寫(xiě)入失敗,而第二個(gè)批次寫(xiě)入成功,broker 會(huì)重試寫(xiě)入第一個(gè)批次。如果此時(shí)第一個(gè)批次也寫(xiě)入成功,那么兩個(gè)批次的順序就反過(guò)來(lái)了。

一般來(lái)說(shuō),如果某些場(chǎng)景要求消息是有序的,那么消息是否寫(xiě)入成功也是很關(guān)鍵的,所以不建議把retries設(shè)為 0。可以把 max.in.flight.requests.per.connection 設(shè)為 1,這樣在生產(chǎn)者嘗試發(fā)送第一批消息時(shí),就不會(huì)有其他的消息發(fā)送給broker。不過(guò)這樣會(huì)嚴(yán)重影響生產(chǎn)者的吞吐量,所以只有在對(duì)消息的順序有嚴(yán)格要求的情況下才能這么做。

  1. request.timeout.msmetadata.fetch.timeout.ms
  • request.timeout.ms=305000
    指定了生產(chǎn)者在發(fā)送數(shù)據(jù)時(shí)等待服務(wù)器返回響應(yīng)的時(shí)間
  • metadata.fetch.timeout.ms (0.9.0.0版本中就被棄用)
    指定了生產(chǎn)者在獲取元數(shù)據(jù)(比如目標(biāo)分區(qū)的 Leader 是誰(shuí))時(shí)等待服務(wù)器返回響應(yīng)的時(shí)間。如果等待響應(yīng)超時(shí),那么生產(chǎn)者要么重試發(fā)送數(shù)據(jù),要么返回一個(gè)錯(cuò)誤(拋出異?;驁?zhí)行回調(diào))。
  1. max.request.size=1048576
    該參數(shù)用于控制生產(chǎn)者發(fā)送的請(qǐng)求大小。它可以指能發(fā)送的單個(gè)消息的最大值,也可以指單個(gè)請(qǐng)求里所有消息總的大小。例如,假設(shè)這個(gè)值為 1MB,那么可以發(fā)送的單個(gè)最大消息為 1MB,或者生產(chǎn)者可以在單個(gè)請(qǐng)求里發(fā)送一個(gè)批次,該批次包含了 1000 個(gè)消息,每個(gè)消息大小為 1KB。另外,broker 對(duì)可接收的消息最大值也有自己的限制(message.max.bytes),所以兩邊的配置最好可以匹配,避免生產(chǎn)者發(fā)送的消息被 broker 拒絕。

注意區(qū)分 batch.size只是針對(duì)一個(gè) topic 的 partition,而 max.request.size針對(duì)單次請(qǐng)求的。

  1. receive.buffer.bytes=32768 和 send.buffer.bytes=131072
    這兩個(gè)參數(shù)分別指定了 TCP socket 接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)大小。如果它們被設(shè)為 -1,就使用操作系統(tǒng)的默認(rèn)值。如果生產(chǎn)者或消費(fèi)者與 broker 處于不同的數(shù)據(jù)中心,那么可以適當(dāng)增大這些值,因?yàn)榭鐢?shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬。

關(guān)于更多的配置信息,可以查看:http://kafka.apachecn.org/documentation.html#configuration

完整實(shí)例

package com.neuedu;

import java.util.Properties;
import org.apache.kafka.clients.producer.*;

public class Producer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers",
                "hadoop03:9092,hadoop05:9092,hadoop06:9092");//該地址是集群的子集,用來(lái)探測(cè)集群。
        props.put("acks", "all");// 記錄完整提交,最慢的但是最大可能的持久化
        props.put("retries", 3);// 請(qǐng)求失敗重試的次數(shù)
        props.put("batch.size", 16384);// batch的大小
        props.put("linger.ms", 1);// 默認(rèn)情況即使緩沖區(qū)有剩余的空間,也會(huì)立即發(fā)送請(qǐng)求,設(shè)置一段時(shí)間用來(lái)等待從而將緩沖區(qū)填的更多,單位為毫秒,producer發(fā)送數(shù)據(jù)會(huì)延遲1ms,可以減少發(fā)送到kafka服務(wù)器的請(qǐng)求數(shù)據(jù)
        props.put("buffer.memory", 33554432);// 提供給生產(chǎn)者緩沖內(nèi)存總量
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");// 序列化的方式,
        // ByteArraySerializer或者StringSerializer
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 100; i++)
        {
            producer.send(new ProducerRecord<String, String>("payment", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();

    }
}

通過(guò)上面的一些講解,應(yīng)該已經(jīng)可以比較友好的使用 kafka生產(chǎn)者了,接下來(lái)我們還剩下最后一個(gè)部分,kafka的分區(qū)

分區(qū)
我們創(chuàng)建消息的時(shí)候,必須要提供主題和消息的內(nèi)容,而消息的key是可選的,當(dāng)不指定key時(shí)默認(rèn)為null。消息的key有兩個(gè)重要的作用:1)提供描述消息的額外信息;2)用來(lái)決定消息寫(xiě)入到哪個(gè)分區(qū),所有具有相同key的消息會(huì)分配到同一個(gè)分區(qū)中。
如果key為null,那么生產(chǎn)者會(huì)使用默認(rèn)的分配器,該分配器使用輪詢(round-robin)算法來(lái)將消息均衡到所有分區(qū)。
如果key不為null而且使用的是默認(rèn)的分配器,那么生產(chǎn)者會(huì)對(duì)key進(jìn)行哈希并根據(jù)結(jié)果將消息分配到特定的分區(qū)。注意的是,在計(jì)算消息與分區(qū)的映射關(guān)系時(shí),使用的是全部的分區(qū)數(shù)而不僅僅是可用的分區(qū)數(shù)。這也意味著,如果某個(gè)分區(qū)不可用(雖然使用復(fù)制方案的話這極少發(fā)生),而消息剛好被分配到該分區(qū),那么將會(huì)寫(xiě)入失敗。另外,如果需要增加額外的分區(qū),那么消息與分區(qū)的映射關(guān)系將會(huì)發(fā)生改變,因此盡量避免這種情況。
自定義分配器
在kafka配置參數(shù)時(shí)設(shè)置分區(qū)器的類
//設(shè)置自定義分區(qū)
kafkaProps.put("partitioner.class", "com.chb.partitioner.MyPartitioner");

現(xiàn)在來(lái)看下如何自定義一個(gè)分配器,下面將key為Banana的消息單獨(dú)放在一個(gè)分區(qū),與其他的消息進(jìn)行分區(qū)隔離:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

public class BananaPartitioner implements Partitioner {
    public void configure(Map<String, ?> configs) {}
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if ((keyBytes == null) || (!(key instanceOf String)))
        throw new InvalidRecordException("We expect all messages to have customer name as key")
    if (((String) key).equals("Banana"))
        return numPartitions - 1; // Banana will always go to last partition
   
     // Other records will get hashed to the rest of the partitions
    return (Math.abs(Utils.murmur2(keyBytes)) % numPartitions)
    }
    
    public void close() {}
 
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容