Flink序列化和反序列化DTO數據

kafka中的數據,平常大多是json、cvs、String,這類數據使用SimpleStringSchema就可以輕松對數據進行序列化和反序列化。
但有時候又需要特殊結構的數據進行傳輸(DTO),需要根據實際的數據結構進行序列化和反序列化。
下面以Student的自定義結構體為例,使用FlinkSerializationSchema進行序列化和反序列化進行演示。

一、創(chuàng)建spring boot工程

pom.xml文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.qogo8</groupId>
    <artifactId>flink-kfk-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>flink-kfk-demo</name>
    <description>flink consume/produce demo</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.13.5</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.5.1</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-jackson</artifactId>
            <version>2.15.3-19.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 打包可運行fatjar -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.5</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

二、創(chuàng)建自定義數據結構的類

1、創(chuàng)建Socres類

package com.qogo8.flink_kfk_demo.Student;

public class Scores {
    private String subject;  //科目
    private int scores;  //科目分數

    public Scores() {
    }

    public Scores(int scores, String subject) {
        this.scores = scores;
        this.subject = subject;
    }

    public int getScores() {
        return this.scores;
    }

    public void setScores(int scores) {
        this.scores = scores;
    }

    public String getSubject() {
        return this.subject;
    }

    public void setSubject(String subject) {
        this.subject = subject;
    }
}

2、創(chuàng)建Student類

package com.qogo8.flink_kfk_demo.Student;

public class Student {
    private String name;  //學生姓名
    private int age;  //學生年齡
    private int grade;  // 年級
    private int Sclass;  //班級
    private String hobby; //興趣愛好
    private Scores scores;  //成績

    public void setGrade(int grade) {
        this.grade = grade;
    }

    public int getGrade() {
        return this.grade;
    }

    public void setSclass(int Sclass) {
        this.Sclass = Sclass;
    }

    public int getSclass() {
        return this.Sclass;
    }

    public void setHobby(String hobby) {
        this.hobby = hobby;
    }

    public String getHobby() {
        return this.hobby;
    }

    public Scores getScores() {
        return this.scores;
    }

    public void setScores(Scores scores) {
        this.scores = scores;
    }

    public String getName() {
        return this.name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return this.age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public Student(String name, int age, int grade, int Sclass, String hobby) {
        this.name = name;
        this.age = age;
        this.grade = grade;
        this.Sclass = Sclass;
        this.hobby = hobby;
        this.scores = new Scores();
        this.scores.setSubject("Chinese");
        this.scores.setScores(22);
    }

    public Student(String name, int age, int grade, int Sclass, String hobby, String subject, int scores) {
        this.name = name;
        this.age = age;
        this.grade = grade;
        this.Sclass = Sclass;
        this.hobby = hobby;
        this.scores = new Scores();
        this.scores.setSubject(subject);
        this.scores.setScores(scores);
    }

    public Student(){}

    public Student(String name, int age, Scores scores) {
        this.name = name;
        this.age = age;
        this.scores = scores;
    }

    public String toString() {
        return "Student [name=" + this.name + ",age=" + this.age + ",grade=" + this.grade + ",Sclass=" + this.Sclass + ",hobby=" + this.hobby + ",subject=" + this.scores.getSubject() + ",scores=" + this.scores.getScores() + "]";
    }
}

三、定義序列化和反序列化類FlinkSerializationSchema

package com.qogo8.flink_kfk_demo.Serialization;

import org.apache.commons.lang3.SerializationException;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import com.qogo8.flink_kfk_demo.Student.Student;

import java.io.IOException;
import java.nio.ByteBuffer;

// 使用Flink 1.13.5版本序列化數據(因為本公司是使用的這個Flink版本^_^)
public class FlinkSerializationSchema implements DeserializationSchema<Student>, SerializationSchema<Student> {

