微信公眾號(hào):大數(shù)據(jù)開(kāi)發(fā)運(yùn)維架構(gòu)
關(guān)注可了解更多大數(shù)據(jù)相關(guān)的資訊。問(wèn)題或建議,請(qǐng)公眾號(hào)留言;
如果您覺(jué)得“大數(shù)據(jù)開(kāi)發(fā)運(yùn)維架構(gòu)”對(duì)你有幫助,歡迎轉(zhuǎn)發(fā)朋友圈
從微信公眾號(hào)拷貝過(guò)來(lái),格式有些錯(cuò)亂,建議直接去公眾號(hào)閱讀
一、概述:
Kafka集群的安裝配置請(qǐng)參考我的上一篇文章:Kafka入門:集群安裝部署(最新版kafka-2.4.0)。從Kafka0.9.0.0開(kāi)始,為提高集群的安全性,Kafka社區(qū)增加了許多功能;Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三種認(rèn)證機(jī)制。
目前支持以下安全措施:
1.clients 與 brokers 認(rèn)證
????2.brokers 與 zookeeper認(rèn)證
????3.數(shù)據(jù)傳輸加密? between? brokers and clients, between brokers, or between brokers and tools using SSL
????4.授權(quán)clients read/write
認(rèn)證版本支持:
????1.SASL/GSSAPI (Kerberos) - 從0.9.0.0開(kāi)始支持
????2.SASL/PLAIN - 從?0.10.0.0開(kāi)始支持
??3.SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 -從0.10.2.0開(kāi)始支持
SSL相關(guān)知識(shí):
????1.JavaSSL認(rèn)證
? ? ? ?SSL(Secure Socket Layer安全套接層),及其繼任者傳輸層安全(Transport ;ayer Security,TLS)是為網(wǎng)絡(luò)通信提供安全及數(shù)據(jù)完整性的一種安全協(xié)議。TLS與SSL在傳輸層對(duì)網(wǎng)絡(luò)連接進(jìn)行加密。
????2.Kerberos認(rèn)證 + ACL鑒權(quán)
? ? ? Kerberos是一種網(wǎng)絡(luò)認(rèn)證協(xié)議,其設(shè)計(jì)目標(biāo)是通過(guò)密鑰系統(tǒng)為客戶機(jī)/服務(wù)器應(yīng)用程序提供強(qiáng)大的認(rèn)證服務(wù)。ACL則是在Kerberos的基礎(chǔ)上進(jìn)行的鑒權(quán)措施,一般Kerberos認(rèn)證就夠使用了。
二、SSL證書生成
??? Apache的Kafka允許client通過(guò)SSL連接。SSL默認(rèn)情況下被禁止,但可以根據(jù)需要開(kāi)啟:
? ??您可以使用Java的keytool工具來(lái)完成,Keytool 是一個(gè)Java 數(shù)據(jù)證書的管理工具 ,Keytool 將密鑰(key)和證書(certificates)存在一個(gè)稱為keystore的文件中 在keystore里,包含兩種數(shù)據(jù):
1)..密鑰實(shí)體(Key entity)——密鑰(secret key)又或者是私鑰和配對(duì)公鑰(采用非對(duì)稱加密)
????2).可信任的證書實(shí)體(trusted certificate entries)——只包含公鑰
keytool相關(guān)指令說(shuō)明:
名稱說(shuō)明
-alias別名,可自定義,這里叫kafka240
-keystore指定密鑰庫(kù)的名稱(就像數(shù)據(jù)庫(kù)一樣的證書庫(kù),可以有很多個(gè)證書,cacerts這個(gè)文件是jre自帶的, 也可以使用其它文件名字,如果沒(méi)有這個(gè)文件名字,它會(huì)創(chuàng)建這樣一個(gè))
-storepass指定密鑰庫(kù)的密碼
-keypass指定別名條目的密碼
-list顯示密鑰庫(kù)中的證書信息
-export將別名指定的證書導(dǎo)出到文件
-file參數(shù)指定導(dǎo)出到文件的文件名
-import將已簽名數(shù)字證書導(dǎo)入密鑰庫(kù)
-keypasswd修改密鑰庫(kù)中指定條目口令
-dname指定證書擁有者信息。
其中,CN=名字與姓氏/域名,OU=組織單位名稱,O=組織名稱,L=城市或區(qū)域名稱,ST=州或省份名稱,C=單位的兩字母國(guó)家代碼
-keyalg指定密鑰的算法
-validity指定創(chuàng)建的證書有效期多少天
-keysize指定密鑰長(zhǎng)度
1.Kafka集群的每個(gè)broker節(jié)點(diǎn)生成SSL密鑰和證書(每個(gè)broker節(jié)執(zhí)行)
?每個(gè)節(jié)點(diǎn)執(zhí)行一次后,集群中的每一臺(tái)機(jī)器都有一個(gè)公私密鑰對(duì)、一個(gè)標(biāo)識(shí)該機(jī)器的證書,注意這里是所有的broker節(jié)點(diǎn)都要執(zhí)行這個(gè)命令。
keytool?-keystore?server.keystore.jks?-alias?kafka240?-validity?365?-genkey
執(zhí)行下面命令時(shí),需要輸入密碼,自己記住就行,下面會(huì)需要,有一個(gè)比較重要的地方,輸入first and last name,這里我理解的有點(diǎn)不夠透徹,這里最好輸入你的主機(jī)名,確保公用名(CN)與服務(wù)器的完全限定域名(FQDN)精確相匹配。client拿CN與DNS域名進(jìn)行比較以確保它確實(shí)連接到所需的服務(wù)器,而不是惡意的服務(wù)器。
31節(jié)點(diǎn)執(zhí)行,輸入31主機(jī)名,如圖:
32節(jié)點(diǎn)執(zhí)行,輸入32主機(jī)名,如圖:
2.生成CA認(rèn)證證書(為了保證整個(gè)證書的安全性,需要使用CA進(jìn)行證書的簽名保證)
????雖然第一步生成了證書,但是證書是無(wú)標(biāo)記的,意味著攻擊者可以通過(guò)創(chuàng)建相同的證書假裝任何機(jī)器。認(rèn)證機(jī)構(gòu)(CA)負(fù)責(zé)簽發(fā)證書。認(rèn)證機(jī)構(gòu)就像發(fā)行護(hù)照的政府,政府會(huì)對(duì)每張護(hù)照蓋章,使得護(hù)照很難被偽造。其它,政府核實(shí)印章,以保證此護(hù)照是真實(shí)的。類似的,CA簽署證書,密碼保證簽署的證書在計(jì)算上很難被偽造。因此,只要CA是一個(gè)真正值得信賴的權(quán)威機(jī)構(gòu),客戶就可以很高的保證他們正在連接到真實(shí)的機(jī)器。
openssl req -new -x509 -keyout ca-key -out ca-cert -days 36
上面這個(gè)命令,可隨機(jī)在任一broker節(jié)點(diǎn)執(zhí)行,只需要執(zhí)行一次,執(zhí)行完成后生成了兩個(gè)文件cat-key、ca-cert,將這兩個(gè)文件分別拷貝到所有broker節(jié)點(diǎn)上,這樣所有的broker都有了這兩個(gè)文件。
3.通過(guò)CA證書創(chuàng)建一個(gè)客戶端端信任證書(每個(gè)broker節(jié)點(diǎn)執(zhí)行)
keytool-keystoreclient.truststore.jks-aliasCAKafka240-import-fileca-cert
4.通過(guò)CA證書創(chuàng)建一個(gè)服務(wù)端器端信任證書(每個(gè)broker節(jié)點(diǎn)執(zhí)行)
keytool-keystoreserver.truststore.jks-aliasCAKafka240-import-fileca-cert
下面就是為證書簽名
5.從密鑰庫(kù)導(dǎo)出證書服務(wù)器端證書cert-file(每個(gè)broker節(jié)點(diǎn)執(zhí)行)
keytool-keystoreserver.keystore.jks-aliaskafka240-certreq-filecert-file
6.用CA給服務(wù)器端證書進(jìn)行簽名處理(每個(gè)broker節(jié)點(diǎn)執(zhí)行)
openssl x509 -req -CA ca-cert -CAkeyca-key -incert-file -outcert-signed-days365-CAcreateserial-passin pass:123456
7.將CA證書導(dǎo)入到服務(wù)器端keystore(每個(gè)broker節(jié)點(diǎn)執(zhí)行)
keytool-keystoreserver.keystore.jks-aliasCAKafka240-import-fileca-cert
8.將已簽名的服務(wù)器證書導(dǎo)入到服務(wù)器keystore(每個(gè)broker節(jié)點(diǎn)執(zhí)行)
keytool?-keystore?server.keystore.jks?-alias?kafka240?-import-file?cert-signed
經(jīng)過(guò)以上步驟,集群的每個(gè)broker節(jié)點(diǎn)都會(huì)有以下文件:
至此服務(wù)端證書生成完畢。下面需要給kafka集群配置SSL加密認(rèn)證
三、Kafka集群配置
????在每個(gè)broker節(jié)點(diǎn)上配置,config/server.properties文件,這里只修改紅框中的配置,其他配置項(xiàng)看我上一篇文章配置即可,如圖:
注:如果設(shè)置的內(nèi)部broker的通訊協(xié)議PLAINTEXT,那么監(jiān)聽(tīng)PLAINTEXT的時(shí)候就需要作相應(yīng)的配置
? ? ? ?listeners=PLAINTEXT://host.name:port,SSL://host.name:port。
如果配置SSL之前,存在Kafka數(shù)據(jù),那么建議重新?lián)Q一個(gè)位置來(lái)存放數(shù)據(jù);如果確保之前的數(shù)據(jù)已經(jīng)沒(méi)什么用了,也可以直接刪除,然后在各個(gè)broker節(jié)點(diǎn)執(zhí)行以下命令啟動(dòng)集群:
/home/kafka/kafka_2.12-2.4.0/bin/kafka-server-start.sh?/home/kafka/kafka_2.12-2.4.0/config/server.properties?&
用liunx自帶的openssl命令來(lái)驗(yàn)證,SSL配置是否正確:
openssls_client-debug-connectsalver32.hadoop.ljs:9093-tls1
返回如下結(jié)果,則證明配置成功:
四、客戶端連接配置
?1.配置了SSL認(rèn)證的集群,通過(guò)Kafka命令連接時(shí),需要配置ssl認(rèn)證進(jìn)行連接:
Producer消費(fèi)者發(fā)送消息,先新建文件producer.properties(文件名自定義):
bootstrap.servers=10.124.164.31:9093,10.124.165.32:9093security.protocol=SSLssl.endpoint.identification.algorithm=ssl.truststore.location=/home/cuadmin/ljs/kafkaSSL/server.truststore.jksssl.truststore.password=123456ssl.keystore.password=123456ssl.keystore.location=/home/cuadmin/ljs/kafkaSSL/server.keystore.jks
發(fā)送消息命令:
kafka-console-consumer.sh--bootstrap-server10.168.192.31:9093,10.168.192.32:9093--from-beginning--topictopic1--consumer.configconsum.properties
nsumer消費(fèi)者接受消息,先新建文件comsumer.properties(文件名自定義):
security.protocol=SSLssl.endpoint.identification.algorithm=group.id=group_topic1ssl.truststore.location=/home/cuadmin/ljs/kafkaSSL/server.truststore.jksssl.truststore.password=123456ssl.keystore.password=123456ssl.keystore.location=/home/cuadmin/ljs/kafkaSSL/server.keystore.jks
消費(fèi)消息命令:
kafka-console-producer.sh--broker-list10.168.192.31:9093,10.168.192.32:9093--topictopic1--producer.configproducer.properties
2.如果客戶端需要通過(guò)Java代碼連接kafka集群,需要先生成客戶端連接從證書,跟服務(wù)端SSL證書生成類似,依次執(zhí)行以下5行命令,這里我就不再一一細(xì)說(shuō)了,比較簡(jiǎn)單,命令如下:
客戶端: 導(dǎo)出客戶端證書 生成client.keystore.jks文件(即:生成客戶端的keystore文件)keytool-keystoreclient.keystore.jks-aliaskafka240-validity365-genkey將證書文件導(dǎo)入到客戶端keystorekeytool-keystoreclient.keystore.jks-aliaskafka240-certreq-fileclient.cert-file用CA給客戶端證書進(jìn)行簽名處理opensslx509-req-CAca-cert-CAkeyca-key-inclient.cert-file-outclient.cert-signed-days365-CAcreateserial-passinpass:123456將CA證書導(dǎo)入到客戶端keystorekeytool-keystoreclient.keystore.jks-aliasCAKafka240-import-fileca-cert將已簽名的證書導(dǎo)入到客戶端keystorekeytool-keystoreclient.keystore.jks-aliaskafka240-import-fileclient.cert-signed
執(zhí)行完成后,應(yīng)該會(huì)生成以下紅框中三個(gè)文件:
拷貝兩個(gè)文件client.keystore.jks、client.truststore.jks到本地:
Producer端代碼實(shí)例:
package com.hadoop.ljs.kafka010;importorg.apache.kafka.clients.CommonClientConfigs;importorg.apache.kafka.clients.producer.Callback;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importorg.apache.kafka.common.config.SslConfigs;importjava.util.Properties;importjava.util.Random;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-23 08:58 * @version: v1.0 * @description: com.hadoop.ljs.kafka010 */publicclassKafkaSslProducer?{publicstaticfinalStringtopic="topic1";publicstaticfinalStringbootstrap_server="10.168.192.31:9093,10.168.192.32:9093";publicstaticfinalStringclient_truststore="D:\\kafkaSSL\\client.truststore.jks";publicstaticfinalStringclient_keystore="D:\\kafkaSSL\\client.keystore.jks";publicstaticfinalStringclient_ssl_password="123456";????publicstaticvoidmain(String[] args){Properties?props?=newProperties();? ? ? ? props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server);//configure the following three settings for SSL Encryptionprops.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SSL");? ? ? ? props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, client_truststore);? ? ? ? props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,? client_ssl_password);// configure the following three settings for SSL Authentication? ? ? ? props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, client_keystore);? ? ? ? props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, client_ssl_password);? ? ? ? props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, client_ssl_password);props.put(ProducerConfig.ACKS_CONFIG,"all");props.put(ProducerConfig.RETRIES_CONFIG,0);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer producer =newKafkaProducer(props);TestCallback callback =newTestCallback();Random rnd =newRandom();for(long i =0; i <=2; i++) {Stringkey="lujisenKey-"+ i;Stringvalue="lujisenMessage------------"+i;System.out.println("Send Message: "+"Key:"+key+"Value:"+value);ProducerRecord data =newProducerRecord(? ? ? ? ? ? ? ? ? ? topic, key, value);? ? ? ? ? ? producer.send(data, callback);? ? ? ? }? ? ? ? producer.close();????}privatestaticclassTestCallbackimplementsCallback {@OverridepublicvoidonCompletion(RecordMetadata recordMetadata, Exception e) {if(e !=null) {System.out.println("Error while producing message to topic :"+ recordMetadata);? ? ? ? ? ? ? ? e.printStackTrace();}else{Stringmessage =String.format("sent message to topic:%s partition:%s? offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());? ? ? ? ? ? ? ? System.out.println(message);? ? ? ? ? ? }? ? ? ? }? ? }}
Consumer端代碼實(shí)例:
packagecom.hadoop.ljs.kafka010;importorg.apache.kafka.clients.CommonClientConfigs;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRebalanceListener;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.config.SslConfigs;importjava.util.Arrays;importjava.util.Collection;importjava.util.Collections;importjava.util.Properties;/***@author: Created By lujisen*@companyChinaUnicom Software JiNan*@date: 2020-02-23 08:58*@version: v1.0*@description: com.hadoop.ljs.kafka010 */publicclassKafkaSslConsumer{publicstaticfinalString topic="topic1";publicstaticfinalString?bootstrap_server="10.168.192.31:9093,10.168.192.32:9093";publicstaticfinalString client_truststore="D:\\kafkaSSL\\client.truststore.jks";publicstaticfinalString client_keystore="D:\\kafkaSSL\\client.keystore.jks";publicstaticfinalString client_ssl_password="123456";publicstaticfinalString consumer_group="group2_topic1";publicstaticvoidmain(String[] args){Properties props =newProperties();? ? ? ? props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server);//configure the following three settings for SSL Encryptionprops.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SSL");? ? ? ? props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, client_truststore);? ? ? ? props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,? client_ssl_password);//configure the following three settings for SSL Authentication? ? ? ? props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, client_keystore);? ? ? ? props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, client_ssl_password);? ? ? ? props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, client_ssl_password);? ? ? ? props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer_group);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer consumer =newKafkaConsumer<>(props);TestConsumerRebalanceListener rebalanceListener =newTestConsumerRebalanceListener();????????consumer.subscribe(Collections.singletonList(topic),?rebalanceListener);while(true) {ConsumerRecords records = consumer.poll(1000);for(ConsumerRecord record : records) {System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());????????????}??????????? consumer.commitSync();????????}????}privatestaticclassTestConsumerRebalanceListenerimplementsConsumerRebalanceListener{@OverridepublicvoidonPartitionsRevoked(Collection<TopicPartition> partitions){System.out.println("Called onPartitionsRevoked with partitions:"+ partitions);????????}@OverridepublicvoidonPartitionsAssigned(Collection<TopicPartition> partitions){System.out.println("Called onPartitionsAssigned with partitions:"+ partitions);? ? ? ? }? ? }}
????至此整個(gè)Kafka集群的SSL加密認(rèn)證配置完成,有些地方整理的比較粗,如果問(wèn)題及時(shí)給我在公眾號(hào)留言,看到后我會(huì)及時(shí)回復(fù)?。。?/p>