版本選擇
- 0.7版本:
只有基礎(chǔ)消息隊(duì)列功能,無(wú)副本;打死也不使用 - 0.8版本:
增加了副本機(jī)制,新的producer API;建議使用0.8.2.2版本;不建議使用0.8.2.0之后的producer API - 0.9版本:
增加權(quán)限和認(rèn)證,新的consumer API,Kafka Connect功能;不建議使用consumer API; - 0.10版本:
引入Kafka Streams功能,bug修復(fù);建議版本0.10.2.2;建議使用新版consumer API - 0.11版本:
producer API冪等,事物API,消息格式重構(gòu);建議版本0.11.0.3;謹(jǐn)慎對(duì)待消息格式變化 - 1.0和2.0版本:
Kafka Streams改進(jìn);建議版本2.0;
部署需要注意

重要參數(shù)
- Broker 端參數(shù)
- log.dirs:這是非常重要的參數(shù),指定了 Broker 需要使用的若干個(gè)文件目錄路徑。要知道這個(gè)參數(shù)是沒(méi)有默認(rèn)值的,這說(shuō)明什么?這說(shuō)明它必須由你親自指定。log.dir:注意這是 dir,結(jié)尾沒(méi)有 s,說(shuō)明它只能表示單個(gè)路徑,它是補(bǔ)充上一個(gè)參數(shù)用的。
提升讀寫(xiě)性能:比起單塊磁盤(pán),多塊物理磁盤(pán)同時(shí)讀寫(xiě)數(shù)據(jù)有更高的吞吐量
Zookeeper
zookeeper.connect:zk1:2181,zk2:2181,zk3:2181/kafka1listeners:學(xué)名叫監(jiān)聽(tīng)器
其實(shí)就是告訴外部連接者要通過(guò)什么協(xié)議訪問(wèn)指定主機(jī)名和端口開(kāi)放的 Kafka 服務(wù)。advertised.listeners:和 listeners 相比多了個(gè) advertised。Advertised 的含義表示宣稱的、公布的,就是說(shuō)這組監(jiān)聽(tīng)器是 Broker 用于對(duì)外發(fā)布的。host.name/port:列出這兩個(gè)參數(shù)就是想說(shuō)你把它們忘掉吧,壓根不要為它們指定值,畢竟都是過(guò)期的參數(shù)了。
最好全部使用主機(jī)名,即 Broker 端和 Client 端應(yīng)用配置中全部填寫(xiě)主機(jī)名
- topic:
- auto.create.topics.enable:是否允許自動(dòng)創(chuàng)建 Topic。推薦設(shè)置為false
規(guī)避線上自動(dòng)創(chuàng)建topic問(wèn)題 - unclean.leader.election.enable:是否允許 Unclean Leader 選舉。推薦為false
規(guī)避落后的副本自動(dòng)選為leader,導(dǎo)致數(shù)據(jù)丟失. - auto.leader.rebalance.enable:是否允許定期進(jìn)行 Leader 選舉。推薦false
規(guī)避自動(dòng)切換leader造成不必要的性能開(kāi)銷
- 消息保存三兄弟
- log.retention.{hours|minutes|ms}:這是個(gè)“三兄弟”,都是控制一條消息數(shù)據(jù)被保存多長(zhǎng)時(shí)間。從優(yōu)先級(jí)上來(lái)說(shuō) ms 設(shè)置最高、minutes 次之、hours 最低
- log.retention.bytes:這是指定 Broker 為消息保存的總磁盤(pán)容量大小。
- message.max.bytes:控制 Broker 能夠接收的最大消息大小。
- 創(chuàng)建topic時(shí)帶參數(shù)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
- 修改topic參數(shù)
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760
Kafka jvm設(shè)置推薦參數(shù)
- KAFKA_HEAP_OPTS:指定堆大小。
- KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 參數(shù)。
kafka 零拷貝
- 數(shù)據(jù)在磁盤(pán)和網(wǎng)絡(luò)進(jìn)行傳輸時(shí)避免昂貴的內(nèi)核態(tài)數(shù)據(jù)拷貝,從而實(shí)現(xiàn)快速的數(shù)據(jù)傳輸
分區(qū)策略
- 輪詢策略
生產(chǎn)者程序會(huì)按照輪詢的方式在主題的所有分區(qū)間均勻地“碼放”消息。
輪詢策略有非常優(yōu)秀的負(fù)載均衡表現(xiàn),它總是能保證消息最大限度地被平均分配到所有分區(qū)上,故默認(rèn)情況下它是最合理的分區(qū)策略,也是我們最常用的分區(qū)策略之一 - 隨機(jī)策略
我們隨意地將消息放置到任意一個(gè)分區(qū)上
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
- 按消息鍵保序策略[官方?jīng)]有該策略]
Kafka 允許為每條消息定義消息鍵,簡(jiǎn)稱為 Key。這個(gè) Key 的作用非常大,它可以是一個(gè)有著明確業(yè)務(wù)含義的字符串,比如客戶代碼、部門(mén)編號(hào)或是業(yè)務(wù) ID 等;也可以用來(lái)表征消息元數(shù)據(jù)。特別是在 Kafka 不支持時(shí)間戳的年代,在一些場(chǎng)景中,工程師們都是直接將消息創(chuàng)建時(shí)間封裝進(jìn) Key 里面的。一旦消息被定義了 Key,那么你就可以保證同一個(gè) Key 的所有消息都進(jìn)入到相同的分區(qū)里面,由于每個(gè)分區(qū)下的消息處理都是有順序的,故這個(gè)策略被稱為按消息鍵保序策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
kafka生產(chǎn)者優(yōu)化
- 開(kāi)啟壓縮
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 開(kāi)啟GZIP壓縮
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
-
壓縮參數(shù)
image.png
結(jié)論:
- 吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在壓縮比方面,zstd > LZ4 > GZIP > Snappy。
- 壓縮比方面: zstd > LZ4 > GZIP > snappy
kafka消息不丟失保證策略
- 生產(chǎn)者 選擇合適的api
Producer 永遠(yuǎn)要使用帶有回調(diào)通知的發(fā)送 API,也就是說(shuō)不要使用 producer.send(msg),而要使用 producer.send(msg, callback) - 消費(fèi)者維護(hù)好偏移量
維持先消費(fèi)消息(閱讀),再更新位移(書(shū)簽)的順序
如果是多線程異步處理消費(fèi)消息,Consumer 程序不要開(kāi)啟自動(dòng)提交位移,而是要應(yīng)用程序手動(dòng)提交位移
3.最佳實(shí)踐
- 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。記住,一定要使用帶有回調(diào)通知的 send 方法。
- 設(shè)置 acks = all。acks 是 Producer 的一個(gè)參數(shù),代表了你對(duì)“已提交”消息的定義。如果設(shè)置成 all,則表明所有副本 Broker 都要接收到消息,該消息才算是“已提交”。這是最高等級(jí)的“已提交”定義。
- 設(shè)置 retries 為一個(gè)較大的值。這里的 retries 同樣是 Producer 的參數(shù),對(duì)應(yīng)前面提到的 Producer 自動(dòng)重試。當(dāng)出現(xiàn)網(wǎng)絡(luò)的瞬時(shí)抖動(dòng)時(shí),消息發(fā)送可能會(huì)失敗,此時(shí)配置了 retries > 0 的 Producer 能夠自動(dòng)重試消息發(fā)送,避免消息丟失。
- 設(shè)置 unclean.leader.election.enable = false。這是 Broker 端的參數(shù),它控制的是哪些 Broker 有資格競(jìng)選分區(qū)的 Leader。如果一個(gè) Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會(huì)造成消息的丟失。故一般都要將該參數(shù)設(shè)置成 false,即不允許這種情況的發(fā)生。
- 設(shè)置 replication.factor >= 3。這也是 Broker 端的參數(shù)。其實(shí)這里想表述的是,最好將消息多保存幾份,畢竟目前防止消息丟失的主要機(jī)制就是冗余。
- 設(shè)置 min.insync.replicas > 1。這依然是 Broker 端參數(shù),控制的是消息至少要被寫(xiě)入到多少個(gè)副本才算是“已提交”。設(shè)置成大于 1 可以提升消息持久性。在實(shí)際環(huán)境中千萬(wàn)不要使用默認(rèn)值 1。
- 確保 replication.factor > min.insync.replicas。如果兩者相等,那么只要有一個(gè)副本掛機(jī),整個(gè)分區(qū)就無(wú)法正常工作了。我們不僅要改善消息的持久性,防止數(shù)據(jù)丟失,還要在不降低可用性的基礎(chǔ)上完成。
- 推薦設(shè)置成 replication.factor = min.insync.replicas + 1。確保消息消費(fèi)完成再提交。
- Consumer 端有個(gè)參數(shù) enable.auto.commit,最好把它設(shè)置成 false,并采用手動(dòng)提交位移的方式。就像前面說(shuō)的,這對(duì)于單 Consumer 多線程處理的場(chǎng)景而言是至關(guān)重要的。

