Spark-Streaming Kafka In Kerberos

最近在HDP2.6的環(huán)境里嘗試了Kerberos,在各組件運(yùn)行正常的情況下最終成功運(yùn)行spark-streaming應(yīng)用,總結(jié)一下就是一葉障目,不見(jiàn)泰山,坑多梯子少。尤其在國(guó)內(nèi),關(guān)于Kerberos的資料較少,但在生產(chǎn)環(huán)境中,Kerberos又是如鯁在喉,無(wú)法忽視。

因此分享這篇文章,希望能給還在苦苦爬坑的小伙伴們一點(diǎn)幫助。

  • 我們的HDP為單用戶(hù)ocsp安裝,多用戶(hù)需要根據(jù)以下步驟進(jìn)行細(xì)微修改

確認(rèn)OCSP各組件的Kerberos工作正常

1. Kafka

  • 使用kafka-topics.sh創(chuàng)建topic

  • 使用kafka producer和consumer需要先kinit
    kinit -kt /etc/security/keytabs/kafka.service.keytab ocsp/host-10-1-236-122@ASIAINFO.COM

  • 使用producer發(fā)送消息,consumer消費(fèi)消息

    • kafka producer
      /usr/hdp/2.6.0.3-8/kafka/bin/kafka-console-producer.sh --topic kerin --broker-list host-10-1-236-122:6667 --security-protocol PLAINTEXTSASL

    • kafka consumer
      /usr/hdp/2.6.0.3-8/kafka/bin/kafka-console-consumer.sh --topic kerin --security-protocol PLAINTEXTSASL --bootstrap-server host-10-1-236-122:6667

FAQ:

  • 使用kafka producer和consumer需要先kinit kinit -kt /etc/security/keytabs/kafka.service.keytab ocsp/<hostname>@ASIAINFO.COM
  • 否則:
    • kafka producer 報(bào)錯(cuò):

      [2017-07-19 10:44:56,582] WARN Error while fetching metadata with correlation id 0 : {kertest=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)

    • kafka consumer 報(bào)錯(cuò):

      javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user


2. Hive

  • kinit
  • 使用beeline登錄

3. Phoenix

  • 使用sqlline與principal,keytab登錄

進(jìn)行Spark,Kafka針對(duì)Kerberos相關(guān)配置

1. 先放上最后提交任務(wù)的命令

spark-submit  --class <classname> --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default  --principal ocsp-yg@ASIAINFO.COM --keytab /etc/security/keytabs/hdfs.headless.keytab --files "/usr/OCSP/conf/kafka_client_jaas.conf,/usr/OCSP/conf/ocsp.keytab" --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --jars <your jars>,/usr/OCSP/lib/spark-kafka-0-10-connector-assembly_2.10-1.0.1.jar /usr/OCSP/lib/ocsp-core_1.6-2.1.0.jar 
  • --principal與--keytab這兩個(gè)參數(shù)為spark需要的Kerberos認(rèn)證信息

  • --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf"為driver連接kafka用到的認(rèn)證信息,因此使用本地絕對(duì)路徑

  • --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf"為executor連接kafka用到的Kerberos認(rèn)證信息,因此使用container中的相對(duì)路徑./

  • jaas文件中定義了principal與keytab,由于我們使用了yarn-client模式,driver需要的文件在本地文件系統(tǒng),executor需要的文件需要我們使用--files的方式上傳,即--files "/usr/OCSP/conf/kafka_client_jaas.conf,/usr/OCSP/conf/ocsp.keytab"

  • 有的文檔中說(shuō)--files中傳keytab文件會(huì)與spark本身的--keytab 沖突,其實(shí)是因?yàn)樗麄儗?duì)spark和kafka使用了相同的principal和keytab,在上述命令中我為了清晰起見(jiàn),讓spark使用了principal ocsp-yg@ASIAINFO.COM,keytab hdfs.headless.keytab,讓spark連接kafka時(shí)使用了principal ocsp/ASIAINFO.COM(principal其實(shí)是在jaas文件中指定的,3中詳細(xì)講jaas文件) keytab ocsp.keytab,當(dāng)spark提交任務(wù)時(shí),yarn會(huì)將--keytab后面的keytab文件與--files里的文件先后上傳,即 hdfs.headless.keytab與ocsp.keytab均會(huì)被上傳,spark與kafka各取所需,即可正常工作。當(dāng)spark與kafka要使用相同的keytab文件時(shí),比如都用ocsp.keytab,那么yarn會(huì)先后上傳兩次ocsp.keytab,在spark正使用的時(shí)候更新了keytab,造成異常退出

  • 因此如果spark與kafka需要使用相同的keytab文件,我們只需要在--files里不要上傳keytab即可避免沖突

