Flink 程序看起來跟普通程序沒什么區(qū)別,都是處理數(shù)據(jù)流。每個(gè)程序都有幾個(gè)相同的基礎(chǔ)部分組成:
1、申請執(zhí)行環(huán)境
2、加載/創(chuàng)造原始數(shù)據(jù)
3、聲明對于這些數(shù)據(jù)的轉(zhuǎn)換過程
4、聲明存儲(chǔ)這些轉(zhuǎn)換后數(shù)據(jù)的目標(biāo)地址
5、觸發(fā)程序結(jié)束
接下來會(huì)對每個(gè)步驟做一個(gè)概述,想要了解細(xì)節(jié)的話請參考各自的部分。
Java DataStream API 所有的核心類都可以在這里找到。
1、申請執(zhí)行環(huán)境
StreamExecutionEnvironment是所有 Flink 程序的基礎(chǔ),通過調(diào)用它的以下這幾個(gè)靜態(tài)方法可以得到一個(gè)執(zhí)行環(huán)境:
StreamExecutionEnvironment.getExecutionEnvironment()
StreamExecutionEnvironment.createLocalEnvironment()
StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles)
如官方例子的main方法中第一行程序:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
一般來說,直接調(diào)用
getExecutionEnvironment()即可,他會(huì)根據(jù)上下文自動(dòng)處理。如果你像平時(shí)一樣是在本地 IDE 執(zhí)行程序,他會(huì)創(chuàng)建一個(gè)本地環(huán)境在你自己的機(jī)器上執(zhí)行代碼。如果你將代碼打包成一個(gè)jar包,并且通過命令行(flink run XXX.jar)的方式提交給 Flink 集群,F(xiàn)link 會(huì)為你的程序生成一個(gè)執(zhí)行環(huán)境,讓他在集群內(nèi)完成執(zhí)行。
2、加載/創(chuàng)造原始數(shù)據(jù)
談到數(shù)據(jù)源的聲明,以基于文件讀取型的場景為例,F(xiàn)link 的執(zhí)行環(huán)境支持了多種方法讓程序從文件讀?。?/p>
- 逐行讀取
- 讀取為 CSV 文件
- 使用其他提供的源
下面以從txt文件逐行讀取為例,講解代碼實(shí)現(xiàn):
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
首先,依然是申請執(zhí)行環(huán)境,之后調(diào)用該執(zhí)行環(huán)境對象的env.readTextFile方法即可得到一個(gè)數(shù)據(jù)流。
除了readTestFile()方法,F(xiàn)link 還支持別的方式,同時(shí)也支持其他場景的數(shù)據(jù)源加載,見官方文檔,這里僅做列表,不詳細(xì)解釋:
2.1 文件讀取型數(shù)據(jù)源
-
readTextFile(path):從 text 文件中逐行讀取,讀取進(jìn)來后的事件是 string 型的 -
readFile(fileInputFormat, path):讀取fileInputFormat類型的文檔 -
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo):前面兩個(gè)方法的內(nèi)部調(diào)用實(shí)現(xiàn)
2.2 Socket 型數(shù)據(jù)源
-
socketTextStream:從 socket 中讀取,各元素以分隔符分開
2.3 集合型
-
fromCollection(Collection):從Java.util.Collection中讀取數(shù)據(jù)生成數(shù)據(jù)流。集合中所有元素必須是同類型。 -
fromCollection(Iterator, Class):從迭代器中讀取數(shù)據(jù)生成數(shù)據(jù)流。參數(shù)Class用以聲明元素類型 -
fromElements(T ...):從數(shù)組中讀取對象數(shù)據(jù)生成數(shù)據(jù)流,所有對象必須是同類型。 -
fromParallelCollection(SplittableIterator, Class):Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator. -
generateSequence(from, to):在from~to之間并行生成數(shù)據(jù)返回為一個(gè)數(shù)組。
2.4 定制
-
addSource:添加一個(gè)新的數(shù)據(jù)源生成方法。如:從 Kafka 中讀取時(shí),可以這么寫:addSource(new FlinkKafkaConsumer<>(...))。具體語法參考connector。
3、聲明對于這些數(shù)據(jù)的轉(zhuǎn)換過程
得到源數(shù)據(jù)流后,接下來就是調(diào)用 Flink 的各個(gè) Operators 來對數(shù)據(jù)流中的數(shù)據(jù)做“變身”動(dòng)作了。以下面的代碼為例:
DataStream<String> input = ...;
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
這段代碼調(diào)用了map這個(gè)算子,將input流中的所有string型數(shù)據(jù)都轉(zhuǎn)換成int型,生成一個(gè)新的數(shù)據(jù)流賦值給parsed。
當(dāng)然,你可以做更復(fù)雜的“變身”,更多官方支持的算子及其語法介紹點(diǎn)我了解。
4、聲明存儲(chǔ)這些轉(zhuǎn)換后數(shù)據(jù)的目標(biāo)地址
當(dāng)你得到一個(gè)最終狀態(tài)的數(shù)據(jù)流后,接下來要考慮的就是如何把這個(gè)數(shù)據(jù)流寫道外部系統(tǒng)了,也就是創(chuàng)建一個(gè) sink。最常見的就是打印到控制臺(tái),可以這么實(shí)現(xiàn):
parsed.print();
Data sinks 消費(fèi)數(shù)據(jù)流,并且把他們寫入文件、sockets、外部系統(tǒng),或者直接打印他們。Flink 內(nèi)置了許多輸出格式。
-
writeAsText() / TextOutputFormat:以string形式按行輸出元素。該string是通過調(diào)用元素的toString()方法得到的。 -
writeAsCsv(...) / CsvOutputFormat:每個(gè)元素轉(zhuǎn)換成一個(gè)逗號隔開的二元數(shù)組。行于列的分隔符可以配置。列的值同樣是調(diào)用元素的toString()方法得到的。 -
print() / printToErr():直接打印到標(biāo)準(zhǔn)控制臺(tái)。還可以傳一個(gè)前綴參數(shù)加載字符串之前,用以區(qū)分不同的print()調(diào)用。 當(dāng)集群開啟并發(fā)模式且大于1的時(shí)候,還會(huì)把產(chǎn)生該數(shù)據(jù)的task的標(biāo)識(shí)符作為前綴加到輸出之前。 -
writeUsingOutputFormat() / FileOutputFormat:自定義文件類型,需要支持自定義類型與bytes類型的轉(zhuǎn)換。 -
writeToSocket:根據(jù)序列化規(guī)則輸出到socket。 -
addSink:調(diào)用外部sink方法??梢圆榭?a target="_blank">connector了解更多。
一般來說,
write*()方法都是用來掉使用的,這些方法輸出的元素不參與 Flink 的 checkpoint 機(jī)制。產(chǎn)生的直接效果就是,這些方法輸出的文檔可能會(huì)有丟失或者延遲,而我們無法通過 Flink 的checkpoint 機(jī)制來保證。
因此,為了保證流數(shù)據(jù)的精確送達(dá),建議使用 StreamingFileSink。同時(shí),通過.addSink(...)接口調(diào)用的外部實(shí)現(xiàn)也可以參與 Flink 的checkpoint機(jī)制,從而保證唯一性送達(dá)。
5、觸發(fā)程序結(jié)束
完成上述所有步驟后,接下來就是觸發(fā)程序執(zhí)行,方法就是調(diào)用當(dāng)前執(zhí)行環(huán)境的execute()方法。根據(jù)執(zhí)行環(huán)境的不同,F(xiàn)link會(huì)自動(dòng)選擇是在本地執(zhí)行,還是把代碼提交給集群執(zhí)行。
調(diào)用env.execute()方法后,F(xiàn)link 會(huì)等待程序執(zhí)行完成,并根據(jù)執(zhí)行模式不同做不同的后續(xù)處理。
如果你不想等待job執(zhí)行完成,你可以異步觸發(fā)程序執(zhí)行,這可以通過調(diào)用當(dāng)前執(zhí)行環(huán)境的executeAysnc()方法實(shí)現(xiàn)。此時(shí),他會(huì)返回給你一個(gè) JobClient,之后,你可以通過與 JobClient 交互來得到你提交的 job 的執(zhí)行情況,如:
final JobClient jobClient = env.executeAsync();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
最后這一步在幫助理解 Flink 算子的執(zhí)行時(shí)機(jī)以及方法極其重要。Flink 程序都是懶執(zhí)行的,即:當(dāng)程序的main()方法被執(zhí)行時(shí),數(shù)據(jù)的加載以及轉(zhuǎn)換工作并不會(huì)馬上開始,F(xiàn)link 會(huì)首先創(chuàng)建一個(gè) dataflow graph,并把他們添加進(jìn)去。只有當(dāng)execute()方法被觸發(fā)時(shí),算子們的處理才真正開始,這與執(zhí)行環(huán)境類型無關(guān),不管你是在本地執(zhí)行,還是在集群中執(zhí)行,都是如此。
6、一個(gè)完整的例子
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
// 申請執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 聲明數(shù)據(jù)源:從套接字localhost:9999讀取
// 聲明數(shù)據(jù)源的處理過程:.flat.Map().keyBy().window().sum()
// 生成最終的結(jié)果數(shù)據(jù)源
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
// 配置sink,將結(jié)果集打印到控制臺(tái)
dataStream.print();
// 觸發(fā)執(zhí)行
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
編碼完成后,本地啟動(dòng)socket輸入端:
nc -lk 9999
然后敲入一些單詞,之后啟動(dòng) job,查看 job 的控制臺(tái)輸出,是你輸入的這些么?