kafka中的攔截器
- 參數(shù)設(shè)置
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 攔截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 攔截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
- 生產(chǎn)者
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
private Jedis jedis; // 省略Jedis初始化
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
jedis.incr("totalSentMessage");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<java.lang.String, ?> configs) {
}
- 消費(fèi)者
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Jedis jedis; //省略Jedis初始化
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long lantency = 0L;
for (ConsumerRecord<String, String> record : records) {
lantency += (System.currentTimeMillis() - record.timestamp());
}
jedis.incrBy("totalLatency", lantency);
long totalLatency = Long.parseLong(jedis.get("totalLatency"));
long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
java生產(chǎn)者是如何管理tcp連接的?
Apache Kafka的所有通信都是基于TCP的,而不是于HTTP或其他協(xié)議的
1 為什采用TCP?
(1)TCP擁有一些高級(jí)功能,如多路復(fù)用請(qǐng)求和同時(shí)輪詢多個(gè)連接的能力。
(2)很多編程語(yǔ)言的HTTP庫(kù)功能相對(duì)的比較簡(jiǎn)陋。
名詞解釋:
多路復(fù)用請(qǐng)求:multiplexing request,是將兩個(gè)或多個(gè)數(shù)據(jù)合并到底層—物理連接中的過(guò)程。TCP的多路復(fù)用請(qǐng)求會(huì)在一條物理連接上創(chuàng)建若干個(gè)虛擬連接,每個(gè)虛擬連接負(fù)責(zé)流轉(zhuǎn)各自對(duì)應(yīng)的數(shù)據(jù)流。嚴(yán)格講:TCP并不能多路復(fù)用,只是提供可靠的消息交付語(yǔ)義保證,如自動(dòng)重傳丟失的報(bào)文。
2 何時(shí)創(chuàng)建TCP連接?
(1)在創(chuàng)建KafkaProducer實(shí)例時(shí),
A:生產(chǎn)者應(yīng)用會(huì)在后臺(tái)創(chuàng)建并啟動(dòng)一個(gè)名為Sender的線程,該Sender線程開(kāi)始運(yùn)行時(shí),首先會(huì)創(chuàng)建與Broker的連接。
B:此時(shí)不知道要連接哪個(gè)Broker,kafka會(huì)通過(guò)METADATA請(qǐng)求獲取集群的元數(shù)據(jù),連接所有的Broker。
(2)還可能在更新元數(shù)據(jù)后,或在消息發(fā)送時(shí)
3 何時(shí)關(guān)閉TCP連接
(1)Producer端關(guān)閉TCP連接的方式有兩種:用戶主動(dòng)關(guān)閉,或kafka自動(dòng)關(guān)閉。
A:用戶主動(dòng)關(guān)閉,通過(guò)調(diào)用producer.close()方關(guān)閉,也包括kill -9暴力關(guān)閉。
B:Kafka自動(dòng)關(guān)閉,這與Producer端參數(shù)connection.max.idles.ms的值有關(guān),默認(rèn)為9分鐘,9分鐘內(nèi)沒(méi)有任何請(qǐng)求流過(guò),就會(huì)被自動(dòng)關(guān)閉。這個(gè)參數(shù)可以調(diào)整。
C:第二種方式中,TCP連接是在Broker端被關(guān)閉的,但這個(gè)連接請(qǐng)求是客戶端發(fā)起的,對(duì)TCP而言這是被動(dòng)的關(guān)閉,被動(dòng)關(guān)閉會(huì)產(chǎn)生大量的CLOSE_WAIT連接。
代碼實(shí)現(xiàn)冪等性
props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
代碼開(kāi)啟生產(chǎn)者事務(wù)
開(kāi)啟 enable.idempotence = true。
設(shè)置 Producer 端參數(shù) transactional. id。最好為其設(shè)置一個(gè)有意義的名字
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
消費(fèi)組平衡問(wèn)題
- 組成員數(shù)量發(fā)生變化。
- 訂閱主題數(shù)量發(fā)生變化。
- 訂閱主題的分區(qū)數(shù)發(fā)生變化。

