《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Source 轉(zhuǎn)自微信公眾號:zhisheng

前言

在?《從0到1學(xué)習(xí)Flink》—— Data Source 介紹?文章中,我給大家介紹了 Flink Data Source 以及簡短的介紹了一下自定義 Data Source,這篇文章更詳細(xì)的介紹下,并寫一個 demo 出來讓大家理解。

Flink Kafka source

準(zhǔn)備工作

我們先來看下 Flink 從 Kafka topic 中獲取數(shù)據(jù)的 demo,首先你需要安裝好了 FLink 和 Kafka 。

運(yùn)行啟動 Flink、Zookepeer、Kafka,

好了,都啟動了!

maven 依賴

<!--flink java-->

org.apache.flink

flink-java

${flink.version}

provided

org.apache.flink

flink-streaming-java_${scala.binary.version}

${flink.version}

provided

<!--日志-->

org.slf4j

slf4j-log4j12

1.7.7

runtime

log4j

log4j

1.2.17

runtime

<!--flink kafka connector-->

org.apache.flink

flink-connector-kafka-0.11_${scala.binary.version}

${flink.version}

<!--alibaba fastjson-->

com.alibaba

fastjson

1.2.51

測試發(fā)送數(shù)據(jù)到 kafka topic

實體類,Metric.java

packagecom.zhisheng.flink.model;

importjava.util.Map;

/**

* Desc:

* weixi: zhisheng_tian

* blog: http://www.54tianzhisheng.cn/

*/

publicclassMetric{

publicString name;

publiclongtimestamp;

publicMap fields;

publicMap tags;

publicMetric(){

? ? }

publicMetric(String name,longtimestamp, Map fields, Map tags){

this.name = name;

this.timestamp = timestamp;

this.fields = fields;

this.tags = tags;

? ? }

@Override

publicStringtoString(){

return"Metric{"+

"name='"+ name +'\''+

", timestamp='"+ timestamp +'\''+

", fields="+ fields +

", tags="+ tags +

'}';

? ? }

publicStringgetName(){

returnname;

? ? }

publicvoidsetName(String name){

this.name = name;

? ? }

publiclonggetTimestamp(){

returntimestamp;

? ? }

publicvoidsetTimestamp(longtimestamp){

this.timestamp = timestamp;

? ? }

publicMapgetFields(){

returnfields;

? ? }

publicvoidsetFields(Map<String, Object> fields){

this.fields = fields;

? ? }

publicMapgetTags(){

returntags;

? ? }

publicvoidsetTags(Map<String, String> tags){

this.tags = tags;

? ? }

}

往 kafka 中寫數(shù)據(jù)工具類:KafkaUtils.java

importcom.alibaba.fastjson.JSON;

importcom.zhisheng.flink.model.Metric;

importorg.apache.kafka.clients.producer.KafkaProducer;

importorg.apache.kafka.clients.producer.ProducerRecord;

importjava.util.HashMap;

importjava.util.Map;

importjava.util.Properties;

/**

* 往kafka中寫數(shù)據(jù)

* 可以使用這個main函數(shù)進(jìn)行測試一下

* weixin: zhisheng_tian

* blog: http://www.54tianzhisheng.cn/

*/

publicclassKafkaUtils{

publicstaticfinalString broker_list ="localhost:9092";

publicstaticfinalString topic ="metric";// kafka topic,F(xiàn)link 程序中需要和這個統(tǒng)一

publicstaticvoidwriteToKafka()throwsInterruptedException{

Properties props =newProperties();

props.put("bootstrap.servers", broker_list);

props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//key 序列化

props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//value 序列化

KafkaProducer producer =newKafkaProducer(props);

Metric metric =newMetric();

? ? ? ? metric.setTimestamp(System.currentTimeMillis());

metric.setName("mem");

Map tags =newHashMap<>();

Map fields =newHashMap<>();

tags.put("cluster","zhisheng");

tags.put("host_ip","101.147.022.106");

fields.put("used_percent",90d);

fields.put("max",27244873d);

fields.put("used",17244873d);

fields.put("init",27244873d);

? ? ? ? metric.setTags(tags);

? ? ? ? metric.setFields(fields);

ProducerRecord record =newProducerRecord(topic,null,null, JSON.toJSONString(metric));

? ? ? ? producer.send(record);

System.out.println("發(fā)送數(shù)據(jù): "+ JSON.toJSONString(metric));

? ? ? ? producer.flush();

? ? }

publicstaticvoidmain(String[] args)throwsInterruptedException{

while(true) {

Thread.sleep(300);

? ? ? ? ? ? writeToKafka();

? ? ? ? }

? ? }

}

