驗(yàn)證本文需要具備Docker及Docker-composer,作者使用的環(huán)境為Mac + Docker
Docker啟動(dòng)Flink集群
首先下載Flink的鏡像docker pull flink,我下載的是1.9.0版本。
然后編寫(xiě) docker-composer.yml
version: "2.1"
services:
jobmanager:
image: flink
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
示例代碼
本代碼完成的功能是從SOCKET端口中讀取文本信息,分詞后在統(tǒng)計(jì)周期內(nèi)計(jì)算每個(gè)單詞出現(xiàn)的次數(shù)。這里只是列出關(guān)鍵代碼,全部工程代碼可以參考我的Github
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
final int port;
final String host;
port = 9008;
host = "192.168.65.2";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream(host, port, "\n");
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>(){
@Override
public void flatMap(String value, Collector<WordWithCount> out){
for(String word : value.split("\\s")){
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>(){
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b){
return new WordWithCount(a.word, a.count+b.count);
}
});
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
public static class WordWithCount {
public String word;
public long count;
public WordWithCount(){}
public WordWithCount(String word, long count){
this.word = word;
this.count = count;
}
@Override
public String toString(){
return word + ":" + count;
}
}
運(yùn)行示例
首先將flink運(yùn)行起來(lái),在docker-compose.yml所在目錄下執(zhí)行
$ docker-compose up -d
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
dc54c9cf6304 flink "/docker-entrypoint.…" 3 days ago Up 4 seconds 6121-6123/tcp, 8081/tcp flink_taskmanager_1
2eab6b0fd0f1 flink "/docker-entrypoint.…" 3 days ago Up 3 seconds 6123/tcp, 0.0.0.0:8081->8081/tcp flink_jobmanager_1
可以看到兩個(gè)實(shí)例已經(jīng)啟動(dòng)了,然后新開(kāi)一個(gè)終端窗口,運(yùn)行nc監(jiān)聽(tīng)程序。
$ nc -l 9008
打開(kāi)Flink界面,選擇Submit New Job,上傳編譯好的jar包。

image.png
提交后可以看到運(yùn)行的app已經(jīng)收到了數(shù)據(jù)

image.png
使用docker logs -f命令,然后在nc窗口中輸入一些單詞,你就能夠在docker窗口下看到統(tǒng)計(jì)結(jié)果輸出了。

image.png