kafka位移
Committing Offsets:Consumer 需要向 Kafka 匯報(bào)自己的位移數(shù)據(jù),這個(gè)匯報(bào)過(guò)程被稱為提交位移
- 自動(dòng)提交位移
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 開(kāi)啟自動(dòng)提交位移
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
- 手動(dòng)提交
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
commitAysnc(); // 使用異步提交規(guī)避阻塞
}
} catch(Exception e) {
handle(e); // 處理異常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
- 精細(xì)提交位移
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 處理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回調(diào)處理邏輯是null
count++;
}
}
對(duì)于一次要處理很多消息的 Consumer 而言,它會(huì)關(guān)心社區(qū)有沒(méi)有方法允許它在消費(fèi)的中間進(jìn)行位移提交。比如前面這個(gè) 5000 條消息的例子,你可能希望每處理完 100 條消息就提交一次位移,這樣能夠避免大批量的消息重新消費(fèi)。

CommitFailedException異常怎么處理?
- 場(chǎng)景一
當(dāng)消息處理的總時(shí)間超過(guò)預(yù)設(shè)的 max.poll.interval.ms 參數(shù)值時(shí),Kafka Consumer 端會(huì)拋出 CommitFailedException 異常
四種處理方式
- 縮短單條消息處理的時(shí)間
- 增加 Consumer 端允許下游系統(tǒng)消費(fèi)一批消息的最大時(shí)長(zhǎng)
- 減少下游系統(tǒng)一次性消費(fèi)的消息總數(shù)
-
下游系統(tǒng)使用多線程來(lái)加速消費(fèi)
image.png
- 場(chǎng)景二
Kafka Java Consumer 端還提供了一個(gè)名為 Standalone Consumer 的獨(dú)立消費(fèi)者。它沒(méi)有消費(fèi)者組的概念,每個(gè)消費(fèi)者實(shí)例都是獨(dú)立工作的,彼此之間毫無(wú)聯(lián)系。不過(guò),你需要注意的是,獨(dú)立消費(fèi)者的位移提交機(jī)制和消費(fèi)者組是一樣的,因此獨(dú)立消費(fèi)者的位移提交也必須遵守之前說(shuō)的那些規(guī)定,比如獨(dú)立消費(fèi)者也要指定 group.id 參數(shù)才能提交位移.

