15-Flink實(shí)戰(zhàn)項(xiàng)目之實(shí)時(shí)熱銷排行

戳更多文章:

1-Flink入門

2-本地環(huán)境搭建&構(gòu)建第一個(gè)Flink應(yīng)用

3-DataSet API

4-DataSteam API

5-集群部署

6-分布式緩存

7-重啟策略

8-Flink中的窗口

9-Flink中的Time

Flink時(shí)間戳和水印

Broadcast廣播變量

FlinkTable&SQL

Flink實(shí)戰(zhàn)項(xiàng)目實(shí)時(shí)熱銷排行

Flink寫入RedisSink

17-Flink消費(fèi)Kafka寫入Mysql

需求

某個(gè)圖書網(wǎng)站,希望看到雙十一秒殺期間實(shí)時(shí)的熱銷排行榜單。我們可以將“實(shí)時(shí)熱門商品”翻譯成程序員更好理解的需求:每隔5秒鐘輸出最近一小時(shí)內(nèi)點(diǎn)擊量最多的前 N 個(gè)商品/圖書.

需求分解

將這個(gè)需求進(jìn)行分解我們大概要做這么幾件事情:

  • 告訴 Flink 框架基于時(shí)間做窗口,我們這里用processingTime,不用自帶時(shí)間戳
  • 過(guò)濾出圖書點(diǎn)擊行為數(shù)據(jù)
  • 按一小時(shí)的窗口大小,每5秒鐘統(tǒng)計(jì)一次,做滑動(dòng)窗口聚合(Sliding Window)
  • 聚合,輸出窗口中點(diǎn)擊量前N名的商品

代碼實(shí)現(xiàn)

向Kafka發(fā)消息模擬購(gòu)買事件

public class KafkaProducer {


    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer("topn",new SimpleStringSchema(),properties);
/*
        //event-timestamp事件的發(fā)生時(shí)間
        producer.setWriteTimestampToKafka(true);
*/
        text.addSink(producer);
        env.execute();
    }
}//

其中的:MyNoParalleSource 是作者自己實(shí)現(xiàn)的一個(gè)并行度為1的發(fā)送器,用來(lái)向kafka發(fā)送數(shù)據(jù):

public class MyNoParalleSource implements SourceFunction<String> {//1

    //private long count = 1L;
    private boolean isRunning = true;

    /**
     * 主要的方法
     * 啟動(dòng)一個(gè)source
     * 大部分情況下,都需要在這個(gè)run方法中實(shí)現(xiàn)一個(gè)循環(huán),這樣就可以循環(huán)產(chǎn)生數(shù)據(jù)了
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while(isRunning){
            //圖書的排行榜
            List<String> books = new ArrayList<>();
            books.add("Pyhton從入門到放棄");//10
            books.add("Java從入門到放棄");//8
            books.add("Php從入門到放棄");//5
            books.add("C++從入門到放棄");//3
            books.add("Scala從入門到放棄");//0-4
            int i = new Random().nextInt(5);
            ctx.collect(books.get(i));

            //每1秒產(chǎn)生一條數(shù)據(jù)
            Thread.sleep(1000);
        }
    }
    //取消一個(gè)cancel的時(shí)候會(huì)調(diào)用的方法
    @Override
    public void cancel() {
        isRunning = false;
    }
}

可見(jiàn),我們每過(guò)1秒向Kafka的topn這個(gè)topic隨機(jī)發(fā)送一本書的名字用來(lái)模擬購(gòu)買行為。

整體實(shí)現(xiàn)代碼如下:

public class TopN {

    public static void main(String[] args) throws Exception{

        /**
         *
         *  書1 書2 書3
         *  (書1,1) (書2,1) (書3,1)
         *
         *
         */
        //每隔5秒鐘 計(jì)算過(guò)去1小時(shí) 的 Top 3 商品
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作為時(shí)間語(yǔ)義


        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        FlinkKafkaConsumer<String> input = new FlinkKafkaConsumer<>("topn", new SimpleStringSchema(), properties);

        //從最早開(kāi)始消費(fèi) 位點(diǎn)
        input.setStartFromEarliest();


        DataStream<String> stream = env
                .addSource(input);

        DataStream<Tuple2<String, Integer>> ds = stream
                .flatMap(new LineSplitter()); //將輸入語(yǔ)句split成一個(gè)一個(gè)單詞并初始化count值為1的Tuple2<String, Integer>類型