    /**
     * 將Student對象序列化為字節(jié)數組
     *
     * @param o 要序列化的Student對象
     * @return 序列化后的字節(jié)數組
     * @throws SerializationException 如果序列化過程中出現錯誤,則拋出此異常
     */
    @Override
    public byte[] serialize(Student o) {
        try {
            // 將傳入的對象強制轉換為Student類型
            Student data = (Student) o;
            if (data == null) {
                // 如果對象為null,則返回null
                return null;
            } else {
                // 定義用于存儲序列化長度的變量
                int serializedLength;
                // 定義用于存儲序列化后名稱的字節(jié)數組
                byte[] serializedName;
                // 定義用于存儲序列化后愛好的字節(jié)數組
                byte[] serializedHobby;
                // 定義用于存儲序列化后科目的字節(jié)數組
                byte[] serializedSubject;

                // 如果學生名稱不為null
                if (data.getName() != null) {
                    // 將學生名稱轉換為UTF-8編碼的字節(jié)數組,并獲取長度
                    serializedName = data.getName().getBytes("UTF-8");
                    serializedLength = serializedName.length;
                } else {
                    // 如果學生名稱為null,則初始化為長度為0的字節(jié)數組,長度也為0
                    serializedName = new byte[0];
                    serializedLength = 0;
                }

                // 將學生愛好轉換為UTF-8編碼的字節(jié)數組
                serializedHobby = data.getHobby().getBytes("UTF-8");
                // 將學生科目轉換為UTF-8編碼的字節(jié)數組
                serializedSubject = data.getScores().getSubject().getBytes("UTF-8");

                // 分配一個ByteBuffer,大小為固定值28加上各個序列化字段的長度之和
                //一個int是4個字節(jié)
                ByteBuffer buffer = ByteBuffer.allocate(28 + serializedLength + serializedHobby.length + serializedSubject.length);
                // 依次將學生年齡、年級、班級、名稱長度、名稱、愛好長度、愛好、成績、科目長度、科目序列化到ByteBuffer中
                buffer.putInt(serializedLength);
                buffer.put(serializedName);
                buffer.putInt(data.getAge());
                buffer.putInt(data.getGrade());
                buffer.putInt(data.getSclass());
                buffer.putInt(serializedHobby.length);
                buffer.put(serializedHobby);
                buffer.putInt(serializedSubject.length);
                buffer.put(serializedSubject);
                buffer.putInt(data.getScores().getScores());
                
                // 返回ByteBuffer的字節(jié)數組
                return buffer.array();
            }
        } catch (Exception e) {
            // 捕獲異常并拋出SerializationException異常
            throw new SerializationException("error when serializing..." + e);
        }
    }

     /**
      * 從字節(jié)數組反序列化一個學生對象
      *
      * @param data 包含學生對象序列化數據的字節(jié)數組
      * @return 反序列化后的學生對象
      * @throws IOException 如果發(fā)生IO異常
      */
     @Override
    public Student deserialize(byte[] data) throws IOException {
        try {
             // 如果數據為空,則返回null
             if (data == null) {
                 return null;
             } else if (data.length < 28) {
                 // 如果數據長度小于28,則拋出異常
                throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected...");
            } else {
                 // 使用ByteBuffer包裝數據
                ByteBuffer buffer = ByteBuffer.wrap(data);
                // 讀取名字長度
                int nameLength = buffer.getInt();
                 // 根據名字長度創(chuàng)建字節(jié)數組
                byte[] nameBytes = new byte[nameLength];
                 // 從緩沖區(qū)讀取名字字節(jié)數據
                buffer.get(nameBytes);
                 // 將名字字節(jié)數據轉換為字符串
                String name = new String(nameBytes, "UTF-8");
                 // 讀取年齡
                int age = buffer.getInt();
                 // 讀取年級
                int grade = buffer.getInt();
                 // 讀取班級
                int Sclass = buffer.getInt();
                 // 讀取愛好長度
                int hobbyLength = buffer.getInt();
                 // 根據愛好長度創(chuàng)建字節(jié)數組
                byte[] hobbyBytes = new byte[hobbyLength];
                 // 從緩沖區(qū)讀取愛好字節(jié)數據
                buffer.get(hobbyBytes);
                 // 將愛好字節(jié)數據轉換為字符串
                String hobby = new String(hobbyBytes, "UTF-8");
                 // 讀取科目長度
                int subjectLength = buffer.getInt();
                 // 根據科目長度創(chuàng)建字節(jié)數組
                byte[] subjectBytes = new byte[subjectLength];
                 // 從緩沖區(qū)讀取科目字節(jié)數據
                buffer.get(subjectBytes);
                 // 將科目字節(jié)數據轉換為字符串
                String subject = new String(subjectBytes, "UTF-8");
                // 讀取分數
                int scores = buffer.getInt();
                 // 創(chuàng)建并返回學生對象
                 return new Student(name, age, grade, Sclass, hobby, subject, scores);
            }
        } catch (Exception e) {
             // 如果反序列化過程中發(fā)生異常,則拋出異常
            throw new SerializationException("error when deserializing..." + e);
             // return new Student("name", 20, 1, 3, "good");
        }
    }

    @Override
    public boolean isEndOfStream(Student o) {
        return false;
    }

    @Override
    public TypeInformation getProducedType() {
        return TypeInformation.of(Student.class);
    }