多線程開(kāi)發(fā)消費(fèi)者實(shí)例
- 消費(fèi)者程序啟動(dòng)多個(gè)線程,每個(gè)線程維護(hù)專屬的 KafkaConsumer 實(shí)例,負(fù)責(zé)完整的消息獲取、消息處理流程
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records =
consumer.poll(Duration.ofMillis(10000));
// 執(zhí)行消息處理邏輯
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
- 2.消費(fèi)者程序使用單或多線程獲取消息,同時(shí)創(chuàng)建多個(gè)消費(fèi)線程執(zhí)行消息處理邏
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...
private int workerNum = ...;
executors = new ThreadPoolExecutor(
workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
...
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (final ConsumerRecord record : records) {
executors.submit(new Worker(record));
}
}
..

kafka副本機(jī)制

kafka如何控制請(qǐng)求?
https://www.processon.com/view/link/5d481e6be4b07c4cf3031755
消費(fèi)組重平衡
- heartbeat.interval.ms
這個(gè)參數(shù)的真正作用是控制重平衡通知的頻率。如果你想要消費(fèi)者實(shí)例更迅速地得到通知,那么就可以給這個(gè)參數(shù)設(shè)置一個(gè)非常小的值,這樣消費(fèi)者就能更快地感知到重平衡已經(jīng)開(kāi)啟了
費(fèi)者組狀態(tài)機(jī)


