前言
在?《從0到1學(xué)習(xí)Flink》—— Data Source 介紹?文章中,我給大家介紹了 Flink Data Source 以及簡短的介紹了一下自定義 Data Source,這篇文章更詳細(xì)的介紹下,并寫一個 demo 出來讓大家理解。
我們先來看下 Flink 從 Kafka topic 中獲取數(shù)據(jù)的 demo,首先你需要安裝好了 FLink 和 Kafka 。
運(yùn)行啟動 Flink、Zookepeer、Kafka,


好了,都啟動了!
<!--flink java-->
org.apache.flink
flink-java
${flink.version}
provided
org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
provided
<!--日志-->
org.slf4j
slf4j-log4j12
1.7.7
runtime
log4j
log4j
1.2.17
runtime
<!--flink kafka connector-->
org.apache.flink
flink-connector-kafka-0.11_${scala.binary.version}
${flink.version}
<!--alibaba fastjson-->
com.alibaba
fastjson
1.2.51
測試發(fā)送數(shù)據(jù)到 kafka topic
實體類,Metric.java
packagecom.zhisheng.flink.model;
importjava.util.Map;
/**
* Desc:
* weixi: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
publicclassMetric{
publicString name;
publiclongtimestamp;
publicMap fields;
publicMap tags;
publicMetric(){
? ? }
publicMetric(String name,longtimestamp, Map fields, Map tags){
this.name = name;
this.timestamp = timestamp;
this.fields = fields;
this.tags = tags;
? ? }
@Override
publicStringtoString(){
return"Metric{"+
"name='"+ name +'\''+
", timestamp='"+ timestamp +'\''+
", fields="+ fields +
", tags="+ tags +
'}';
? ? }
publicStringgetName(){
returnname;
? ? }
publicvoidsetName(String name){
this.name = name;
? ? }
publiclonggetTimestamp(){
returntimestamp;
? ? }
publicvoidsetTimestamp(longtimestamp){
this.timestamp = timestamp;
? ? }
publicMapgetFields(){
returnfields;
? ? }
publicvoidsetFields(Map<String, Object> fields){
this.fields = fields;
? ? }
publicMapgetTags(){
returntags;
? ? }
publicvoidsetTags(Map<String, String> tags){
this.tags = tags;
? ? }
}
往 kafka 中寫數(shù)據(jù)工具類:KafkaUtils.java
importcom.alibaba.fastjson.JSON;
importcom.zhisheng.flink.model.Metric;
importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.ProducerRecord;
importjava.util.HashMap;
importjava.util.Map;
importjava.util.Properties;
/**
* 往kafka中寫數(shù)據(jù)
* 可以使用這個main函數(shù)進(jìn)行測試一下
* weixin: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
publicclassKafkaUtils{
publicstaticfinalString broker_list ="localhost:9092";
publicstaticfinalString topic ="metric";// kafka topic,F(xiàn)link 程序中需要和這個統(tǒng)一
publicstaticvoidwriteToKafka()throwsInterruptedException{
Properties props =newProperties();
props.put("bootstrap.servers", broker_list);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//key 序列化
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//value 序列化
KafkaProducer producer =newKafkaProducer(props);
Metric metric =newMetric();
? ? ? ? metric.setTimestamp(System.currentTimeMillis());
metric.setName("mem");
Map tags =newHashMap<>();
Map fields =newHashMap<>();
tags.put("cluster","zhisheng");
tags.put("host_ip","101.147.022.106");
fields.put("used_percent",90d);
fields.put("max",27244873d);
fields.put("used",17244873d);
fields.put("init",27244873d);
? ? ? ? metric.setTags(tags);
? ? ? ? metric.setFields(fields);
ProducerRecord record =newProducerRecord(topic,null,null, JSON.toJSONString(metric));
? ? ? ? producer.send(record);
System.out.println("發(fā)送數(shù)據(jù): "+ JSON.toJSONString(metric));
? ? ? ? producer.flush();
? ? }
publicstaticvoidmain(String[] args)throwsInterruptedException{
while(true) {
Thread.sleep(300);
? ? ? ? ? ? writeToKafka();
? ? ? ? }
? ? }
}
運(yùn)行:

如果出現(xiàn)如上圖標(biāo)記的,即代表能夠不斷的往 kafka 發(fā)送數(shù)據(jù)的。
Main.java
packagecom.zhisheng.flink;
importorg.apache.flink.api.common.serialization.SimpleStringSchema;
importorg.apache.flink.streaming.api.datastream.DataStreamSource;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
importjava.util.Properties;
/**
* Desc:
* weixi: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
publicclassMain{
publicstaticvoidmain(String[] args)throwsException{
finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props =newProperties();
props.put("bootstrap.servers","localhost:9092");
props.put("zookeeper.connect","localhost:2181");
props.put("group.id","metric-group");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//key 反序列化
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","latest");//value 反序列化
DataStreamSource dataStreamSource = env.addSource(newFlinkKafkaConsumer011<>(
"metric",//kafka topic
newSimpleStringSchema(),// String 序列化
props)).setParallelism(1);
dataStreamSource.print();//把從 kafka 讀取到的數(shù)據(jù)打印在控制臺
env.execute("Flink add data source");
? ? }
}
運(yùn)行起來:

看到?jīng)]程序,F(xiàn)link 程序控制臺能夠源源不斷的打印數(shù)據(jù)呢。
上面就是 Flink 自帶的 Kafka source,那么接下來就模仿著寫一個從 MySQL 中讀取數(shù)據(jù)的 Source。
首先 pom.xml 中添加 MySQL 依賴:
mysql
mysql-connector-java
5.1.34
數(shù)據(jù)庫建表如下:
DROPTABLEIFEXISTS`student`;
CREATETABLE`student`(
`id`int(11)unsignedNOTNULLAUTO_INCREMENT,
`name`varchar(25)COLLATEutf8_binDEFAULTNULL,
`password`varchar(25)COLLATEutf8_binDEFAULTNULL,
`age`int(10)DEFAULTNULL,
PRIMARYKEY(`id`)
)ENGINE=InnoDBAUTO_INCREMENT=5DEFAULTCHARSET=utf8COLLATE=utf8_bin;
插入數(shù)據(jù):
INSERTINTO`student`VALUES('1','zhisheng01','123456','18'), ('2','zhisheng02','123','17'), ('3','zhisheng03','1234','18'), ('4','zhisheng04','12345','16');
COMMIT;
新建實體類:Student.java
packagecom.zhisheng.flink.model;
/**
* Desc:
* weixi: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
publicclassStudent{
publicintid;
publicString name;
publicString password;
publicintage;
publicStudent(){
? ? }
publicStudent(intid, String name, String password,intage){
this.id = id;
this.name = name;
this.password = password;
this.age = age;
? ? }
@Override
publicStringtoString(){
return"Student{"+
"id="+ id +
", name='"+ name +'\''+
", password='"+ password +'\''+
", age="+ age +
'}';
? ? }
publicintgetId(){
returnid;
? ? }
publicvoidsetId(intid){
this.id = id;
? ? }
publicStringgetName(){
returnname;
? ? }
publicvoidsetName(String name){
this.name = name;
? ? }
publicStringgetPassword(){
returnpassword;
? ? }
publicvoidsetPassword(String password){
this.password = password;
? ? }
publicintgetAge(){
returnage;
? ? }
publicvoidsetAge(intage){
this.age = age;
? ? }
}
新建 Source 類?SourceFromMySQL.java,該類繼承 RichSourceFunction ,實現(xiàn)里面的 open、close、run、cancel 方法:
packagecom.zhisheng.flink.source;
importcom.zhisheng.flink.model.Student;
importorg.apache.flink.configuration.Configuration;
importorg.apache.flink.streaming.api.functions.source.RichSourceFunction;
importjava.sql.Connection;
importjava.sql.DriverManager;
importjava.sql.PreparedStatement;
importjava.sql.ResultSet;
/**
* Desc:
* weixi: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
publicclassSourceFromMySQLextendsRichSourceFunction{
? ? PreparedStatement ps;
privateConnection connection;
/**
? ? * open() 方法中建立連接,這樣不用每次 invoke 的時候都要建立連接和釋放連接。
? ? *
*@paramparameters
*@throwsException
? ? */
@Override
publicvoidopen(Configuration parameters)throwsException{
super.open(parameters);
? ? ? ? connection = getConnection();
String sql ="select * from Student;";
ps =this.connection.prepareStatement(sql);
? ? }
/**
? ? * 程序執(zhí)行完畢就可以進(jìn)行,關(guān)閉連接和釋放資源的動作了
? ? *
*@throwsException
? ? */
@Override
publicvoidclose()throwsException{
super.close();
if(connection !=null) {//關(guān)閉連接和釋放資源
? ? ? ? ? ? connection.close();
? ? ? ? }
if(ps !=null) {
? ? ? ? ? ? ps.close();
? ? ? ? }
? ? }
/**
? ? * DataStream 調(diào)用一次 run() 方法用來獲取數(shù)據(jù)
? ? *
*@paramctx
*@throwsException
? ? */
@Override
publicvoidrun(SourceContext<Student> ctx)throwsException{
? ? ? ? ResultSet resultSet = ps.executeQuery();
while(resultSet.next()) {
Student student =newStudent(
resultSet.getInt("id"),
resultSet.getString("name").trim(),
resultSet.getString("password").trim(),
resultSet.getInt("age"));
? ? ? ? ? ? ctx.collect(student);
? ? ? ? }
? ? }
@Override
publicvoidcancel(){
? ? }
privatestaticConnectiongetConnection(){
Connection con =null;
try{
Class.forName("com.mysql.jdbc.Driver");
con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8","root","root123456");
}catch(Exception e) {
System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
? ? ? ? ? ? }
returncon;
? ? }
}
Flink 程序:
packagecom.zhisheng.flink;
importcom.zhisheng.flink.source.SourceFromMySQL;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Desc:
* weixi: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
publicclassMain2{
publicstaticvoidmain(String[] args)throwsException{
finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(newSourceFromMySQL()).print();
env.execute("Flink add data sourc");
? ? }
}
運(yùn)行 Flink 程序,控制臺日志中可以看見打印的 student 信息。