運(yùn)行:

如果出現(xiàn)如上圖標(biāo)記的,即代表能夠不斷的往 kafka 發(fā)送數(shù)據(jù)的。

Flink 程序

Main.java

packagecom.zhisheng.flink;

importorg.apache.flink.api.common.serialization.SimpleStringSchema;

importorg.apache.flink.streaming.api.datastream.DataStreamSource;

importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

importjava.util.Properties;

/**

* Desc:

* weixi: zhisheng_tian

* blog: http://www.54tianzhisheng.cn/

*/

publicclassMain{

publicstaticvoidmain(String[] args)throwsException{

finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties props =newProperties();

props.put("bootstrap.servers","localhost:9092");

props.put("zookeeper.connect","localhost:2181");

props.put("group.id","metric-group");

props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//key 反序列化

props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

props.put("auto.offset.reset","latest");//value 反序列化

DataStreamSource dataStreamSource = env.addSource(newFlinkKafkaConsumer011<>(

"metric",//kafka topic

newSimpleStringSchema(),// String 序列化

props)).setParallelism(1);

dataStreamSource.print();//把從 kafka 讀取到的數(shù)據(jù)打印在控制臺

env.execute("Flink add data source");

? ? }

}

運(yùn)行起來:

看到?jīng)]程序,F(xiàn)link 程序控制臺能夠源源不斷的打印數(shù)據(jù)呢。

自定義 Source

上面就是 Flink 自帶的 Kafka source,那么接下來就模仿著寫一個從 MySQL 中讀取數(shù)據(jù)的 Source。

首先 pom.xml 中添加 MySQL 依賴

mysql

mysql-connector-java

5.1.34

數(shù)據(jù)庫建表如下:

DROPTABLEIFEXISTS`student`;

CREATETABLE`student`(

`id`int(11)unsignedNOTNULLAUTO_INCREMENT,

`name`varchar(25)COLLATEutf8_binDEFAULTNULL,

`password`varchar(25)COLLATEutf8_binDEFAULTNULL,

`age`int(10)DEFAULTNULL,

PRIMARYKEY(`id`)

)ENGINE=InnoDBAUTO_INCREMENT=5DEFAULTCHARSET=utf8COLLATE=utf8_bin;

插入數(shù)據(jù)


INSERTINTO`student`VALUES('1','zhisheng01','123456','18'), ('2','zhisheng02','123','17'), ('3','zhisheng03','1234','18'), ('4','zhisheng04','12345','16');

COMMIT;

新建實體類:Student.java

packagecom.zhisheng.flink.model;

/**

* Desc:

* weixi: zhisheng_tian

* blog: http://www.54tianzhisheng.cn/

*/

publicclassStudent{

publicintid;

publicString name;

publicString password;

publicintage;

publicStudent(){

? ? }

publicStudent(intid, String name, String password,intage){

this.id = id;

this.name = name;

this.password = password;

this.age = age;

? ? }

@Override

publicStringtoString(){

return"Student{"+

"id="+ id +

", name='"+ name +'\''+

", password='"+ password +'\''+

", age="+ age +

'}';

? ? }

publicintgetId(){

returnid;

? ? }

publicvoidsetId(intid){

this.id = id;

? ? }

publicStringgetName(){

returnname;

? ? }

publicvoidsetName(String name){

this.name = name;

? ? }

publicStringgetPassword(){

returnpassword;

? ? }

publicvoidsetPassword(String password){

this.password = password;

? ? }

publicintgetAge(){

returnage;

? ? }

publicvoidsetAge(intage){

this.age = age;

? ? }

}

新建 Source 類?SourceFromMySQL.java,該類繼承 RichSourceFunction ,實現(xiàn)里面的 open、close、run、cancel 方法:


packagecom.zhisheng.flink.source;

importcom.zhisheng.flink.model.Student;

importorg.apache.flink.configuration.Configuration;

importorg.apache.flink.streaming.api.functions.source.RichSourceFunction;

importjava.sql.Connection;

importjava.sql.DriverManager;

importjava.sql.PreparedStatement;

importjava.sql.ResultSet;