Broker 端重平衡場(chǎng)景剖析
場(chǎng)景一:新成員入組。

場(chǎng)景二:組成員主動(dòng)離組

場(chǎng)景四:重平衡時(shí)協(xié)調(diào)者對(duì)組內(nèi)成員提交位移的處理。

控制器
1作用:
控制器組件(Controller),是Apache Kafka的核心組件。它的主要作用是Apache Zookeeper的幫助下管理和協(xié)調(diào)整個(gè)Kafka集群。
集群中任意一臺(tái)Broker都能充當(dāng)控制器的角色,但在運(yùn)行過(guò)程中,只能有一個(gè)Broker成為控制器。
2 特點(diǎn):控制器是重度依賴Zookeeper。
3 產(chǎn)生:
控制器是被選出來(lái)的,Broker在啟動(dòng)時(shí),會(huì)嘗試去Zookeeper中創(chuàng)建/controller節(jié)點(diǎn)。Kafka當(dāng)前選舉控制器的規(guī)則是:第一個(gè)成功創(chuàng)建/controller節(jié)點(diǎn)的Broker會(huì)被指定為控制器。
4 功能:
A :主題管理(創(chuàng)建,刪除,增加分區(qū))
當(dāng)執(zhí)行kafka-topics腳本時(shí),大部分的后臺(tái)工作都是控制器來(lái)完成的。
B :分區(qū)重分配
Kafka-reassign-partitions腳本提供的對(duì)已有主題分區(qū)進(jìn)行細(xì)粒度的分配功能。
C :Preferred領(lǐng)導(dǎo)者選舉
Preferred領(lǐng)導(dǎo)者選舉主要是Kafka為了避免部分Broker負(fù)載過(guò)重而提供的一種換Leade的方案。
D :集群成員管理(新增Broker,Broker主動(dòng)關(guān)閉,Broker宕機(jī))
控制器組件會(huì)利用watch機(jī)制檢查Zookeeper的/brokers/ids節(jié)點(diǎn)下的子節(jié)點(diǎn)數(shù)量變更。當(dāng)有新Broker啟動(dòng)后,它會(huì)在/brokers下創(chuàng)建專屬的znode節(jié)點(diǎn)。一旦創(chuàng)建完畢,Zookeeper會(huì)通過(guò)Watch機(jī)制將消息通知推送給控制器,這樣,控制器就能自動(dòng)地感知到這個(gè)變化。進(jìn)而開(kāi)啟后續(xù)新增Broker作業(yè)。
偵測(cè)Broker存活性則是依賴于剛剛提到的另一個(gè)機(jī)制:臨時(shí)節(jié)點(diǎn)。每個(gè)Broker啟動(dòng)后,會(huì)在/brokers/ids下創(chuàng)建一個(gè)臨時(shí)的znode。當(dāng)Broker宕機(jī)或主機(jī)關(guān)閉后,該Broker與Zookeeper的會(huì)話結(jié)束,這個(gè)znode會(huì)被自動(dòng)刪除。同理,Zookeeper的Watch機(jī)制將這一變更推送給控制器,這樣控制器就能知道有Broker關(guān)閉或宕機(jī)了,從而進(jìn)行善后。
E :數(shù)據(jù)服務(wù)
控制器上保存了最全的集群元數(shù)據(jù)信息,其他所有Broker會(huì)定期接收控制器發(fā)來(lái)的元數(shù)據(jù)更新請(qǐng)求,從而更新其內(nèi)存中的緩存數(shù)據(jù)。
5 控制器保存的數(shù)據(jù)
控制器中保存的這些數(shù)據(jù)在Zookeeper中也保存了一份。每當(dāng)控制器初始化時(shí),它都會(huì)從Zookeeper上讀取對(duì)應(yīng)的元數(shù)據(jù)并填充到自己的緩存中。
6 控制器故障轉(zhuǎn)移(Failover)
故障轉(zhuǎn)移是指:當(dāng)運(yùn)行中的控制器突然宕機(jī)或意外終止時(shí),Kafka能夠快速地感知到,并立即啟用備用控制器來(lái)替代之前失敗的控制器。
7 內(nèi)部設(shè)計(jì)原理
A :控制器的內(nèi)部設(shè)計(jì)相當(dāng)復(fù)雜
控制器是多線程的設(shè)計(jì),會(huì)在內(nèi)部創(chuàng)建很多線程。如:
(1)為每個(gè)Broker創(chuàng)建一個(gè)對(duì)應(yīng)的Socket連接,然后在創(chuàng)建一個(gè)專屬的線程,用于向這些Broker發(fā)送特定的請(qǐng)求。
(2)控制連接zookeeper,也會(huì)創(chuàng)建單獨(dú)的線程來(lái)處理Watch機(jī)制通知回調(diào)。
(3)控制器還會(huì)為主題刪除創(chuàng)建額外的I/O線程。
這些線程還會(huì)訪問(wèn)共享的控制器緩存數(shù)據(jù),為了維護(hù)數(shù)據(jù)安全性,控制在代碼中大量使用ReetrantLock同步機(jī)制,進(jìn)一步拖慢了整個(gè)控制器的處理速度。
B :在0.11版對(duì)控制器的低沉設(shè)計(jì)進(jìn)了重構(gòu)。
(1)最大的改進(jìn)是:把多線程的方案改成了單線程加事件對(duì)列的方案。
a. 單線程+隊(duì)列的實(shí)現(xiàn)方式:社區(qū)引入了一個(gè)事件處理線程,統(tǒng)一處理各種控制器事件,然后控制器將原來(lái)執(zhí)行的操作全部建模成一個(gè)個(gè)獨(dú)立的事件,發(fā)送到專屬的事件隊(duì)列中,供此線程消費(fèi)。
b. 單線程不代表之前提到的所有線程都被干掉了,控制器只是把緩存狀態(tài)變更方面的工作委托給了這個(gè)線程而已。
(2)第二個(gè)改進(jìn):將之前同步操作Zookeeper全部改為異步操作。
a. Zookeeper本身的API提供了同步寫(xiě)和異步寫(xiě)兩種方式。同步操作zk,在有大量主題分區(qū)發(fā)生變更時(shí),Zookeeper容易成為系統(tǒng)的瓶頸。
高水位的討論
Leader 副本保持同步條件
- 該遠(yuǎn)程 Follower 副本在 ISR 中。
- 該遠(yuǎn)程 Follower 副本 LEO 值落后于 Leader 副本 LEO 值的時(shí)間,不超過(guò) Broker 端參數(shù) replica.lag.time.max.ms 的值。如果使用默認(rèn)值的話,就是不超過(guò) 10 秒。

kafka調(diào)優(yōu)
1.操作系統(tǒng)調(diào)優(yōu)
系統(tǒng)時(shí)禁掉 atime 更新
至少選擇 ext4 或 XFS
ulimit -n 和 vm.max_map_count
操作系統(tǒng)頁(yè)緩存
2.JVM 層調(diào)優(yōu)
設(shè)置堆大小 6-8G
GC 收集器 G1
- Broker 端調(diào)優(yōu)
Producer -> Broker -> Consumer三端kafka版本要保持一致
4.應(yīng)用層調(diào)優(yōu)
不要頻繁地創(chuàng)建 Producer 和 Consumer 對(duì)象實(shí)例
用完及時(shí)關(guān)閉
合理利用多線程來(lái)改善性能

