不管是把 Kafka 作為消息隊列、消息、總線還是數(shù)據(jù)存儲平臺來使用 ,總是需要有一個可以往 Kafka 寫入數(shù)據(jù)的生產者和一個可以從 Kafka讀取數(shù)據(jù)的消費者,或者一個兼具兩種角 色的應用程序。
例如,在一個信用卡事務處理系統(tǒng)里,有一個客戶端應用程序,它可能是一個在線商店, 每當有支付行為發(fā)生時,它負責把事務發(fā)送到 Kafka上。另一個應用程序根據(jù)規(guī)則引擎檢 查這個事務,決定是批準還是拒絕。 批準或拒絕的響應消息被寫回 Kafka,然后發(fā)送給發(fā)起事務的在線商店。第三個應用程序從 Kafka上讀取事務和審核狀態(tài),把它們保存到數(shù)據(jù) 庫, 隨后分析師可以對這些結果進行分析,或許還能借此改進規(guī)則引擎 。
開發(fā)者們可以使用 Kafka 內置的客戶端 API開發(fā) Kafka應用程序。
在這一章,我們將從 Kafra生產者的設計和組件講起,學習如何使用 Kafka生產者。我們將展示如何創(chuàng)建 KafkaProducer和 ProducerRecords對象、如何將記錄發(fā)送給 Kafka,以及如何處理從 Kafka 返回的錯誤,然后介紹用干控制生產者行為的重要配置選項,最后深入 探討如何使用不同的分區(qū)方法和序列化器,以及如何自定義序列化器和分區(qū)器 。
在下一章,我們將會介紹 Kafra的悄費者客戶端,以及如何從 Kafka讀取消息。
生產者概覽
一個應用程序在很多情況下需要往 Kafka 寫入消息 : 記錄用戶的活動(用于審計和分析 )、 記錄度量指標、保存日志、消息、記錄智能家電的信息、與其他應用程序進行異步通信、 緩沖即將寫入到數(shù)據(jù)庫的數(shù)據(jù),等等。
多樣的使用場景意味著多樣的需求:是否每個消息都很重要?是否允許丟失 一 小部分消息?偶爾出現(xiàn)重復消息是否可以接受?是否有嚴格的延遲和吞吐量要求?
在之前提到的信用卡事務處理系統(tǒng)里,消息丟失或消息重復是不允許的,可以接受的延遲最大為 500ms,對吞吐量要求較高,我們希望每秒鐘可以處理一百萬個消息。
保存網站的點擊信息是另 一種使用場景。在這個場景里,允許丟失少量的消息或出現(xiàn)少量 的消息重復,延遲可以高一些,只要不影響用戶體驗就行。換句話說,只要用戶點擊鏈接 后可以馬上加載頁面,那么我們并不介意消息要在幾秒鐘之后才能到達 Kafka 服務器。 吞 吐量則取決于網站用戶使用網站的頻度。
不同的使用場景對生產者 API 的使用和配置會有直接的影響。
盡管生產者 API 使用起來很簡單 ,但消息的發(fā)送過程還是有點復雜的。下圖展示 了向Kafka 發(fā)送消息的主要步驟。
Kafka 生產者組件圖
我們從創(chuàng)建 一個 ProducerRecord 對象開始, ProducerRecord 對象需要包含目標主題和要發(fā)送的內容。我們還可以指定鍵或分區(qū)。在發(fā)送 ProducerRecord對象時,生產者要先把鍵和 值對象序列化成字節(jié)數(shù)組,這樣它們才能夠在網絡上傳輸 。
接下來,數(shù)據(jù)被傳給分區(qū)器。如果之前在 ProducerRecord對象里指定了分區(qū),那么分區(qū)器就不會再做任何事情,直接把指定的分區(qū)返回。如果沒有指定分區(qū) ,那么分區(qū)器會根據(jù) ProducerRecord對象的鍵來選擇一個分區(qū) 。選好分區(qū)以后 ,生產者就知道該往哪個主題和分區(qū)發(fā)送這條記錄了。緊接著,這條記錄被添加到一個記錄批次里,這個批次里的所有消息會被發(fā)送到相同的主題和分區(qū)上。有一個獨立的線程負責把這些記錄批次發(fā)送到相應的 broker 上。
服務器在收到這些消息時會返回一個響應。如果消息成功寫入 Kafka,就返回 一 個 RecordMetaData 對象,它包含了主題和分區(qū)信息,以及記錄在分區(qū)里的偏移量。如果寫入 失敗, 就會返回 一個錯誤 。生產者在收到錯誤之后會嘗試重新發(fā)送消息,幾次之后如果還是失敗,就返回錯誤信息。
創(chuàng)建Kafka生產者
要往 Kafka寫入消息,首先要創(chuàng)建一個生產者對象,井設置一些屬性。
下面的代碼片段展示了如何創(chuàng)建一個新的生產者,這里只指定了必要的屬性,其他使用默認設置。
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.seializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
Kafka生產者有 3個必選的屬性
bootstrap.servers
該屬性指定 broker 的地址清單,地址的格式為 host:port。清單里不需要包含所有的broker地址,生產者會從給定的 broker里查找到其他 broker的信息。不過建議至少要提供兩個 broker的信息, 一旦其中一個宕機,生產者仍然能夠連接到集群上。
key.serializer
broker希望接收到的消息的鍵和值都是字節(jié)數(shù)組。生產者接口允許使用參數(shù)化類型,因此可以把 Java對象作為鍵和值發(fā)送給 broker。這樣的代碼具有良好的可讀性,不過生產者需要知道如何把這些 Java對象轉換成字節(jié)數(shù)組。 key.serializer必須被設置為一個實現(xiàn)了org.apache.kafka.common.serialization.Serializer接口的類,生產者會使用這個類把鍵對象序列化成字節(jié)數(shù)組。 Kafka 客戶端默認提供了ByteArraySerializer(這個只做很少的事情)、 StringSerializer和 IntegerSerializer,因此,如果你只使用常見的幾種 Java對象類型,那么就沒必要實現(xiàn)自己的序列化器 。要注意, key.serializer是必須設置的,就算你打算只發(fā)送值內容。
value.serializer
與 key.serializer一樣, value.serializer指定的類會將值序列化。如果鍵和值都是字符串,可以使用與 key.serializer 一樣的序列化器。如果鍵是整數(shù)類型而值是字符扇 , 那么需要使用不同的序列化器。
發(fā)送消息主要有3種方式:
1、發(fā)送并忘記( fire-and-forget):我們把消息發(fā)送給服務器,但井不關心它是否正常到達。大多數(shù)情況下,消息會正常到達,因為 Kafka是高可用的,而且生產者會自動嘗試重發(fā)。不過,使用這種方式有時候也會丟失一些消息。
2、同步發(fā)送:我們使用send()方怯發(fā)送消息, 它會返回一個Future對象,調用get()方法進行等待, 就可以知道悄息是否發(fā)送成功。
3、異步發(fā)送:我們調用 send() 方怯,并指定一個回調函數(shù), 服務器在返回響應時調用該函數(shù)。
在下面的幾個例子中 , 我們會介紹如何使用上述幾種方式來發(fā)送消息,以及如何處理可能 發(fā)生的異常情況。
本章的所有例子都使用單線程,但其實生產者是可以使用多線程來發(fā)送消息的。剛開始的 時候可以使用單個消費者和單個線程。如果需要更高的吞吐量,可以在生產者數(shù)量不變的 前提下增加線程數(shù)量。如果這樣做還不夠 , 可以增加生產者數(shù)量。
發(fā)送消息到Kafka
最簡單的同步發(fā)送消息方式如下所示 :
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
producer.send(record);
} catch(Exception e) {
e.printStack();
}
生產者的 send() 方住將 ProducerRecord對象作為參數(shù),它需要目標主題的名字和要發(fā)送的鍵和值對象,它們都是字符串。鍵和值對象的類型必須與序列化器和生產者對象相匹配。
我們使用生產者的 send() 方越發(fā)送 ProducerRecord對象。從生產者的架構圖里可以看到,消息先是被放進緩沖區(qū),然后使用單獨的線程發(fā)送到服務器端。 send() 方法會返回一個包含 RecordMetadata 的 Future對象,不過因為我們會忽略返回值,所以無法知道消息是否發(fā)送成功。如果不關心發(fā)送結果,那么可以使用這種發(fā)送方式。比如,記錄 Twitter 消息日志,或記錄不太重要的應用程序日志。
我們可以忽略發(fā)送消息時可能發(fā)生的錯誤或在服務器端可能發(fā)生的錯誤,但在發(fā)送消息之前,生產者還是有可能發(fā)生其他的異常。這些異常有可能是 SerializationException (說明序列化消息失敗)、 BufferExhaustedException 或 TimeoutException (說明緩沖區(qū)已滿),又或者是 InterruptException (說明發(fā)送線程被中斷)。
同步發(fā)送消息
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
producer.send(record).get();
} catch(Exception e) {
e.printStack();
}
在這里, producer.send() 方住先返回一個 Future對象,然后調用 Future對象的 get() 方法等待 Kafka 響應。如果服務器返回錯誤, get()方怯會拋出異常。如果沒有發(fā)生錯誤,我們會得到一個 RecordMetadata對象,可以用它獲取消息的偏移量。如果在發(fā)送數(shù)據(jù)之前或者在發(fā)送過程中發(fā)生了任何錯誤 ,比如 broker返回 了一個不允許重發(fā)消息的異?;蛘咭呀洺^了重發(fā)的次數(shù) ,那么就會拋出異常。我們只是簡單地把異常信息打印出來。
如何處理從Kafka生產者返回的錯誤
KafkaProducer一般會發(fā)生兩類錯誤。其中一類是可重試錯誤 ,這類錯誤可以通過重發(fā)消息來解決。比如對于連接錯誤,可以通過再次建立連接來解決,“無主(noleader)” 錯誤則可 以通過重新為分區(qū)選舉首領來解決。 KafkaProducer可以被配置成自動重試,如果在多次重試后仍無能解決問題,應用程序會收到一個重試異常。另一類錯誤無出通過重試解決 ,比如“消息太大”異常。對于這類錯誤, KafkaProducer不會進行任何重試,直接拋出異常。
異步發(fā)送消息
假設消息在應用程序和 Kafka集群之間一個來回需要 10ms。如果在發(fā)送完每個消息后都等待回應,那么發(fā)送 100個消息需要 1秒。但如果只發(fā)送消息而不等待響應,那么發(fā)送100個消息所需要的時間會少很多。大多數(shù)時候,我們并不需要等待響應——盡管 Kafka 會把目標主題、分區(qū)信息和消息的偏移量發(fā)送回來,但對于發(fā)送端的應用程序來說不是必需的。不過在遇到消息發(fā)送失敗時,我們需要拋出異常、記錄錯誤日志,或者把消息寫入 “錯誤消息”文件以便日后分析。
為了在異步發(fā)送消息的同時能夠對異常情況進行處理,生產者提供了回調支持 。下面是使用異步發(fā)送消息、回調的一個例子。
生產者的配置
到目前為止 , 我們只介紹了生產者的幾個必要配置參數(shù)——bootstrap.servers API 以及序列化器。
生產者還有很多可配置的參數(shù),在 Kafka文檔里都有說明,它們大部分都有合理的默認值 , 所以沒有必要去修改它們 。不過有幾個參數(shù)在內存使用、性能和可靠性方面對生產者影響比較大,接下來我們會一一說明。
1. acks
acks 參數(shù)指定了必須要有多少個分區(qū)副本收到消息,生產者才會認為消息寫入是成功的。
這個參數(shù)對消息丟失的可能性有重要影響。 該參數(shù)有如下選項。
? 如果 acks=0, 生產者在成功寫入悄息之前不會等待任何來自服務器的響應。也就是說, 如果當中出現(xiàn)了問題 , 導致服務器沒有收到消息,那么生產者就無從得知,消息也就丟 失了。不過,因為生產者不需要等待服務器的響應,所以它可以以網絡能夠支持的最大 速度發(fā)送消息,從而達到很高的吞吐量。
? 如果 acks=1,只要集群的首領節(jié)點收到消息,生產者就會收到 一個來自服務器的成功 響應。如果消息無撞到達首領節(jié)點(比如首領節(jié)點崩憤,新的首領還沒有被選舉出來), 生產者會收到一個錯誤響應,為了避免數(shù)據(jù)丟失,生產者會重發(fā)消息。不過,如果一個 沒有收到消息的節(jié)點成為新首領,消息還是會丟失。這個時候的吞吐量取決于使用的是 同步發(fā)送還是異步發(fā)送。如果讓發(fā)送客戶端等待服務器的響應(通過調用 Future對象 的 get()方法),顯然會增加延遲(在網絡上傳輸一個來回的延遲)。如果客戶端使用異步回調,延遲問題就可以得到緩解,不過吞吐量還是會受發(fā)送中消息數(shù)量的限制(比如,生 產者在收到服務器響應之前可以發(fā)送多少個消息)。
? 如果 acks=all,只有當所有參與復制的節(jié)點全部收到消息時,生產者才會收到一個來自服務器的成功響應。這種模式是最安全的,它可以保證不止一個服務器收到消息,就算有服務器發(fā)生崩潰,整個集群仍然可以運行(第 5 章將討論更多的細節(jié))。不過,它的延遲比 acks=1時更高,因為我們要等待不只一個服務器節(jié)點接收消息。
2. buffer.memory
該參數(shù)用來設置生產者內存緩沖區(qū)的大小,生產者用它緩沖要發(fā)送到服務器的消息。如果 應用程序發(fā)送消息的速度超過發(fā)送到服務器的速度,會導致生產者空間不足。這個時候, send()方法調用要么被阻塞,要么拋出異常,取決于如何設置 block.on.buffe.full 參數(shù) (在0.9.0.0版本里被替換成了max.block.ms,表示在拋出異常之前可以阻塞一段時間)。
3. compression.type
默認情況下,消息發(fā)送時不會被壓縮。該參數(shù)可以設置為 snappy、 gzip 或 lz4,它指定了消息被發(fā)送給 broker之前使用哪一種壓縮算法進行壓縮。 snappy 壓縮算怯由 Google巳發(fā)明, 它占用較少 的 CPU,卻能提供較好的性能和相當可觀的壓縮比,如果比較關注性能和網絡帶寬,可以使用這種算法。 gzip壓縮算法一般會占用較多的 CPU,但會提供更高的壓縮比,所以如果網絡帶寬比較有限,可以使用這種算法。使用壓縮可以降低網絡傳輸開銷和存儲開銷,而這往往是向 Kafka發(fā)送消息的瓶頸所在。
4. retries
生產者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區(qū)找不到首領)。在這種情況下, retries參數(shù)的值決定了生產者可以重發(fā)消息的次數(shù),如果達到這個次數(shù),生產者會放棄重試并返回錯誤。默認情況下,生產者會在每次重試之間等待 1OOms,不過可以通過 retries.backoff.ms 參數(shù)來改變這個時間間隔。建議在設置重試次數(shù)和重試時間間隔之前, 先測試一下恢復一個崩潰節(jié)點需要多少時間(比如所有分區(qū)選舉出首領需要多長時間), 讓總的重試時間比 Kafka集群從崩潰中恢復的時間長,否則生產者會過早地放棄重試。不過有些錯誤不是臨時性錯誤,沒辦怯通過重試來解決(比如“悄息太大”錯誤)。一般情 況下,因為生產者會自動進行重試,所以就沒必要在代碼邏輯里處理那些可重試的錯誤。 你只需要處理那些不可重試的錯誤或重試次數(shù)超出上限的情況。
5. batch.size
當有多個消息需要被發(fā)送到同一個分區(qū)時,生產者會把它們放在放一個批次里。該參數(shù)指定了一個批次可以使用的內存大小,按照字節(jié)數(shù)計算(而不是消息個數(shù))。當批次被填滿,批次里的所有消息會被發(fā)送出去。不過生產者井不一定都會等到批次被填滿才發(fā)送,半捕 的批次,甚至只包含一個消息的批次也有可能被發(fā)送。所以就算把批次大小設置得很大, 也不會造成延遲,只是會占用更多的內存而已。但如果設置得太小,因為生產者需要更頻繁地發(fā)送消息,會增加一些額外的開銷。
6. linger.ms
該參數(shù)指定了生產者在發(fā)送批次之前等待更多消息加入批次的時間。 KafkaProducer 會在批次填滿或 linger.ms達到上限時把批次發(fā)送出去。默認情況下,只要有可用的線程, 生產者就會把消息發(fā)送出去,就算批次里只有一個消息。把 linger.ms設置成比0大的數(shù), 讓生產者在發(fā)送批次之前等待一會兒,使更多的消息加入到這個批次 。雖然這樣會增加延遲,但也會提升吞吐量(因為一次性發(fā)送更多的消息,每個消息的開銷就變小了)。
7. client.id
該參數(shù)可以是任意的字符串,服務器會用它來識別消息的來源,還可以用在日志和配額指標里。
8. max.in.flight.requests.per.connection
該參數(shù)指定了生產者在收到服務器晌應之前可以發(fā)送多少個消息。它的值越高,就會占用越多的內存,不過也會提升吞吐量。 把它設為 1 可以保證消息是按照發(fā)送的順序寫入服務器的,即使發(fā)生了重試。
9. timeout.ms、 request.timeout.ms 和 metadata.fetch.timeout.ms
request.timeout.ms指定了生產者在發(fā)送數(shù)據(jù)時等待服務器返回響應的時間,metadata.fetch.timeout.ms指定了生產者在獲取元數(shù)據(jù)(比如目標分區(qū)的首領是誰)時等待服務器返回響應的時間。如果等待響應超時,那么生產者要么重試發(fā)送數(shù)據(jù),要么返回 一個錯誤 (拋出異常或執(zhí)行回調)。timeout.ms 指定了 broker 等待同步副本返回消息確認的時間,與 asks 的配置相匹配一一如果在指定時間內沒有收到同步副本的確認,那么 broker就會返回 一個錯誤 。
10. max.block.ms
該參數(shù)指定了在調用 send() 方法或使用 parttitionFor() 方能獲取元數(shù)據(jù)時生產者的阻塞 時間。當生產者的發(fā)送緩沖區(qū)已捕,或者沒有可用的元數(shù)據(jù)時,這些方屈就會阻塞。在阻塞時間達到 max.block.ms時,生產者會拋出超時異常。
11 . max.request.size
該參數(shù)用于控制生產者發(fā)送的請求大小。它可以指能發(fā)送的單個消息的最大值,也可以指單個請求里所有消息總的大小。例如,假設這個值為 1MB,那么可以發(fā)送的單個最大消息為 1MB,或者生產者可以在單個請求里發(fā)送一個批次,該批次包含了 1000個消息,每個消息大小為 1KB 。另外, broker對可接收的消息最大值也有自己的限制( message.max.bytes),所以兩邊的配置最好可以匹配,避免生產者發(fā)送的消息被 broker拒絕 。
12. receive.buffer.bytes 和 send.buffer.bytes
這兩個參數(shù)分別指定了 TCP socket接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)大小 。 如果它們被設為 -1 , 就使用操作系統(tǒng)的默認值。如果生產者或消費者與 broker處于不同的數(shù)據(jù)中心,那么可以適當增大這些值,因為跨數(shù)據(jù)中心的網絡一般都有比較高的延遲和比較低的帶寬。
順序保證
Kafka可以保證同一個分區(qū)里的消息是有序的。也就是說,如果生產者按照一定的順序發(fā)送消息, broker就會按照這個順序把它們寫入分區(qū),消費者也會按照同樣的順序讀取它們。在某些情況下 , 順序是非常重要的。如果把retries 設為非零整數(shù),同時把 max.in.flight.requests.per.connection 設為比 1大的數(shù),那么,如果第一個批次消息寫入失敗,而第二個批次寫入成功, broker會重試寫入第一個批次。如果此時第一個批次也寫入成功,那 么兩個批次的順序就反過來了。
一般來說,如果某些場景要求消息是有序的,那么消息是否寫入成功也是 很關鍵的,所以不建議把順序是非常重要的。如果把retries 設為 0。可以把 max.in.flight.requests.per.connection設為 1,這樣在生產者嘗試發(fā)送第一批消息時,就不會有其他的消息發(fā)送給 broker。不過這樣會嚴重影響生產者的吞吐量 ,所以只有在 對消息的順序有嚴格要求的情況下才能這么做。