Kafka 工作流程分析
1、Kafka生產(chǎn)過(guò)程分析
(1)寫入方式
producer采用推(push)模式將消息發(fā)布到broker,每條消息都被追加(append)到分區(qū)(patition)中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存要高,保障kafka吞吐率)
(2)partition
說(shuō)明:
- 消息發(fā)送時(shí)都被發(fā)送到一個(gè)topic,其本質(zhì)就是一個(gè)目錄,而topic是由一些Partition Logs(分區(qū)日志)組成.
- 每個(gè)Partition中的消息都是有序的,生產(chǎn)的消息被不斷追加到Partition log上,其中的每一個(gè)消息都被賦予了一個(gè)唯一的offset值。
分區(qū)原因:
- 提升拓展性:每個(gè)Partition可以通過(guò)調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè)topic又可以有多個(gè)Partition組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了
- 提高吞吐能力:在進(jìn)行數(shù)據(jù)寫入時(shí)以 Partition 為單位進(jìn)行寫入。
分區(qū)依據(jù):
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// (1) 指定了patition,則直接使用該 Partition
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
- 對(duì)于已經(jīng)指定了 partition 的,則直接使用該partition;
- 未指定patition但指定key,通過(guò)對(duì)key的value進(jìn)行hash出一個(gè)patition;
- patition和key都未指定,使用輪詢選出一個(gè)patition;
(3)Replica(副本)

Replica流程
(4)寫入流程
流程圖:

producer寫入流程
流程描述:
- producer先從zookeeper的 "/brokers/.../state"節(jié)點(diǎn)找到該partition的leader
- producer將消息發(fā)送給該leader
- leader將消息寫入本地log
- followers從leader pull消息
- Follower將 pull到的消息寫入本地log
- Follower 寫入成功值后向leader發(fā)送ACK
- leader收到所有ISR中的replication的ACK后,增加HW
- 向producer發(fā)送ACK
2、 Broker 保存消息
(1)存儲(chǔ)說(shuō)明
- 物理上把topic分成一個(gè)或多個(gè)patition,每個(gè)patition物理上對(duì)應(yīng)一個(gè)文件夾(該文件夾存儲(chǔ)該patition的所有消息和索引文件)
- Kafka讀取特定消息的時(shí)間復(fù)雜度為O(1);
- 消息數(shù)據(jù)是存儲(chǔ)在partition文件夾下的*.log文件中的;
- 消息存儲(chǔ)時(shí)常有兩個(gè)策略,分別為:
基于時(shí)間存儲(chǔ)策略:默認(rèn)保留168小時(shí)(log.retention.hours=168)
基于大小保留策略:默認(rèn)保留 1G(log.retention.bytes=1073741824)
(2)Zk存儲(chǔ)結(jié)構(gòu)

Zk存儲(chǔ)結(jié)構(gòu)
3、consumer flow
(1) 高級(jí)API與低級(jí)API
- kafka提供了兩套consumer API:高級(jí)Consumer API和低級(jí)Consumer API。
- 高級(jí)API不需要自行去管理offset,partition replica等,系統(tǒng)通過(guò)Zk自行管理。(低級(jí) API反之)
(2)Consumer Group(消費(fèi)者組)
流程圖:

Consumer Group
描述說(shuō)明:
- Consumer Group 由多個(gè)Consumer 組成,同時(shí)一個(gè)Consumer只有屬于一個(gè)Consumer Group。
- Consumer Group 保證了其訂閱的Topic partition 會(huì)被該Consumer Group 中的Consumer消費(fèi)。對(duì)于多個(gè)Consumer Group訂閱了同一個(gè)Topic,每個(gè)Consumer Group之間互不影響。
- 如果要實(shí)現(xiàn)一個(gè)消息被多個(gè) consumer 消費(fèi),則可以將當(dāng)consumer 單獨(dú)添加到單獨(dú)的Consumer Group中(反之,如果要實(shí)現(xiàn)一個(gè)消息 被一個(gè) consumer 消費(fèi),則可以將當(dāng)consumer 添加到同一個(gè)Consumer Group中)