Flink實戰(zhàn):FlinkSQL接收開啟Kerberos認證的Kafka集群數(shù)據(jù)存入MySQL

微信公眾號:大數(shù)據(jù)開發(fā)運維架構

關注可了解更多大數(shù)據(jù)相關的資訊。問題或建議,請公眾號留言;

如果您覺得“大數(shù)據(jù)開發(fā)運維架構”對你有幫助,歡迎轉發(fā)朋友圈

從微信公眾號拷貝過來,格式有些錯亂,建議直接去公眾號閱讀


上篇文章展示了Flink連接Kafka集群的代碼,平時我們做統(tǒng)計分析,經(jīng)常會用到FlinkSQL,這里就貼一下FlinkSQL消費Kafka數(shù)據(jù)存入Mysql的代碼實例,更多實戰(zhàn)內(nèi)容關注微信公眾號:“大數(shù)據(jù)開發(fā)運維架構”

版本信息:

flink1.9.0

kafka0.10.0

????mysql5.6.40

廢話不多說直接上實戰(zhàn)代碼:

1.這里mysql數(shù)據(jù)庫recommend中有一張表student,創(chuàng)建表語句:

SETNAMESutf8mb4;SETFOREIGN_KEY_CHECKS?=0;-- ------------------------------ Table structure for student-- ----------------------------DROPTABLEIFEXISTS`student`;CREATETABLE`student`(`id`int(64)NULLDEFAULTNULL,`name`varchar(255)CHARACTERSETutf8COLLATEutf8_general_ciNULLDEFAULTNULL,`course`varchar(255)CHARACTERSETutf8COLLATEutf8_general_ciNULLDEFAULTNULL,`score`double(128,0)NULLDEFAULTNULL)ENGINE=InnoDBCHARACTERSET= utf8COLLATE= utf8_general_ci ROW_FORMAT =Compact;SETFOREIGN_KEY_CHECKS =1;

2.對應student表的實體類:

packagecom.hadoop.ljs.flink.sql;/***@author: Created By lujisen*@companyChinaUnicom Software JiNan*@date: 2020-03-01 07:50*@version: v1.0*@description: com.hadoop.ljs.flink.sql */publicclassStudent{/*唯一ID*/intid;/*名字*/? ? String name;/*課程*/? ? String course;/*分數(shù)*/doublescore;publicStudent(Integer f0, String f1, String f2, Double f3){? ? ? ? id=f0;? ? ? ? name=f1;? ? ? ? course=f2;? ? ? ? score=f3;????}publicintgetId(){returnid;????}publicvoidsetId(intid){this.id = id;????}publicStringgetName(){returnname;????}publicvoidsetName(String name){this.name = name;????}publicStringgetCourse(){returncourse;????}publicvoidsetCourse(String course){this.course = course;????}publicdoublegetScore(){returnscore;????}publicvoidsetScore(doublescore){this.score = score;? ? }}

3.自定義Sink類,存數(shù)據(jù)到mysql中:

packagecom.hadoop.ljs.flink.sql;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;/***@author: Created By lujisen*@companyChinaUnicom Software JiNan*@date: 2020-03-01 07:48*@version: v1.0*@description: com.hadoop.ljs.flink.sql */publicclassSinkStudent2MySQLextendsRichSinkFunction{publicstaticfinalString url="jdbc:mysql://10.124.165.31:3306/recommend??useUnicode=true&characterEncoding=UTF-8";publicstaticfinalString userName="root";publicstaticfinalString password="123456a?";privatestaticfinallongserialVersionUID = -4443175430371919407L;? ? PreparedStatement ps;privateConnection connection;/**這里的open只調(diào)用一次*@paramparameters*@throwsException? ? */@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);? ? ? ? connection = getConnection();String sql ="replace? into student(id,name,course,score) values(?, ?, ?,?);";ps =this.connection.prepareStatement(sql);? ? }@Overridepublicvoidclose()throwsException{super.close();if(connection !=null) {? ? ? ? ? ? connection.close();? ? ? ? }if(ps !=null) {? ? ? ? ? ? ps.close();? ? ? ? }? ? }/**? ? * 每條數(shù)據(jù)的插入都要調(diào)用一次 invoke() 方法? ? **@paramcontext*@throwsException? ? */@Overridepublicvoidinvoke(Student student, Context context)throwsException{/*對每一條數(shù)據(jù)進行處理,組裝數(shù)據(jù)*/ps.setLong(1, student.getId());ps.setString(2,student.getName());ps.setString(3, student.getCourse());ps.setDouble(4,student.getScore());? ? ? ? ps.executeUpdate();????}privatestaticConnectiongetConnection(){Connection con =null;try{Class.forName("com.mysql.jdbc.Driver");? ? ? ? ? ? con = DriverManager.getConnection(url,userName,password);System.out.println("msql連接成功!");}catch(Exception e) {System.out.println("msql連接失敗,錯誤信息"+ e.getMessage());? ? ? ? }returncon;? ? }}

