課程目標
- Kafka 產(chǎn)生的背景
- Kafka 的架構(gòu)
- Kafka 的安裝部署和集群部署
- Kafka 的基本操作
- Kafka 的應用
Kafka 的簡介
- 高性能
- 高吞吐量
什么是 Kafka
Kafka 是一款分布式消息發(fā)布和訂閱系統(tǒng),具有高性能、高吞吐量的特點而被廣泛應用于大數(shù)據(jù)傳輸場景。它是由 LinkedIn公 司開發(fā),使用 Scala 語言編寫,之后成為 Apache 基金會的一個頂級項目。 kafka 提供了類似 JMS 的特性,但是在設計和實現(xiàn)上是完全不同的,而且他也不是 JMS 規(guī)范的實現(xiàn)。
Kafka 產(chǎn)生的背景
kafka作為一個消息系統(tǒng),早期設計的目的是用作 LinkedIn 的活動流(Activity Stream)和運營數(shù)據(jù)處理管道(Pipeline)?;顒恿鲾?shù)據(jù)是所有的網(wǎng)站對用戶的使用情況做分析的時候要用到的最常規(guī)的部分,活動數(shù)據(jù)包括頁面的訪問量(PV)、被查看內(nèi)容方面的信息以及搜索內(nèi)容。這種數(shù)據(jù)通常的處理方式是先把各種活動以日志的形式寫入某種文件,然后周期性地對這些文件進行統(tǒng)計分析。運營數(shù)據(jù)指的是服務器的性能數(shù)據(jù)(CPU、IO使用率、請求時間、服務日志等)。
Kafka 的應用場景
內(nèi)置分區(qū)
-
實現(xiàn)集群
`spring cloud stream` 也有 Kafka 的實現(xiàn)。由于 kafka 具有更好的吞吐量、內(nèi)置分區(qū)、冗余及容錯性的優(yōu)點 ( kafka 每秒可以處理幾十萬消息 ) ,讓 kafka 成為了一個很好的大規(guī)模消息處理應用的解決方案。所以在企業(yè)級應用中,主要會應用于如下幾個方面:
行為跟蹤:kafka可以用于跟蹤用戶瀏覽頁面、搜索及其他行為。通過發(fā)布-訂閱模式實時記錄到對應的topic中,通過后端大數(shù)據(jù)平臺接入處理分析,并做更進一步的實時處理和監(jiān)控。
日志收集:在日志收集方面,有很多比較優(yōu)秀的產(chǎn)品,比如 Apache Flume,很多公司使用kafka 代理日志聚合。日志聚合表示從服務器上收集日志文件,然后放到一個集中的平臺(文件服務器)進行處理。在實際應用開發(fā)中,我們應用程序的 log 都會輸出到本地的磁盤上, 排查問題的話通過 linux 命令來搞定,如果應用程序組成了負載均衡集群,并且集群的機器有幾十臺以上,那么想通過日志快速定位到問題,就是很麻煩的事情了。所以一般都會做一個日志統(tǒng)一收集平臺管理 log 日志用來快速查詢重要應用的問題。所以很多公司的套路都是把應用日志集中到 kafka 上,然后分別導入到 es 和 hdfs 上,用來做實時檢索分析和離線統(tǒng)計數(shù)據(jù)備份等。而另一方面,kafka 本身又提供了很好的 api 來集成日志并且做日志收集。

Kafka 本身的架構(gòu)
一個典型的 kafka 集群包含若干 Producer(可以是應用節(jié)點產(chǎn)生的消息,也可以是通過Flume 收集日志產(chǎn)生的事件),若干個 Broker(kafka 支持水平擴展)、若干個 Consumer 、Group,以及一個 zookeeper 集群。kafka 通過 zookeeper 管理集群配置及服務協(xié)同。
Producer 使用 push 模式將消息發(fā)布到 broker,consumer 通過監(jiān)聽使用 pull 模式從broker 訂閱并消費消息。
多個 broker 協(xié)同工作, producer 和 consumer 部署在各個業(yè)務邏輯中。三者通過zookeeper 管理協(xié)調(diào)請求和轉(zhuǎn)發(fā)。這樣就組成了一個高性能的分布式消息發(fā)布和訂閱系統(tǒng)。圖上有一個細節(jié)是和其他 mq 中間件不同的點。producer 發(fā)送消息到 broker 的過程是 push,而 consumer 從 broker 消費消息的過程是 pull,主動去拉數(shù)據(jù)。而不是 broker 把數(shù)據(jù)主動發(fā)送給 consumer 。
- Topic 主題
- partion 數(shù)據(jù)分區(qū)
- group

