- 消費(fèi)者消費(fèi)后偏移量如果是使用自動(dòng)提交,在兩次提交之間如果發(fā)生了再平衡,則會(huì)造成上次提交后新加入的消息被重復(fù)處理。比如自動(dòng)提交間隔為5秒,在最近一次提交后3秒發(fā)生了在平衡,消費(fèi)者從最后一次提交的位置開始讀消息,在這3秒內(nèi)到達(dá)的消息會(huì)被重新讀取。
2.消費(fèi)者在訂閱主題時(shí)可以使用通配符,當(dāng)有新的主題被創(chuàng)建時(shí),消費(fèi)者能自動(dòng)從新的主題中讀取消息。
- 在只有一個(gè)消費(fèi)者的情況下,可以直接設(shè)置要讀取的分區(qū)和偏移量,而不用加入到消費(fèi)者組中,這樣可以不受再平衡的影響。代碼如下:
List<PartitionInfo> pinfos = null;
pinfos = consumer.partitionsFor(topic);
List<TopicPartition> partitions = new ArrayList<>();
if (pinfos!=null){
for (PartitionInfo pi : pinfos){
partitions.add(new TopicPartition(pi.topic(),pi.partition()));
}
}
consumer.assign(partitions);
注意:這種方式和訂閱分區(qū)不能同時(shí)使用,否則會(huì)報(bào)錯(cuò)。
不同步分區(qū)副本是否可以被選舉成為首領(lǐng),會(huì)對(duì)系統(tǒng)的可用性和可靠性造成影響。設(shè)置參數(shù)為unclean.leader.election.enable。該參數(shù)為true時(shí),不同步的分區(qū)副本成為首領(lǐng)時(shí)可能會(huì)造成消費(fèi)者訪問數(shù)據(jù)混亂和丟失部分?jǐn)?shù)據(jù)。 設(shè)為false時(shí),系統(tǒng)的可用性會(huì)受影響,系統(tǒng)必須等待原來發(fā)生錯(cuò)誤的broker恢復(fù)到可用狀態(tài)。需要針對(duì)不同情況進(jìn)行權(quán)衡。
只發(fā)送一次的實(shí)現(xiàn)方法。將數(shù)據(jù)中的唯一鍵或者kafka的主題+分區(qū)+偏移量組成的唯一鍵保存到一個(gè)鍵值數(shù)據(jù)庫中,每次發(fā)送時(shí)先檢查鍵值庫中是否存在相同的鍵,如果存在則不用再次發(fā)送。這種方法會(huì)降低處理性能,只有在嚴(yán)格要求發(fā)送一次的情況下才考慮使用。如果消息寫入的系統(tǒng)支持事務(wù),可以將消息和偏移量放到一個(gè)事務(wù)中保存到系統(tǒng),消費(fèi)者啟動(dòng)時(shí)從系統(tǒng)中讀出偏移量,用seek()從該偏移量的位置繼續(xù)讀取消息。
6.kafka監(jiān)控工具:推薦使用yahoo的kafka-manager,該工具可以管理多個(gè)cluster,對(duì)主題和分區(qū)的管理可以實(shí)現(xiàn)和系統(tǒng)腳本一樣的功能,都是在頁面上進(jìn)行操作,比命令行方便。啟動(dòng)腳本中打開JMX端口:export JMX_PORT="9999",這樣可以在kafka-manager中通過JMX看到kafka吞吐量情況。
分區(qū)副本重新分配。當(dāng)有broker停用或者增加新的broker后,需要重新分配分區(qū)副本,使集群平衡。命令行工具 kafka-reassign-partitions.sh
生產(chǎn)者在往kafka中寫入數(shù)據(jù)時(shí),為了可靠性和吞吐量的平衡,在設(shè)置acks=all時(shí),可設(shè)置min.insync.replicas 小于分區(qū)副本數(shù),比如分區(qū)副本是3,可以設(shè)置該參數(shù)為2.
主題創(chuàng)建后可以增加分區(qū)數(shù),但不能減少??梢栽黾踊驕p少分區(qū)副本數(shù)量。
安全和用戶認(rèn)證:kafka使用SSL或SASL來建立客戶端和broker之間的認(rèn)證連接??蛻舳伺cbroker之間以及broker之間可以用SSL來進(jìn)行加密傳輸??梢詫?duì)客戶端的讀/寫操作進(jìn)行授權(quán)(命令行工具為kafka-acls.sh)。