4.主函數(shù)類,從kafka接收消息,對每行數(shù)據(jù)進行拆分,注冊為臨時表,調(diào)用自定義SinkStudent2MySQL類,存入數(shù)據(jù)到student表中:

package com.hadoop.ljs.flink.sql;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.api.java.tuple.Tuple4;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.java.StreamTableEnvironment;importjava.util.Properties;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-03-01 07:47 * @version: v1.0 * @description: com.hadoop.ljs.flink.sql */publicclassFlinkKafkaKerberosSQLConsumer {publicstaticfinalStringkrb5Conf="D:\\kafkaSSL\\krb5.conf";publicstaticfinalStringkafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";publicstaticfinalStringtopic="topic2";publicstaticfinalStringconsumerGroup="test_topic2";publicstaticfinalStringbootstrapServer="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";publicstaticvoidmain(String[] args) throws Exception {//在windows中設置JAAS,也可以通過-D方式傳入System.setProperty("java.security.krb5.conf", krb5Conf);System.setProperty("java.security.auth.login.config",?kafkaJaasConf);? ? ? ? ? ? ? StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();senv.setDefaultLocalParallelism(1);? ? ? ? EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();????????StreamTableEnvironment?tableEnv?=?StreamTableEnvironment.create(senv,?bsSettings);????????FlinkKafkaConsumer010?myConsumer?=newFlinkKafkaConsumer010(topic,newSimpleStringSchema(),getComsumerProperties());DataStream stream = senv.addSource(myConsumer);stream.filter(newFilterFunction() {@Overridepublicbooleanfilter(Stringvalue) throws Exception {if(null==value||value.split(",").length!=4){returnfalse;? ? ? ? ? ? ? ? }returntrue;? ? ? ? ? ? }? ? ? ? });DataStream> map = stream.map(newMapFunction>() {privatestaticfinal long serialVersionUID =1471936326697828381L;@OverridepublicTuple4 map(Stringvalue) throws Exception {String[] split = value.split(",");returnnewTuple4<>(Integer.valueOf(split[0]), split[1], split[2], Double.valueOf(split[3]));? ? ? ? ? ? }? ? ? ? });//將數(shù)據(jù)注冊為臨時表,并制定fieldstableEnv.registerDataStream("student",?map,"id,name,course,score");Table sqlQuery = tableEnv.sqlQuery("select id,name,course,score? from? student");DataStream> appendStream = tableEnv.toAppendStream(sqlQuery, Types.TUPLE(Types.INT, Types.STRING, Types.STRING,Types.DOUBLE));????????appendStream.print();/*將每條數(shù)據(jù)轉換成student實體類數(shù)據(jù),sink到mysql中*/appendStream.map(newMapFunction, Student>() {privatestaticfinal long serialVersionUID =-4770965496944515917L;@OverridepublicStudent map(Tuple4 value) throws Exception {returnnewStudent(value.f0, value.f1, value.f2,value.f3);? ? ? ? ? ? }}).addSink(newSinkStudent2MySQL());senv.execute("FlinkKafkaKerberosSQLConsumer");? ? }/*獲取Kafka消費端配置*/privatestaticProperties getComsumerProperties() {Properties props =newProperties();props.put("bootstrap.servers",bootstrapServer);props.put("group.id",consumerGroup);props.put("auto.offset.reset","earliest");/*keberos集群,必須制定以下三項配置*/props.put("security.protocol","SASL_PLAINTEXT");props.put("sasl.kerberos.service.name","kafka");props.put("sasl.mechanism","GSSAPI");returnprops;? ? }}

5.這里貼下pom.xml:

1.9.01.82.111.2.50.10.1.0org.apache.flinkflink-connector-kafka-0.10_2.11${flink.version}org.apache.flinkflink-table-planner-blink_${scala.binary.version}${flink.version}org.apache.flinkflink-table-runtime-blink_${scala.binary.version}${flink.version}org.apache.flinkflink-table-common${flink.version}mysqlmysql-connector-java5.1.46

6.發(fā)送數(shù)據(jù)到kafka,每條記錄用逗號“,”拆分:

1001,name1,yuwen1,811002,name2,yuwen2,821003,name3,yuwen3,83

發(fā)送數(shù)據(jù)截圖:

最近一些文章都是根據(jù)粉絲留言進行編寫實戰(zhàn)代碼,如有其他需求直接給我公眾號留言即可,覺得有用,多給轉轉朋友圈,謝謝關注?。?!

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容