Flink實(shí)戰(zhàn):連接開啟Kerberos認(rèn)證的Kafka集群

微信公眾號(hào):大數(shù)據(jù)開發(fā)運(yùn)維架構(gòu)

關(guān)注可了解更多大數(shù)據(jù)相關(guān)的資訊。問(wèn)題或建議,請(qǐng)公眾號(hào)留言;

如果您覺得“大數(shù)據(jù)開發(fā)運(yùn)維架構(gòu)”對(duì)你有幫助,歡迎轉(zhuǎn)發(fā)朋友圈

從微信公眾號(hào)拷貝過(guò)來(lái),格式有些錯(cuò)亂,建議直接去公眾號(hào)閱讀


? 當(dāng)kafka開啟Kerberos認(rèn)證后,如何使用Flink生產(chǎn)或消費(fèi)數(shù)據(jù)呢?其實(shí)就是在生產(chǎn)消費(fèi)者的代碼中加入jaas.conf、keytab這些認(rèn)證有關(guān)的配置,下面我們直接看代碼:

版本信息:

flink1.9.0

kafka0.10.0

這里提示一下,如果版本依賴的不一致會(huì)報(bào)錯(cuò),一定要對(duì)應(yīng)版本:

java.lang.NoSuchMethodError:org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread

1.其實(shí)連接Kerberos集群很簡(jiǎn)單,需要下面三個(gè)文件:

1).KerberosServer的配置文件krb5.conf,讓程序知道我應(yīng)該哪個(gè)kdc去登錄認(rèn)證;

[libdefaults]udp_preference_limit=1 renew_lifetime=3650dforwardable=truedefault_realm=CHINAUNICOMticket_lifetime=3650ddns_lookup_realm=falsedns_lookup_kdc=falsedefault_ccache_name=/tmp/krb5cc_%{uid}? #default_tgs_enctypes = aes des3-cbc-sha1 rc4 des-cbc-md5??#default_tkt_enctypes?=?aes?des3-cbc-sha1?rc4?des-cbc-md5[domain_realm]??.CHINAUNICOM?=?CHINAUNICOM[logging]default=FILE:/var/log/krb5kdc.logadmin_server=FILE:/var/log/kadmind.log??kdc?=?FILE:/var/log/krb5kdc.log[realms]CHINAUNICOM={????admin_server?=?master98.hadoop.ljskdc=master98.hadoop.ljs??}

2).認(rèn)證肯定需要指定認(rèn)證方式這里需要一個(gè)jaas.conf文件,一般集群的conf目錄下都有;

KafkaClient{com.sun.security.auth.module.Krb5LoginModulerequireduseKeyTab=truekeyTab="D:\\kafkaSSL\\kafka.service.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka/salver32.hadoop.unicom@CHINAUNICOM"serviceName=kafka;};

??? 3).就是用戶的登錄認(rèn)證票據(jù)和認(rèn)證文件,票據(jù)和keytab文件這里就不在貼了;

2.為防止你依賴報(bào)錯(cuò),這里貼下pom.xml依賴,可能有些冗余,自己刪除即可:

org.apache.kafkakafka-clients${kafka.version}compileorg.apache.flinkflink-hadoop-fs${flink.version}org.apache.hadoophadoop-common${hadoop.version}org.apache.hadoophadoop-hdfs${hadoop.version}org.apache.httpcomponentshttpclient${httpclient.version}org.apache.flinkflink-connector-kafka-0.10_2.111.9.0compile

4.Flink接收socket端消息,發(fā)送到kafka:


5.Flink將socket接收的數(shù)據(jù)發(fā)送Kafka,代碼實(shí)例:

