前言

環(huán)境所依賴的pom文件
?<dependencies>
????????<dependency>
????????????<groupId>org.apache.avro</groupId>
????????????<artifactId>avro</artifactId>
????????????<version>1.8.2</version>
????????</dependency>
????????<dependency>
????????????<groupId>org.apache.flink</groupId>
????????????<artifactId>flink-scala_2.12</artifactId>
????????????<version>1.10.1</version>
????????</dependency>
????????<!--?https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala?-->
????????<dependency>
????????????<groupId>org.apache.flink</groupId>
????????????<artifactId>flink-streaming-scala_2.12</artifactId>
????????????<version>1.10.1</version>
????????</dependency>
????????<dependency>
????????????<groupId>org.apache.flink</groupId>
????????????<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
????????????<version>1.10.1</version>
????????</dependency>
????????<!--?https://mvnrepository.com/artifact/org.apache.flink/flink-avro?-->
????????<dependency>
????????????<groupId>org.apache.flink</groupId>
????????????<artifactId>flink-avro</artifactId>
????????????<version>1.10.1</version>
????????</dependency>
????????<!--?https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients?-->
????????<dependency>
????????????<groupId>org.apache.kafka</groupId>
????????????<artifactId>kafka-clients</artifactId>
????????????<version>1.0.0</version>
????????</dependency>
????????<dependency>
????????????<groupId>org.apache.kafka</groupId>
????????????<artifactId>kafka-streams</artifactId>
????????????<version>1.0.0</version>
????????</dependency>
????</dependencies>
????<build>
????????<plugins>
????????????<plugin>
????????????????<groupId>org.apache.avro</groupId>
????????????????<artifactId>avro-maven-plugin</artifactId>
????????????????<version>1.8.2</version>
????????????????<executions>
????????????????????<execution>
????????????????????????<phase>generate-sources</phase>
????????????????????????<goals>
????????????????????????????<goal>schema</goal>
????????????????????????</goals>
????????????????????????<configuration>
????????????????????????????<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
????????????????????????????<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
????????????????????????</configuration>
????????????????????</execution>
????????????????</executions>
????????????</plugin>
????????????<plugin>
????????????????<groupId>org.apache.maven.plugins</groupId>
????????????????<artifactId>maven-compiler-plugin</artifactId>
????????????????<configuration>
????????????????????<source>1.6</source>
????????????????????<target>1.6</target>
????????????????</configuration>
????????????</plugin>
????????</plugins>
????</build>一、Avro提供的技術支持包括以下五個方面:
- 優(yōu)秀的數(shù)據(jù)結構;
- 一個緊湊的,快速的,二進制數(shù)據(jù)格式;
- 一個容器文件,用來存儲持久化數(shù)據(jù);
- RPC遠程過程調用;
- 集成最簡單的動態(tài)語言。讀取或者寫入數(shù)據(jù)文件,使用或實現(xiàn)RPC協(xié)議均不需要代碼實現(xiàn)。對于靜態(tài)- - 語言編寫的話需要實現(xiàn);
二、Avro優(yōu)點
- 二進制消息,性能好/效率高
- 使用JSON描述模式
- 模式和數(shù)據(jù)統(tǒng)一存儲,消息自描述,不需要生成stub代碼(支持生成IDL)
- RPC調用在握手階段交換模式定義
- 包含完整的客戶端/服務端堆棧,可快速實現(xiàn)RPC
- 支持同步和異步通信
- 支持動態(tài)消息
- 模式定義允許定義數(shù)據(jù)的排序(序列化時會遵循這個順序)
- 提供了基于Jetty內核的服務基于Netty的服務
三、Avro Json格式介紹
{
????"namespace":?"com.avro.bean",
????"type":?"record",
????"name":?"UserBehavior",
????"fields":?[
????????{"name":?"userId",?"type":?"long"},
????????{"name":?"itemId",??"type":?"long"},
????????{"name":?"categoryId",?"type":?"int"},
????????{"name":?"behavior",?"type":?"string"},
????????{"name":?"timestamp",?"type":?"long"}
????]
}- namespace : 要生成的目錄
- type :類型 avro 使用 record
- name : 會自動生成對應的對象
- fields : 要指定的字段
注意: 創(chuàng)建的文件后綴名一定要叫 avsc