kafka 的安裝部署
下載安裝包
https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka_2.11-1.1.0.tgz
kafka_2.11-1.1.0.tgx: http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
安裝過程
- tar -zxvf 解壓安裝包
[root@Darian1 software]# tar -zxvf kafka_2.11-2.0.0.tgz
kafka 目錄介紹
-
/bin操作 kafka 的可執(zhí)行腳本 -
/config配置文件 -
/libs依賴庫目錄 -
/logs日志數(shù)據(jù)目錄
啟動 /停止 kafka
- 需要先啟動 zookeeper ,如果沒有搭建 zookeeper 環(huán)境,可以直接運行kafka內(nèi)嵌的zookeeper
啟動命令: bin/zookeeper-server-start.sh config/zookeeper.properties & - 進入kafka目錄,運行 bin/kafka-server-start.sh {-daemon 后臺啟動} config/server.properties &
- 進入kafka目錄,運行bin/kafka-server-stop.sh config/server.properties
運行的外部的 Zookeeper 集群的 Kafka
[root@Darian1 bin]# vim ../config/server.properties
zookeeper.connect=168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181
[root@Darian1 bin]# sh kafka-server-start.sh ../config/server.properties
如果超時時間連接較長可以延長時間。
[root@Darian1 bin]# vim ../config/server.properties zookeeper.connect=192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=30000
Kafka 的基本操作
前提:
首先需要啟動 zookeeper
啟動 kafka
[root@Darian1 bin]# sh kafka-server-start.sh ../config/server.properties
后臺啟動kafka
[root@Darian1 bin]# sh kafka-server-start.sh -daemon ../config/server.properties
創(chuàng)建一個 Topic
[root@Darian1 bin]# sh kafka-topics.sh --create --zookeeper 192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 --replication-factor 1 --partitions 1 --topic darianTest
Created topic "darianTest".
Replication-factor 表示該topic需要在不同的broker中保存幾份,這里設置成1,表示在兩個broker中保存兩份
Partitions 分區(qū)數(shù)
查看所有的 Topic
[root@Darian1 bin]# sh kafka-topics.sh --list --zookeeper 192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181
[root@Darian1 bin]# sh kafka-topics.sh --list --zookeeper localhost:2181
darianTest
查看 topic 屬性
[root@Darian1 bin]# ./kafka-topics.sh --describe --zookeeper 192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 --topic darianTest
Topic:darianTest PartitionCount:1 ReplicationFactor:1 Configs:
Topic: darianTest Partition: 0 Leader: 1 Replicas: 1 Isr: 1
創(chuàng)建一個控制臺發(fā)送端
Broker-list 不是 zookeeper
[root@Darian1 bin]# sh kafka-console-producer.sh --broker-list localhost:9092 --topic darianTest
>hello
>helloworld
>dsfsdf
>sdfsd
>
創(chuàng)建一個控制臺接收端
[root@Darian1 bin]# sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic darianTest --from-beginning
hello
helloworld
dsfsdf
sdfsd
安裝集群環(huán)境
zookeeper 能夠完成 kafka 的集群
修改 server.properties 配置
修改server.properties. broker.id=0 / 1
修改server.properties 修改成本機IP
advertised.listeners=PLAINTEXT://192.168.11.153:9092
當 Kafka broker 啟動時,它會在ZK上注冊自己的IP和端口號,客戶端就通過這個IP和端口號來連接。Kafka 的 listeners 如果需要配置集群的話,需要把自己機器的 IP 配置上去。
192.168.40.128
[root@Darian1 bin]# vim ../config/server.properties
# 在集群里邊必須是唯一的,默認情況下 broker.id 都是 0 ,需要標識它的唯一性
broker.id=1 / 2 / 3
zookeeper.connect=192.168.40.128:2181
listeners=PLAINTEXT://192.168.40.128:9092
192.168.40.129
# 在集群里邊必須是唯一的,默認情況下 broker.id 都是 0 ,需要標識它的唯一性
broker.id= 2
zookeeper.connect=192.168.40.128:2181
listeners=PLAINTEXT://192.168.40.129:9092
192.168.40.131
# 在集群里邊必須是唯一的,默認情況下 broker.id 都是 0 ,需要標識它的唯一性
broker.id= 3
zookeeper.connect=192.168.40.128:2181
listeners=PLAINTEXT://192.168.40.131:9092
[root@Darian1 bin]# sh kafka-server-start.sh -daemon ../config/server.properties
Kafka 啟動不好的話,多啟動兩次。
啟動成功以后,看 zookeeper 節(jié)點的變化:
[root@Darian1 bin]# sh zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 12] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 11] ls /brokers/ids
[1, 2, 3]
[zk: localhost:2181(CONNECTED) 15] get /controller
{"version":1,"brokerid":2,"timestamp":"1548340180496"}
cZxid = 0x100000147
ctime = Thu Jan 24 22:29:40 CST 2019
mZxid = 0x100000147
mtime = Thu Jan 24 22:29:40 CST 2019
pZxid = 0x100000147
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1687fbe81fe000a
dataLength = 54
numChildren = 0
每一個節(jié)點都有它相應的含義。我們的寫請求會落到 `Master` 節(jié)點上,讀請求可以走其他節(jié)點去讀。他們都是熱備可以工作的。他的選舉規(guī)則是最小的節(jié)點也就是最早注冊的節(jié)點。
中間件都是成熟的商業(yè)化的東西。
JAVA API 的使用
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
KafKaConsumer
public class KafkaConsumerDemo extends Thread {
private final KafkaConsumer kafkaConsumer;
public KafkaConsumerDemo(String topic) {
// 設置屬性
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092");
// 消費組
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 間隔時間
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// 從最早的開始
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaConsumer = new KafkaConsumer(properties);
kafkaConsumer.subscribe(Collections.singletonList(topic));
}
@Override
public void run() {
while (true) {
ConsumerRecords<Integer, String> consumerRecords = kafkaConsumer.poll(1000); // 超時時間
consumerRecords.forEach(consumerRecord -> {
System.err.println("[message receive]:" + consumerRecord.value());
kafkaConsumer.commitSync();
});
}
}
public static void main(String[] args) {
new KafkaConsumerDemo("test").start();
}
}
KafkaProducer
public class KafkaProducerDemo extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final boolean isAsync;
public KafkaProducerDemo(String topic, boolean isAsync) {
// 設置屬性
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
properties.put(ProducerConfig.ACKS_CONFIG, "-1");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<Integer, String>(properties);
this.topic = topic;
this.isAsync = isAsync;
}
@Override
public void run() {
int num = 0;
while (num < 50) {
String message = "message_" + num;
System.err.println("[producer message]:" + message);
if (isAsync) { // 異步發(fā)送
producer.send(new ProducerRecord<>(topic, message), (recordMetadata, e) -> {
if (recordMetadata != null) {
System.err.println("[async-offset]:" + recordMetadata.offset() +
"->[partition]:" + recordMetadata.partition());
}
});
} else { // 同步發(fā)送 future / callable
try {
RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, message)).get();
System.err.println("[sync-offset]:" + recordMetadata.offset() +
"->[partition]:" + recordMetadata.partition());
} catch (Exception e) {
e.printStackTrace();
}
}
num++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new KafkaProducerDemo("test", true).start();
}
}
-
Kafka 有兩種 API
- 封裝得比較好的
- 非常靈活的
Kafka 的配置信息非常地詳細

