Flink101-快速示例

驗(yàn)證本文需要具備Docker及Docker-composer,作者使用的環(huán)境為Mac + Docker

Docker啟動(dòng)Flink集群

首先下載Flink的鏡像docker pull flink,我下載的是1.9.0版本。

然后編寫(xiě) docker-composer.yml

version: "2.1"
services:
  jobmanager:
    image: flink
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

示例代碼

本代碼完成的功能是從SOCKET端口中讀取文本信息,分詞后在統(tǒng)計(jì)周期內(nèi)計(jì)算每個(gè)單詞出現(xiàn)的次數(shù)。這里只是列出關(guān)鍵代碼,全部工程代碼可以參考我的Github

public class SocketWindowWordCount {
    public static void main(String[] args) throws Exception {
        final int port;
        final String host;
        port = 9008;
        host = "192.168.65.2";

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.socketTextStream(host, port, "\n");

        DataStream<WordWithCount> windowCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>(){
                @Override
                public void flatMap(String value, Collector<WordWithCount> out){
                    for(String word : value.split("\\s")){
                        out.collect(new WordWithCount(word, 1L));
                    }
                }
            })
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .reduce(new ReduceFunction<WordWithCount>(){
                @Override
                public WordWithCount reduce(WordWithCount a, WordWithCount b){
                    return new WordWithCount(a.word, a.count+b.count);
                }
            });

        windowCounts.print().setParallelism(1);
        env.execute("Socket Window WordCount");
    }

    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount(){}

        public WordWithCount(String word, long count){
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString(){
            return word + ":" + count;
        }
    }

運(yùn)行示例

首先將flink運(yùn)行起來(lái),在docker-compose.yml所在目錄下執(zhí)行

$ docker-compose up -d
$ docker ps
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                              NAMES
dc54c9cf6304        flink               "/docker-entrypoint.…"   3 days ago          Up 4 seconds        6121-6123/tcp, 8081/tcp            flink_taskmanager_1
2eab6b0fd0f1        flink               "/docker-entrypoint.…"   3 days ago          Up 3 seconds        6123/tcp, 0.0.0.0:8081->8081/tcp   flink_jobmanager_1

可以看到兩個(gè)實(shí)例已經(jīng)啟動(dòng)了,然后新開(kāi)一個(gè)終端窗口,運(yùn)行nc監(jiān)聽(tīng)程序。

$ nc -l 9008

打開(kāi)Flink界面,選擇Submit New Job,上傳編譯好的jar包。


image.png

提交后可以看到運(yùn)行的app已經(jīng)收到了數(shù)據(jù)


image.png

使用docker logs -f命令,然后在nc窗口中輸入一些單詞,你就能夠在docker窗口下看到統(tǒng)計(jì)結(jié)果輸出了。


image.png

參考資料

  1. Flink-基于Docker的開(kāi)發(fā)環(huán)境搭建
  2. Docker composer 快速入門(mén)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1.Flink架構(gòu)及特性分析 Flink是個(gè)相當(dāng)早的項(xiàng)目,開(kāi)始于2008年,但只在最近才得到注意。Flink是原生...
    生活的探路者閱讀 1,783評(píng)論 0 5
  • 金秋十月。這樣的夜晚已然帶著絲絲寒意襲來(lái),卻也正好清醒一下運(yùn)行幾個(gè)小時(shí)后沉悶的頭腦。 身后的教室里擁擠的人味,空氣...
    多瑞果閱讀 300評(píng)論 1 3
  • 01 一個(gè)人愛(ài)不愛(ài)你,其實(shí)很好證明,從他打開(kāi)你朋友圈的方式就看得出。 這句話不是我說(shuō)的,是朋友思思說(shuō)的。思思是個(gè)刷...
    御樓閱讀 551評(píng)論 0 0
  • 夢(mèng)里經(jīng)常夢(mèng)到廁所里的坑,很厭惡,佷惡心,胃部有些堵?;貞浶r(shí)候有過(guò)這種感受的場(chǎng)景,一次媽媽給我買(mǎi)的心愛(ài)的毛...
    心清如水2018閱讀 355評(píng)論 0 0
  • 鳥(niǎo)在籠中,恨關(guān)羽不能張飛;人活世上,要八戒更需悟空。 前日有個(gè)五年級(jí)的學(xué)生,拿著一張語(yǔ)文卷子向我請(qǐng)教一個(gè)問(wèn)題,內(nèi)容...
    蘭亭雋客閱讀 454評(píng)論 2 4

友情鏈接更多精彩內(nèi)容