四、使用Java自定義序列化到kafka
???????? 首先我們先使用 Java編寫Kafka客戶端寫入數(shù)據(jù)和消費數(shù)據(jù)。
4.1 準備測試數(shù)據(jù)
543462,1715,1464116,pv,1511658000662867,2244074,1575622,pv,1511658000561558,3611281,965809,pv,1511658000894923,3076029,1879194,pv,1511658000834377,4541270,3738615,pv,1511658000315321,942195,4339722,pv,1511658000625915,1162383,570735,pv,1511658000
4.2 自定義Avro 序列化和反序列化
首先我們需要實現(xiàn)2個類分別為Serializer和Deserializer分別是序列化和反序列化
package?com.avro.AvroUtil;
import?com.avro.bean.UserBehavior;
import?org.apache.avro.io.BinaryDecoder;
import?org.apache.avro.io.BinaryEncoder;
import?org.apache.avro.io.DecoderFactory;
import?org.apache.avro.io.EncoderFactory;
import?org.apache.avro.specific.SpecificDatumReader;
import?org.apache.avro.specific.SpecificDatumWriter;
import?org.apache.kafka.common.serialization.Deserializer;
import?org.apache.kafka.common.serialization.Serializer;
import?java.io.ByteArrayInputStream;
import?java.io.ByteArrayOutputStream;
import?java.io.IOException;
import?java.util.Map;
/**
?*?@author?大數(shù)據(jù)老哥
?*?@version?V1.0
?*?@Package?com.avro.AvroUtil
?*?@File?:SimpleAvroSchemaJava.java
?*?@date?2021/1/8?20:02?*/
/**
?*??自定義序列化和反序列化?*/
public?class?SimpleAvroSchemaJava?implements?Serializer<UserBehavior>,?Deserializer<UserBehavior>?{
????
????@Override
????public?void?configure(Map<String,??>?map,?boolean?b)?{
????}
????//序列化方法
????@Override
????public?byte[]?serialize(String?s,?UserBehavior?userBehavior)?{
????????//?創(chuàng)建序列化執(zhí)行器
????????SpecificDatumWriter<UserBehavior>?writer?=?new?SpecificDatumWriter<UserBehavior>(userBehavior.getSchema());
?????????//?創(chuàng)建一個流?用存儲序列化后的二進制文件
????????ByteArrayOutputStream?out?=?new?ByteArrayOutputStream();
????????//?創(chuàng)建二進制編碼器
????????BinaryEncoder?encoder?=?EncoderFactory.get().directBinaryEncoder(out,?null);
????????try?{
????????????//?數(shù)據(jù)入都流中
????????????writer.write(userBehavior,?encoder);
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}
????????return?out.toByteArray();
????}
????@Override
????public?void?close()?{
????}
????//反序列化
????@Override
????public?UserBehavior?deserialize(String?s,?byte[]?bytes)?{
????????//?用來保存結果數(shù)據(jù)
????????UserBehavior?userBehavior?=?new?UserBehavior();
????????//?創(chuàng)建輸入流用來讀取二進制文件
????????ByteArrayInputStream?arrayInputStream?=?new?ByteArrayInputStream(bytes);
????????//?創(chuàng)建輸入序列化執(zhí)行器
????????SpecificDatumReader<UserBehavior>?stockSpecificDatumReader?=?new?SpecificDatumReader<UserBehavior>(userBehavior.getSchema());
????????//創(chuàng)建二進制解碼器
????????BinaryDecoder?binaryDecoder?=?DecoderFactory.get().directBinaryDecoder(arrayInputStream,?null);
????????try?{
????????????//?數(shù)據(jù)讀取
????????????userBehavior=stockSpecificDatumReader.read(null,?binaryDecoder);
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}
????????//?結果返回
????????return?userBehavior;
????}
}
4.3 創(chuàng)建序列化對象
package?com.avro.kafka;
import?com.avro.bean.UserBehavior;
import?org.apache.kafka.clients.producer.KafkaProducer;
import?org.apache.kafka.clients.producer.ProducerRecord;
import?java.io.BufferedReader;
import?java.io.FileReader;
import?java.util.ArrayList;
import?java.util.List;
import?java.util.Properties;
/**
?*?@author?大數(shù)據(jù)老哥
?*?@version?V1.0
?*?@Package?com.avro.kafka
?*?@File?:UserBehaviorProducerKafka.java
?*?@date?2021/1/8?20:14?*/
public?class?UserBehaviorProducerKafka?{
????public?static?void?main(String[]?args)?throws?InterruptedException?{
????????//?獲取數(shù)據(jù)
????????List<UserBehavior>?data?=?getData();
????????//?創(chuàng)建配置文件
????????Properties?props?=?new?Properties();
????????props.setProperty("bootstrap.servers",?"192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092");
????????props.setProperty("key.serializer",?"org.apache.kafka.common.serialization.StringSerializer");
????????props.setProperty("value.serializer",?"com.avro.AvroUtil.SimpleAvroSchemaJava");
????????//?創(chuàng)建kafka的生產者
????????KafkaProducer<String,?UserBehavior>?userBehaviorProducer?=?new?KafkaProducer<String,?UserBehavior>(props);
????????//?循環(huán)遍歷數(shù)據(jù)
????????for?(UserBehavior?userBehavior?:?data)?{
????????????ProducerRecord<String,?UserBehavior>?producerRecord?=?new?ProducerRecord<String,?UserBehavior>("UserBehaviorKafka",?userBehavior);
????????????userBehaviorProducer.send(producerRecord);
????????????System.out.println("數(shù)據(jù)寫入成功"+data);
????????????Thread.sleep(1000);
????????}
????}
????public?static?List<UserBehavior>?getData()?{
????????ArrayList<UserBehavior>?userBehaviors?=?new?ArrayList<UserBehavior>();
????????try?{
????????????BufferedReader?br?=?new?BufferedReader(new?FileReader(new?File("data/UserBehavior.csv")));
????????????String?line?=?"";
????????????while?((line?=?br.readLine())?!=?null)?{
????????????????String[]?split?=?line.split(",");
?????????????userBehaviors.add(?new?UserBehavior(Long.parseLong(split[0]),?Long.parseLong(split[1]),?Integer.parseInt(split[2]),?split[3],?Long.parseLong(split[4])));
????????????}
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????????return?userBehaviors;
????}
}
注意:value.serializer 一定要指定我們自己寫好的那個反序列化類,負責會無效
4.4 創(chuàng)建反序列化對象
package?com.avro.kafka;
import?com.avro.bean.UserBehavior;
import?org.apache.kafka.clients.consumer.ConsumerRecord;
import?org.apache.kafka.clients.consumer.ConsumerRecords;
import?org.apache.kafka.clients.consumer.KafkaConsumer;
import?java.util.Arrays;
import?java.util.Properties;
/**
?*?@author?大數(shù)據(jù)老哥
?*?@version?V1.0
?*?@Package?com.avro.kafka
?*?@File?:UserBehaviorConsumer.java
?*?@date?2021/1/8?20:58?*/
public?class?UserBehaviorConsumer?{
????public?static?void?main(String[]?args)?{
????????Properties?prop?=?new?Properties();
????????prop.put("bootstrap.servers",?"192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092");
????????prop.put("group.id",?"UserBehavior");
????????prop.put("key.deserializer",?"org.apache.kafka.common.serialization.StringDeserializer");
????????//?設置反序列化類為自定義的avro反序列化類
????????prop.put("value.deserializer",?"com.avro.AvroUtil.SimpleAvroSchemaJava");
????????KafkaConsumer<String,?UserBehavior>?consumer?=?new?KafkaConsumer<String,?UserBehavior>(prop);
????????consumer.subscribe(Arrays.asList("UserBehaviorKafka"));
????????while?(true)?{
????????????ConsumerRecords<String,?UserBehavior>?poll?=?consumer.poll(1000);
????????????for?(ConsumerRecord<String,?UserBehavior>?stringStockConsumerRecord?:?poll)?{
????????????????System.out.println(stringStockConsumerRecord.value());
????????????}
????????}
????}
}
4.5 啟動運行
創(chuàng)建kafkaTopic 和啟動一個消費者
#?創(chuàng)建topic
./kafka-topics.sh?--create?--zookeeper?node01:2181,node02:2181,node03:2181?--replication-factor?2?--partitions?3?--topic?UserBehaviorKafka
#?模擬消費者
./kafka-console-consumer.sh?--from-beginning?--topic?UserBehaviorKafka?--zookeeper?node01:2181,node02:2node03:2181
五、Flink 實現(xiàn)Avro自定義序列化到Kafka
???????? 到這里好多小伙們就說我Java實現(xiàn)了那Flink 不就改一下Consumer 和Producer 不就完了嗎?
5.1 準備數(shù)據(jù)
543462,1715,1464116,pv,1511658000662867,2244074,1575622,pv,1511658000561558,3611281,965809,pv,1511658000894923,3076029,1879194,pv,1511658000834377,4541270,3738615,pv,1511658000315321,942195,4339722,pv,1511658000625915,1162383,570735,pv,1511658000
5.2 創(chuàng)建Flink自定義Avro序列化和反序列化

package?com.avro.AvroUtil;
import?com.avro.bean.UserBehavior;
import?com.typesafe.sslconfig.ssl.FakeChainedKeyStore;
import?org.apache.avro.io.BinaryDecoder;
import?org.apache.avro.io.BinaryEncoder;
import?org.apache.avro.io.DecoderFactory;
import?org.apache.avro.io.EncoderFactory;
import?org.apache.avro.specific.SpecificDatumReader;
import?org.apache.avro.specific.SpecificDatumWriter;
import?org.apache.flink.api.common.serialization.DeserializationSchema;
import?org.apache.flink.api.common.serialization.SerializationSchema;
import?org.apache.flink.api.common.typeinfo.TypeInformation;
import?org.apache.kafka.common.serialization.Deserializer;
import?org.apache.kafka.common.serialization.Serializer;
import?java.io.ByteArrayInputStream;
import?java.io.ByteArrayOutputStream;
import?java.io.IOException;
import?java.util.Map;
/**
?*?@author?大數(shù)據(jù)老哥
?*?@version?V1.0
?*?@Package?com.avro.AvroUtil
?*?@File?:SimpleAvroSchemaFlink.java
?*?@date?2021/1/8?20:02?*/
/**
?*??自定義序列化和反序列化?*/
public?class?SimpleAvroSchemaFlink?implements?DeserializationSchema<UserBehavior>,?SerializationSchema<UserBehavior>?{
?
????@Override
????public?byte[]?serialize(UserBehavior?userBehavior)?{
????????//?創(chuàng)建序列化執(zhí)行器
????????SpecificDatumWriter<UserBehavior>?writer?=?new?SpecificDatumWriter<UserBehavior>(userBehavior.getSchema());
????????//?創(chuàng)建一個流?用存儲序列化后的二進制文件
????????ByteArrayOutputStream?out?=?new?ByteArrayOutputStream();
????????//?創(chuàng)建二進制編碼器
????????BinaryEncoder?encoder?=?EncoderFactory.get().directBinaryEncoder(out,?null);
????????try?{
????????????//?數(shù)據(jù)入都流中
????????????writer.write(userBehavior,?encoder);
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}
????????return?out.toByteArray();
????}
????@Override
????public?TypeInformation<UserBehavior>?getProducedType()?{
??????return?TypeInformation.of(UserBehavior.class);
????}
????@Override
????public?UserBehavior?deserialize(byte[]?bytes)?throws?IOException?{
????????//?用來保存結果數(shù)據(jù)
????????UserBehavior?userBehavior?=?new?UserBehavior();
????????//?創(chuàng)建輸入流用來讀取二進制文件
????????ByteArrayInputStream?arrayInputStream?=?new?ByteArrayInputStream(bytes);
????????//?創(chuàng)建輸入序列化執(zhí)行器
????????SpecificDatumReader<UserBehavior>?stockSpecificDatumReader?=?new?SpecificDatumReader<UserBehavior>(userBehavior.getSchema());
????????//創(chuàng)建二進制解碼器
????????BinaryDecoder?binaryDecoder?=?DecoderFactory.get().directBinaryDecoder(arrayInputStream,?null);
????????try?{
????????????//?數(shù)據(jù)讀取
????????????userBehavior=stockSpecificDatumReader.read(null,?binaryDecoder);
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}
????????//?結果返回
????????return?userBehavior;
????}
????@Override
????public?boolean?isEndOfStream(UserBehavior?userBehavior)?{
????????return?false;
????}
}
5.3 創(chuàng)建Flink Comsumer 反序列化
package?com.avro.FlinkKafka
import?com.avro.AvroUtil.{SimpleAvroSchemaFlink}
import?com.avro.bean.UserBehavior
import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import?java.util.Properties
/**
?*?@Package?com.avro.FlinkKafka
?*?@File :UserBehaviorConsumerFlink.java
?*?@author?大數(shù)據(jù)老哥
?*?@date?2021/1/8?21:18
?*?@version?V1.0?*/
object?UserBehaviorConsumerFlink?{
??def?main(args:?Array[String]):?Unit?=?{
????//1.構建流處理運行環(huán)境
????val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
????env.setParallelism(1)?//?設置并行度1?方便后面測試
????//?2.設置kafka?配置信息
????val?prop?=?new?Properties
????prop.put("bootstrap.servers",?"192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092")
????prop.put("group.id",?"UserBehavior")
????prop.put("key.deserializer",?"org.apache.kafka.common.serialization.StringDeserializer")
????//?設置反序列化類為自定義的avro反序列化類
????prop.put("value.deserializer",?"com.avro.AvroUtil.SimpleAvroSchemaFlink")
????//????val?kafka:?FlinkKafkaConsumer011[String]?=??new?FlinkKafkaConsumer011[String]("UserBehaviorKafka",?new?SimpleStringSchema(),?prop)
????//?3.構建Kafka?連接器
????val?kafka:?FlinkKafkaConsumer011[UserBehavior]?=?new?FlinkKafkaConsumer011[UserBehavior]("UserBehavior",?new?SimpleAvroSchemaFlink(),?prop)
????//4.設置Flink層最新的數(shù)據(jù)開始消費
????kafka.setStartFromLatest()
????//5.基于kafka構建數(shù)據(jù)源
????val?data:?DataStream[UserBehavior]?=?env.addSource(kafka)
????//6.結果打印
????data.print()
????env.execute("UserBehaviorConsumerFlink")
??}
}5.4 創(chuàng)建Flink Producer 序列化
package?com.avro.FlinkKafka
import?com.avro.AvroUtil.SimpleAvroSchemaFlink
import?com.avro.bean.UserBehavior
import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import?java.util.Properties
/**
?*?@Package?com.avro.FlinkKafka
?*?@File :UserBehaviorProducerFlink.java
?*?@author?大數(shù)據(jù)老哥
?*?@date?2021/1/8?21:38
?*?@version?V1.0?*/
object?UserBehaviorProducerFlink?{
??def?main(args:?Array[String]):?Unit?=?{
????val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
????val?value?=?env.readTextFile("./data/UserBehavior.csv")
????val?users:?DataStream[UserBehavior]?=?value.map(row?=>?{
??????val?arr?=?row.split(",")
??????val?behavior?=?new?UserBehavior()
??????behavior.setUserId(arr(0).toLong)
??????behavior.setItemId(arr(1).toLong)
??????behavior.setCategoryId(arr(2).toInt)
??????behavior.setBehavior(arr(3))
??????behavior.setTimestamp(arr(4).toLong)
??????behavior
????})
????val?prop?=?new?Properties()
????prop.setProperty("bootstrap.servers",?"node01:9092,node02:9092,node03:9092")
????//4.連接Kafka
????val?producer:?FlinkKafkaProducer011[UserBehavior]?=?new?FlinkKafkaProducer011[UserBehavior]("UserBehaviorKafka",?new?SimpleAvroSchemaFlink(),?prop)
????//5.將數(shù)據(jù)打入kafka
????users.addSink(producer)
????//6.執(zhí)行任務
????env.execute("UserBehaviorProducerFlink")
??}
}5.5 啟動運行

需要源碼的請去GitHub 自行下載 ?https://github.com/lhh2002/Flink_Avro
小結
???????? ?其實我在實現(xiàn)這個功能的時候也是蒙的,不會難道就不學了嗎,肯定不是呀。我在5.2提出的那個問題的時候其實是我自己親身經歷過的。首先遇到了問題不要想著怎么放棄,而是想想怎么解決,當時我的思路看源碼看別人寫的。最后經過不懈的努力也終成功了,我在這里為大家提供Flink面試題需要的朋友可以去下面GitHub去下載,信自己,努力和汗水總會能得到回報的。我是大數(shù)據(jù)老哥,我們下期見~~~
資源獲取 獲取Flink面試題,Spark面試題,程序員必備軟件,hive面試題,Hadoop面試題,Docker面試題,簡歷模板等資源請去
GitHub自行下載 https://github.com/lhh2002/Framework-Of-BigData
Gitee 自行下載?https://gitee.com/li_hey_hey/Framework-Of-BigData