2021-02-04-Flink-39(Flink 電商用戶行為分析案例 一)

1.實時熱門商品統(tǒng)計

日志字段有用戶id 商品種類 類別 行為類型(PV) 時間
要求統(tǒng)計某段時間內(nèi)的tonN商品種類以及其數(shù)量
思路:典型的滑動窗口案例
首先按商品類型分類獲取其數(shù)量特征
自定義aggregate獲取聚合后要的字段:商品類型,結(jié)束時間,熱門度
然后按結(jié)束時間來分類
使用Liststate狀態(tài)存儲時間段內(nèi)的種類和數(shù)量
使用定時器處理遲到數(shù)據(jù)
裝入list中排序獲取topN

讀取文件,實際開發(fā)用kafka懶得起集群了.....

#示例數(shù)據(jù)
866796,534083,4203730,pv,1511658000
937166,321683,2355072,pv,1511658000
156905,2901727,3001296,pv,1511658000
758810,5109495,1575622,pv,1511658000
107304,111477,4173315,pv,1511658000
452437,3255022,5099474,pv,1511658000
813974,1332724,2520771,buy,1511658000
524395,3887779,2366905,pv,1511658000
470572,3760258,1299190,pv,1511658001
543789,3110556,4558987,cart,1511658001
354759,2191348,4756105,pv,1511658001
382009,2123538,4801426,pv,1511658001
677046,1598945,4145813,pv,1511658001
public class UserBehavior {
/**
 * Created with IntelliJ IDEA.
 * Description: 
 * User: 
 * Date: 2021-02-07
 * Time: 21:16
 */
   private Long userId;
   private Long itemId;
   private Long category;
   private String behavior;
   private Long timestamp;

    public UserBehavior() {
    }

    public UserBehavior(Long userId, Long itemId, Long category, String behavior, Long timestamp) {
        this.userId = userId;
        this.itemId = itemId;
        this.category = category;
        this.behavior = behavior;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "UserBehavior{" +
                "userId=" + userId +
                ", itemId=" + itemId +
                ", category=" + category +
                ", behavior='" + behavior + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public Long getItemId() {
        return itemId;
    }

    public void setItemId(Long itemId) {
        this.itemId = itemId;
    }

    public Long getCategory() {
        return category;
    }

    public void setCategory(Long category) {
        this.category = category;
    }

    public String getBehavior() {
        return behavior;
    }

    public void setBehavior(String behavior) {
        this.behavior = behavior;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }
}


public class ItemViewCount {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-02-07
     * Time: 21:20
     */
    private Long itemId;
    private Long windowEnd;
    private Long count;

    @Override
    public String toString() {
        return "ItemViewCount{" +
                "itemId=" + itemId +
                ", windowEnd=" + windowEnd +
                ", count=" + count +
                '}';
    }

    public Long getItemId() {
        return itemId;
    }

    public void setItemId(Long itemId) {
        this.itemId = itemId;
    }

    public Long getWindowEnd() {
        return windowEnd;
    }

    public void setWindowEnd(Long windowEnd) {
        this.windowEnd = windowEnd;
    }

    public Long getCount() {
        return count;
    }

    public void setCount(Long count) {
        this.count = count;
    }

    public ItemViewCount(Long itemId, Long windowEnd, Long count) {
        this.itemId = itemId;
        this.windowEnd = windowEnd;
        this.count = count;
    }

