1.前言
本論壇發(fā)送的所有內(nèi)容,都是筆者在自己的個(gè)人的筆記上優(yōu)化后謄寫而來,希望自己所擁有的知識(shí)能夠幫助更多有學(xué)習(xí)欲望的人。但值得一提的是,由于本人接觸行業(yè)時(shí)間有限,可能會(huì)出現(xiàn)一些技術(shù)上的紕漏,如果有問題歡迎私信、評(píng)論指出,大家共同進(jìn)步。
2.簡單介紹
在Flink對(duì)數(shù)據(jù)進(jìn)行計(jì)算的時(shí)候,一般會(huì)按照階段的不同,將處理過程分為 source->transform->sink
借此來完成數(shù)據(jù)從讀取到計(jì)算再到寫出的全過程。
本章節(jié)當(dāng)中要介紹的FlinkJDBC其實(shí)就是Sink階段的成員之一,它能夠幫助Flink達(dá)成從數(shù)據(jù)流到存儲(chǔ)介質(zhì)保存的全過程(存儲(chǔ)介質(zhì)需要支持JDBC)。如果SINK方的這個(gè)存儲(chǔ)介質(zhì)支持XA事務(wù)的話,那么FlinkJDBC還能夠?qū)ζ涮峁┚珳?zhǔn)一次性語義。
3.FlinkJDBC使用
3.1 引入依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.15.0</version>
</dependency>
3.2 直接指定JDBCSink即可
這里以ClickHouse為例->本代碼可直接粘貼使用,因?yàn)槭窃谖臋n中手寫的,沒有用編輯器,所以可能會(huì)有錯(cuò)別單詞.
//1.聲明靜態(tài)方法
public static <T> SinkFunction<T> getJdbcSink(String sql){
return JdbcSink.<T>sink(
sql,
new JdbcStatementBuilder<T>(){
//這個(gè)方法主要是完成對(duì)sql語句中的數(shù)據(jù)內(nèi)容對(duì)PreparedStatement對(duì)象中占位符的賦值
@Overwrite
public void accept(PreparedStatement preparedStatement,T obj) throws SQLException{
//通過反射來完成賦值,本段代碼結(jié)束之后有關(guān)于這部分內(nèi)容反射相關(guān)知識(shí)的介紹
Field[] declaredFields = obj.getClass().getDeclaredFields();
for(int i=0; i<declaredFields.length; i++){
Filed declaredField = declaredFields[i];
declaredField.setAccessible(true);
try{
Object value = declaredField.get(obj);
preparedStatement.setObject(i,value);
}catch(IllegalAccessException e){
e.printStackTrace();
}
}
}
},
JdbcExecutionOptions.bulider()
.withBatchIntervalMs(5000L) //指定多長時(shí)間發(fā)送一次
.withBatchSize(5) //指定攢夠多少條數(shù)發(fā)送一次
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver") //Drvier
.withUrl("jdbc:clickhouse://hadoop102:8123/table") //url
.build()
);
}
4.反射相關(guān)知識(shí)的描述
反射機(jī)制對(duì)于我來說,實(shí)際上就是一種能夠?yàn)槭褂谜咛峁┽槍?duì)未知對(duì)象或者未知類來進(jìn)行內(nèi)容讀取的一個(gè)功能。這是我個(gè)人對(duì)于反射的淺顯理解,如果有錯(cuò)誤歡迎指正。接下來我就要用我所理解的內(nèi)容,編寫一個(gè)簡單的例子,來解釋3.2程序段中accept方法是如何完成實(shí)體類對(duì)占位符進(jìn)行賦值的過程。
4.1 反射的小例子
思路:主程序想要通過對(duì)一個(gè)方法傳入不同的實(shí)體類,來獲得所有實(shí)體類中的所有屬性的字段信息。
準(zhǔn)備:主程序(用來調(diào)用方法)、兩個(gè)不同的實(shí)體類(用來對(duì)公共方法做驗(yàn)證)、泛型方法(輸出實(shí)體類中的字段信息)
//實(shí)體類1
@Data
@AllArgsConstructor
public class Student {
//用來表示學(xué)生信息
private String name;
private String banji;
private String score;
}
//實(shí)體類2
@Data
@AllArgsConstructor
public class Teacher {
//用來表示老師信息
private String dept;
private String classHeader;
}
//泛型方法
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
public class ReadInfoMethod {
//通過傳入?yún)?shù),然后循環(huán)的將作為參數(shù)的對(duì)象中的所有屬性的值信息添加到數(shù)組中
public static <T> List<T> getInfo(T t){
Class<?> aClass = t.getClass();
Field[] declaredFields = aClass.getDeclaredFields();
ArrayList<T> result = new ArrayList<T>();
for (Field declaredField : declaredFields) {
int num = 0;
declaredField.setAccessible(true);
try {
T o = (T)declaredField.get(t);
result.add(num,o);
num++;
}catch (IllegalAccessException e){
e.printStackTrace();
}
}
return result;
}
}
//主程序
public class test {
public static void main(String[] args) {
Student student = new Student("弗林克", "三年二班", "95");
Teacher teacher = new Teacher("辦公室部門", "三年二班班主任");
//調(diào)用泛型方法,獲得傳入對(duì)象的所有屬性字段的值信息的列表
List<Student> info = ReadInfoMethod.getInfo(student);
List<Teacher> teachers = ReadInfoMethod.getInfo(teacher);
System.out.println(info);
System.out.println(teachers);
}
}
泛型方法中調(diào)用的方法的方式,與FlinkJDBC中的accept方法中的內(nèi)容如出一轍。二者在表現(xiàn)形式上的區(qū)別就是accept方法在對(duì)占位符進(jìn)行賦值的時(shí)候,需要指定對(duì)應(yīng)字段的索引位置,因此 才有了preparedStatement.setObject(i,value);的這種方式。
Flink官網(wǎng)中針對(duì)這部分內(nèi)容進(jìn)行描述的地址是:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/jdbc/