spark-submit  --class <classname> --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default  --principal ocsp@ASIAINFO.COM --keytab /etc/security/keytabs/ocsp.keytab --files "/usr/OCSP/conf/kafka_client_jaas.conf" --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --jars <your jars>,/usr/OCSP/lib/spark-kafka-0-10-connector-assembly_2.10-1.0.1.jar /usr/OCSP/lib/ocsp-core_1.6-2.1.0.jar 
  • 還有一個(gè)問(wèn)題是本例中drvier和executor使用了相同的kafka_client_jaas.conf,這也會(huì)造成一些問(wèn)題,3中會(huì)詳細(xì)說(shuō)明

2. 生成keytab和principal

  • 在KDC Server上執(zhí)行
    kadmin -p admin/admin@ASIAINFO.COM
  • 生成principal,principal最好使用ocsp的用戶(hù)名+domain
    addprinc -randkey ocsp/ASIAINFO.COM
  • 生成keytab
    ktadd -k /data/ocsp.keytab ocsp/ASIAINFO.COM
  • 將keytab文件copy到spark driver所在的機(jī)器(因?yàn)镺CSP默認(rèn)使用yarn-client模式)

3. 創(chuàng)建spark讀取kafka的jaas配置文件

  • 配置文件kafka_client_jaas.conf樣例如下:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  principal="ocsp@ASIAINFO.COM"
  keyTab="./ocsp.keytab"
  renewTicket=true
  storeKey=true
  serviceName="ocsp";
};
  • 其中useTicketCache指從系統(tǒng)的cash中讀取credential信息,useKeyTab指從指定的keyTab文件讀取credential

  • principal和keytab用第二步生成的principal與keytab,注意:k?eytab的路徑

    • 如果這個(gè)conf文件是給driver讀取,則我們要用keytab文件在本地的絕對(duì)路徑
    • 如果這個(gè)conf文件是executor讀取,則我們要用keytab文件在container中的相對(duì)路徑,即./ocsp.keytab
    • 如果為了方便起見(jiàn),drvier與executor要使用相同的jaas文件,路徑配置為./ocsp.keytab,我們需要將keytab文件copy到運(yùn)行spark-submit的當(dāng)前路徑
    • 如果driver和executor要使用不同的jaas文件,則driver的jaas文件中,keytab應(yīng)為本地絕對(duì)路徑,executor的jaas文件中,keytab應(yīng)為相對(duì)路徑./

4. 配置spark1.6+kafka0.10 jar包

<dependency>
                    <groupId>com.hortonworks</groupId>
                    <artifactId>spark-kafka-0-10-connector-main_2.10</artifactId>
                    <version>1.0.1</version>
                    <scope>system</scope>
                    <systemPath>${project.basedir}/../lib/spark-kafka-0-10-connector_2.10-1.0.1.jar</systemPath>
</dependency>

5. 修改Spark讀取Kafka部分

  • 需要import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
  • 我們使用的DirectApi讀取kafka
KafkaUtils.createDirectStream[String, String](
        SSC,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](TopicsSet, KafkaParams))

KafkaParams配置如下:

val KafkaParams = Map[String, Object]("auto.offset.reset" -> "latest"
            , "key.deserializer" -> classOf[StringDeserializer]
            , "value.deserializer" -> classOf[StringDeserializer]
            , "security.protocol" -> "SASL_PLAINTEXT"
            , "bootstrap.servers" -> "kafka-server1:6667"
            , "group.id" -> "test")

6. 修改Spark寫(xiě)Kafka部分

  • 寫(xiě)kafka調(diào)用的是kafka官方的庫(kù)
<dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>0.10.1.1</version>
</dependency>
  • 代碼中需要import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
val props = new Properties()
      props.put("bootstrap.servers", dsConf.get("metadata.broker.list", ""))
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      if (MainFrameConf.KERBEROS_ENABLE == "true"){
        props.put("security.protocol","SASL_PLAINTEXT")
      }
new KafkaProducer[String, String](props)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容