1.實時熱門商品統(tǒng)計
日志字段有用戶id 商品種類 類別 行為類型(PV) 時間
要求統(tǒng)計某段時間內(nèi)的tonN商品種類以及其數(shù)量
思路:典型的滑動窗口案例
首先按商品類型分類獲取其數(shù)量特征
自定義aggregate獲取聚合后要的字段:商品類型,結(jié)束時間,熱門度
然后按結(jié)束時間來分類
使用Liststate狀態(tài)存儲時間段內(nèi)的種類和數(shù)量
使用定時器處理遲到數(shù)據(jù)
裝入list中排序獲取topN
讀取文件,實際開發(fā)用kafka懶得起集群了.....
#示例數(shù)據(jù)
866796,534083,4203730,pv,1511658000
937166,321683,2355072,pv,1511658000
156905,2901727,3001296,pv,1511658000
758810,5109495,1575622,pv,1511658000
107304,111477,4173315,pv,1511658000
452437,3255022,5099474,pv,1511658000
813974,1332724,2520771,buy,1511658000
524395,3887779,2366905,pv,1511658000
470572,3760258,1299190,pv,1511658001
543789,3110556,4558987,cart,1511658001
354759,2191348,4756105,pv,1511658001
382009,2123538,4801426,pv,1511658001
677046,1598945,4145813,pv,1511658001
public class UserBehavior {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-02-07
* Time: 21:16
*/
private Long userId;
private Long itemId;
private Long category;
private String behavior;
private Long timestamp;
public UserBehavior() {
}
public UserBehavior(Long userId, Long itemId, Long category, String behavior, Long timestamp) {
this.userId = userId;
this.itemId = itemId;
this.category = category;
this.behavior = behavior;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "UserBehavior{" +
"userId=" + userId +
", itemId=" + itemId +
", category=" + category +
", behavior='" + behavior + '\'' +
", timestamp=" + timestamp +
'}';
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public Long getItemId() {
return itemId;
}
public void setItemId(Long itemId) {
this.itemId = itemId;
}
public Long getCategory() {
return category;
}
public void setCategory(Long category) {
this.category = category;
}
public String getBehavior() {
return behavior;
}
public void setBehavior(String behavior) {
this.behavior = behavior;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
}
public class ItemViewCount {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-02-07
* Time: 21:20
*/
private Long itemId;
private Long windowEnd;
private Long count;
@Override
public String toString() {
return "ItemViewCount{" +
"itemId=" + itemId +
", windowEnd=" + windowEnd +
", count=" + count +
'}';
}
public Long getItemId() {
return itemId;
}
public void setItemId(Long itemId) {
this.itemId = itemId;
}
public Long getWindowEnd() {
return windowEnd;
}
public void setWindowEnd(Long windowEnd) {
this.windowEnd = windowEnd;
}
public Long getCount() {
return count;
}
public void setCount(Long count) {
this.count = count;
}
public ItemViewCount(Long itemId, Long windowEnd, Long count) {
this.itemId = itemId;
this.windowEnd = windowEnd;
this.count = count;
}
public ItemViewCount() {
}
}
public class HotItem {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-02-06
* Time: 23:02
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> source = env.readTextFile("C:\\Users\\hp\\IdeaProjects\\Flink_Java\\src\\main\\resources\\UserBehavior.csv");
SingleOutputStreamOperator<UserBehavior> map = source.map(new RichMapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String s) throws Exception {
String[] split = s.split(",");
return new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Long.parseLong(split[2]), split[3], Long.parseLong(split[4]));
}
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior element) {
return element.getTimestamp() * 1000L;
}
});
//過濾
SingleOutputStreamOperator<UserBehavior> filter = map.filter(new FilterFunction<UserBehavior>() {
@Override
public boolean filter(UserBehavior userBehavior) throws Exception {
return "pv".equals(userBehavior.getBehavior());
}
});
//分組開窗
KeyedStream<UserBehavior, Tuple> keyBy = filter.keyBy("itemId");
WindowedStream<UserBehavior, Tuple, TimeWindow> window = keyBy.timeWindow(Time.hours(1), Time.minutes(5));
// WindowedStream<UserBehavior, Tuple, TimeWindow> window = keyBy.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)));
//注意不要導錯包
SingleOutputStreamOperator<ItemViewCount> aggregate = window.aggregate(new ItemAgg(), new WindowItemResult());
//按照windowend重新keyby
SingleOutputStreamOperator<String> process = aggregate.keyBy("windowEnd").process(new KeyedProcessFunction<Tuple, ItemViewCount, String>() {
ListState<ItemViewCount> listState;
@Override
public void open(Configuration parameters) throws Exception {
listState = getRuntimeContext()
.getListState(new ListStateDescriptor<>("lst-state", ItemViewCount.class));
}
@Override
public void processElement(ItemViewCount value, Context ctx, Collector<String> out) throws Exception {
//每來一條數(shù)據(jù)執(zhí)行一次,并且注冊定時器,由于是按時間來劃分窗口的所以只有一個定時器
listState.add(value);
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
ArrayList<ItemViewCount> viewCounts = Lists.newArrayList(listState.get().iterator());
viewCounts.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount t0, ItemViewCount t1) {
if (t0.getCount() > t1.getCount()) {
return -1;
} else if (t0.getCount().equals(t1.getCount())) {
return 0;
} else {
return 1;
}
}
});
StringBuffer buffer = new StringBuffer();
buffer.append("窗口結(jié)束時間").append(new Timestamp(timestamp - 1)).append("\n");
//獲取topN
for (int i = 0; i < Math.min(3, viewCounts.size()); i++) {
ItemViewCount viewCount = viewCounts.get(i);
buffer.append("Nom").append(i + 1).append("類型").append(viewCount.getItemId()).append("熱門度").append(viewCount.getCount()).append("\n");;
}
buffer.append("\n");
buffer.append("=======");
buffer.append("\n");
Thread.sleep(1000L);
out.collect(buffer.toString());
}
});
process.print();
env.execute("job");
}
public static class ItemAgg implements AggregateFunction<UserBehavior, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(UserBehavior userBehavior, Long aLong) {
return aLong + 1L;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return null;
}
}
public static class WindowItemResult implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
Long itemId = tuple.getField(0);
Long windowEnd = window.getEnd();
Long aLong = input.iterator().next();
out.collect(new ItemViewCount(itemId, windowEnd, aLong));
}
}
}
2.實時熱門頁面統(tǒng)計
同上一個案例相同的思路
用到的其他知識點:
設置水位線延遲數(shù)據(jù)到達時間
正則匹配過濾出符合條件的日志記錄
處理遲到數(shù)據(jù)的三重保證的
1.水位線
2.允許遲到數(shù)據(jù) .allowedLateness(Time.minutes(1L));窗口不關閉,繼續(xù)等待
3.側(cè)輸出流
問題的處理:允許的延遲數(shù)據(jù)到達只會更新agg之前還在等待的窗口聚合結(jié)果,但是定時器并不會觸發(fā),因為來的是過時的數(shù)據(jù),但是過時的定時器也會觸發(fā),在水位線更新時會觸發(fā),但是由于我們設置的list,每來一條聚合結(jié)果就會添加到list的狀態(tài)中,會出現(xiàn)刷榜的情況,如果采取clearing,那么就會全部清除,顯然是不符合題意的
對問題的進一步處理:
使用maplist,設置另一個定時器來真正關閉窗口和清除狀態(tài)
#示例數(shù)據(jù)
83.149.9.216 - - 17/05/2015:10:05:24 +0000 GET /presentations/logstash-monitorama-2013/images/1983_delorean_dmc-12-pic-38289.jpeg
83.149.9.216 - - 17/05/2015:10:05:54 +0000 GET /presentations/logstash-monitorama-2013/images/simple-inputs-filters-outputs.jpg
83.149.9.216 - - 17/05/2015:10:05:33 +0000 GET /presentations/logstash-monitorama-2013/images/tiered-outputs-to-inputs.jpg
83.149.9.216 - - 17/05/2015:10:05:56 +0000 GET /favicon.ico
24.236.252.67 - - 17/05/2015:10:05:40 +0000 GET /favicon.ico
93.114.45.13 - - 17/05/2015:10:05:14 +0000 GET /articles/dynamic-dns-with-dhcp/
93.114.45.13 - - 17/05/2015:10:05:04 +0000 GET /reset.css
93.114.45.13 - - 17/05/2015:10:05:45 +0000 GET /style2.css
public class LogEvent {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-02-08
* Time: 21:33
*/
private String ip;
private String userId;
private Long timestamp;
private String method;
private String url;
@Override
public String toString() {
return "LogEvent{" +
"ip='" + ip + '\'' +
", userId='" + userId + '\'' +
", timestamp=" + timestamp +
", method='" + method + '\'' +
", url='" + url + '\'' +
'}';
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public LogEvent() {
}
public LogEvent(String ip, String userId, Long timestamp, String method, String url) {
this.ip = ip;
this.userId = userId;
this.timestamp = timestamp;
this.method = method;
this.url = url;
}
}
public class PageViewCount {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-02-08
* Time: 21:34
*/
private String url;
private Long windowEnd;
private Long count;
@Override
public String toString() {
return "PageViewCount{" +
"url='" + url + '\'' +
", windowEnd=" + windowEnd +
", count=" + count +
'}';
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public Long getWindowEnd() {
return windowEnd;
}
public void setWindowEnd(Long windowEnd) {
this.windowEnd = windowEnd;
}
public Long getCount() {
return count;
}
public void setCount(Long count) {
this.count = count;
}
public PageViewCount(String url, Long windowEnd, Long count) {
this.url = url;
this.windowEnd = windowEnd;
this.count = count;
}
public PageViewCount() {
}
}
public class HotPages {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-02-08
* Time: 21:36
* 83.149.9.216 - - 17/05/2015:10:05:56 +0000 GET /favicon.ico
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//URL url = HotPages.class.getResource("/apache.log");
DataStreamSource<String> source = env.readTextFile("C:\\Users\\hp\\IdeaProjects\\Flink_Java\\src\\main\\resources\\apache.log");
SingleOutputStreamOperator<LogEvent> watermarks = source.map(new MapFunction<String, LogEvent>() {
@Override
public LogEvent map(String s) throws Exception {
String[] split = s.split(" ");
SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
Long time = sdf.parse(split[3]).getTime();
return new LogEvent(split[0], split[1], time, split[5], split[6]);
}
//對亂序數(shù)據(jù)的處理,設置水位線,并延遲1分鐘
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LogEvent>(Time.minutes(1L)) {
@Override
public long extractTimestamp(LogEvent element) {
return element.getTimestamp();
}
});
//過濾出類型是get的log
SingleOutputStreamOperator<LogEvent> filter1 = watermarks.filter(x -> "GET".equals(x.getMethod()));
//正則
SingleOutputStreamOperator<LogEvent> filter = filter1.filter(new FilterFunction<LogEvent>() {
@Override
public boolean filter(LogEvent logEvent) throws Exception {
String regex = "^((?!\\.(css|js|ico|png)$).)*$";
// return Pattern.matches(regex, logEvent.getMethod());
return logEvent.getUrl().matches(regex);
}
});
//分組,滑動窗口
WindowedStream<LogEvent, String, TimeWindow> timeWindow = filter.keyBy(LogEvent::getUrl).timeWindow(Time.minutes(10L), Time.seconds(5));
SingleOutputStreamOperator<PageViewCount> aggregate = timeWindow.aggregate(new AggregateFunction<LogEvent, Long, Long>() {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(LogEvent logEvent, Long aLong) {
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return null;
}
}, new WindowFunction<Long, PageViewCount, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<Long> input, Collector<PageViewCount> out) throws Exception {
Long aLong = input.iterator().next();
out.collect(new PageViewCount(s, window.getEnd(), aLong));
}
});
aggregate.keyBy(PageViewCount::getWindowEnd).process(new KeyedProcessFunction<Long, PageViewCount, String>() {
ListState<PageViewCount> listState ;
@Override
public void open(Configuration parameters) throws Exception {
listState = getRuntimeContext().getListState(new ListStateDescriptor<>("page-count", PageViewCount.class));
}
@Override
public void processElement(PageViewCount value, Context ctx, Collector<String> out) throws Exception {
listState.add(value);
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
ArrayList<PageViewCount> counts = Lists.newArrayList(listState.get().iterator());
counts.sort((x1,x2)-> - Long.compare(x1.getCount(),x2.getCount()));
StringBuffer buffer = new StringBuffer();
buffer.append("窗口結(jié)束時間").append(new Timestamp(timestamp - 1)).append("\n");
for (int i = 0; i < Math.min(3,counts.size()); i++) {
PageViewCount pageViewCount = counts.get(i);
buffer.append("NUMBER ").append(1+i).append("url=").append(pageViewCount.getUrl())
.append("熱門度").append(pageViewCount.getCount()).append("\n");
}
buffer.append("====="+"\n");
//Thread.sleep(1000L);
out.collect(buffer.toString());
}
}).print();
env.execute("job");
}
}
maplist,注冊另一個定時器來關閉窗口
public class HotPages {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-02-08
* Time: 21:36
* 83.149.9.216 - - 17/05/2015:10:05:56 +0000 GET /favicon.ico
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//URL url = HotPages.class.getResource("/apache.log");
DataStreamSource<String> source = env.readTextFile("C:\\Users\\hp\\IdeaProjects\\Flink_Java\\src\\main\\resources\\apache.log");
SingleOutputStreamOperator<LogEvent> watermarks = source.map(new MapFunction<String, LogEvent>() {
@Override
public LogEvent map(String s) throws Exception {
String[] split = s.split(" ");
SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
Long time = sdf.parse(split[3]).getTime();
return new LogEvent(split[0], split[1], time, split[5], split[6]);
}
//對亂序數(shù)據(jù)的處理,設置水位線,并延遲1分鐘
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LogEvent>(Time.seconds(1L)) {
@Override
public long extractTimestamp(LogEvent element) {
return element.getTimestamp();
}
});
//過濾出類型是get的log
SingleOutputStreamOperator<LogEvent> filter1 = watermarks.filter(x -> "GET".equals(x.getMethod()));
//正則
SingleOutputStreamOperator<LogEvent> filter = filter1.filter(new FilterFunction<LogEvent>() {
@Override
public boolean filter(LogEvent logEvent) throws Exception {
String regex = "^((?!\\.(css|js|ico|png)$).)*$";
// return Pattern.matches(regex, logEvent.getMethod());
return logEvent.getUrl().matches(regex);
}
});
//定義一個測輸出流,注意打大括號
OutputTag<LogEvent> latelog = new OutputTag<LogEvent>("latelog") {
};
//分組,滑動窗口
WindowedStream<LogEvent, String, TimeWindow> timeWindow = filter.keyBy(LogEvent::getUrl).timeWindow(Time.minutes(10L), Time.seconds(5))
//允許遲到數(shù)據(jù)
.allowedLateness(Time.minutes(1L))
.sideOutputLateData(latelog);
SingleOutputStreamOperator<PageViewCount> aggregate = timeWindow.aggregate(new AggregateFunction<LogEvent, Long, Long>() {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(LogEvent logEvent, Long aLong) {
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return null;
}
}, new WindowFunction<Long, PageViewCount, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<Long> input, Collector<PageViewCount> out) throws Exception {
Long aLong = input.iterator().next();
out.collect(new PageViewCount(s, window.getEnd(), aLong));
}
});
//獲取測輸出流
DataStream<LogEvent> sideOutput = aggregate.getSideOutput(latelog);
aggregate.keyBy(PageViewCount::getWindowEnd).process(new KeyedProcessFunction<Long, PageViewCount, String>() {
// ListState<PageViewCount> listState ;
MapState<String, Long> mapState;
@Override
public void open(Configuration parameters) throws Exception {
// listState = getRuntimeContext().getListState(new ListStateDescriptor<>("page-count", PageViewCount.class));
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Long>("PAGE-COUNT", String.class, Long.class));
}
@Override
public void processElement(PageViewCount value, Context ctx, Collector<String> out) throws Exception {
//listState.add(value);
mapState.put(value.getUrl(), value.getCount());
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);
//注冊一個定時器,用來真正的關閉窗口,作用是清除窗口狀態(tài)的信息
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 60 * 1000L);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// ArrayList<PageViewCount> counts = Lists.newArrayList(listState.get().iterator());
//用來判斷是否要關閉窗口
if (timestamp == ctx.getCurrentKey() + 60 * 1000L) {
mapState.clear();
return;
}
ArrayList<Map.Entry<String, Long>> counts = Lists.newArrayList(mapState.entries().iterator());
counts.sort((x1, x2) -> -Long.compare(x1.getValue(), x2.getValue()));
StringBuffer buffer = new StringBuffer();
buffer.append("窗口結(jié)束時間").append(new Timestamp(timestamp - 1)).append("\n");
for (int i = 0; i < Math.min(3, counts.size()); i++) {
// PageViewCount pageViewCount = counts.get(i);
Map.Entry<String, Long> longEntry = counts.get(i);
buffer.append("NUMBER ").append(1 + i).append("url=").append(longEntry.getKey())
.append("熱門度").append(longEntry.getValue()).append("\n");
}
buffer.append("=====" + "\n");
Thread.sleep(1000L);
out.collect(buffer.toString());
}
}).print();
env.execute("job");
}
}
3.PV統(tǒng)計
為什么不用windowall,因為只有一個分區(qū)
使用keyby,避免數(shù)據(jù)傾斜,可以使用隨機數(shù)
class PageView {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-02-15
* Time: 17:03
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment eve = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
eve.setParallelism(1);
eve.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> source = eve.readTextFile("C:\\Users\\hp\\IdeaProjects\\Flink_Java\\src\\main\\resources\\UserBehavior.csv");
SingleOutputStreamOperator<UserBehavior> map = source.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String s) throws Exception {
String[] split = s.split(",");
return new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Long.parseLong(split[2]), split[3], Long.parseLong(split[4]));
}
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior element) {
return element.getTimestamp() * 1000L;
}
});
SingleOutputStreamOperator<UserBehavior> filter = map.filter(data -> "pv".equals(data.getBehavior()));
SingleOutputStreamOperator<Tuple2<Integer, Long>> map1 = filter.map(new MapFunction<UserBehavior, Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> map(UserBehavior userBehavior) throws Exception {
Random random = new Random();
return Tuple2.of(random.nextInt(10), 1L);
}
});
SingleOutputStreamOperator<ItemViewCount> aggregate = map1.keyBy(data -> data.f0).timeWindow(Time.hours(1L)).aggregate(new AggregateFunction<Tuple2<Integer, Long>, Long, Long>() {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Tuple2<Integer, Long> integerLongTuple2, Long aLong) {
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return null;
}
}, new WindowFunction<Long, ItemViewCount, Integer, TimeWindow>() {
@Override
public void apply(Integer integer, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
out.collect(new ItemViewCount(Long.parseLong(integer.toString()), window.getEnd(), input.iterator().next()));
}
});
aggregate.keyBy(ItemViewCount::getWindowEnd)
.process(new KeyedProcessFunction<Long, ItemViewCount, ItemViewCount
>() {
ValueState<Long> valueState;
@Override
public void open(Configuration parameters) throws Exception {
valueState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("long", Long.class, 0L));
}
@Override
public void processElement(ItemViewCount value, Context ctx, Collector<ItemViewCount> out) throws Exception {
valueState.update(valueState.value() + value.getCount());
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<ItemViewCount> out) throws Exception {
out.collect(new ItemViewCount(1L, ctx.getCurrentKey(), valueState.value()));
valueState.clear();
}
}).print();
eve.execute("job");
}
}
統(tǒng)計結(jié)果
4> ItemViewCount{itemId=1, windowEnd=1511679600000, count=50838}
7> ItemViewCount{itemId=1, windowEnd=1511672400000, count=44499}
5> ItemViewCount{itemId=1, windowEnd=1511665200000, count=48022}
8> ItemViewCount{itemId=1, windowEnd=1511668800000, count=47298}
8> ItemViewCount{itemId=1, windowEnd=1511683200000, count=52296}
1> ItemViewCount{itemId=1, windowEnd=1511676000000, count=48649}
3> ItemViewCount{itemId=1, windowEnd=1511661600000, count=41890}
3> ItemViewCount{itemId=1, windowEnd=1511686800000, count=52552}
3> ItemViewCount{itemId=1, windowEnd=1511690400000, count=48292}
5> ItemViewCount{itemId=1, windowEnd=1511694000000, count=13}
4.UV統(tǒng)計
方案一 set去重版
public class UV1 {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-02-15
* Time: 22:12
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment eve = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
eve.setParallelism(1);
eve.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> source = eve.readTextFile("C:\\Users\\hp\\IdeaProjects\\Flink_Java\\src\\main\\resources\\UserBehavior.csv");
SingleOutputStreamOperator<UserBehavior> map = source.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String s) throws Exception {
String[] split = s.split(",");
return new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Long.parseLong(split[2]), split[3], Long.parseLong(split[4]));
}
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior element) {
return element.getTimestamp() * 1000L;
}
});
SingleOutputStreamOperator<UserBehavior> filter = map.filter(data -> "pv".equals(data.getBehavior()));
filter.timeWindowAll(Time.hours(1L)).apply(new AllWindowFunction<UserBehavior, Tuple3<String,Long,Long>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<UserBehavior> values, Collector<Tuple3<String, Long, Long>> out) throws Exception {
HashSet<Long> longs = new HashSet<>();
for (UserBehavior value : values) {
longs.add(value.getUserId());
}
out.collect(Tuple3.of("uv1",window.getEnd(),(long)longs.size()));
}
}).print();
eve.execute("job");
}
}
方案二 布隆過濾器
public class UvWithBloomFilter {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設置并行度為1
env.setParallelism(1);
// 2. 從csv文件中獲取數(shù)據(jù)
URL resource = UniqueVisitor.class.getResource("/UserBehavior.csv");
DataStream<String> inputStream = env.readTextFile(resource.getPath());
// 3. 轉(zhuǎn)換成POJO,分配時間戳和watermark
DataStream<UserBehavior> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
}).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
new BoundedOutOfOrdernessTimestampExtractor<UserBehavior>(Time.of(200, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(UserBehavior element) {
return element.getTimestamp() * 1000L;
}
}
));
// 開窗統(tǒng)計uv值
SingleOutputStreamOperator<PageViewCount> uvStream = dataStream
.filter(data -> "pv".equals(data.getBehavior()))
.timeWindowAll(Time.hours(1))
.trigger(new MyTrigger())
.process(new UvCountResultWithBloomFliter());
uvStream.print();
env.execute("uv count with bloom filter job");
}
// 自定義觸發(fā)器
public static class MyTrigger extends Trigger<UserBehavior, TimeWindow> {
@Override
public TriggerResult onElement(UserBehavior element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
// 每一條數(shù)據(jù)來到,直接觸發(fā)窗口計算,并且直接清空窗口
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
}
}
// 自定義一個布隆過濾器
public static class MyBloomFilter {
// 定義位圖的大小,一般需要定義為2的整次冪
private Integer cap;
public MyBloomFilter(Integer cap) {
this.cap = cap;
}
// 實現(xiàn)一個hash函數(shù)
public Long hashCode(String value, Integer seed) {
Long result = 0L;
for (int i = 0; i < value.length(); i++) {
result = result * seed + value.charAt(i);
}
return result & (cap - 1);
}
}
// 實現(xiàn)自定義的處理函數(shù)
public static class UvCountResultWithBloomFliter extends ProcessAllWindowFunction<UserBehavior, PageViewCount, TimeWindow> {
// 定義jedis連接和布隆過濾器
Jedis jedis;
MyBloomFilter myBloomFilter;
@Override
public void open(Configuration parameters) throws Exception {
jedis = new Jedis("localhost", 6379);
myBloomFilter = new MyBloomFilter(1 << 29); // 要處理1億個數(shù)據(jù),用64MB大小的位圖
}
@Override
public void process(Context context, Iterable<UserBehavior> elements, Collector<PageViewCount> out) throws Exception {
// 將位圖和窗口count值全部存入redis,用windowEnd作為key
Long windowEnd = context.window().getEnd();
String bitmapKey = windowEnd.toString();
// 把count值存成一張hash表
String countHashName = "uv_count";
String countKey = windowEnd.toString();
// 1. 取當前的userId
Long userId = elements.iterator().next().getUerId();
// 2. 計算位圖中的offset
Long offset = myBloomFilter.hashCode(userId.toString(), 61);
// 3. 用redis的getbit命令,判斷對應位置的值
Boolean isExist = jedis.getbit(bitmapKey, offset);
if (!isExist) {
// 如果不存在,對應位圖位置置1
jedis.setbit(bitmapKey, offset, true);
// 更新redis中保存的count值
Long uvCount = 0L; // 初始count值
String uvCountString = jedis.hget(countHashName, countKey);
if (uvCountString != null && !"".equals(uvCountString)) {
uvCount = Long.valueOf(uvCountString);
}
jedis.hset(countHashName, countKey, String.valueOf(uvCount + 1));
out.collect(new PageViewCount("uv", windowEnd, uvCount + 1));
}
}
@Override
public void close() throws Exception {
jedis.close();
}
}
}
統(tǒng)計結(jié)果
PageViewCount{url='uv', windowEnd=1511661600000, count=7469}
PageViewCount{url='uv', windowEnd=1511661600000, count=7470}
PageViewCount{url='uv', windowEnd=1511661600000, count=7471}
PageViewCount{url='uv', windowEnd=1511661600000, count=7472}
PageViewCount{url='uv', windowEnd=1511661600000, count=7473}
PageViewCount{url='uv', windowEnd=1511661600000, count=7474}