Kafka 認證機制
自 0.9.0.0 版本開始,Kafka 正式引入了認證機制,用于實現(xiàn)基礎(chǔ)的安全用戶認證,這是將 Kafka 上云或進行多租戶管理的必要步驟。截止到當前最新的 2.3 版本,Kafka 支持基于 SSL 和基于 SASL 的安全認證機制。
基于 SSL 的認證主要是指 Broker 和客戶端的雙路認證(2-way authentication)。通常來說,SSL 加密(Encryption)已經(jīng)啟用了單向認證,即客戶端認證 Broker 的證書(Certificate)。如果要做 SSL 認證,那么我們要啟用雙路認證,也就是說 Broker 也要認證客戶端的證書。
kafka 還支持通過 SASL 做客戶端認證。SASL 是提供認證和數(shù)據(jù)安全服務(wù)的框架。Kafka 支持的 SASL 機制有 5 種,它們分別是在不同版本中被引入的,你需要根據(jù)你自己使用的 Kafka 版本,來選擇該版本所支持的認證機制。

建議:你可以使用 SSL 來做通信加密,使用 SASL 來做 Kafka 的認證實現(xiàn)。
SASL/SCRAM-SHA-256 配置實例
第 1 步:創(chuàng)建用戶
配置 SASL/SCRAM 的第一步,是創(chuàng)建能否連接 Kafka 集群的用戶。在本次測試中,我會創(chuàng)建 3 個用戶,分別是 admin 用戶、writer 用戶和 reader 用戶。admin 用戶用于實現(xiàn) Broker 間通信,writer 用戶用于生產(chǎn)消息,reader 用戶用于消費消息。
kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin
kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=writer],SCRAM-SHA-512=[password=writer]' --entity-type users --entity-name writer
kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=reader],SCRAM-SHA-512=[password=reader]' --entity-type users --entity-name reader
查看用戶:
kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name writer
刪除用戶:
kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name writer
第 2 步:創(chuàng)建 JAAS 文件
配置了用戶之后,我們需要為每個 Broker 創(chuàng)建一個對應(yīng)的 JAAS 文件。因為本例中的只有一個 Broker 在一臺機器上,所以我只創(chuàng)建了一份 JAAS 文件。但是你要切記,在實際場景中,你需要為每臺單獨的物理 Broker 機器都創(chuàng)建一份 JAAS 文件。
JAAS 的文件內(nèi)容如下:
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
關(guān)于這個文件內(nèi)容,你需要注意以下兩點:
- 不要忘記最后一行和倒數(shù)第二行結(jié)尾處的分號;
- JAAS 文件中不需要任何空格鍵。
這里,我們使用 admin 用戶實現(xiàn) Broker 之間的通信。接下來,我們來配置 Broker 的 server.properties 文件,下面這些內(nèi)容,是需要單獨配置的:
# 認證配置
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://localhost:9092
第 1 項內(nèi)容表明開啟 SCRAM 認證機制,并啟用 SHA-256 算法;
第 2 項的意思是為 Broker 間通信也開啟 SCRAM 認證,同樣使用 SHA-256 算法;
第 3 項表示 Broker 間通信不配置 SSL,本例中我們不演示 SSL 的配置;
最后 1 項是設(shè)置 listeners 使用 SASL_PLAINTEXT,依然是不使用 SSL。
第 3 步:啟動 Broker
修改 /usr/local/kafka/bin/kafka-server-start.sh
# exec $base_dir/kafka-run-class.sh $EXTRA_ARGS com.cloudera.kafka.wrap.Kafka "$@"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf com.cloudera.kafka.wrap.Kafka "$@"
第 4 步:發(fā)送消息
先創(chuàng)建一個topic:
kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test
在創(chuàng)建好測試主題之后,我們使用 kafka-console-producer 腳本來嘗試發(fā)送消息。由于啟用了認證,客戶端需要做一些相應(yīng)的配置。我們創(chuàng)建一個名為 producer.conf 的配置文件,內(nèi)容如下:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writer";
之后運行 Console Producer 程序:
kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config <your_path>/producer.conf
>hello, world
>
第 5 步:消費消息
接下來,我們使用 Console Consumer 程序來消費一下剛剛生產(chǎn)的消息。同樣地,我們需要為 kafka-console-consumer 腳本創(chuàng)建一個名為 consumer.conf 的腳本,內(nèi)容如下:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader";
之后運行 Console Consumer 程序:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config <your_path>/consumer.conf
hello, world
第 6 步:動態(tài)增減用戶
最后,我們來演示 SASL/SCRAM 動態(tài)增減用戶的場景。假設(shè)我刪除了 writer 用戶,同時又添加了一個新用戶:new_writer,那么,我們需要執(zhí)行的命令如下:
kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name writer
Completed Updating config for entity: user-principal 'writer'.
kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name writer
Completed Updating config for entity: user-principal 'writer'.
kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=new_writer]' --entity-type users --entity-name new_writer
Completed Updating config for entity: user-principal 'new_writer'.