        DataStream<Tuple2<String, Integer>> wcount = ds
                .keyBy(0)
                .window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(5)))
                //key之后的元素進(jìn)入一個(gè)總時(shí)間長(zhǎng)度為600s,每5s向后滑動(dòng)一次的滑動(dòng)窗口
                .sum(1);// 將相同的key的元素第二個(gè)count值相加

        wcount
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//(shu1, xx) (shu2,xx)....
                //所有key元素進(jìn)入一個(gè)5s長(zhǎng)的窗口(選5秒是因?yàn)樯嫌未翱诿?s計(jì)算一輪數(shù)據(jù),topN窗口一次計(jì)算只統(tǒng)計(jì)一個(gè)窗口時(shí)間內(nèi)的變化)
                .process(new TopNAllFunction(3))
                .print();
//redis sink  redis -> 接口

        env.execute();
    }//





    private static final class LineSplitter implements
            FlatMapFunction<String, Tuple2<String, Integer>> {

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            //String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            /*for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }*/

            //(書1,1) (書2,1) (書3,1)
            out.collect(new Tuple2<String, Integer>(value, 1));
        }
    }

    private static class TopNAllFunction
            extends
            ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow> {

        private int topSize = 3;

        public TopNAllFunction(int topSize) {

            this.topSize = topSize;
        }

        public void process(

                ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>.Context arg0,
                Iterable<Tuple2<String, Integer>> input,
                Collector<String> out) throws Exception {

            TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
                    new Comparator<Integer>() {

                        @Override
                        public int compare(Integer y, Integer x) {
                            return (x < y) ? -1 : 1;
                        }

                    }); //treemap按照key降序排列,相同count值不覆蓋

            for (Tuple2<String, Integer> element : input) {
                treemap.put(element.f1, element);
                if (treemap.size() > topSize) { //只保留前面TopN個(gè)元素
                    treemap.pollLastEntry();
                }
            }


            for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap
                    .entrySet()) {
                out.collect("=================\n熱銷圖書列表:\n"+ new Timestamp(System.currentTimeMillis()) +  treemap.toString() + "\n===============\n");
            }

        }

    }


}//

查看輸出:

=================
熱銷圖書列表:
2019-03-05 22:32:40.004{8=(Java從入門到放棄,8), 7=(C++從入門到放棄,7), 5=(Php從入門到放棄,5)}
===============
=================
熱銷圖書列表:
2019-03-05 22:32:45.004{8=(Java從入門到放棄,8), 7=(C++從入門到放棄,7), 5=(Php從入門到放棄,5)}
===============

所有代碼,我放在了我的公眾號(hào),回復(fù)Flink可以下載

  • 海量【java和大數(shù)據(jù)的面試題+視頻資料】整理在公眾號(hào),關(guān)注后可以下載~
  • 更多大數(shù)據(jù)技術(shù)歡迎和作者一起探討~

關(guān)注我的公眾號(hào),后臺(tái)回復(fù)【JAVAPDF】獲取200頁(yè)面試題!
5萬(wàn)人關(guān)注的大數(shù)據(jù)成神之路,不來(lái)了解一下嗎?
5萬(wàn)人關(guān)注的大數(shù)據(jù)成神之路,真的不來(lái)了解一下嗎?
5萬(wàn)人關(guān)注的大數(shù)據(jù)成神之路,確定真的不來(lái)了解一下嗎?

歡迎您關(guān)注《大數(shù)據(jù)成神之路》
image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Hello,大家好!先自我介紹下,我叫....還是不說(shuō)真名吧低調(diào).....董小姐,傳說(shuō)中的......Mu...
    i董小姐閱讀 3,244評(píng)論 3 3
  • 豬年大吉,上一個(gè)豬年好像才剛剛過(guò)去,又好像早已遠(yuǎn)去。 我寫的東西好像都挺悲觀消極的,可有什么辦法呢,這是生活帶給我...
    快樂(lè)的小星星閱讀 193評(píng)論 2 6
  • 早晨,我迫不及待的問(wèn)老爸:“什么時(shí)候釣魚?!崩习终f(shuō):作業(yè)完成,下午三點(diǎn)。聽(tīng)完這句話,我立刻行動(dòng)起來(lái)了,我寫啊寫,寫...
    帥云吞1閱讀 407評(píng)論 0 1
  • 想通過(guò)一場(chǎng)會(huì)銷現(xiàn)場(chǎng)收款600——1000萬(wàn)[紅包][紅包][紅包]嗎? 想通過(guò)卡模式一次活動(dòng)現(xiàn)場(chǎng)儲(chǔ)值80——100...
    富民之路閱讀 1,061評(píng)論 0 1
  • mns區(qū)塊鏈商城到底是一個(gè)什么樣的商城平臺(tái)?和其它商城有什么區(qū)別呢?MNS商城是建立在區(qū)塊鏈的技術(shù)和算法的全球流通...
    波波談古論今閱讀 685評(píng)論 0 0

友情鏈接更多精彩內(nèi)容