Kafka實(shí)戰(zhàn):集群SSL加密認(rèn)證和配置(最新版kafka-2.4.0)

微信公眾號(hào):大數(shù)據(jù)開(kāi)發(fā)運(yùn)維架構(gòu)

關(guān)注可了解更多大數(shù)據(jù)相關(guān)的資訊。問(wèn)題或建議,請(qǐng)公眾號(hào)留言;

如果您覺(jué)得“大數(shù)據(jù)開(kāi)發(fā)運(yùn)維架構(gòu)”對(duì)你有幫助,歡迎轉(zhuǎn)發(fā)朋友圈

從微信公眾號(hào)拷貝過(guò)來(lái),格式有些錯(cuò)亂,建議直接去公眾號(hào)閱讀


一、概述:

Kafka集群的安裝配置請(qǐng)參考我的上一篇文章:Kafka入門:集群安裝部署(最新版kafka-2.4.0)。從Kafka0.9.0.0開(kāi)始,為提高集群的安全性,Kafka社區(qū)增加了許多功能;Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三種認(rèn)證機(jī)制。

目前支持以下安全措施:

1.clients 與 brokers 認(rèn)證

????2.brokers 與 zookeeper認(rèn)證

????3.數(shù)據(jù)傳輸加密? between? brokers and clients, between brokers, or between brokers and tools using SSL

????4.授權(quán)clients read/write

認(rèn)證版本支持:

????1.SASL/GSSAPI (Kerberos) - 從0.9.0.0開(kāi)始支持

????2.SASL/PLAIN - 從?0.10.0.0開(kāi)始支持

??3.SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 -從0.10.2.0開(kāi)始支持

SSL相關(guān)知識(shí):

????1.JavaSSL認(rèn)證

? ? ? ?SSL(Secure Socket Layer安全套接層),及其繼任者傳輸層安全(Transport ;ayer Security,TLS)是為網(wǎng)絡(luò)通信提供安全及數(shù)據(jù)完整性的一種安全協(xié)議。TLS與SSL在傳輸層對(duì)網(wǎng)絡(luò)連接進(jìn)行加密。

????2.Kerberos認(rèn)證 + ACL鑒權(quán)

? ? ? Kerberos是一種網(wǎng)絡(luò)認(rèn)證協(xié)議,其設(shè)計(jì)目標(biāo)是通過(guò)密鑰系統(tǒng)為客戶機(jī)/服務(wù)器應(yīng)用程序提供強(qiáng)大的認(rèn)證服務(wù)。ACL則是在Kerberos的基礎(chǔ)上進(jìn)行的鑒權(quán)措施,一般Kerberos認(rèn)證就夠使用了。

二、SSL證書生成

??? Apache的Kafka允許client通過(guò)SSL連接。SSL默認(rèn)情況下被禁止,但可以根據(jù)需要開(kāi)啟:

? ??您可以使用Java的keytool工具來(lái)完成,Keytool 是一個(gè)Java 數(shù)據(jù)證書的管理工具 ,Keytool 將密鑰(key)和證書(certificates)存在一個(gè)稱為keystore的文件中 在keystore里,包含兩種數(shù)據(jù):

1)..密鑰實(shí)體(Key entity)——密鑰(secret key)又或者是私鑰和配對(duì)公鑰(采用非對(duì)稱加密)

????2).可信任的證書實(shí)體(trusted certificate entries)——只包含公鑰

keytool相關(guān)指令說(shuō)明:

名稱說(shuō)明

-alias別名,可自定義,這里叫kafka240

-keystore指定密鑰庫(kù)的名稱(就像數(shù)據(jù)庫(kù)一樣的證書庫(kù),可以有很多個(gè)證書,cacerts這個(gè)文件是jre自帶的, 也可以使用其它文件名字,如果沒(méi)有這個(gè)文件名字,它會(huì)創(chuàng)建這樣一個(gè))

-storepass指定密鑰庫(kù)的密碼

-keypass指定別名條目的密碼

-list顯示密鑰庫(kù)中的證書信息

-export將別名指定的證書導(dǎo)出到文件

-file參數(shù)指定導(dǎo)出到文件的文件名

-import將已簽名數(shù)字證書導(dǎo)入密鑰庫(kù)

-keypasswd修改密鑰庫(kù)中指定條目口令

-dname指定證書擁有者信息。

其中,CN=名字與姓氏/域名,OU=組織單位名稱,O=組織名稱,L=城市或區(qū)域名稱,ST=州或省份名稱,C=單位的兩字母國(guó)家代碼

-keyalg指定密鑰的算法