從上面自定義的 Source 可以看到我們繼承的就是這個 RichSourceFunction 類,那么來了解一下:

一個抽象類,繼承自 AbstractRichFunction。為實現(xiàn)一個 Rich SourceFunction 提供基礎(chǔ)能力。該類的子類有三個,兩個是抽象類,在此基礎(chǔ)上提供了更具體的實現(xiàn),另一個是 ContinuousFileMonitoringFunction。

MessageAcknowledgingSourceBase :它針對的是數(shù)據(jù)源是消息隊列的場景并且提供了基于 ID 的應(yīng)答機(jī)制。
MultipleIdsMessageAcknowledgingSourceBase : 在 MessageAcknowledgingSourceBase 的基礎(chǔ)上針對 ID 應(yīng)答機(jī)制進(jìn)行了更為細(xì)分的處理,支持兩種 ID 應(yīng)答模型:session id 和 unique message id。
ContinuousFileMonitoringFunction:這是單個(非并行)監(jiān)視任務(wù),它接受 FileInputFormat,并且根據(jù) FileProcessingMode 和 FilePathFilter,它負(fù)責(zé)監(jiān)視用戶提供的路徑;決定應(yīng)該進(jìn)一步讀取和處理哪些文件;創(chuàng)建與這些文件對應(yīng)的 FileInputSplit 拆分,將它們分配給下游任務(wù)以進(jìn)行進(jìn)一步處理。
本文主要講了下 Flink 使用 Kafka Source 的使用,并提供了一個 demo 教大家如何自定義 Source,從 MySQL 中讀取數(shù)據(jù),當(dāng)然你也可以從其他地方讀取,實現(xiàn)自己的數(shù)據(jù)源 source??赡芷綍r工作會比這個更復(fù)雜,需要大家靈活應(yīng)對!
轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/10/30/flink-create-source/
微信公眾號:zhisheng