    public ItemViewCount() {
    }
}


public class HotItem {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-02-06
     * Time: 23:02
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource<String> source = env.readTextFile("C:\\Users\\hp\\IdeaProjects\\Flink_Java\\src\\main\\resources\\UserBehavior.csv");
        SingleOutputStreamOperator<UserBehavior> map = source.map(new RichMapFunction<String, UserBehavior>() {
            @Override
            public UserBehavior map(String s) throws Exception {
                String[] split = s.split(",");
                return new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Long.parseLong(split[2]), split[3], Long.parseLong(split[4]));
            }
        }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
            @Override
            public long extractAscendingTimestamp(UserBehavior element) {
                return element.getTimestamp() * 1000L;
            }
        });

        //過濾
        SingleOutputStreamOperator<UserBehavior> filter = map.filter(new FilterFunction<UserBehavior>() {
            @Override
            public boolean filter(UserBehavior userBehavior) throws Exception {
                return "pv".equals(userBehavior.getBehavior());
            }
        });

        //分組開窗
        KeyedStream<UserBehavior, Tuple> keyBy = filter.keyBy("itemId");

        WindowedStream<UserBehavior, Tuple, TimeWindow> window = keyBy.timeWindow(Time.hours(1), Time.minutes(5));
        // WindowedStream<UserBehavior, Tuple, TimeWindow> window = keyBy.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)));

        //注意不要導錯包
        SingleOutputStreamOperator<ItemViewCount> aggregate = window.aggregate(new ItemAgg(), new WindowItemResult());

        //按照windowend重新keyby
        SingleOutputStreamOperator<String> process = aggregate.keyBy("windowEnd").process(new KeyedProcessFunction<Tuple, ItemViewCount, String>() {

            ListState<ItemViewCount> listState;

            @Override
            public void open(Configuration parameters) throws Exception {
                listState = getRuntimeContext()
                        .getListState(new ListStateDescriptor<>("lst-state", ItemViewCount.class));
            }

            @Override
            public void processElement(ItemViewCount value, Context ctx, Collector<String> out) throws Exception {
                //每來一條數(shù)據(jù)執(zhí)行一次,并且注冊定時器,由于是按時間來劃分窗口的所以只有一個定時器
                listState.add(value);
                ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L);
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                ArrayList<ItemViewCount> viewCounts = Lists.newArrayList(listState.get().iterator());
                viewCounts.sort(new Comparator<ItemViewCount>() {
                    @Override
                    public int compare(ItemViewCount t0, ItemViewCount t1) {
                        if (t0.getCount() > t1.getCount()) {
                            return -1;
                        } else if (t0.getCount().equals(t1.getCount())) {
                            return 0;
                        } else {
                            return 1;
                        }
                    }
                });
                StringBuffer buffer = new StringBuffer();
                buffer.append("窗口結(jié)束時間").append(new Timestamp(timestamp - 1)).append("\n");
                //獲取topN
                for (int i = 0; i < Math.min(3, viewCounts.size()); i++) {
                    ItemViewCount viewCount = viewCounts.get(i);
                    buffer.append("Nom").append(i + 1).append("類型").append(viewCount.getItemId()).append("熱門度").append(viewCount.getCount()).append("\n");;
                }
                buffer.append("\n");
                buffer.append("=======");
                buffer.append("\n");

                Thread.sleep(1000L);
                out.collect(buffer.toString());
            }
        });
        process.print();
        env.execute("job");


    }

    public static class ItemAgg implements AggregateFunction<UserBehavior, Long, Long> {

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(UserBehavior userBehavior, Long aLong) {
            return aLong + 1L;
        }

        @Override
        public Long getResult(Long aLong) {
            return aLong;
        }

        @Override
        public Long merge(Long aLong, Long acc1) {
            return null;
        }
    }

    public static class WindowItemResult implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
        @Override
        public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
            Long itemId = tuple.getField(0);
            Long windowEnd = window.getEnd();
            Long aLong = input.iterator().next();
            out.collect(new ItemViewCount(itemId, windowEnd, aLong));
        }
    }

}

2.實時熱門頁面統(tǒng)計

同上一個案例相同的思路
用到的其他知識點:
設置水位線延遲數(shù)據(jù)到達時間
正則匹配過濾出符合條件的日志記錄
處理遲到數(shù)據(jù)的三重保證的
1.水位線
2.允許遲到數(shù)據(jù) .allowedLateness(Time.minutes(1L));窗口不關閉,繼續(xù)等待
3.側(cè)輸出流
問題的處理:允許的延遲數(shù)據(jù)到達只會更新agg之前還在等待的窗口聚合結(jié)果,但是定時器并不會觸發(fā),因為來的是過時的數(shù)據(jù),但是過時的定時器也會觸發(fā),在水位線更新時會觸發(fā),但是由于我們設置的list,每來一條聚合結(jié)果就會添加到list的狀態(tài)中,會出現(xiàn)刷榜的情況,如果采取clearing,那么就會全部清除,顯然是不符合題意的
對問題的進一步處理:
使用maplist,設置另一個定時器來真正關閉窗口和清除狀態(tài)

