目錄
1、Flink使用WaterMark處理亂序事件
2、累加器和計(jì)數(shù)器
3、Window使用
4、流的切分和合并
5、任務(wù)鏈
6、Flink消費(fèi)kafka數(shù)據(jù)起始o(jì)ffset配置
7、Flink消費(fèi)kafka數(shù)據(jù),消費(fèi)者offset提交配置
8、數(shù)據(jù)源
9、數(shù)據(jù)存放
10、運(yùn)行時(shí)環(huán)境的區(qū)別
11、keyedStream中進(jìn)行聚合操作
一.Flink使用WaterMark處理亂序事件[1]
watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用watermark機(jī)制結(jié)合window來(lái)實(shí)現(xiàn)。
流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過(guò)程和時(shí)間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來(lái)的,但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說(shuō)late element)
但是對(duì)于late element,我們又不能無(wú)限期的等下去,必須要有個(gè)機(jī)制來(lái)保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了。這個(gè)特別的機(jī)制,就是watermark。
關(guān)于水位線機(jī)制這篇博客講的比較通俗易懂,Flink流計(jì)算編程--watermark(水位線)簡(jiǎn)介
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//將TimeCharactersistic設(shè)置為EventTime。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = env.socketTextStream("localhost", 9999)
// 指定watemark的實(shí)現(xiàn)
text.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator )
自定義的實(shí)現(xiàn)AssignerWithPeriodicWatermarks接口的類(lèi)
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
//最大允許的亂序時(shí)間是10s,告訴Flink希望消息最多有10s的延遲,每個(gè)窗口僅在waterMark通過(guò)時(shí)被處理
val maxOutOfOrderness = 10000L;
var currentMaxTimestamp: Long;
//extractTimestamp方法是從數(shù)據(jù)本身中提取EventTime
override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp;
}
//getCurrentWatermar方法,是用currentMaxTimestamp - maxOutOfOrderness來(lái)獲取的
override def getCurrentWatermark(): Watermark = {
new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
使用WaterMark是,window的觸發(fā)時(shí)機(jī)?
1.首先
window的設(shè)定無(wú)關(guān)數(shù)據(jù)本身,而是系統(tǒng)定義好了的。如果window大小是3秒,那么1分鐘內(nèi)會(huì)把window劃分為如下的形式:
[00:00:00,00:00:03)
[00:00:03,00:00:06)
...
[00:00:57,00:01:00)
2.其次
輸入的數(shù)據(jù)中,根據(jù)自身的Event Time,將數(shù)據(jù)劃分到不同的window中,如果window中有數(shù)據(jù),則當(dāng)watermark時(shí)間>=Event Time時(shí),就符合了window觸發(fā)的條件了,最終決定window觸發(fā),還是由數(shù)據(jù)本身的Event Time所屬的window中的window_end_time決定。
3.最終
window的觸發(fā)要符合以下幾個(gè)條件:
watermark時(shí)間 >= window_end_time- 在
[window_start_time,window_end_time)中有數(shù)據(jù)存在
4.注意
- watermark是一個(gè)全局的值,不是某一個(gè)key下的值,所以即使不是同一個(gè)key的數(shù)據(jù),其warmark也會(huì)增加。
- 對(duì)于late element太多的數(shù)據(jù)而言,由于Event Time < watermark時(shí)間,所以來(lái)一條就觸發(fā)一個(gè)window。
- 也可以使用AllowedLateness功能設(shè)置消息的最大允許時(shí)間來(lái)解決處理延遲的消息。
二.累加器和計(jì)數(shù)器[2]
Flink和Spark一樣,都提供了累加器供我們使用,它們大多用于一些計(jì)數(shù),計(jì)算一些指標(biāo)的場(chǎng)景。
計(jì)數(shù)器也是一種累加器,它是最簡(jiǎn)單的累加器,作計(jì)數(shù)功能使用。在Flink類(lèi)部,內(nèi)置了很多計(jì)數(shù)器,比如IntCounter,LongCounter和DoubleCounter等等。
那么如何使用累加器呢?主要分為下面的幾部:
- 第一步:在自定義的轉(zhuǎn)換操作里創(chuàng)建累加器對(duì)象:
private IntCounter numLines = new IntCounter(); - 第二步:注冊(cè)累加器對(duì)象,通常是在
rich function的open()方法中。這里你還需要定義累加器的名字getRuntimeContext().addAccumulator(“num-lines”, this.numLines); - 第三步:在operator函數(shù)的任何地方使用累加器,包括在open()和close()方法中this.numLines.add(1);
- 第四步:結(jié)果存儲(chǔ)在JobExecutionResult里:
JobExecutionResult jobExecutionResult =env.execute("Accumulator"); jobExecutionResult .getAccumulatorResult("num-lines");
如果需要可以選擇實(shí)現(xiàn)Accumulator或者SimpleAccumulator來(lái)自定義累加器。
- Accumulator<V, R>是最靈活的:它定義了需要進(jìn)行累加的值的類(lèi)型V以及最后結(jié)果的類(lèi)型R
- SimpleAccumulator則是在進(jìn)行累計(jì)數(shù)據(jù)類(lèi)型和返回的數(shù)據(jù)類(lèi)型一致的情況下使用的,例如計(jì)數(shù)器。
/**
* Flink的累加器使用
*/
object flinkBatch {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements("Hello Jason What are you doing Hello world")
val counts = text
.flatMap(_.toLowerCase.split(" "))
.map(new RichMapFunction[String, String] {
//創(chuàng)建累加器
val acc = new IntCounter()
override def open(parameters: Configuration): Unit = {
super.open(parameters)
//注冊(cè)累加器
getRuntimeContext.addAccumulator("accumulator", acc)
}
override def map(in: String): String = {
//使用累加器
this.acc.add(1)
in
}
}).map((_,1))
.groupBy(0)
.sum(1)
counts.writeAsText("d:/test.txt/").setParallelism(1)
val res = env.execute("Accumulator Test")
//獲取累加器的結(jié)果
val num = res.getAccumulatorResult[Int]("accumulator")
println(num)
}
}
三.Window使用[3]
在KeyedStream中進(jìn)行使用,根據(jù)某個(gè)特征針對(duì)每個(gè)key用windows進(jìn)行分組
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));
dataStream.keyBy(0).timeWindow(Time.seconds(5),Time.seconds(2));
四.流的切分和合并[4]
1.將流分割成多個(gè)流
SplitStream split = dataStream.split(new OutputSelector() {
@Override
public Iterable select(Integer value) {
List output = new ArrayList();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
從split stream中選擇一個(gè)流
// SplitStream split;
DataStream even = split.select("even");
DataStream odd = split.select("odd");
DataStream all = split.select("even","odd");
2.合并多個(gè)數(shù)據(jù)流成一個(gè)新的數(shù)據(jù)流
dataStream.union(otherStream1, otherStream2, ...);
五.任務(wù)鏈[5]
把很多轉(zhuǎn)化操作的任務(wù)鏈接在一起放到同一個(gè)thread中執(zhí)行,可以獲得更好的性能。使用 StreamExecutionEnvironment.disableOperatorChaining()可以在整個(gè)job中去除某個(gè)鏈節(jié)點(diǎn)。
(1)Start new chain
omeStream.filter(...).map(...).startNewChain().map(...);
(2)Disable chaining
someStream.map(...).disableChaining();
(3)Set slot sharing group
someStream.filter(...).slotSharingGroup("name");
六.Flink消費(fèi)kafka數(shù)據(jù)起始o(jì)ffset配置[6]
Flink讀取Kafka數(shù)據(jù)確定開(kāi)始位置有以下幾種設(shè)置方式:
1.從topic的最早offset位置開(kāi)始處理數(shù)據(jù),如果kafka中保存有消費(fèi)者組的消費(fèi)位置將被忽略。
flinkKafkaConsumer.setStartFromEarliest()
2.從topic的最新offset位置開(kāi)始處理數(shù)據(jù),如果kafka中保存有消費(fèi)者組的消費(fèi)位置將被忽略。
flinkKafkaConsumer.setStartFromLatest()
3.從指定的時(shí)間戳(毫秒)開(kāi)始消費(fèi)數(shù)據(jù),Kafka中每個(gè)分區(qū)中數(shù)據(jù)大于等于設(shè)置的時(shí)間戳的數(shù)據(jù)位置將被當(dāng)做開(kāi)始消費(fèi)的位置。如果kafka中保存有消費(fèi)者組的消費(fèi)位置將被忽略。
flinkKafkaConsumer.setStartFromTimestamp(…)
4.默認(rèn)的設(shè)置。根據(jù)代碼中設(shè)置的group.id設(shè)置的消費(fèi)者組,去kafka中或者zookeeper中找到對(duì)應(yīng)的消費(fèi)者
offset位置消費(fèi)數(shù)據(jù)。如果沒(méi)有找到對(duì)應(yīng)的消費(fèi)者組的位置,那么將按照auto.offset.reset設(shè)置的策略讀取offset。
flinkKafkaConsumer.setStartFromGroupOffsets()
七.Flink消費(fèi)kafka數(shù)據(jù),消費(fèi)者offset提交配置[7]
Flink提供了消費(fèi)kafka數(shù)據(jù)的offset如何提交給Kafka的配置。注意,F(xiàn)link并不依賴提交給Kafka或者zookeeper中的offset來(lái)保證容錯(cuò)。提交的offset只是為了外部來(lái)查詢監(jiān)視kafka數(shù)據(jù)消費(fèi)的情況。配置offset的提交方式取決于是否為job設(shè)置開(kāi)啟checkpoint??梢允褂?code>env.enableCheckpointing(5000)來(lái)設(shè)置開(kāi)啟checkpoint。
1.關(guān)閉checkpoint
如果禁用了checkpoint,那么offset位置的提交取決于Flink讀取kafka客戶端的配置,例如enable.auto.commit 配置是否開(kāi)啟自動(dòng)提交offset, auto.commit.interval.ms決定自動(dòng)提交offset的周期。
2.開(kāi)啟checkpoint
如果開(kāi)啟了checkpoint,那么當(dāng)checkpoint保存狀態(tài)完成后,將checkpoint中保存的offset位置提交到kafka。這樣保證了Kafka中保存的offset和checkpoint中保存的offset一致,可以通過(guò)配置setCommitOffsetsOnCheckpoints(true/false)來(lái)配置是否將checkpoint中的offset提交到kafka中(默認(rèn)是true)。如果使用這種方式,那么properties中配置的kafka offset自動(dòng)提交參數(shù)enable.auto.commit和周期提交參數(shù)auto.commit.interval.ms參數(shù)將被忽略。
八.數(shù)據(jù)源[8]
StreamExecutionEnvironment提供的一些訪問(wèn)數(shù)據(jù)源的接口
1.基于文件的數(shù)據(jù)源
readTextFile(path)
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
2.基于Socket的數(shù)據(jù)源
socketTextStream //Linux中啟動(dòng)Socket端口 nc -lk 9999
3.基于Collection的數(shù)據(jù)源
fromCollection(Collection)
fromCollection(Iterator, Class)
fromElements(T ...)
fromParallelCollection(SplittableIterator, Class)
generateSequence(from, to)
4.addSource
添加新的源函數(shù),例如從kafka中讀取數(shù)據(jù),
// SimpleStringSchema 指定key的格式
FlinkKafkaConsumer011<String> consumer011 = new FlinkKafkaConsumer011<>(
"flink-topic",
new SimpleStringSchema(),
props);
env.addSource(consumer);
九.數(shù)據(jù)存放[9]
writeAsText() / 以字符串的形式逐行寫(xiě)入文件
writeAsCsv(...) / 將元組寫(xiě)出以逗號(hào)分隔的csv文件。注意:只能作用到元組數(shù)據(jù)上。
print() / printToErr() / 控制臺(tái)直接輸出結(jié)果,調(diào)用對(duì)象的toString()方法得到輸出結(jié)果
writeUsingOutputFormat() / FileOutputFormat
writeToSocket
addSink / 自定義接收函數(shù)或使用FlinkKafkaProducer將數(shù)據(jù)寫(xiě)入到Kafka
十.運(yùn)行時(shí)環(huán)境的區(qū)別[10]
創(chuàng)建StreamExecutionEnvironment 對(duì)象有多種方式,如下所示。但是通常用默認(rèn)方式就可以,它可以根據(jù)所處環(huán)境自動(dòng)做出正確的選擇。
//默認(rèn)
StreamExecutionEnvironment.getExecutionEnvironment();
//從本地環(huán)境創(chuàng)建
StreamExecutionEnvironment.createLocalEnvironment();
//遠(yuǎn)程創(chuàng)建
StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles);
十一.keyedStream中進(jìn)行聚合操作[11]
對(duì)于Key的指定方式可以參考我的這篇文章 Flink中指定Key的幾種方式
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");