-validity指定創(chuàng)建的證書有效期多少天

-keysize指定密鑰長(zhǎng)度

1.Kafka集群的每個(gè)broker節(jié)點(diǎn)生成SSL密鑰和證書(每個(gè)broker節(jié)執(zhí)行)

?每個(gè)節(jié)點(diǎn)執(zhí)行一次后,集群中的每一臺(tái)機(jī)器都有一個(gè)公私密鑰對(duì)、一個(gè)標(biāo)識(shí)該機(jī)器的證書,注意這里是所有的broker節(jié)點(diǎn)都要執(zhí)行這個(gè)命令。

keytool?-keystore?server.keystore.jks?-alias?kafka240?-validity?365?-genkey

執(zhí)行下面命令時(shí),需要輸入密碼,自己記住就行,下面會(huì)需要,有一個(gè)比較重要的地方,輸入first and last name,這里我理解的有點(diǎn)不夠透徹,這里最好輸入你的主機(jī)名,確保公用名(CN)與服務(wù)器的完全限定域名(FQDN)精確相匹配。client拿CN與DNS域名進(jìn)行比較以確保它確實(shí)連接到所需的服務(wù)器,而不是惡意的服務(wù)器。

31節(jié)點(diǎn)執(zhí)行,輸入31主機(jī)名,如圖:

32節(jié)點(diǎn)執(zhí)行,輸入32主機(jī)名,如圖:

2.生成CA認(rèn)證證書(為了保證整個(gè)證書的安全性,需要使用CA進(jìn)行證書的簽名保證)

????雖然第一步生成了證書,但是證書是無(wú)標(biāo)記的,意味著攻擊者可以通過(guò)創(chuàng)建相同的證書假裝任何機(jī)器。認(rèn)證機(jī)構(gòu)(CA)負(fù)責(zé)簽發(fā)證書。認(rèn)證機(jī)構(gòu)就像發(fā)行護(hù)照的政府,政府會(huì)對(duì)每張護(hù)照蓋章,使得護(hù)照很難被偽造。其它,政府核實(shí)印章,以保證此護(hù)照是真實(shí)的。類似的,CA簽署證書,密碼保證簽署的證書在計(jì)算上很難被偽造。因此,只要CA是一個(gè)真正值得信賴的權(quán)威機(jī)構(gòu),客戶就可以很高的保證他們正在連接到真實(shí)的機(jī)器。

openssl req -new -x509 -keyout ca-key -out ca-cert -days 36

上面這個(gè)命令,可隨機(jī)在任一broker節(jié)點(diǎn)執(zhí)行,只需要執(zhí)行一次,執(zhí)行完成后生成了兩個(gè)文件cat-key、ca-cert,將這兩個(gè)文件分別拷貝到所有broker節(jié)點(diǎn)上,這樣所有的broker都有了這兩個(gè)文件。


3.通過(guò)CA證書創(chuàng)建一個(gè)客戶端端信任證書(每個(gè)broker節(jié)點(diǎn)執(zhí)行)

keytool-keystoreclient.truststore.jks-aliasCAKafka240-import-fileca-cert

4.通過(guò)CA證書創(chuàng)建一個(gè)服務(wù)端器端信任證書(每個(gè)broker節(jié)點(diǎn)執(zhí)行)

keytool-keystoreserver.truststore.jks-aliasCAKafka240-import-fileca-cert

下面就是為證書簽名

5.從密鑰庫(kù)導(dǎo)出證書服務(wù)器端證書cert-file(每個(gè)broker節(jié)點(diǎn)執(zhí)行)

keytool-keystoreserver.keystore.jks-aliaskafka240-certreq-filecert-file

6.用CA給服務(wù)器端證書進(jìn)行簽名處理(每個(gè)broker節(jié)點(diǎn)執(zhí)行)

openssl x509 -req -CA ca-cert -CAkeyca-key -incert-file -outcert-signed-days365-CAcreateserial-passin pass:123456

7.將CA證書導(dǎo)入到服務(wù)器端keystore(每個(gè)broker節(jié)點(diǎn)執(zhí)行)

keytool-keystoreserver.keystore.jks-aliasCAKafka240-import-fileca-cert

8.將已簽名的服務(wù)器證書導(dǎo)入到服務(wù)器keystore(每個(gè)broker節(jié)點(diǎn)執(zhí)行)

keytool?-keystore?server.keystore.jks?-alias?kafka240?-import-file?cert-signed

經(jīng)過(guò)以上步驟,集群的每個(gè)broker節(jié)點(diǎn)都會(huì)有以下文件:

至此服務(wù)端證書生成完畢。下面需要給kafka集群配置SSL加密認(rèn)證

三、Kafka集群配置

????在每個(gè)broker節(jié)點(diǎn)上配置,config/server.properties文件,這里只修改紅框中的配置,其他配置項(xiàng)看我上一篇文章配置即可,如圖:


注:如果設(shè)置的內(nèi)部broker的通訊協(xié)議PLAINTEXT,那么監(jiān)聽(tīng)PLAINTEXT的時(shí)候就需要作相應(yīng)的配置

? ? ? ?listeners=PLAINTEXT://host.name:port,SSL://host.name:port。


如果配置SSL之前,存在Kafka數(shù)據(jù),那么建議重新?lián)Q一個(gè)位置來(lái)存放數(shù)據(jù);如果確保之前的數(shù)據(jù)已經(jīng)沒(méi)什么用了,也可以直接刪除,然后在各個(gè)broker節(jié)點(diǎn)執(zhí)行以下命令啟動(dòng)集群:

/home/kafka/kafka_2.12-2.4.0/bin/kafka-server-start.sh?/home/kafka/kafka_2.12-2.4.0/config/server.properties?&

用liunx自帶的openssl命令來(lái)驗(yàn)證,SSL配置是否正確:

openssls_client-debug-connectsalver32.hadoop.ljs:9093-tls1

返回如下結(jié)果,則證明配置成功:

四、客戶端連接配置

?1.配置了SSL認(rèn)證的集群,通過(guò)Kafka命令連接時(shí),需要配置ssl認(rèn)證進(jìn)行連接:

Producer消費(fèi)者發(fā)送消息,先新建文件producer.properties(文件名自定義):

bootstrap.servers=10.124.164.31:9093,10.124.165.32:9093security.protocol=SSLssl.endpoint.identification.algorithm=ssl.truststore.location=/home/cuadmin/ljs/kafkaSSL/server.truststore.jksssl.truststore.password=123456ssl.keystore.password=123456ssl.keystore.location=/home/cuadmin/ljs/kafkaSSL/server.keystore.jks

發(fā)送消息命令:

kafka-console-consumer.sh--bootstrap-server10.168.192.31:9093,10.168.192.32:9093--from-beginning--topictopic1--consumer.configconsum.properties

nsumer消費(fèi)者接受消息,先新建文件comsumer.properties(文件名自定義):

security.protocol=SSLssl.endpoint.identification.algorithm=group.id=group_topic1ssl.truststore.location=/home/cuadmin/ljs/kafkaSSL/server.truststore.jksssl.truststore.password=123456ssl.keystore.password=123456ssl.keystore.location=/home/cuadmin/ljs/kafkaSSL/server.keystore.jks

消費(fèi)消息命令:

kafka-console-producer.sh--broker-list10.168.192.31:9093,10.168.192.32:9093--topictopic1--producer.configproducer.properties

2.如果客戶端需要通過(guò)Java代碼連接kafka集群,需要先生成客戶端連接從證書,跟服務(wù)端SSL證書生成類似,依次執(zhí)行以下5行命令,這里我就不再一一細(xì)說(shuō)了,比較簡(jiǎn)單,命令如下:

客戶端: 導(dǎo)出客戶端證書 生成client.keystore.jks文件(即:生成客戶端的keystore文件)keytool-keystoreclient.keystore.jks-aliaskafka240-validity365-genkey將證書文件導(dǎo)入到客戶端keystorekeytool-keystoreclient.keystore.jks-aliaskafka240-certreq-fileclient.cert-file用CA給客戶端證書進(jìn)行簽名處理opensslx509-req-CAca-cert-CAkeyca-key-inclient.cert-file-outclient.cert-signed-days365-CAcreateserial-passinpass:123456將CA證書導(dǎo)入到客戶端keystorekeytool-keystoreclient.keystore.jks-aliasCAKafka240-import-fileca-cert將已簽名的證書導(dǎo)入到客戶端keystorekeytool-keystoreclient.keystore.jks-aliaskafka240-import-fileclient.cert-signed

執(zhí)行完成后,應(yīng)該會(huì)生成以下紅框中三個(gè)文件:

拷貝兩個(gè)文件client.keystore.jks、client.truststore.jks到本地:

Producer端代碼實(shí)例:

package com.hadoop.ljs.kafka010;importorg.apache.kafka.clients.CommonClientConfigs;importorg.apache.kafka.clients.producer.Callback;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importorg.apache.kafka.common.config.SslConfigs;importjava.util.Properties;importjava.util.Random;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-23 08:58 * @version: v1.0 * @description: com.hadoop.ljs.kafka010 */publicclassKafkaSslProducer?{publicstaticfinalStringtopic="topic1";publicstaticfinalStringbootstrap_server="10.168.192.31:9093,10.168.192.32:9093";publicstaticfinalStringclient_truststore="D:\\kafkaSSL\\client.truststore.jks";publicstaticfinalStringclient_keystore="D:\\kafkaSSL\\client.keystore.jks";publicstaticfinalStringclient_ssl_password="123456";????publicstaticvoidmain(String[] args){Properties?props?=newProperties();? ? ? ? props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server);//configure the following three settings for SSL Encryptionprops.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SSL");? ? ? ? props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, client_truststore);? ? ? ? props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,? client_ssl_password);// configure the following three settings for SSL Authentication? ? ? ? props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, client_keystore);? ? ? ? props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, client_ssl_password);? ? ? ? props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, client_ssl_password);props.put(ProducerConfig.ACKS_CONFIG,"all");props.put(ProducerConfig.RETRIES_CONFIG,0);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer producer =newKafkaProducer(props);TestCallback callback =newTestCallback();Random rnd =newRandom();for(long i =0; i <=2; i++) {Stringkey="lujisenKey-"+ i;Stringvalue="lujisenMessage------------"+i;System.out.println("Send Message: "+"Key:"+key+"Value:"+value);ProducerRecord data =newProducerRecord(? ? ? ? ? ? ? ? ? ? topic, key, value);? ? ? ? ? ? producer.send(data, callback);? ? ? ? }? ? ? ? producer.close();????}privatestaticclassTestCallbackimplementsCallback {@OverridepublicvoidonCompletion(RecordMetadata recordMetadata, Exception e) {if(e !=null) {System.out.println("Error while producing message to topic :"+ recordMetadata);? ? ? ? ? ? ? ? e.printStackTrace();}else{Stringmessage =String.format("sent message to topic:%s partition:%s? offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());? ? ? ? ? ? ? ? System.out.println(message);? ? ? ? ? ? }? ? ? ? }? ? }}

Consumer端代碼實(shí)例:

packagecom.hadoop.ljs.kafka010;importorg.apache.kafka.clients.CommonClientConfigs;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRebalanceListener;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.config.SslConfigs;importjava.util.Arrays;importjava.util.Collection;importjava.util.Collections;importjava.util.Properties;/***@author: Created By lujisen*@companyChinaUnicom Software JiNan*@date: 2020-02-23 08:58*@version: v1.0*@description: com.hadoop.ljs.kafka010 */publicclassKafkaSslConsumer{publicstaticfinalString topic="topic1";publicstaticfinalString?bootstrap_server="10.168.192.31:9093,10.168.192.32:9093";publicstaticfinalString client_truststore="D:\\kafkaSSL\\client.truststore.jks";publicstaticfinalString client_keystore="D:\\kafkaSSL\\client.keystore.jks";publicstaticfinalString client_ssl_password="123456";publicstaticfinalString consumer_group="group2_topic1";publicstaticvoidmain(String[] args){Properties props =newProperties();? ? ? ? props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server);//configure the following three settings for SSL Encryptionprops.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SSL");? ? ? ? props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, client_truststore);? ? ? ? props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,? client_ssl_password);//configure the following three settings for SSL Authentication? ? ? ? props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, client_keystore);? ? ? ? props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, client_ssl_password);? ? ? ? props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, client_ssl_password);? ? ? ? props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer_group);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer consumer =newKafkaConsumer<>(props);TestConsumerRebalanceListener rebalanceListener =newTestConsumerRebalanceListener();????????consumer.subscribe(Collections.singletonList(topic),?rebalanceListener);while(true) {ConsumerRecords records = consumer.poll(1000);for(ConsumerRecord record : records) {System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());????????????}??????????? consumer.commitSync();????????}????}privatestaticclassTestConsumerRebalanceListenerimplementsConsumerRebalanceListener{@OverridepublicvoidonPartitionsRevoked(Collection<TopicPartition> partitions){System.out.println("Called onPartitionsRevoked with partitions:"+ partitions);????????}@OverridepublicvoidonPartitionsAssigned(Collection<TopicPartition> partitions){System.out.println("Called onPartitionsAssigned with partitions:"+ partitions);? ? ? ? }? ? }}

????至此整個(gè)Kafka集群的SSL加密認(rèn)證配置完成,有些地方整理的比較粗,如果問(wèn)題及時(shí)給我在公眾號(hào)留言,看到后我會(huì)及時(shí)回復(fù)?。。?/p>

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容