#示例數(shù)據(jù)
83.149.9.216 - - 17/05/2015:10:05:24 +0000 GET /presentations/logstash-monitorama-2013/images/1983_delorean_dmc-12-pic-38289.jpeg
83.149.9.216 - - 17/05/2015:10:05:54 +0000 GET /presentations/logstash-monitorama-2013/images/simple-inputs-filters-outputs.jpg
83.149.9.216 - - 17/05/2015:10:05:33 +0000 GET /presentations/logstash-monitorama-2013/images/tiered-outputs-to-inputs.jpg
83.149.9.216 - - 17/05/2015:10:05:56 +0000 GET /favicon.ico
24.236.252.67 - - 17/05/2015:10:05:40 +0000 GET /favicon.ico
93.114.45.13 - - 17/05/2015:10:05:14 +0000 GET /articles/dynamic-dns-with-dhcp/
93.114.45.13 - - 17/05/2015:10:05:04 +0000 GET /reset.css
93.114.45.13 - - 17/05/2015:10:05:45 +0000 GET /style2.css
public class LogEvent {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-02-08
     * Time: 21:33
     */
    private String ip;
    private String userId;
    private Long timestamp;
    private String method;
    private String url;

    @Override
    public String toString() {
        return "LogEvent{" +
                "ip='" + ip + '\'' +
                ", userId='" + userId + '\'' +
                ", timestamp=" + timestamp +
                ", method='" + method + '\'' +
                ", url='" + url + '\'' +
                '}';
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    public String getMethod() {
        return method;
    }

    public void setMethod(String method) {
        this.method = method;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public LogEvent() {
    }

    public LogEvent(String ip, String userId, Long timestamp, String method, String url) {
        this.ip = ip;
        this.userId = userId;
        this.timestamp = timestamp;
        this.method = method;
        this.url = url;
    }
}


public class PageViewCount {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-02-08
     * Time: 21:34
     */
    private String url;
    private Long windowEnd;
    private Long count;

    @Override
    public String toString() {
        return "PageViewCount{" +
                "url='" + url + '\'' +
                ", windowEnd=" + windowEnd +
                ", count=" + count +
                '}';
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public Long getWindowEnd() {
        return windowEnd;
    }

    public void setWindowEnd(Long windowEnd) {
        this.windowEnd = windowEnd;
    }

    public Long getCount() {
        return count;
    }

    public void setCount(Long count) {
        this.count = count;
    }

    public PageViewCount(String url, Long windowEnd, Long count) {
        this.url = url;
        this.windowEnd = windowEnd;
        this.count = count;
    }

    public PageViewCount() {
    }
}


public class HotPages {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-02-08
     * Time: 21:36
     * 83.149.9.216 - - 17/05/2015:10:05:56 +0000 GET /favicon.ico
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //URL url = HotPages.class.getResource("/apache.log");
        DataStreamSource<String> source = env.readTextFile("C:\\Users\\hp\\IdeaProjects\\Flink_Java\\src\\main\\resources\\apache.log");
        SingleOutputStreamOperator<LogEvent> watermarks = source.map(new MapFunction<String, LogEvent>() {
            @Override
            public LogEvent map(String s) throws Exception {
                String[] split = s.split(" ");
                SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
                Long time = sdf.parse(split[3]).getTime();
                return new LogEvent(split[0], split[1], time, split[5], split[6]);

            }
            //對亂序數(shù)據(jù)的處理,設置水位線,并延遲1分鐘
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LogEvent>(Time.minutes(1L)) {
            @Override
            public long extractTimestamp(LogEvent element) {
                return element.getTimestamp();
            }
        });
        //過濾出類型是get的log
        SingleOutputStreamOperator<LogEvent> filter1 = watermarks.filter(x -> "GET".equals(x.getMethod()));

        //正則
        SingleOutputStreamOperator<LogEvent> filter = filter1.filter(new FilterFunction<LogEvent>() {
            @Override
            public boolean filter(LogEvent logEvent) throws Exception {
                String regex = "^((?!\\.(css|js|ico|png)$).)*$";
                // return Pattern.matches(regex, logEvent.getMethod());
                return  logEvent.getUrl().matches(regex);
            }
        });
        //分組,滑動窗口
        WindowedStream<LogEvent, String, TimeWindow> timeWindow = filter.keyBy(LogEvent::getUrl).timeWindow(Time.minutes(10L), Time.seconds(5));

        SingleOutputStreamOperator<PageViewCount> aggregate = timeWindow.aggregate(new AggregateFunction<LogEvent, Long, Long>() {
            @Override
            public Long createAccumulator() {
                return 0L;
            }

            @Override
            public Long add(LogEvent logEvent, Long aLong) {
                return aLong + 1;
            }

            @Override
            public Long getResult(Long aLong) {
                return aLong;
            }

            @Override
            public Long merge(Long aLong, Long acc1) {
                return null;
            }
        }, new WindowFunction<Long, PageViewCount, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow window, Iterable<Long> input, Collector<PageViewCount> out) throws Exception {
                Long aLong = input.iterator().next();
                out.collect(new PageViewCount(s, window.getEnd(), aLong));
            }
        });

        aggregate.keyBy(PageViewCount::getWindowEnd).process(new KeyedProcessFunction<Long, PageViewCount, String>() {

            ListState<PageViewCount> listState ;
            @Override
            public void open(Configuration parameters) throws Exception {
                listState = getRuntimeContext().getListState(new ListStateDescriptor<>("page-count", PageViewCount.class));
            }

            @Override
            public void processElement(PageViewCount value, Context ctx, Collector<String> out) throws Exception {
                     listState.add(value);
                     ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);

            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                ArrayList<PageViewCount> counts = Lists.newArrayList(listState.get().iterator());
                counts.sort((x1,x2)-> - Long.compare(x1.getCount(),x2.getCount()));

                StringBuffer buffer = new StringBuffer();
                buffer.append("窗口結(jié)束時間").append(new Timestamp(timestamp - 1)).append("\n");
                for (int i = 0; i < Math.min(3,counts.size()); i++) {
                    PageViewCount pageViewCount = counts.get(i);
                    buffer.append("NUMBER ").append(1+i).append("url=").append(pageViewCount.getUrl())
                            .append("熱門度").append(pageViewCount.getCount()).append("\n");
                }
                buffer.append("====="+"\n");
                //Thread.sleep(1000L);
                out.collect(buffer.toString());

            }
        }).print();

        env.execute("job");
    }
}

