攔截器
- 客戶端kafka攔截器 攔截器類似于spring等切面,可以對生產者和消費者收發(fā)消息的過程進行自動增強;分為生產者攔截器和消費者攔截器
- 生產者攔截器允許你在發(fā)送消息前以及消息提交成功后植入你的攔截器邏輯;而消費者攔截器支持在消費消息前以及提交位移后編寫特定邏輯。這兩種攔截器都支持鏈的方式,即你可以將一組攔截器串連成一個大的攔截器,Kafka 會按照添加順序依次執(zhí)行攔截器邏輯;
- 具體的代碼配置
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
// 攔截器 1
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor");
// 攔截器 2
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
- 生產者攔截器類需要繼承 org.apache.kafka.clients.producer.ProducerInterceptor 類并實現(xiàn)其中的方法,其中OnSend是在消息發(fā)送之前執(zhí)行的; onAcknowledgement是在消息提交成功或者消息提交失敗的時候處理的,onAcknowledgement 的調用要早于 callback 的調用,處理的線程和callback不是一個線程;
- 消費者攔截器類需要繼承的 org.apache.kafka.clients.consumer.ConsumerInterceptor ,并實現(xiàn)其中的OnConsumer 和 onCommit分別代表消息在消費之間進行了一側過濾,onConsumer是是在提交位移之后處理的。
- kafka攔截器是一個低頻使用的功能,但是可以作為一種監(jiān)控等使用;
Kafka TCP連接的那些事
- kafka生產者創(chuàng)建tcp連接
- kafka在消息傳輸協(xié)議上使用的是TCP連接,關于生產者在什么時候創(chuàng)建tcp連接以及什么時候關閉tcp連接的
- 生產者在Producer被初始化的時候,創(chuàng)建tcp連接,這是最早進行的tcp連接;在使用的過程中如果嘗試給一個不存在(當前存在的tcp連接中)topic發(fā)送消息的時候,或者如果設置定期更新tcp連接集群信息的時候,回你創(chuàng)建tcp連接(此時創(chuàng)建多少個?)
- 生產者什么時候關閉tcp連接:一種是用戶主動關閉,一般是應用程序的關閉或者是調用producer.close進行關閉;一種是 Kafka 自動關閉,Producer 端參數(shù) connections.max.idle.ms 的值有關。默認情況下該參數(shù)值是 9 分鐘,即如果在 9 分鐘內沒有任何請求“流過”某個 TCP 連接,那么 Kafka 會主動幫你把該 TCP 連接關閉。如果設置成-1的話,會產生很多僵尸連接;
- 因此在創(chuàng)建生產者的時候,不需要指定整個集群的所有地址,如果指定的話,在初始化的時候就會創(chuàng)建很多tcp連接,造成資源的不必要浪費
- kafka消費者創(chuàng)建tcp連接與關閉連接
- 消費者也是使用tcp協(xié)議進行的,但是消費端不是在初始化的時候就創(chuàng)建tcp連接了,而是在poll的時候才會創(chuàng)建tcp連接,具體的過程分為以下三個步驟
- 創(chuàng)建連接 來 find coordinator
- 根據(jù)find coordinator獲取到的真正broker的信息,創(chuàng)建與 coordinator 的tcp連接,coordinator負責對consumer的注冊,管理,位移等操作
- 在消費不同分區(qū)的消息的時候,創(chuàng)建與該分區(qū)副本leader的broker建立連接,比如說現(xiàn)在有五個分區(qū)消息需要消費,但是只有兩個broker的話,最多會創(chuàng)建 2 個tcp連接;這個連接創(chuàng)建之后,第一個連接會被后臺在某個時間段進行關閉
- 什么時候關閉連接
- 主動關閉,直接kill或者調用close()方法進行關閉
- 被動關閉,消費者端參數(shù) connection.max.idle.ms控制的,該參數(shù)現(xiàn)在的默認值是 9 分鐘,即如果某個 Socket 連接上連續(xù) 9 分鐘都沒有任何請求“過境”的話,那么消費者會強行“殺掉”這個 Socket 連接,如果寫了個程序,用循環(huán)poll的話,建立起一個偽長連接的話,socket一直不會斷開
- 消費者也是使用tcp協(xié)議進行的,但是消費端不是在初始化的時候就創(chuàng)建tcp連接了,而是在poll的時候才會創(chuàng)建tcp連接,具體的過程分為以下三個步驟
- 關于消費者連接的問題,在第一個連接創(chuàng)建之后,到第三種連接創(chuàng)建的時候,需要把第一個連接關閉,因為第一個連接不知道具體的節(jié)點信息,沒法復用,導致資源的浪費,需要重新創(chuàng)建新的連接,這個問題是否能夠進行改善?
kafka的冪等生產者和事務生產者
- kafka消息交付的可靠性保障目前來說主要支持至少一次的發(fā)送,也就是如果生產者網(wǎng)絡波動的話,允許多次發(fā)送,這樣的話,可能會造成消息的重復;kafka因此還支持精確一次的發(fā)送,即消息既不會重復發(fā)送也不會丟失;
- 精確一次發(fā)送的主要支持是 冪等和事務型發(fā)送,冪等的話,直接在配置中設置enable.idempotence = true;如果是事務型的話,除了設置enable.idempotence = true之外還需要設置 transctional. id;冪等消息的話,只能夠保障某個主題的分區(qū)一致性,不能夠跨區(qū),如果producer重新啟動的話,喪失對上一次的冪等約束,這也就要求所有需要保障冪等消息的數(shù)據(jù)必須落在一個分區(qū)上;如果想在所有分區(qū)上都保持冪等,且不受重新啟動的影響的話,就需要使用事務性生產者,能夠保障一批消息同時成功和同時失敗,且在消費端可以控制對整個事務性消息的隔離級別isolation.level = read_uncommitted/read_committed。具體的生產端代碼
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
- 不管是冪等還是事務性發(fā)送者,對整個kafka的性能是影響的,而且也不能夠保證消費端一定只送達一次,在異常情況下消費端可能會重復消費某些消息,因此在使用的過程中還是傾向于消費端進行冪等處理,這樣既不會降低性能,也不會影響整個業(yè)務邏輯的處理;
kafka的消費組概念
- 消費組作為一個所有消費者的代理,用于管理訂閱主題的所有分區(qū),然后來根據(jù)consumer的數(shù)量來進行負載均衡,不同的消費組之間沒有關系,每個分區(qū)只能由同一個消費者組內的一個 Consumer 實例來消費;一個group中只能有一個consumer去消費某條消息,不能同時有多個進行消費,這個類似于傳統(tǒng)的點對點發(fā)送的模型,只不過將粒度縮小,減小所有consumer的搶占;
- 同一個group下的consumer如何進行分配消費消息?
- kafka的group如何與topic進行關聯(lián),是在消費端監(jiān)聽的時候進行設置?如果在消費端進行設置的話,如何能夠控制消費實例與分區(qū)數(shù)量的關系?
- consumer實例和分區(qū)數(shù)量:消費組訂閱主題的分區(qū)數(shù)大于等于消費實例的數(shù)量,不會造成資源的浪費;比如如果有十個consumer實例,只有3個分區(qū),那么有7個consumer是空閑狀態(tài)
- 空閑狀態(tài)的consumer不會進行消費,也就是說所有消息不會進入到這個consumer中?
- 理想情況下,Consumer 實例的數(shù)量應該等于該 Group 訂閱主題的分區(qū)總數(shù)。但是在微服務盛行的階段,服務擴容很正常,會造成consumer實例的不定期變更,會導致重新的負載均衡,kafka在重新給consumer重新分配分區(qū)的時候,會進行類似于垃圾回收的Stop the world操作,可能有停頓,如果設置分區(qū)數(shù)量過大的話,kafka有一個對分區(qū)備份的功能,也就是如果有十個分區(qū)的話,消息會備份十份的大小,最后進行統(tǒng)一的發(fā)送,分區(qū)數(shù)量過大的話會導致內存占用過大,同時如果想提高吞吐量的話,分區(qū)數(shù)量和consumer client數(shù)量保持一致,這樣的話又會增加實例的連接數(shù)量?這個問題怎么處理
- Rebalance: 本質上是一種協(xié)議,規(guī)定了一個 Consumer Group 下的所有 Consumer 如何達成一致,來分配訂閱 Topic 的每個分區(qū);既然是平衡consumer和topic分區(qū)之間負載均衡的話,那么在以下三種情況下會發(fā)生變化
- consumer實例的數(shù)量發(fā)生了變化,重新分配; 那么consumer的集群重啟過程中豈不是要不斷的進行rebalance?
- group訂閱的topic數(shù)量發(fā)生了變化,歸根結底是自己管理的topic分區(qū)數(shù)量發(fā)生了變化,以及要剔除某些不適合的consumer而導致負載的變動,這個是運維操作,一般的話,可以進行避免
- 主題的分區(qū)數(shù)量發(fā)生了變動,導致consumer重新分配,這個是運維操作,一般的話,可以進行避免
- 那么如何避免reblance呢,大部分情況下是盡量的避免第一種情況,也就是實例數(shù)量的變更,實例數(shù)量的變更主要分以下幾種情況
- (真的需要變化)實例真的需要增加或者減少,這種情況下無法避免
- (kafka認為你變化了,但是你沒有變化,只不過是有些異常導致kafka的監(jiān)控認為你掛了)心跳監(jiān)控,通過所有consumer實例通過向Coordinator 發(fā)送心跳來實時的報告自己的存活狀態(tài),主要的參數(shù)
- session.timeout.ms 來控制,超過這個時間沒有發(fā)送心跳,認為死亡) ;還需要設置
- heartbeat.interval.ms用來控制心跳上報的頻率,也就是上個參數(shù)內,自己需要上報多少次心跳,這個需要根據(jù)自己的網(wǎng)絡帶寬來考慮,
- max.poll.interval.ms 來控制,Consumer 端應用程序兩次調用 poll 方法的最大時間間隔。它的默認值是 5 分鐘,表示你的 Consumer 程序如果在 5 分鐘之內無法消費完 poll 方法返回的消息,那么 Consumer 會主動發(fā)起“離開組”的請求,Coordinator 也會開啟新一輪 Rebalance
- 推薦的參數(shù)
設置 session.timeout.ms = 6s。
設置 heartbeat.interval.ms = 2s。
- 推薦的參數(shù)
- 關于kafka如何管理consumer的
- coordinator 作為一個對consumer的注冊,管理中心,是在broker創(chuàng)建的時候創(chuàng)建,作為broker的組件存在的,其主要作用是協(xié)助consumer group來管理負載均衡,位移,以及成員情況的管理,每個consumer啟動的時候,都會向coordinator報備,然后被其注冊,管理;consumer group是如何和coordinator進行關聯(lián)的呢?一般是通過位移主題來確定的 gruopId的hashCode / 分區(qū)總數(shù) 獲取到位移主題的分區(qū)id,然后根據(jù)這個id獲取到當前分區(qū)的leader的副本所在的broker的coordiantor
Kafka 會計算該 Group 的 group.id 參數(shù)的哈希值。比如你有個 Group 的 group.id 設置成了“test-group”,那么它的 hashCode 值就應該是 627841412。其次,Kafka 會計算 __consumer_offsets 的分區(qū)數(shù),通常是 50 個分區(qū),之后將剛才那個哈希值對分區(qū)數(shù)進行取模加求絕對值計算,即 abs(627841412 % 50) = 12。此時,我們就知道了位移主題的分區(qū) 12 負責保存這個 Group 的數(shù)據(jù)。有了分區(qū)號,算法的第 2 步就變得很簡單了,我們只需要找出位移主題分區(qū) 12 的 Leader 副本在哪個 Broker 上就可以了。這個 Broker,就是我們要找的 Coordinator
kafka位移主題
- Consumer 的位移數(shù)據(jù)作為一條條普通的 Kafka 消息,提交到 __consumer_offsets 中??梢赃@么說,__consumer_offsets 的主要作用是保存 Kafka 消費者的位移信息
- kafka如何記錄某個consumer group中consumer消費那些分區(qū)的消息的位移或者說作為標示,用于consumer之后消費的起始點呢,在老版本的kafka中是將這些數(shù)據(jù)保存到zk中的,但是zk不適合高頻的讀寫等操作,因此在0.8.x版本之后,kafka自行進行管理這個記錄,在內部自己創(chuàng)建一個消息主題,來進行維護;
- 消息主題的具體格式:消息主題的主要作用是記錄定位consumer的消費記錄,由于consumer group下的consumer可能會進行負載均衡,最好的辦法就是一group為維度,來進行記錄,那么首先記錄 gourp-id, 然后是 topic 名稱 ,最后是 分區(qū)號,這樣的話,在使用的時候,不管group下的哪個consumer來消費的話,都可以直接獲取現(xiàn)在應該消費哪個區(qū)域
- kafka在默認情況下創(chuàng)建50個分區(qū),3個備份副本;可以手動指定分區(qū)數(shù)量;如果自動創(chuàng)建分區(qū)的數(shù)量的話,那么在日志中可能會有 _consumer_offsets-xxx 這個就是向位移主題中寫的消息記錄
- 關于設置自動提交和手動提交對位移主題的影響: 如果設置手動的話,那么每一次提交的話,會進行寫入一個位移主題消息,但是如果使用自動的話,只要 Consumer 一直啟動著,它就會無限期地向位移主題寫入消息,因為會定時的進行提交,即使沒有消息的話,也會不斷的進行提交,這樣會導致消息隊列占用的內存不斷增加最終撐爆內存,這就需要在kafka上配置Compact(整理整個消息隊列,保留某個消息中的最新的消息,進行壓縮數(shù)據(jù)) 策略來刪除位移主題中的過期消息,避免該主題無限期膨脹;
- Kafka 提供了專門的后臺線程定期地巡檢待 Compact 的主題,看看是否存在滿足條件的可刪除數(shù)據(jù)。這個后臺線程叫 Log Cleaner。很多實際生產環(huán)境中都出現(xiàn)過位移主題無限膨脹占用過多磁盤空間的問題,如果你的環(huán)境中也有這個問題,我建議你去檢查一下 Log Cleaner 線程的狀態(tài),通常都是這個線程掛掉了導致的
kafka 位移提交
記錄了 Consumer 要消費的下一條消息的位移。這可能和你以前了解的有些出入,不過切記是下一條消息的位移,而不是目前最新消費消息的位移;
Consumer 需要向 Kafka 匯報自己的位移數(shù)據(jù),這個匯報過程被稱為提交位移,提交位移主要是為了表征 Consumer 的消費進度,這樣當 Consumer 發(fā)生故障重啟之后,就能夠從 Kafka 中讀取之前提交的位移值,然后從相應的位移處繼續(xù)消費,從而避免整個消費過程重來一遍。
換句話說,位移提交是 Kafka 提供給你的一個工具或語義保障,你負責維持這個語義保障,即如果你提交了位移 X,那么 Kafka 會認為所有位移值小于 X 的消息你都已經(jīng)成功消費了
- 那位移主題的作用是作什么的,在什么情況下會被啟用?僅僅是用于找到coordinator么?
- kafka 分為自動提交和手動提交,kafka會無腦的認為你的提交是正確的,如果隨便提交的話,會導致最終消息紊亂
- 自動提交的話 auto.commit.interval.ms 默認值是5s,也就是kafka會每五秒進行一次批量的提交位移信息,如果在這5s期間發(fā)生了reblace的,可能沒有提交成功,導致重復消費的問題;自動提交的邏輯是 在調用poll的時候,會先上報上一次poll的消費位移結果,然后在進行poll;沒等到下次poll,就掛了,消息就要重復消費
- 手動提交的話,不僅僅是enable.auto.commit 為 false,還需要手動的進行代碼調用
//同步的處理,導致consumer進程阻塞,影響整個系統(tǒng)的吞吐量
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 處理提交失敗異常
}
}
//使用 異步消息的話,不會影響吞吐量,但是不知道是否上報成功,如果沒有成功的話,不會進行重試,
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception);
});
}
//終極代碼
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
commitAysnc(); // 使用異步提交規(guī)避阻塞
}
} catch (Exception e) {
handle(e); // 處理異常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
//但是如何解決每一次poll提交的消息數(shù)量過大問題,如果一次間隔處理了很多條消息,如果出錯了整體都要再次處理一次,對于系統(tǒng)不是很友好
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 處理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回調處理邏輯是 null
count++;
}
}
kafka 多線程消費問題
- Kafka 0.10.1.0 版本開始,KafkaConsumer 就變?yōu)榱穗p線程的設計,即用戶主線程和心跳線程,但是從用戶角度來看屬于單線程,心跳線程和消息消費線程是隔離開的;Consumer 設計了單線程 + 輪詢的機制。這種設計能夠較好地實現(xiàn)非阻塞式的消息獲取,用戶可以在接到消息之后 進行自定義消息的處理;
- kafka實現(xiàn)多線程消費的方案
- 一個應用中創(chuàng)建多個consumer實例,相當于同時存在多個consumer,進行消費,這樣的好處是業(yè)務上簡單,并且能夠在某個分區(qū)上實現(xiàn)消息的順序執(zhí)行;缺點資源浪費嚴重,容易造成reblance,影響整個kafka的吞吐量。
- 創(chuàng)建一個接受消息的程序,然后處理消息的時候,使用多線程進行消費,這樣的好處是對于kafka來說,始終就是一個consumer實例,即使有一個出現(xiàn)異常了,也不會引發(fā)reblance的操作,但是缺點是如何處理異步消息處理中的位移提交以及失敗重試問題
- 多線程版本的代碼:
kafka 消息的監(jiān)控指標以及參數(shù)
- Lag 或 Consumer Lag 用來描述當前消費者還有多少消息沒有消費完成,也就是消息的滯后型,Lag的單位是消息條數(shù),以分區(qū)為維度進行統(tǒng)計的,如果需要統(tǒng)計整個topic的話,需要自己進行求和計算
- Lead 這個是JMX監(jiān)控的特有指標,這個是表示當前消費的消息的位移和第一條消息之間的差值,其實就是總消息數(shù) - Lag 數(shù)的值,用這個指標來衡量的時候,更能夠看出來整個消息的延遲有多大
- 有三種方式進行監(jiān)控
-
使用kafka的腳本進行監(jiān)控 $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test,如果consumer-id, host,client-id 是空的話,表示consumer實例不存在,如果執(zhí)行完腳本根本沒有反應的話,可能是版本不支持
監(jiān)控值 - 使用java api進行程序上面上的調用監(jiān)控
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException { Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); try (AdminClient client = AdminClient.create(props)) { //設置groupId ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID); try { Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自動提交位移 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { //獲取訂閱分區(qū)的最新消息位移 Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet()); //執(zhí)行相應的減法操作,獲取 Lag 值并封裝進一個 Map 對象 return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset())); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 處理中斷異常 // ... return Collections.emptyMap(); } catch (ExecutionException e) { // 處理 ExecutionException // ... return Collections.emptyMap(); } catch (TimeoutException e) { throw new TimeoutException("Timed out when getting lag for consumer group " + groupID); } } }- 使用Kafka JMX 監(jiān)控指標
-
