配置kafka server端(每個broker)
編輯原有配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/server.properties
listeners=SASL_PLAINTEXT://192.168.43.209:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:root
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
創(chuàng)建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf
KafkaServer{
???????org.apache.kafka.common.security.plain.PlainLoginModule required
??????? username="kafka"
??????? password="kafkapswd"
??????? user_ kafkaa(用戶名)="kafkaapswd"(密碼)
??????? user_ kafkab(用戶名)=" kafkabpswd"(密碼)
user_ kafkac(用戶名)=" kafkacpswd"(密碼)
user_ kafkad(用戶名)=" kafkadpswd"(密碼);
};
修改執(zhí)行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-server-start.sh
if ["x$KAFKA_OPTS" ]; then
??? export KAFKA_OPTS="-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf"
fi
修改執(zhí)行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-run-class.sh
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf'
if ["x$DAEMON_MODE" = "xtrue" ]; then
? nohup $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS?$KAFKA_SASL_OPTS? $KAFKA_JMX_OPTS$KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" >"$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
? exec $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS?$KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH$KAFKA_OPTS "$@"
fi
配置kafka client端
創(chuàng)建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf
KafkaClient{
???????org.apache.kafka.common.security.plain.PlainLoginModule required
??????? username=" kafkaa"
??????? password=" kafkaapswd";
};
修改執(zhí)行文件
vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh
vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-producer.sh
if ["x$KAFKA_OPTS" ]; then
??? export KAFKA_OPTS="-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf"
fi
Java客戶端消費
運行jar包的服務器的指定路徑下必須有kafka_client_jaas.conf文件
在程序中添加如下配置
System.setProperty("java.security.auth.login.config","xx/kafka_client_jaas.conf");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
部署過程中遇到的問題及解決方法
問題描述:發(fā)布消息、訂閱消息時,出現(xiàn)如下錯誤,WARN [Consumer clientId=consumer-1, groupId=console-consumer-20752]Error while fetching metadata with correlation id 2 :{test2=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
解決方法:各客戶端的用戶名設置為相同,多個客戶端同時管理會產(chǎn)生沖突。