maplist,注冊另一個定時器來關閉窗口


public class HotPages {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-02-08
     * Time: 21:36
     * 83.149.9.216 - - 17/05/2015:10:05:56 +0000 GET /favicon.ico
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //URL url = HotPages.class.getResource("/apache.log");
        DataStreamSource<String> source = env.readTextFile("C:\\Users\\hp\\IdeaProjects\\Flink_Java\\src\\main\\resources\\apache.log");
        SingleOutputStreamOperator<LogEvent> watermarks = source.map(new MapFunction<String, LogEvent>() {
            @Override
            public LogEvent map(String s) throws Exception {
                String[] split = s.split(" ");
                SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
                Long time = sdf.parse(split[3]).getTime();
                return new LogEvent(split[0], split[1], time, split[5], split[6]);

            }
            //對亂序數(shù)據(jù)的處理,設置水位線,并延遲1分鐘
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LogEvent>(Time.seconds(1L)) {
            @Override
            public long extractTimestamp(LogEvent element) {
                return element.getTimestamp();
            }
        });
        //過濾出類型是get的log
        SingleOutputStreamOperator<LogEvent> filter1 = watermarks.filter(x -> "GET".equals(x.getMethod()));

        //正則
        SingleOutputStreamOperator<LogEvent> filter = filter1.filter(new FilterFunction<LogEvent>() {
            @Override
            public boolean filter(LogEvent logEvent) throws Exception {
                String regex = "^((?!\\.(css|js|ico|png)$).)*$";
                // return Pattern.matches(regex, logEvent.getMethod());
                return logEvent.getUrl().matches(regex);
            }
        });
        //定義一個測輸出流,注意打大括號
        OutputTag<LogEvent> latelog = new OutputTag<LogEvent>("latelog") {
        };

        //分組,滑動窗口
        WindowedStream<LogEvent, String, TimeWindow> timeWindow = filter.keyBy(LogEvent::getUrl).timeWindow(Time.minutes(10L), Time.seconds(5))
                //允許遲到數(shù)據(jù)
                .allowedLateness(Time.minutes(1L))
                .sideOutputLateData(latelog);


        SingleOutputStreamOperator<PageViewCount> aggregate = timeWindow.aggregate(new AggregateFunction<LogEvent, Long, Long>() {
            @Override
            public Long createAccumulator() {
                return 0L;
            }

            @Override
            public Long add(LogEvent logEvent, Long aLong) {
                return aLong + 1;
            }

            @Override
            public Long getResult(Long aLong) {
                return aLong;
            }

            @Override
            public Long merge(Long aLong, Long acc1) {
                return null;
            }
        }, new WindowFunction<Long, PageViewCount, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow window, Iterable<Long> input, Collector<PageViewCount> out) throws Exception {
                Long aLong = input.iterator().next();
                out.collect(new PageViewCount(s, window.getEnd(), aLong));
            }
        });

        //獲取測輸出流
        DataStream<LogEvent> sideOutput = aggregate.getSideOutput(latelog);


        aggregate.keyBy(PageViewCount::getWindowEnd).process(new KeyedProcessFunction<Long, PageViewCount, String>() {

            // ListState<PageViewCount> listState ;
            MapState<String, Long> mapState;

            @Override
            public void open(Configuration parameters) throws Exception {
                // listState = getRuntimeContext().getListState(new ListStateDescriptor<>("page-count", PageViewCount.class));
                mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Long>("PAGE-COUNT", String.class, Long.class));
            }

            @Override
            public void processElement(PageViewCount value, Context ctx, Collector<String> out) throws Exception {
                //listState.add(value);
                mapState.put(value.getUrl(), value.getCount());
                ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);

                //注冊一個定時器,用來真正的關閉窗口,作用是清除窗口狀態(tài)的信息
                ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 60 * 1000L);

            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                // ArrayList<PageViewCount> counts = Lists.newArrayList(listState.get().iterator());
                //用來判斷是否要關閉窗口
                if (timestamp == ctx.getCurrentKey() + 60 * 1000L) {
                    mapState.clear();
                    return;
                }
                ArrayList<Map.Entry<String, Long>> counts = Lists.newArrayList(mapState.entries().iterator());
                counts.sort((x1, x2) -> -Long.compare(x1.getValue(), x2.getValue()));

                StringBuffer buffer = new StringBuffer();
                buffer.append("窗口結(jié)束時間").append(new Timestamp(timestamp - 1)).append("\n");
                for (int i = 0; i < Math.min(3, counts.size()); i++) {
                   // PageViewCount pageViewCount = counts.get(i);
                    Map.Entry<String, Long> longEntry = counts.get(i);
                    buffer.append("NUMBER ").append(1 + i).append("url=").append(longEntry.getKey())
                            .append("熱門度").append(longEntry.getValue()).append("\n");
                }
                buffer.append("=====" + "\n");
                Thread.sleep(1000L);
                out.collect(buffer.toString());

            }
        }).print();

        env.execute("job");
    }
}

