kafka sals/scram 單機版安裝步驟

ubuntu版本

java安裝

  1. sudo apt-get update
  2. sudo apt-get install openjdk-8-jdk
  3. java --version

zookeeper安裝

  1. 下載
    wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

  2. 解壓縮
    tar -zxvf zookeeper-3.4.6.tar.gz

  3. 進入目錄 zookeeper-3.4.6/conf,將 zoo_sample.cfg重命名為 zoo.cfg
    cp zoo_sample.cfg zoo.cfg

  4. 啟動zk
    bin/zkServer.sh start

  5. 查看zk啟動狀態(tài)
    bin/zkServer.sh status

kafka安裝

  1. 下載
    wget https://archive.apache.org/dist/kafka/2.4.1/kafka_2.11-2.4.1.tgz

  2. 解壓縮
    tar -zxvf kafka_2.11-2.4.1.tgz

  3. 進入 kafka_2.11-2.4.1 啟動kafka
    bin/kafka-server-start.sh config/server.properties
    后臺啟動:nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

  4. 正常kafka能啟動,如果kafka報錯如下

  Caused by: java.net.UnknownHostException: xxx: Name or service not known

修改 config/server.properties 加上配置 listeners=PLAINTEXT://10.0.0.3:8090

  1. 發(fā)送生產消息
    ./bin/kafka-console-producer.sh --broker-list 10.0.0.3:8090 --topic mytest --producer.config config/producer.properties

  2. 接收生產消息
    ./bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.3:8090 --topic mytest --from-beginning --consumer.config config/consumer.properties

  3. 遠程外網(wǎng)連接使用
    以上配置,用外網(wǎng)進行連接的時候,會報超時

Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic XXX_TOPIC not present in metadata after 30000 ms.

Connection to node 0 (/10.0.0.3:8090) could not be established. Broker may not be available.
[Producer clientId=producer-1] Connection to node 0 (/10.0.0.3:8090) could not be established. Broker may not be available.

增加以下配置即可: advertised.listeners=PLAINTEXT://xx_外網(wǎng)ip地址:8090

sals/scram配置

  1. 修改 kafka_2.11-2.4.1/config 下的 server.properties 文件
# 開啟 sasl認證,如果要開啟ACL,則要加上下面一段配置,是針對topic進行權限控制
listeners=SASL_PLAINTEXT://10.0.0.3:8090
advertised.listeners=SASL_PLAINTEXT://xx_外網(wǎng)ip地址:8090
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512
# false 只有配置了用戶能訪問 ; true 所有用戶都能訪問,只有部分不能訪問,針對黑名單
allow.everyone.if.no.acl.found=false
super.users=User:admin


# ACL相關配置,配置了開啟針對topic/用戶 級別的讀寫控制,老版本
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  1. 在 config 目錄下增加 kafka_server_jaas.conf 文件(名字自定義即可),文件里面的配置的內容
KafkaServer{
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-sec"
};

KafkaServer的意思是指,kafka broker 之間通訊的賬號和密碼

  1. 將賬號信息,通過zookeeper的方式進行配置,2中 admin 賬號也是通過這種方式配置
# 配置admin
./bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --add-config 'SCRAM-SHA-512=[password=admin-sec]' --entity-type users --entity-name admin
# 配置生產者賬號 producer
./bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --add-config 'SCRAM-SHA-512=[password=ptest]' --entity-type users --entity-name ptest
# 配置消費者賬號 consumer
./bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --add-config 'SCRAM-SHA-512=[password=ctest]' --entity-type users --entity-name ctest



#如果開啟了acl認證,則需要針對topic進行授權,以下是對 ptest 賬號授權
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:ptest --operation Write --topic mytes
# ctest
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:ctest --operation Read --topic mytest
#授權消費組隊topic的讀權限
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:ctest --operation Read --group test-consumer-group
  1. 配置kafka啟動加載的賬號信息,修改 bin/kafka-server-start.sh文件,在配置文件中加上以下配置,在有效配置的最上面貼上就可以
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.11-2.4.1/config/kafka_server_jaas.conf"
  1. Java客戶端測試
#增加配置
Map<String, Object> props = new HashMap<>();
props.putxxx ...
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","SCRAM-SHA-512");
props.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username='ctest' password='ctest'");

  1. 如果要在服務器上測試,則需要做以下配置
# 生產者 - 修改配置 config/producer.properties 增加以下配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="ptest" \
    password="ptest";

# 生產者啟動命令
./bin/kafka-console-producer.sh --broker-list 10.0.0.3:8090 --topic mytest --producer.config config/producer.properties

# 消費者 - 修改配置 config/consumer.properties 增加以下配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="ctest" \
password="ctest";

# 消費者啟動命令
./bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.3:8090 --topic mytest --from-beginning --consumer.config config/consumer.properties

問題及概念理解

1 .listeners與 advertised.listeners 的區(qū)別
listeners: 定義了kafka服務器內部監(jiān)聽的地址和端口
advertised.listeners 定義了向客戶端公開的地址和端口,通過zk進行數(shù)據(jù)共享,它會保存在zk中 /brokers/ids/0 的endpoints中 。
(使用場景:當公網(wǎng)ip不是服務器網(wǎng)卡,而是通過代理綁定,無法通過listener進行綁定,只能通過0.0.0.0綁定,當外部需要訪問kafka集群時,通過zk拿所有的broker節(jié)點的公網(wǎng)地址進行訪問)

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容