戳更多文章:
2-本地環(huán)境搭建&構(gòu)建第一個(gè)Flink應(yīng)用
Flink實(shí)戰(zhàn)項(xiàng)目實(shí)時(shí)熱銷排行
需求
某個(gè)圖書網(wǎng)站,希望看到雙十一秒殺期間實(shí)時(shí)的熱銷排行榜單。我們可以將“實(shí)時(shí)熱門商品”翻譯成程序員更好理解的需求:每隔5秒鐘輸出最近一小時(shí)內(nèi)點(diǎn)擊量最多的前 N 個(gè)商品/圖書.
需求分解
將這個(gè)需求進(jìn)行分解我們大概要做這么幾件事情:
- 告訴 Flink 框架基于時(shí)間做窗口,我們這里用processingTime,不用自帶時(shí)間戳
- 過(guò)濾出圖書點(diǎn)擊行為數(shù)據(jù)
- 按一小時(shí)的窗口大小,每5秒鐘統(tǒng)計(jì)一次,做滑動(dòng)窗口聚合(Sliding Window)
- 聚合,輸出窗口中點(diǎn)擊量前N名的商品
代碼實(shí)現(xiàn)
向Kafka發(fā)消息模擬購(gòu)買事件
public class KafkaProducer {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
//new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer("topn",new SimpleStringSchema(),properties);
/*
//event-timestamp事件的發(fā)生時(shí)間
producer.setWriteTimestampToKafka(true);
*/
text.addSink(producer);
env.execute();
}
}//
其中的:MyNoParalleSource 是作者自己實(shí)現(xiàn)的一個(gè)并行度為1的發(fā)送器,用來(lái)向kafka發(fā)送數(shù)據(jù):
public class MyNoParalleSource implements SourceFunction<String> {//1
//private long count = 1L;
private boolean isRunning = true;
/**
* 主要的方法
* 啟動(dòng)一個(gè)source
* 大部分情況下,都需要在這個(gè)run方法中實(shí)現(xiàn)一個(gè)循環(huán),這樣就可以循環(huán)產(chǎn)生數(shù)據(jù)了
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<String> ctx) throws Exception {
while(isRunning){
//圖書的排行榜
List<String> books = new ArrayList<>();
books.add("Pyhton從入門到放棄");//10
books.add("Java從入門到放棄");//8
books.add("Php從入門到放棄");//5
books.add("C++從入門到放棄");//3
books.add("Scala從入門到放棄");//0-4
int i = new Random().nextInt(5);
ctx.collect(books.get(i));
//每1秒產(chǎn)生一條數(shù)據(jù)
Thread.sleep(1000);
}
}
//取消一個(gè)cancel的時(shí)候會(huì)調(diào)用的方法
@Override
public void cancel() {
isRunning = false;
}
}
可見(jiàn),我們每過(guò)1秒向Kafka的topn這個(gè)topic隨機(jī)發(fā)送一本書的名字用來(lái)模擬購(gòu)買行為。
整體實(shí)現(xiàn)代碼如下:
public class TopN {
public static void main(String[] args) throws Exception{
/**
*
* 書1 書2 書3
* (書1,1) (書2,1) (書3,1)
*
*
*/
//每隔5秒鐘 計(jì)算過(guò)去1小時(shí) 的 Top 3 商品
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作為時(shí)間語(yǔ)義
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
FlinkKafkaConsumer<String> input = new FlinkKafkaConsumer<>("topn", new SimpleStringSchema(), properties);
//從最早開(kāi)始消費(fèi) 位點(diǎn)
input.setStartFromEarliest();
DataStream<String> stream = env
.addSource(input);
DataStream<Tuple2<String, Integer>> ds = stream
.flatMap(new LineSplitter()); //將輸入語(yǔ)句split成一個(gè)一個(gè)單詞并初始化count值為1的Tuple2<String, Integer>類型
DataStream<Tuple2<String, Integer>> wcount = ds
.keyBy(0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(5)))
//key之后的元素進(jìn)入一個(gè)總時(shí)間長(zhǎng)度為600s,每5s向后滑動(dòng)一次的滑動(dòng)窗口
.sum(1);// 將相同的key的元素第二個(gè)count值相加
wcount
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//(shu1, xx) (shu2,xx)....
//所有key元素進(jìn)入一個(gè)5s長(zhǎng)的窗口(選5秒是因?yàn)樯嫌未翱诿?s計(jì)算一輪數(shù)據(jù),topN窗口一次計(jì)算只統(tǒng)計(jì)一個(gè)窗口時(shí)間內(nèi)的變化)
.process(new TopNAllFunction(3))
.print();
//redis sink redis -> 接口
env.execute();
}//
private static final class LineSplitter implements
FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
//String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
/*for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}*/
//(書1,1) (書2,1) (書3,1)
out.collect(new Tuple2<String, Integer>(value, 1));
}
}
private static class TopNAllFunction
extends
ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow> {
private int topSize = 3;
public TopNAllFunction(int topSize) {
this.topSize = topSize;
}
public void process(
ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>.Context arg0,
Iterable<Tuple2<String, Integer>> input,
Collector<String> out) throws Exception {
TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
new Comparator<Integer>() {
@Override
public int compare(Integer y, Integer x) {
return (x < y) ? -1 : 1;
}
}); //treemap按照key降序排列,相同count值不覆蓋
for (Tuple2<String, Integer> element : input) {
treemap.put(element.f1, element);
if (treemap.size() > topSize) { //只保留前面TopN個(gè)元素
treemap.pollLastEntry();
}
}
for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap
.entrySet()) {
out.collect("=================\n熱銷圖書列表:\n"+ new Timestamp(System.currentTimeMillis()) + treemap.toString() + "\n===============\n");
}
}
}
}//
查看輸出:
=================
熱銷圖書列表:
2019-03-05 22:32:40.004{8=(Java從入門到放棄,8), 7=(C++從入門到放棄,7), 5=(Php從入門到放棄,5)}
===============
=================
熱銷圖書列表:
2019-03-05 22:32:45.004{8=(Java從入門到放棄,8), 7=(C++從入門到放棄,7), 5=(Php從入門到放棄,5)}
===============
所有代碼,我放在了我的公眾號(hào),回復(fù)Flink可以下載
- 海量【java和大數(shù)據(jù)的面試題+視頻資料】整理在公眾號(hào),關(guān)注后可以下載~
- 更多大數(shù)據(jù)技術(shù)歡迎和作者一起探討~
關(guān)注我的公眾號(hào),后臺(tái)回復(fù)【JAVAPDF】獲取200頁(yè)面試題!
5萬(wàn)人關(guān)注的大數(shù)據(jù)成神之路,不來(lái)了解一下嗎?
5萬(wàn)人關(guān)注的大數(shù)據(jù)成神之路,真的不來(lái)了解一下嗎?
5萬(wàn)人關(guān)注的大數(shù)據(jù)成神之路,確定真的不來(lái)了解一下嗎?
歡迎您關(guān)注《大數(shù)據(jù)成神之路》

image.png