3.PV統(tǒng)計

為什么不用windowall,因為只有一個分區(qū)
使用keyby,避免數(shù)據(jù)傾斜,可以使用隨機數(shù)

class PageView {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-02-15
     * Time: 17:03
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment eve = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        eve.setParallelism(1);
        eve.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource<String> source = eve.readTextFile("C:\\Users\\hp\\IdeaProjects\\Flink_Java\\src\\main\\resources\\UserBehavior.csv");
        SingleOutputStreamOperator<UserBehavior> map = source.map(new MapFunction<String, UserBehavior>() {
            @Override
            public UserBehavior map(String s) throws Exception {
                String[] split = s.split(",");
                return new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Long.parseLong(split[2]), split[3], Long.parseLong(split[4]));
            }
        }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
            @Override
            public long extractAscendingTimestamp(UserBehavior element) {
                return element.getTimestamp() * 1000L;
            }
        });
        SingleOutputStreamOperator<UserBehavior> filter = map.filter(data -> "pv".equals(data.getBehavior()));
        SingleOutputStreamOperator<Tuple2<Integer, Long>> map1 = filter.map(new MapFunction<UserBehavior, Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> map(UserBehavior userBehavior) throws Exception {
                Random random = new Random();
                return Tuple2.of(random.nextInt(10), 1L);
            }
        });
        SingleOutputStreamOperator<ItemViewCount> aggregate = map1.keyBy(data -> data.f0).timeWindow(Time.hours(1L)).aggregate(new AggregateFunction<Tuple2<Integer, Long>, Long, Long>() {
            @Override
            public Long createAccumulator() {
                return 0L;
            }

            @Override
            public Long add(Tuple2<Integer, Long> integerLongTuple2, Long aLong) {
                return aLong + 1;
            }

            @Override
            public Long getResult(Long aLong) {
                return aLong;
            }

            @Override
            public Long merge(Long aLong, Long acc1) {
                return null;
            }
        }, new WindowFunction<Long, ItemViewCount, Integer, TimeWindow>() {
            @Override
            public void apply(Integer integer, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
                out.collect(new ItemViewCount(Long.parseLong(integer.toString()), window.getEnd(), input.iterator().next()));
            }
        });

        aggregate.keyBy(ItemViewCount::getWindowEnd)
                .process(new KeyedProcessFunction<Long, ItemViewCount, ItemViewCount
                        >() {

                    ValueState<Long> valueState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        valueState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("long", Long.class, 0L));
                    }

                    @Override
                    public void processElement(ItemViewCount value, Context ctx, Collector<ItemViewCount> out) throws Exception {
                        valueState.update(valueState.value() + value.getCount());
                        ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ItemViewCount> out) throws Exception {
                        out.collect(new ItemViewCount(1L, ctx.getCurrentKey(), valueState.value()));
                        valueState.clear();

                    }
                }).print();

        eve.execute("job");


    }
}

