kafka中的數據,平常大多是json、cvs、String,這類數據使用SimpleStringSchema就可以輕松對數據進行序列化和反序列化。
但有時候又需要特殊結構的數據進行傳輸(DTO),需要根據實際的數據結構進行序列化和反序列化。
下面以Student的自定義結構體為例,使用FlinkSerializationSchema進行序列化和反序列化進行演示。
一、創(chuàng)建spring boot工程
pom.xml文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qogo8</groupId>
<artifactId>flink-kfk-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>flink-kfk-demo</name>
<description>flink consume/produce demo</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.5</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
<version>2.15.3-19.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包可運行fatjar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
二、創(chuàng)建自定義數據結構的類
1、創(chuàng)建Socres類
package com.qogo8.flink_kfk_demo.Student;
public class Scores {
private String subject; //科目
private int scores; //科目分數
public Scores() {
}
public Scores(int scores, String subject) {
this.scores = scores;
this.subject = subject;
}
public int getScores() {
return this.scores;
}
public void setScores(int scores) {
this.scores = scores;
}
public String getSubject() {
return this.subject;
}
public void setSubject(String subject) {
this.subject = subject;
}
}
2、創(chuàng)建Student類
package com.qogo8.flink_kfk_demo.Student;
public class Student {
private String name; //學生姓名
private int age; //學生年齡
private int grade; // 年級
private int Sclass; //班級
private String hobby; //興趣愛好
private Scores scores; //成績
public void setGrade(int grade) {
this.grade = grade;
}
public int getGrade() {
return this.grade;
}
public void setSclass(int Sclass) {
this.Sclass = Sclass;
}
public int getSclass() {
return this.Sclass;
}
public void setHobby(String hobby) {
this.hobby = hobby;
}
public String getHobby() {
return this.hobby;
}
public Scores getScores() {
return this.scores;
}
public void setScores(Scores scores) {
this.scores = scores;
}
public String getName() {
return this.name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return this.age;
}
public void setAge(int age) {
this.age = age;
}
public Student(String name, int age, int grade, int Sclass, String hobby) {
this.name = name;
this.age = age;
this.grade = grade;
this.Sclass = Sclass;
this.hobby = hobby;
this.scores = new Scores();
this.scores.setSubject("Chinese");
this.scores.setScores(22);
}
public Student(String name, int age, int grade, int Sclass, String hobby, String subject, int scores) {
this.name = name;
this.age = age;
this.grade = grade;
this.Sclass = Sclass;
this.hobby = hobby;
this.scores = new Scores();
this.scores.setSubject(subject);
this.scores.setScores(scores);
}
public Student(){}
public Student(String name, int age, Scores scores) {
this.name = name;
this.age = age;
this.scores = scores;
}
public String toString() {
return "Student [name=" + this.name + ",age=" + this.age + ",grade=" + this.grade + ",Sclass=" + this.Sclass + ",hobby=" + this.hobby + ",subject=" + this.scores.getSubject() + ",scores=" + this.scores.getScores() + "]";
}
}
三、定義序列化和反序列化類FlinkSerializationSchema
package com.qogo8.flink_kfk_demo.Serialization;
import org.apache.commons.lang3.SerializationException;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import com.qogo8.flink_kfk_demo.Student.Student;
import java.io.IOException;
import java.nio.ByteBuffer;
// 使用Flink 1.13.5版本序列化數據(因為本公司是使用的這個Flink版本^_^)
public class FlinkSerializationSchema implements DeserializationSchema<Student>, SerializationSchema<Student> {
/**
* 將Student對象序列化為字節(jié)數組
*
* @param o 要序列化的Student對象
* @return 序列化后的字節(jié)數組
* @throws SerializationException 如果序列化過程中出現錯誤,則拋出此異常
*/
@Override
public byte[] serialize(Student o) {
try {
// 將傳入的對象強制轉換為Student類型
Student data = (Student) o;
if (data == null) {
// 如果對象為null,則返回null
return null;
} else {
// 定義用于存儲序列化長度的變量
int serializedLength;
// 定義用于存儲序列化后名稱的字節(jié)數組
byte[] serializedName;
// 定義用于存儲序列化后愛好的字節(jié)數組
byte[] serializedHobby;
// 定義用于存儲序列化后科目的字節(jié)數組
byte[] serializedSubject;
// 如果學生名稱不為null
if (data.getName() != null) {
// 將學生名稱轉換為UTF-8編碼的字節(jié)數組,并獲取長度
serializedName = data.getName().getBytes("UTF-8");
serializedLength = serializedName.length;
} else {
// 如果學生名稱為null,則初始化為長度為0的字節(jié)數組,長度也為0
serializedName = new byte[0];
serializedLength = 0;
}
// 將學生愛好轉換為UTF-8編碼的字節(jié)數組
serializedHobby = data.getHobby().getBytes("UTF-8");
// 將學生科目轉換為UTF-8編碼的字節(jié)數組
serializedSubject = data.getScores().getSubject().getBytes("UTF-8");
// 分配一個ByteBuffer,大小為固定值28加上各個序列化字段的長度之和
//一個int是4個字節(jié)
ByteBuffer buffer = ByteBuffer.allocate(28 + serializedLength + serializedHobby.length + serializedSubject.length);
// 依次將學生年齡、年級、班級、名稱長度、名稱、愛好長度、愛好、成績、科目長度、科目序列化到ByteBuffer中
buffer.putInt(serializedLength);
buffer.put(serializedName);
buffer.putInt(data.getAge());
buffer.putInt(data.getGrade());
buffer.putInt(data.getSclass());
buffer.putInt(serializedHobby.length);
buffer.put(serializedHobby);
buffer.putInt(serializedSubject.length);
buffer.put(serializedSubject);
buffer.putInt(data.getScores().getScores());
// 返回ByteBuffer的字節(jié)數組
return buffer.array();
}
} catch (Exception e) {
// 捕獲異常并拋出SerializationException異常
throw new SerializationException("error when serializing..." + e);
}
}
/**
* 從字節(jié)數組反序列化一個學生對象
*
* @param data 包含學生對象序列化數據的字節(jié)數組
* @return 反序列化后的學生對象
* @throws IOException 如果發(fā)生IO異常
*/
@Override
public Student deserialize(byte[] data) throws IOException {
try {
// 如果數據為空,則返回null
if (data == null) {
return null;
} else if (data.length < 28) {
// 如果數據長度小于28,則拋出異常
throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected...");
} else {
// 使用ByteBuffer包裝數據
ByteBuffer buffer = ByteBuffer.wrap(data);
// 讀取名字長度
int nameLength = buffer.getInt();
// 根據名字長度創(chuàng)建字節(jié)數組
byte[] nameBytes = new byte[nameLength];
// 從緩沖區(qū)讀取名字字節(jié)數據
buffer.get(nameBytes);
// 將名字字節(jié)數據轉換為字符串
String name = new String(nameBytes, "UTF-8");
// 讀取年齡
int age = buffer.getInt();
// 讀取年級
int grade = buffer.getInt();
// 讀取班級
int Sclass = buffer.getInt();
// 讀取愛好長度
int hobbyLength = buffer.getInt();
// 根據愛好長度創(chuàng)建字節(jié)數組
byte[] hobbyBytes = new byte[hobbyLength];
// 從緩沖區(qū)讀取愛好字節(jié)數據
buffer.get(hobbyBytes);
// 將愛好字節(jié)數據轉換為字符串
String hobby = new String(hobbyBytes, "UTF-8");
// 讀取科目長度
int subjectLength = buffer.getInt();
// 根據科目長度創(chuàng)建字節(jié)數組
byte[] subjectBytes = new byte[subjectLength];
// 從緩沖區(qū)讀取科目字節(jié)數據
buffer.get(subjectBytes);
// 將科目字節(jié)數據轉換為字符串
String subject = new String(subjectBytes, "UTF-8");
// 讀取分數
int scores = buffer.getInt();
// 創(chuàng)建并返回學生對象
return new Student(name, age, grade, Sclass, hobby, subject, scores);
}
} catch (Exception e) {
// 如果反序列化過程中發(fā)生異常,則拋出異常
throw new SerializationException("error when deserializing..." + e);
// return new Student("name", 20, 1, 3, "good");
}
}
@Override
public boolean isEndOfStream(Student o) {
return false;
}
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(Student.class);
}
public Class<?> getDTOClass() {
return Student.class;
}
}
四、定義kafka生產者
上面已經定義好序列化和反序列化的類了,下面就來使用序列化類和反序列化類
定義FlinkKafkaProducer_DTO_Serializer類,用于序列化數據,同時往kafka里寫數據
package com.qogo8.flink_kfk_demo.client;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.CommonClientConfigs;
import com.qogo8.flink_kfk_demo.Serialization.FlinkSerializationSchema;
import com.qogo8.flink_kfk_demo.Student.Student;
public class FlinkKafkaProducer_DTO_Serializer {
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(600000); // 每10秒創(chuàng)建一個檢查點
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000); // 設置檢查點超時時間為60秒
env.setRestartStrategy(RestartStrategies.fallBackRestart());
DataStream<Student> ds = (DataStream<Student>) env.addSource(new SourceFunction<Student>() {
@Override
public void run(SourceContext<Student> ctx) throws Exception {
int i = 1;
SecureRandom rand = new SecureRandom();
final List<String> INTERESTS = Arrays.asList(
"閱讀", "健身", "烹飪", "旅行", "攝影",
"編程", "繪畫", "音樂", "登山", "騎行",
"舞蹈", "桌游", "露營", "釣魚", "手工DIY"
);
final List<String> SubjectList = Arrays.asList(
"語文", "數學", "英語", "地理", "歷史",
"生物", "科學", "物理", "化學", "美術"
);
final String[] surnames = {
"趙", "錢", "孫", "李", "周", "吳", "鄭", "王", "馮", "陳",
"褚", "衛(wèi)", "蔣", "沈", "韓", "楊", "朱", "秦", "尤", "許"
};
final String[] givenNames = {
"偉", "芳", "娜", "秀英", "敏靜", "麗", "強", "軍", "磊", "超",
"杰", "婷婷", "鵬", "雪", "慧", "倩", "宇", "晨", "欣", "明"
};
String surname = surnames[rand.nextInt(surnames.length)];
String givenName = givenNames[rand.nextInt(givenNames.length)];
while (true) {
surname = surnames[rand.nextInt(surnames.length)];
givenName = givenNames[rand.nextInt(givenNames.length)];
Student student = new Student(surname + givenName,i,1,3,INTERESTS.get(rand.nextInt(INTERESTS.size())),SubjectList.get(rand.nextInt(SubjectList.size())),rand.nextInt(155) + 1);
ctx.collect(student);
System.out.println(student.toString());
i++;
TimeUnit.SECONDS.sleep(3);
}
}
@Override
public void cancel() {
}
}).setParallelism(1).name("gen-data");
Properties props = new Properties();
String broker = parameterTool.get("broker");
String topic = parameterTool.get("topic");
// String token = parameterTool.get("token");
// String clusterName = parameterTool.get("clusterName");
if (broker != null) {
props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, broker);
} else {
throw new RuntimeException("broker為空");
}
FlinkKafkaProducer<Student> producer = new FlinkKafkaProducer<Student>(topic,
new FlinkSerializationSchema(),
props);
// FlinkFixedPartitioner 是 Flink 中 FlinkKafkaProducer 默認使用的分區(qū)策略,其核心效果是將 Flink 任務的 并行子任務(subtask) 與 Kafka 的 分區(qū)(partition) 建立固定映射關系,確保每個 Flink subtask 始終將數據寫入固定的 Kafka 分區(qū)
// 可選:顯式設置 FlinkFixedPartitioner(默認無需配置)
// producer.setCustomPartitioner(new FlinkFixedPartitioner<>());
//假定kafka有4個分區(qū),并發(fā)數4可同時向4個分區(qū)寫數
ds.addSink(producer).setParallelism(4).name("kafka-producer");
env.execute("flink_kfk_demo");
}
}
定義kafka消費者
定義FlinkKafkaConsumer_DTO_DeSerializer類,用于反序列化數據,同時從kafka里讀數據
package com.qogo8.flink_kfk_demo.client;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import com.qogo8.flink_kfk_demo.Serialization.FlinkSerializationSchema;
import com.qogo8.flink_kfk_demo.Student.Student;
import java.util.Properties;
import org.apache.flink.util.Collector;
/**
* 該類用于測試自定義dto序列化的發(fā)送
*/
public class FlinkKafkaConsumer_DTO_DeSerializer {
public FlinkKafkaConsumer_DTO_DeSerializer() {
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String broker = parameterTool.get("broker");
String topic = parameterTool.get("topic");
Properties props = new Properties();
if (broker != null) {
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, broker);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "student-consumer-group1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
} else {
throw new RuntimeException("broker為空");
}
// 創(chuàng)建 Kafka 消費者
FlinkKafkaConsumer<Student> consumer = new FlinkKafkaConsumer<Student>(topic, new FlinkSerializationSchema(), props);
// 添加 Kafka 消費者為數據源
DataStream<Student> stream = env.addSource(consumer);
// 簡單的數據處理(將輸入字符串拆分為單詞)
DataStream<String> words = stream.flatMap(new FlatMapFunction<Student, String>() {
@Override
public void flatMap(Student value, Collector<String> out) {
// 根據實際業(yè)務邏輯處理Student對象
out.collect(value.toString());
// 這里可以添加你的業(yè)務處理邏輯
processStudent(value);
}
});
// 將處理后的數據打印到控制臺
words.print();
// 啟動作業(yè)
env.execute("Flink Kafka Consumer Job");
}
private static void processStudent(Student student) {
// 處理Student對象的業(yè)務邏輯
System.out.println("學生: " + student.getName());
}
}