配置信息分析
發(fā)送端的可選配置信息分析
ackacks
acks 配置表示 producer 發(fā)送消息到 broker 上以后的確認值。有三個可選項
- 0:表示 producer 不需要等待 broker 的消息確認。這個選項時延最小但同時風險最大(因為當 server 宕機時,數(shù)據(jù)將會丟失)。
- 1:表示 producer 只需要獲得 kafka 集群中的 leader 節(jié)點確認即可,這個選擇時延較小同時確保了 leader 節(jié)點確認接收成功。
- all(-1):需要 ISR 中所有的 Replica 給予接收確認(需要集群中的所有節(jié)點確認),速度最慢,安全性最高,但是由于 ISR 可能會縮小到僅包含一個 Replica ,所以設置參數(shù)為 all 并不能一定避免數(shù)據(jù)丟失。
batch.size
生產(chǎn)者發(fā)送多個消息到 `broker` 上的同一個 **分區(qū)** 時,為了減少網(wǎng)絡請求帶來的性能開銷,通過批量的方式來提交消息,可以通過這個參數(shù)來控制批量提交的字節(jié)數(shù)大小,默認大小是 16384byte , 也就是 16kb ,意味著當一批消息大小達到指定的 `batch.size` 的時候會統(tǒng)一發(fā)送。
linger.ms
**Producer** 默認會把兩次發(fā)送時間間隔內(nèi)收集到的所有 Requests 進行一次聚合然后再發(fā)送,以此提高吞吐量,而 `linger.ms` 就是為每次發(fā)送到 broker 的請求增加一些 delay,以此來聚合更多的 Message 請求。 這個有點像TCP里面的 Nagle 算法,在 TCP 協(xié)議的傳輸中,為了減少大量小數(shù)據(jù)包的發(fā)送,采用了 Nagle 算法,也就是基于小包的等-停協(xié)議。
-
batch.size和linger.ms這兩個參數(shù)是kafka性能優(yōu)化的關鍵參數(shù),很多同學會發(fā)現(xiàn) batch.size 和 linger.ms 這兩者的作用是一樣的,如果兩個都配置了,那么怎么工作的呢?實際上,當二者都配置的時候,只要滿足其中一個要求,就會發(fā)送請求到 broker 上。
max.request.size
設置請求的數(shù)據(jù)的最大字節(jié)數(shù),為了防止發(fā)生較大的數(shù)據(jù)包影響到吞吐量,默認值為1MB。
還有重試次數(shù)等
消息的同步發(fā)送和異步發(fā)送:
Kafka 1.0 以后,默認的發(fā)送方式都是異步發(fā)送消息。
我們的消息通過 **Kafka producer** 去 `send` 以后,這個消息實際上是放到一個后臺的發(fā)送隊列里邊,然后通過一個后臺的線程,通過不斷地從后代的發(fā)送隊列中不斷地取出消息進行發(fā)送。發(fā)送以后,進行調(diào)用回調(diào)方法。就是 `Callback` 方法進行執(zhí)行。
同步發(fā)送就是用的 future 的時候去阻塞。獲得發(fā)送信息是進行阻塞。
消費端的可選配置分析
group.id
`consumer group` 是 kafka 提供的可擴展且具有容錯性的消費者機制。既然是一個組,那么組內(nèi)必然可以有多個消費者或消費者實例 ( consumer instance ) ,它們共享一個公共的 ID ,即 group ID 。組內(nèi)的所有消費者協(xié)調(diào)在一起來消費訂閱主題 ( subscribed topics ) 的所有分區(qū) ( partition ) 。當然,每個分區(qū)只能由同一個消費組內(nèi)的一個 consumer 來消費。如下圖所示,分別有三個消費者,屬于兩個不同的 group ,那么對于 firstTopic 這個 topic 來說,這兩個組的消費者都能同時消費這個 topic 中的消息,對于此時的架構(gòu)來說,這個 firstTopic 就類似于 ActiveMQ 中的 topic 概念。如右圖所示,如果3個消費者都屬于同一個 group ,那么此時 firstTopic 就是一個 Queue 的概念。
- 不同的組可以同時消費同一條消息。