統(tǒng)計結(jié)果

4> ItemViewCount{itemId=1, windowEnd=1511679600000, count=50838}
7> ItemViewCount{itemId=1, windowEnd=1511672400000, count=44499}
5> ItemViewCount{itemId=1, windowEnd=1511665200000, count=48022}
8> ItemViewCount{itemId=1, windowEnd=1511668800000, count=47298}
8> ItemViewCount{itemId=1, windowEnd=1511683200000, count=52296}
1> ItemViewCount{itemId=1, windowEnd=1511676000000, count=48649}
3> ItemViewCount{itemId=1, windowEnd=1511661600000, count=41890}
3> ItemViewCount{itemId=1, windowEnd=1511686800000, count=52552}
3> ItemViewCount{itemId=1, windowEnd=1511690400000, count=48292}
5> ItemViewCount{itemId=1, windowEnd=1511694000000, count=13}

4.UV統(tǒng)計

方案一 set去重版

public class UV1 {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-02-15
     * Time: 22:12
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment eve = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        eve.setParallelism(1);
        eve.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource<String> source = eve.readTextFile("C:\\Users\\hp\\IdeaProjects\\Flink_Java\\src\\main\\resources\\UserBehavior.csv");
        SingleOutputStreamOperator<UserBehavior> map = source.map(new MapFunction<String, UserBehavior>() {
            @Override
            public UserBehavior map(String s) throws Exception {
                String[] split = s.split(",");
                return new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Long.parseLong(split[2]), split[3], Long.parseLong(split[4]));
            }
        }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
            @Override
            public long extractAscendingTimestamp(UserBehavior element) {
                return element.getTimestamp() * 1000L;
            }
        });
        SingleOutputStreamOperator<UserBehavior> filter = map.filter(data -> "pv".equals(data.getBehavior()));

        filter.timeWindowAll(Time.hours(1L)).apply(new AllWindowFunction<UserBehavior, Tuple3<String,Long,Long>, TimeWindow>() {

            @Override
            public void apply(TimeWindow window, Iterable<UserBehavior> values, Collector<Tuple3<String, Long, Long>> out) throws Exception {
                HashSet<Long> longs = new HashSet<>();
                for (UserBehavior value : values) {
                     longs.add(value.getUserId());
                }
                out.collect(Tuple3.of("uv1",window.getEnd(),(long)longs.size()));
            }
        }).print();
        eve.execute("job");
    }
}

方案二 布隆過濾器

