最近在HDP2.6的環(huán)境里嘗試了Kerberos,在各組件運(yùn)行正常的情況下最終成功運(yùn)行spark-streaming應(yīng)用,總結(jié)一下就是一葉障目,不見(jiàn)泰山,坑多梯子少。尤其在國(guó)內(nèi),關(guān)于Kerberos的資料較少,但在生產(chǎn)環(huán)境中,Kerberos又是如鯁在喉,無(wú)法忽視。
因此分享這篇文章,希望能給還在苦苦爬坑的小伙伴們一點(diǎn)幫助。
- 我們的HDP為單用戶(hù)ocsp安裝,多用戶(hù)需要根據(jù)以下步驟進(jìn)行細(xì)微修改
確認(rèn)OCSP各組件的Kerberos工作正常
1. Kafka
使用kafka-topics.sh創(chuàng)建topic
使用kafka producer和consumer需要先kinit
kinit -kt /etc/security/keytabs/kafka.service.keytab ocsp/host-10-1-236-122@ASIAINFO.COM-
使用producer發(fā)送消息,consumer消費(fèi)消息
kafka producer
/usr/hdp/2.6.0.3-8/kafka/bin/kafka-console-producer.sh --topic kerin --broker-list host-10-1-236-122:6667 --security-protocol PLAINTEXTSASLkafka consumer
/usr/hdp/2.6.0.3-8/kafka/bin/kafka-console-consumer.sh --topic kerin --security-protocol PLAINTEXTSASL --bootstrap-server host-10-1-236-122:6667
FAQ:
- 使用kafka producer和consumer需要先kinit kinit -kt /etc/security/keytabs/kafka.service.keytab ocsp/<hostname>@ASIAINFO.COM
- 否則:
-
kafka producer 報(bào)錯(cuò):
[2017-07-19 10:44:56,582] WARN Error while fetching metadata with correlation id 0 : {kertest=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
-
kafka consumer 報(bào)錯(cuò):
javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
-
2. Hive
- kinit
- 使用beeline登錄
3. Phoenix
- 使用sqlline與principal,keytab登錄
進(jìn)行Spark,Kafka針對(duì)Kerberos相關(guān)配置
1. 先放上最后提交任務(wù)的命令
spark-submit --class <classname> --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default --principal ocsp-yg@ASIAINFO.COM --keytab /etc/security/keytabs/hdfs.headless.keytab --files "/usr/OCSP/conf/kafka_client_jaas.conf,/usr/OCSP/conf/ocsp.keytab" --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --jars <your jars>,/usr/OCSP/lib/spark-kafka-0-10-connector-assembly_2.10-1.0.1.jar /usr/OCSP/lib/ocsp-core_1.6-2.1.0.jar
--principal與--keytab這兩個(gè)參數(shù)為spark需要的Kerberos認(rèn)證信息
--driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf"為driver連接kafka用到的認(rèn)證信息,因此使用本地絕對(duì)路徑
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf"為executor連接kafka用到的Kerberos認(rèn)證信息,因此使用container中的相對(duì)路徑./
jaas文件中定義了principal與keytab,由于我們使用了yarn-client模式,driver需要的文件在本地文件系統(tǒng),executor需要的文件需要我們使用--files的方式上傳,即--files "/usr/OCSP/conf/kafka_client_jaas.conf,/usr/OCSP/conf/ocsp.keytab"
有的文檔中說(shuō)--files中傳keytab文件會(huì)與spark本身的--keytab 沖突,其實(shí)是因?yàn)樗麄儗?duì)spark和kafka使用了相同的principal和keytab,在上述命令中我為了清晰起見(jiàn),讓spark使用了principal ocsp-yg@ASIAINFO.COM,keytab hdfs.headless.keytab,讓spark連接kafka時(shí)使用了principal ocsp/ASIAINFO.COM(principal其實(shí)是在jaas文件中指定的,3中詳細(xì)講jaas文件) keytab ocsp.keytab,當(dāng)spark提交任務(wù)時(shí),yarn會(huì)將--keytab后面的keytab文件與--files里的文件先后上傳,即 hdfs.headless.keytab與ocsp.keytab均會(huì)被上傳,spark與kafka各取所需,即可正常工作。當(dāng)spark與kafka要使用相同的keytab文件時(shí),比如都用ocsp.keytab,那么yarn會(huì)先后上傳兩次ocsp.keytab,在spark正使用的時(shí)候更新了keytab,造成異常退出
因此如果spark與kafka需要使用相同的keytab文件,我們只需要在--files里不要上傳keytab即可避免沖突
spark-submit --class <classname> --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default --principal ocsp@ASIAINFO.COM --keytab /etc/security/keytabs/ocsp.keytab --files "/usr/OCSP/conf/kafka_client_jaas.conf" --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --jars <your jars>,/usr/OCSP/lib/spark-kafka-0-10-connector-assembly_2.10-1.0.1.jar /usr/OCSP/lib/ocsp-core_1.6-2.1.0.jar
- 還有一個(gè)問(wèn)題是本例中drvier和executor使用了相同的kafka_client_jaas.conf,這也會(huì)造成一些問(wèn)題,3中會(huì)詳細(xì)說(shuō)明
2. 生成keytab和principal
- 在KDC Server上執(zhí)行
kadmin -p admin/admin@ASIAINFO.COM - 生成principal,principal最好使用ocsp的用戶(hù)名+domain
addprinc -randkey ocsp/ASIAINFO.COM - 生成keytab
ktadd -k /data/ocsp.keytab ocsp/ASIAINFO.COM - 將keytab文件copy到spark driver所在的機(jī)器(因?yàn)镺CSP默認(rèn)使用yarn-client模式)
3. 創(chuàng)建spark讀取kafka的jaas配置文件
- 配置文件kafka_client_jaas.conf樣例如下:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=false
useKeyTab=true
principal="ocsp@ASIAINFO.COM"
keyTab="./ocsp.keytab"
renewTicket=true
storeKey=true
serviceName="ocsp";
};
其中useTicketCache指從系統(tǒng)的cash中讀取credential信息,useKeyTab指從指定的keyTab文件讀取credential
-
principal和keytab用第二步生成的principal與keytab,注意:k?eytab的路徑
- 如果這個(gè)conf文件是給driver讀取,則我們要用keytab文件在本地的絕對(duì)路徑
- 如果這個(gè)conf文件是executor讀取,則我們要用keytab文件在container中的相對(duì)路徑,即./ocsp.keytab
- 如果為了方便起見(jiàn),drvier與executor要使用相同的jaas文件,路徑配置為./ocsp.keytab,我們需要將keytab文件copy到運(yùn)行spark-submit的當(dāng)前路徑
- 如果driver和executor要使用不同的jaas文件,則driver的jaas文件中,keytab應(yīng)為本地絕對(duì)路徑,executor的jaas文件中,keytab應(yīng)為相對(duì)路徑./
4. 配置spark1.6+kafka0.10 jar包
- 在我們的應(yīng)用中有兩部分需要修改,一個(gè)是處理之前從kafka讀取數(shù)據(jù),一個(gè)是處理結(jié)束后向kafka寫(xiě)數(shù)據(jù)
- 由于kafka0.10版本后才支持Kerberos,而官方Spark2.* 之后才適配kafka0.10,但我們目前使用HDP2.6中spark1.6與2.* 雙版本,Spark 1.6 + Kafka 0.10就需要使用HDP提供的spark-kafka-0-10-connector包,官網(wǎng)說(shuō)明如下:https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.0/bk_spark-component-guide/content/using-spark-streaming.html#spark-streaming-jar
- 通過(guò)官方maven的方式?jīng)]有添加成功,可能是網(wǎng)絡(luò)原因,因此我是從https://github.com/hortonworks-spark/skc下載源碼,本地編譯,然后添加本地jar包進(jìn)我們的項(xiàng)目
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>spark-kafka-0-10-connector-main_2.10</artifactId>
<version>1.0.1</version>
<scope>system</scope>
<systemPath>${project.basedir}/../lib/spark-kafka-0-10-connector_2.10-1.0.1.jar</systemPath>
</dependency>
5. 修改Spark讀取Kafka部分
- 需要import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
- 我們使用的DirectApi讀取kafka
KafkaUtils.createDirectStream[String, String](
SSC,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](TopicsSet, KafkaParams))
KafkaParams配置如下:
val KafkaParams = Map[String, Object]("auto.offset.reset" -> "latest"
, "key.deserializer" -> classOf[StringDeserializer]
, "value.deserializer" -> classOf[StringDeserializer]
, "security.protocol" -> "SASL_PLAINTEXT"
, "bootstrap.servers" -> "kafka-server1:6667"
, "group.id" -> "test")
6. 修改Spark寫(xiě)Kafka部分
- 寫(xiě)kafka調(diào)用的是kafka官方的庫(kù)
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>
</dependency>
- 代碼中需要import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
val props = new Properties()
props.put("bootstrap.servers", dsConf.get("metadata.broker.list", ""))
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
if (MainFrameConf.KERBEROS_ENABLE == "true"){
props.put("security.protocol","SASL_PLAINTEXT")
}
new KafkaProducer[String, String](props)