/**

* Desc:

* weixi: zhisheng_tian

* blog: http://www.54tianzhisheng.cn/

*/

publicclassSourceFromMySQLextendsRichSourceFunction{

? ? PreparedStatement ps;

privateConnection connection;

/**

? ? * open() 方法中建立連接,這樣不用每次 invoke 的時候都要建立連接和釋放連接。

? ? *

*@paramparameters

*@throwsException

? ? */

@Override

publicvoidopen(Configuration parameters)throwsException{

super.open(parameters);

? ? ? ? connection = getConnection();

String sql ="select * from Student;";

ps =this.connection.prepareStatement(sql);

? ? }

/**

? ? * 程序執(zhí)行完畢就可以進(jìn)行,關(guān)閉連接和釋放資源的動作了

? ? *

*@throwsException

? ? */

@Override

publicvoidclose()throwsException{

super.close();

if(connection !=null) {//關(guān)閉連接和釋放資源

? ? ? ? ? ? connection.close();

? ? ? ? }

if(ps !=null) {

? ? ? ? ? ? ps.close();

? ? ? ? }

? ? }

/**

? ? * DataStream 調(diào)用一次 run() 方法用來獲取數(shù)據(jù)

? ? *

*@paramctx

*@throwsException

? ? */

@Override

publicvoidrun(SourceContext<Student> ctx)throwsException{

? ? ? ? ResultSet resultSet = ps.executeQuery();

while(resultSet.next()) {

Student student =newStudent(

resultSet.getInt("id"),

resultSet.getString("name").trim(),

resultSet.getString("password").trim(),

resultSet.getInt("age"));

? ? ? ? ? ? ctx.collect(student);

? ? ? ? }

? ? }

@Override

publicvoidcancel(){

? ? }

privatestaticConnectiongetConnection(){

Connection con =null;

try{

Class.forName("com.mysql.jdbc.Driver");

con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8","root","root123456");

}catch(Exception e) {

System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());

? ? ? ? ? ? }

returncon;

? ? }

}

Flink 程序


packagecom.zhisheng.flink;

importcom.zhisheng.flink.source.SourceFromMySQL;

importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**

* Desc:

* weixi: zhisheng_tian

* blog: http://www.54tianzhisheng.cn/

*/

publicclassMain2{

publicstaticvoidmain(String[] args)throwsException{

finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(newSourceFromMySQL()).print();

env.execute("Flink add data sourc");

? ? }

}

運(yùn)行 Flink 程序,控制臺日志中可以看見打印的 student 信息。

RichSourceFunction

從上面自定義的 Source 可以看到我們繼承的就是這個 RichSourceFunction 類,那么來了解一下:

一個抽象類,繼承自 AbstractRichFunction。為實現(xiàn)一個 Rich SourceFunction 提供基礎(chǔ)能力。該類的子類有三個,兩個是抽象類,在此基礎(chǔ)上提供了更具體的實現(xiàn),另一個是 ContinuousFileMonitoringFunction。

MessageAcknowledgingSourceBase :它針對的是數(shù)據(jù)源是消息隊列的場景并且提供了基于 ID 的應(yīng)答機(jī)制。

MultipleIdsMessageAcknowledgingSourceBase : 在 MessageAcknowledgingSourceBase 的基礎(chǔ)上針對 ID 應(yīng)答機(jī)制進(jìn)行了更為細(xì)分的處理,支持兩種 ID 應(yīng)答模型:session id 和 unique message id。

ContinuousFileMonitoringFunction:這是單個(非并行)監(jiān)視任務(wù),它接受 FileInputFormat,并且根據(jù) FileProcessingMode 和 FilePathFilter,它負(fù)責(zé)監(jiān)視用戶提供的路徑;決定應(yīng)該進(jìn)一步讀取和處理哪些文件;創(chuàng)建與這些文件對應(yīng)的 FileInputSplit 拆分,將它們分配給下游任務(wù)以進(jìn)行進(jìn)一步處理。

最后

本文主要講了下 Flink 使用 Kafka Source 的使用,并提供了一個 demo 教大家如何自定義 Source,從 MySQL 中讀取數(shù)據(jù),當(dāng)然你也可以從其他地方讀取,實現(xiàn)自己的數(shù)據(jù)源 source??赡芷綍r工作會比這個更復(fù)雜,需要大家靈活應(yīng)對!

關(guān)注我

轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/10/30/flink-create-source/

微信公眾號:zhisheng

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容