packagecom.hadoop.ljs.flink.streaming;importcom.hadoop.ljs.flink.utils.CustomKeyedSerializationSchema;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;importorg.apache.kafka.clients.producer.ProducerConfig;importjava.util.Properties;/***@author: Created By lujisen*@companyChinaUnicom Software JiNan*@date: 2020-02-29 09:31*@version: v1.0*@description: com.hadoop.ljs.flink.streaming */publicclassFlinkKafkaKerberosProducer{publicstaticfinalString topic="topic1";publicstaticfinalString krb5Conf="D:\\kafkaSSL\\krb5.conf";publicstaticfinalString kafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";publicstaticfinalString bootstrapServers="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";publicstaticfinalString hostname="localhost";publicstaticfinalintport=9000;publicstaticvoidmain(String[]?args)throwsException{//在windows中設(shè)置JAAS,也可以通過(guò)-D方式傳入System.setProperty("java.security.krb5.conf", krb5Conf);System.setProperty("java.security.auth.login.config",?kafkaJaasConf);/*獲取flink流式計(jì)算執(zhí)行環(huán)境*/finalStreamExecutionEnvironment?senv?=?StreamExecutionEnvironment.getExecutionEnvironment();/*從Socket端接收數(shù)據(jù)*/DataStream dataSource = senv.socketTextStream(hostname, port,"\n");/*下面可以根據(jù)自己的需求進(jìn)行自動(dòng)的轉(zhuǎn)換*//*接收的數(shù)據(jù),中間可經(jīng)過(guò)復(fù)雜的處理,最后發(fā)送到kafka端*/dataSource.addSink(newFlinkKafkaProducer010(topic,newCustomKeyedSerializationSchema(), getProducerProperties()));/*啟動(dòng)*/senv.execute("FlinkKafkaProducer");????}publicstaticPropertiesgetProducerProperties(){Properties props =newProperties();props.put("bootstrap.servers", bootstrapServers);props.put("acks","1");props.put("retries",3);props.put("batch.size",16384);props.put("linger.ms",1);props.put("buffer.memory",33554432);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");props.put("security.protocol","SASL_PLAINTEXT");props.put("sasl.kerberos.service.name","kafka");props.put("sasl.mechanism","GSSAPI");returnprops;? ? }}

6.Flink連接kafka消費(fèi)消息,代碼實(shí)例:

package?com.hadoop.ljs.flink.streaming;importcom.hadoop.ljs.flink.utils.KafkaCommonRecordSchema;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;importorg.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;importorg.apache.kafka.clients.consumer.ConsumerRecord;importjava.util.HashMap;importjava.util.Map;importjava.util.Properties;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-29 09:31 * @version: v1.0 * @description: com.hadoop.ljs.flink.streaming */publicclassFlinkKafkaKerberosConsumer{publicstaticfinalStringkrb5Conf="D:\\kafkaSSL\\krb5.conf";publicstaticfinalStringkafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";publicstaticfinalStringtopic="topic1";publicstaticfinalStringconsumerGroup="test_topic1";publicstaticfinalStringbootstrapServer="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";publicstaticvoid?main(String[]?args)throwsException{//在windows中設(shè)置JAAS,也可以通過(guò)-D方式傳入System.setProperty("java.security.krb5.conf", krb5Conf);System.setProperty("java.security.auth.login.config", kafkaJaasConf);finalStreamExecutionEnvironmentenv?=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);FlinkKafkaConsumer010 consumer010 = newFlinkKafkaConsumer010(topic,newSimpleStringSchema(), getComsumerProperties());????????consumer010.setStartFromEarliest();??//source從kafkaDataStream?dataStream?=?env.addSource(consumer010);dataStream.print();try{? ? ? ? ? ? env.execute();}catch(Exceptionex) {? ? ? ? ? ? ex.printStackTrace();? ? ? ? }? ? }privatestaticPropertiesgetComsumerProperties() {Propertiesprops = newProperties();props.put("bootstrap.servers",bootstrapServer);props.put("group.id",consumerGroup);props.put("auto.offset.reset","earliest");props.put("security.protocol","SASL_PLAINTEXT");props.put("sasl.kerberos.service.name","kafka");props.put("sasl.mechanism","GSSAPI");returnprops;? ? }}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容