    public Class<?> getDTOClass() {
        return Student.class;
    }
}

四、定義kafka生產者

上面已經定義好序列化和反序列化的類了,下面就來使用序列化類和反序列化類
定義FlinkKafkaProducer_DTO_Serializer類,用于序列化數據,同時往kafka里寫數據

package com.qogo8.flink_kfk_demo.client;

import java.security.SecureRandom;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.CommonClientConfigs;
import com.qogo8.flink_kfk_demo.Serialization.FlinkSerializationSchema;
import com.qogo8.flink_kfk_demo.Student.Student;

public class FlinkKafkaProducer_DTO_Serializer {
    public static void main(String[] args) throws Exception {

        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(600000); // 每10秒創(chuàng)建一個檢查點
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000); // 設置檢查點超時時間為60秒
        env.setRestartStrategy(RestartStrategies.fallBackRestart());
        DataStream<Student> ds = (DataStream<Student>) env.addSource(new SourceFunction<Student>() {

            @Override
            public void run(SourceContext<Student> ctx) throws Exception {
                int i = 1;
                SecureRandom rand = new SecureRandom();

                final List<String> INTERESTS = Arrays.asList(
                    "閱讀", "健身", "烹飪", "旅行", "攝影",
                    "編程", "繪畫", "音樂", "登山", "騎行",
                    "舞蹈", "桌游", "露營", "釣魚", "手工DIY"
                );

                final List<String> SubjectList = Arrays.asList(
                    "語文", "數學", "英語", "地理", "歷史",
                    "生物", "科學", "物理", "化學", "美術"
                );

                final String[] surnames = {
                    "趙", "錢", "孫", "李", "周", "吳", "鄭", "王", "馮", "陳", 
                    "褚", "衛(wèi)", "蔣", "沈", "韓", "楊", "朱", "秦", "尤", "許"
                };

                final String[] givenNames = {
                    "偉", "芳", "娜", "秀英", "敏靜", "麗", "強", "軍", "磊", "超",
                    "杰", "婷婷", "鵬", "雪", "慧", "倩", "宇", "晨", "欣", "明"
                };
                String surname = surnames[rand.nextInt(surnames.length)];
                String givenName = givenNames[rand.nextInt(givenNames.length)];

                while (true) {
                    surname = surnames[rand.nextInt(surnames.length)];
                    givenName = givenNames[rand.nextInt(givenNames.length)];
                    Student student = new Student(surname + givenName,i,1,3,INTERESTS.get(rand.nextInt(INTERESTS.size())),SubjectList.get(rand.nextInt(SubjectList.size())),rand.nextInt(155) + 1);
                    ctx.collect(student);
                    System.out.println(student.toString());
                    i++;
                    TimeUnit.SECONDS.sleep(3);
                }
            }

            @Override
            public void cancel() {

            }
        }).setParallelism(1).name("gen-data");

        Properties props = new Properties();
        String broker = parameterTool.get("broker");
        String topic = parameterTool.get("topic");
        // String token = parameterTool.get("token");
        // String clusterName = parameterTool.get("clusterName");
        if (broker != null) {
            props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, broker);
        } else {
            throw new RuntimeException("broker為空");
        }

       FlinkKafkaProducer<Student> producer = new FlinkKafkaProducer<Student>(topic,
               new FlinkSerializationSchema(),
               props);
        //    FlinkFixedPartitioner 是 Flink 中 FlinkKafkaProducer 默認使用的分區(qū)策略,其核心效果是將 Flink 任務的 并行子任務(subtask) 與 Kafka 的 分區(qū)(partition) 建立固定映射關系,確保每個 Flink subtask 始終將數據寫入固定的 Kafka 分區(qū)
        // 可選:顯式設置 FlinkFixedPartitioner(默認無需配置)
        // producer.setCustomPartitioner(new FlinkFixedPartitioner<>());
        //假定kafka有4個分區(qū),并發(fā)數4可同時向4個分區(qū)寫數
        ds.addSink(producer).setParallelism(4).name("kafka-producer");
        env.execute("flink_kfk_demo");
    }
}

定義kafka消費者

定義FlinkKafkaConsumer_DTO_DeSerializer類,用于反序列化數據,同時從kafka里讀數據

package com.qogo8.flink_kfk_demo.client;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import com.qogo8.flink_kfk_demo.Serialization.FlinkSerializationSchema;
import com.qogo8.flink_kfk_demo.Student.Student;
import java.util.Properties;
import org.apache.flink.util.Collector;

/**
 * 該類用于測試自定義dto序列化的發(fā)送
 */
public class FlinkKafkaConsumer_DTO_DeSerializer {
    public FlinkKafkaConsumer_DTO_DeSerializer() {
    }

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String broker = parameterTool.get("broker");
        String topic = parameterTool.get("topic");
        Properties props = new Properties();
        if (broker != null) {
            props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, broker);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "student-consumer-group1");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        } else {
            throw new RuntimeException("broker為空");
        }
        
        // 創(chuàng)建 Kafka 消費者
        FlinkKafkaConsumer<Student> consumer = new FlinkKafkaConsumer<Student>(topic, new FlinkSerializationSchema(), props);

        // 添加 Kafka 消費者為數據源
        DataStream<Student> stream = env.addSource(consumer);
        // 簡單的數據處理(將輸入字符串拆分為單詞)
        DataStream<String> words = stream.flatMap(new FlatMapFunction<Student, String>() {
            @Override
            public void flatMap(Student value, Collector<String> out) {
                // 根據實際業(yè)務邏輯處理Student對象
                out.collect(value.toString());

                // 這里可以添加你的業(yè)務處理邏輯
                processStudent(value);
            }
        });

        // 將處理后的數據打印到控制臺
        words.print();

        // 啟動作業(yè)
        env.execute("Flink Kafka Consumer Job");
    
    }

    private static void processStudent(Student student) {
        // 處理Student對象的業(yè)務邏輯
        System.out.println("學生: " + student.getName());
    }
}
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容