- 同一個組只能有一個 Consumer 能夠拿到消息

enable.auto.commit
ENABLE_AUTO_COMMIT_CONFIG 可以設置成 false 。
消費者消費消息以后自動提交,只有當消息提交以后,該消息才不會被再次接收到,還可以配合 `auto.commit.interval.ms` 控制自動提交的頻率。 每一段時間內(nèi)的消息批量地確認提交。
當然,我們也可以通過 `consumer.commitSync()` 的方式實現(xiàn)手動提交
{
// 異步 Commit
kafkaConsumer.commitAsync();
// 同步 commit
kafkaConsumer.commitSync();
//
}
// 可以選擇不同的 Topic
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, long timeoutMs) {
// ...
}
// 可以選擇不同的回調(diào)接口 callback 接口
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
// ...
}
auto.offset.reset
這個參數(shù)是針對新的 `groupid` 中的消費者而言的,當有新 `groupid` 的消費者來消費指定的 `topic` 時,對于該參數(shù)的配置,會有不同的語義
-
auto.offset.reset=latest情況下,新的消費者將會從其他消費者最后消費的 offset 處開始消費 Topic 下的消息。 -
auto.offset.reset= earliest情況下,新的消費者會從該 topic 最早的消息開始消費,(對于新的 groupId 來說,重置 offset )。 -
auto.offset.reset=none情況下,新的消費者加入以后,由于之前不存在offset,則會直接拋出異常。
沒有消費組,就會拋出異常。
max.poll.records
此設置限制每次調(diào)用 poll 返回的消息數(shù),這樣可以更容易地預測每次 poll 間隔要處理的最大值。通過調(diào)整此值,可以減少 poll 間隔。
Kafka 工具:

消息都會寫到磁盤上,只要磁盤上這條消息可以存在,那么我換不同的 `GroupId` ,那么就可以一直去消費這條消息。
Spring -kafka 集成
Spring 整合 Kafka 實現(xiàn)注冊成功以后去設置抽獎名額(贈送一個抽獎機會)。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
# 定義應用的名稱
spring.application.name=spring-cloud-stream-kafka
# 配置 Web 服務端口
server.port=8080
# 失效管理安全
management.security.enabled=false
# 配置需要的 kafka 主題
kafka.topic.test=test
kafka.topic.darian=darian
# 配置 kafka 的 zookeeper 的節(jié)點
#spring.cloud.stream.kafka.binder.zk-nodes=192.168.136.128:2181
spring.cloud.stream.kafka.streams.binder.configuration.zk-nodes=192.168.40.128:2181
# 配置 Spring Kafka 配置信息
spring.kafka.bootstrap-servers=192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092
# Kafka 生產(chǎn)者配置
spring.kafka.producer.bootstrap-servers=192.168.40.128:9092
spring.kafka.producer.key-deserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# kafka 消費者配置
spring.kafka.consumer.group-id=darian-1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
KafkaConsumerListener
@Component
public class KafkaConsumerListener {
@KafkaListener(topics = "${kafka.topic.test}")
public void onMessageTest(String message) {
System.out.println("Kafka test 消費者監(jiān)聽器,接收到消息:" + message);
}
@KafkaListener(topics = "${kafka.topic.darian}")
public void onMessageDarian(String message) {
System.out.println("Kafka darian消費者監(jiān)聽器,接收到消息:" + message);
}
}
KafkaProducerController
@RequiredArgsConstructor
@RestController
public class KafkaProducerController {
private final KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/message/send/{topic}")
public TopicMessage sendMessage(
@PathVariable String topic,
@RequestParam String message) {
if ((!"darian".equals(topic)) && (!"test".equals(topic))) {
return new TopicMessage(topic, message, false, "topic【" + topic + "】 不存在");
}
ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, message);
return new TopicMessage(topic, message, true, "success");
}
@AllArgsConstructor
@Data
public static class TopicMessage {
private String topic;
private String message;
private boolean send;
private String errorMessage;
}
}