public class UvWithBloomFilter {
  public static void main(String[] args) throws Exception {
    // 1. 創(chuàng)建執(zhí)行環(huán)境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 設置并行度為1
    env.setParallelism(1);

    // 2. 從csv文件中獲取數(shù)據(jù)
    URL resource = UniqueVisitor.class.getResource("/UserBehavior.csv");
    DataStream<String> inputStream = env.readTextFile(resource.getPath());

    // 3. 轉(zhuǎn)換成POJO,分配時間戳和watermark
    DataStream<UserBehavior> dataStream = inputStream.map(line -> {
      String[] fields = line.split(",");
      return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
    }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
      new BoundedOutOfOrdernessTimestampExtractor<UserBehavior>(Time.of(200, TimeUnit.MILLISECONDS)) {
        @Override
        public long extractTimestamp(UserBehavior element) {
          return element.getTimestamp() * 1000L;
        }
      }
    ));

    // 開窗統(tǒng)計uv值
    SingleOutputStreamOperator<PageViewCount> uvStream = dataStream
      .filter(data -> "pv".equals(data.getBehavior()))
      .timeWindowAll(Time.hours(1))
      .trigger(new MyTrigger())
      .process(new UvCountResultWithBloomFliter());


    uvStream.print();

    env.execute("uv count with bloom filter job");
  }

  // 自定義觸發(fā)器
  public static class MyTrigger extends Trigger<UserBehavior, TimeWindow> {
    @Override
    public TriggerResult onElement(UserBehavior element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
      // 每一條數(shù)據(jù)來到,直接觸發(fā)窗口計算,并且直接清空窗口
      return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
    }
  }

  // 自定義一個布隆過濾器
  public static class MyBloomFilter {
    // 定義位圖的大小,一般需要定義為2的整次冪
    private Integer cap;

    public MyBloomFilter(Integer cap) {
      this.cap = cap;
    }

    // 實現(xiàn)一個hash函數(shù)
    public Long hashCode(String value, Integer seed) {
      Long result = 0L;
      for (int i = 0; i < value.length(); i++) {
        result = result * seed + value.charAt(i);
      }
      return result & (cap - 1);
    }
  }

  // 實現(xiàn)自定義的處理函數(shù)
  public static class UvCountResultWithBloomFliter extends ProcessAllWindowFunction<UserBehavior, PageViewCount, TimeWindow> {
    // 定義jedis連接和布隆過濾器
    Jedis jedis;
    MyBloomFilter myBloomFilter;

    @Override
    public void open(Configuration parameters) throws Exception {
      jedis = new Jedis("localhost", 6379);
      myBloomFilter = new MyBloomFilter(1 << 29);    // 要處理1億個數(shù)據(jù),用64MB大小的位圖
    }

    @Override
    public void process(Context context, Iterable<UserBehavior> elements, Collector<PageViewCount> out) throws Exception {
      // 將位圖和窗口count值全部存入redis,用windowEnd作為key
      Long windowEnd = context.window().getEnd();
      String bitmapKey = windowEnd.toString();
      // 把count值存成一張hash表
      String countHashName = "uv_count";
      String countKey = windowEnd.toString();

      // 1. 取當前的userId
      Long userId = elements.iterator().next().getUerId();

      // 2. 計算位圖中的offset
      Long offset = myBloomFilter.hashCode(userId.toString(), 61);

      // 3. 用redis的getbit命令,判斷對應位置的值
      Boolean isExist = jedis.getbit(bitmapKey, offset);

      if (!isExist) {
        // 如果不存在,對應位圖位置置1
        jedis.setbit(bitmapKey, offset, true);

        // 更新redis中保存的count值
        Long uvCount = 0L;    // 初始count值
        String uvCountString = jedis.hget(countHashName, countKey);
        if (uvCountString != null && !"".equals(uvCountString)) {
          uvCount = Long.valueOf(uvCountString);
        }
        jedis.hset(countHashName, countKey, String.valueOf(uvCount + 1));

        out.collect(new PageViewCount("uv", windowEnd, uvCount + 1));
      }
    }

    @Override
    public void close() throws Exception {
      jedis.close();
    }
  }
}

統(tǒng)計結(jié)果

PageViewCount{url='uv', windowEnd=1511661600000, count=7469}
PageViewCount{url='uv', windowEnd=1511661600000, count=7470}
PageViewCount{url='uv', windowEnd=1511661600000, count=7471}
PageViewCount{url='uv', windowEnd=1511661600000, count=7472}
PageViewCount{url='uv', windowEnd=1511661600000, count=7473}
PageViewCount{url='uv', windowEnd=1511661600000, count=7474}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容