1.一文搞定Flink自定義JDBC

1.前言

本論壇發(fā)送的所有內(nèi)容,都是筆者在自己的個(gè)人的筆記上優(yōu)化后謄寫而來,希望自己所擁有的知識(shí)能夠幫助更多有學(xué)習(xí)欲望的人。但值得一提的是,由于本人接觸行業(yè)時(shí)間有限,可能會(huì)出現(xiàn)一些技術(shù)上的紕漏,如果有問題歡迎私信、評(píng)論指出,大家共同進(jìn)步。


2.簡單介紹

在Flink對(duì)數(shù)據(jù)進(jìn)行計(jì)算的時(shí)候,一般會(huì)按照階段的不同,將處理過程分為 source->transform->sink
借此來完成數(shù)據(jù)從讀取到計(jì)算再到寫出的全過程。
本章節(jié)當(dāng)中要介紹的FlinkJDBC其實(shí)就是Sink階段的成員之一,它能夠幫助Flink達(dá)成從數(shù)據(jù)流到存儲(chǔ)介質(zhì)保存的全過程(存儲(chǔ)介質(zhì)需要支持JDBC)。如果SINK方的這個(gè)存儲(chǔ)介質(zhì)支持XA事務(wù)的話,那么FlinkJDBC還能夠?qū)ζ涮峁┚珳?zhǔn)一次性語義。


3.FlinkJDBC使用

3.1 引入依賴


<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-connector-jdbc</artifactId>

    <version>1.15.0</version>

</dependency>

3.2 直接指定JDBCSink即可

這里以ClickHouse為例->本代碼可直接粘貼使用,因?yàn)槭窃谖臋n中手寫的,沒有用編輯器,所以可能會(huì)有錯(cuò)別單詞.


//1.聲明靜態(tài)方法

public static <T> SinkFunction<T> getJdbcSink(String sql){

    return JdbcSink.<T>sink(

        sql,

        new JdbcStatementBuilder<T>(){

            //這個(gè)方法主要是完成對(duì)sql語句中的數(shù)據(jù)內(nèi)容對(duì)PreparedStatement對(duì)象中占位符的賦值

            @Overwrite

            public void accept(PreparedStatement preparedStatement,T obj) throws SQLException{

                    //通過反射來完成賦值,本段代碼結(jié)束之后有關(guān)于這部分內(nèi)容反射相關(guān)知識(shí)的介紹

                    Field[] declaredFields = obj.getClass().getDeclaredFields();

                    for(int i=0; i<declaredFields.length; i++){

                        Filed declaredField = declaredFields[i];

                        declaredField.setAccessible(true);

                        try{

                            Object value = declaredField.get(obj);

                            preparedStatement.setObject(i,value);

                            }catch(IllegalAccessException e){

                                e.printStackTrace();

                            }

                        }

                }

            },

        JdbcExecutionOptions.bulider()

        .withBatchIntervalMs(5000L) //指定多長時(shí)間發(fā)送一次

        .withBatchSize(5) //指定攢夠多少條數(shù)發(fā)送一次

        .build(),

        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver") //Drvier

        .withUrl("jdbc:clickhouse://hadoop102:8123/table") //url

        .build()

    );   

}


4.反射相關(guān)知識(shí)的描述

反射機(jī)制對(duì)于我來說,實(shí)際上就是一種能夠?yàn)槭褂谜咛峁┽槍?duì)未知對(duì)象或者未知類來進(jìn)行內(nèi)容讀取的一個(gè)功能。這是我個(gè)人對(duì)于反射的淺顯理解,如果有錯(cuò)誤歡迎指正。接下來我就要用我所理解的內(nèi)容,編寫一個(gè)簡單的例子,來解釋3.2程序段中accept方法是如何完成實(shí)體類對(duì)占位符進(jìn)行賦值的過程。

4.1 反射的小例子

思路:主程序想要通過對(duì)一個(gè)方法傳入不同的實(shí)體類,來獲得所有實(shí)體類中的所有屬性的字段信息。

準(zhǔn)備:主程序(用來調(diào)用方法)、兩個(gè)不同的實(shí)體類(用來對(duì)公共方法做驗(yàn)證)、泛型方法(輸出實(shí)體類中的字段信息)


//實(shí)體類1

@Data

@AllArgsConstructor

public class Student {

//用來表示學(xué)生信息

    private String name;

    private String banji;

    private String score;

}

//實(shí)體類2

@Data

@AllArgsConstructor

public class Teacher {

    //用來表示老師信息

    private String dept;

    private String classHeader;

}


//泛型方法

import java.lang.reflect.Field;

import java.util.ArrayList;

import java.util.List;

public class ReadInfoMethod {

    //通過傳入?yún)?shù),然后循環(huán)的將作為參數(shù)的對(duì)象中的所有屬性的值信息添加到數(shù)組中

    public static <T> List<T> getInfo(T t){

        Class<?> aClass = t.getClass();

        Field[] declaredFields = aClass.getDeclaredFields();

        ArrayList<T> result = new ArrayList<T>();

        for (Field declaredField : declaredFields) {

            int num = 0;

            declaredField.setAccessible(true);

            try {

                T o = (T)declaredField.get(t);

                result.add(num,o);

                num++;

            }catch (IllegalAccessException e){

                e.printStackTrace();

            }

        }

        return  result;

    }

}


//主程序

public class test {

    public static void main(String[] args) {

        Student student = new Student("弗林克", "三年二班", "95");

        Teacher teacher = new Teacher("辦公室部門", "三年二班班主任");

        //調(diào)用泛型方法,獲得傳入對(duì)象的所有屬性字段的值信息的列表

        List<Student> info = ReadInfoMethod.getInfo(student);

        List<Teacher> teachers = ReadInfoMethod.getInfo(teacher);

        System.out.println(info);

        System.out.println(teachers);

    }

}

泛型方法中調(diào)用的方法的方式,與FlinkJDBC中的accept方法中的內(nèi)容如出一轍。二者在表現(xiàn)形式上的區(qū)別就是accept方法在對(duì)占位符進(jìn)行賦值的時(shí)候,需要指定對(duì)應(yīng)字段的索引位置,因此 才有了preparedStatement.setObject(i,value);的這種方式。

Flink官網(wǎng)中針對(duì)這部分內(nèi)容進(jìn)行描述的地址是:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/jdbc/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容