Flink中實(shí)用的小知識(shí)點(diǎn)整理

目錄

1、Flink使用WaterMark處理亂序事件
2、累加器和計(jì)數(shù)器
3、Window使用
4、流的切分和合并
5、任務(wù)鏈
6、Flink消費(fèi)kafka數(shù)據(jù)起始o(jì)ffset配置
7、Flink消費(fèi)kafka數(shù)據(jù),消費(fèi)者offset提交配置
8、數(shù)據(jù)源
9、數(shù)據(jù)存放
10、運(yùn)行時(shí)環(huán)境的區(qū)別
11、keyedStream中進(jìn)行聚合操作

一.Flink使用WaterMark處理亂序事件[1]

watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用watermark機(jī)制結(jié)合window來(lái)實(shí)現(xiàn)。

流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過(guò)程和時(shí)間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來(lái)的,但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說(shuō)late element)

但是對(duì)于late element,我們又不能無(wú)限期的等下去,必須要有個(gè)機(jī)制來(lái)保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了。這個(gè)特別的機(jī)制,就是watermark。

關(guān)于水位線機(jī)制這篇博客講的比較通俗易懂,Flink流計(jì)算編程--watermark(水位線)簡(jiǎn)介

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//將TimeCharactersistic設(shè)置為EventTime。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val text = env.socketTextStream("localhost", 9999)

// 指定watemark的實(shí)現(xiàn)
text.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator ) 

自定義的實(shí)現(xiàn)AssignerWithPeriodicWatermarks接口的類(lèi)

class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
    //最大允許的亂序時(shí)間是10s,告訴Flink希望消息最多有10s的延遲,每個(gè)窗口僅在waterMark通過(guò)時(shí)被處理
    val maxOutOfOrderness = 10000L;
    var currentMaxTimestamp: Long; 

    //extractTimestamp方法是從數(shù)據(jù)本身中提取EventTime
    override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp;
    }
   //getCurrentWatermar方法,是用currentMaxTimestamp - maxOutOfOrderness來(lái)獲取的
    override def getCurrentWatermark(): Watermark = {
        new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}
使用WaterMark是,window的觸發(fā)時(shí)機(jī)?

1.首先
window的設(shè)定無(wú)關(guān)數(shù)據(jù)本身,而是系統(tǒng)定義好了的。如果window大小是3秒,那么1分鐘內(nèi)會(huì)把window劃分為如下的形式:

[00:00:00,00:00:03)
[00:00:03,00:00:06)
...
[00:00:57,00:01:00)

2.其次
輸入的數(shù)據(jù)中,根據(jù)自身的Event Time,將數(shù)據(jù)劃分到不同的window中,如果window中有數(shù)據(jù),則當(dāng)watermark時(shí)間>=Event Time時(shí),就符合了window觸發(fā)的條件了,最終決定window觸發(fā),還是由數(shù)據(jù)本身的Event Time所屬的window中的window_end_time決定。

3.最終
window的觸發(fā)要符合以下幾個(gè)條件:

  • watermark時(shí)間 >= window_end_time
  • [window_start_time,window_end_time) 中有數(shù)據(jù)存在

4.注意

  • watermark是一個(gè)全局的值,不是某一個(gè)key下的值,所以即使不是同一個(gè)key的數(shù)據(jù),其warmark也會(huì)增加。
  • 對(duì)于late element太多的數(shù)據(jù)而言,由于Event Time < watermark時(shí)間,所以來(lái)一條就觸發(fā)一個(gè)window。
  • 也可以使用AllowedLateness功能設(shè)置消息的最大允許時(shí)間來(lái)解決處理延遲的消息。

二.累加器和計(jì)數(shù)器[2]

Flink和Spark一樣,都提供了累加器供我們使用,它們大多用于一些計(jì)數(shù),計(jì)算一些指標(biāo)的場(chǎng)景。
計(jì)數(shù)器也是一種累加器,它是最簡(jiǎn)單的累加器,作計(jì)數(shù)功能使用。在Flink類(lèi)部,內(nèi)置了很多計(jì)數(shù)器,比如IntCounter,LongCounter和DoubleCounter等等。

那么如何使用累加器呢?主要分為下面的幾部:

  • 第一步:在自定義的轉(zhuǎn)換操作里創(chuàng)建累加器對(duì)象:private IntCounter numLines = new IntCounter();
  • 第二步:注冊(cè)累加器對(duì)象,通常是在rich function的open()方法中。這里你還需要定義累加器的名字getRuntimeContext().addAccumulator(“num-lines”, this.numLines);
  • 第三步:在operator函數(shù)的任何地方使用累加器,包括在open()和close()方法中this.numLines.add(1);
  • 第四步:結(jié)果存儲(chǔ)在JobExecutionResult里:JobExecutionResult jobExecutionResult =env.execute("Accumulator"); jobExecutionResult .getAccumulatorResult("num-lines");

如果需要可以選擇實(shí)現(xiàn)Accumulator或者SimpleAccumulator來(lái)自定義累加器。

  • Accumulator<V, R>是最靈活的:它定義了需要進(jìn)行累加的值的類(lèi)型V以及最后結(jié)果的類(lèi)型R
  • SimpleAccumulator則是在進(jìn)行累計(jì)數(shù)據(jù)類(lèi)型和返回的數(shù)據(jù)類(lèi)型一致的情況下使用的,例如計(jì)數(shù)器。
/**
  * Flink的累加器使用
  */
object flinkBatch {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements("Hello Jason What are you doing Hello world")
    val counts = text
      .flatMap(_.toLowerCase.split(" "))
      .map(new RichMapFunction[String, String] {
        //創(chuàng)建累加器
        val acc = new IntCounter()
        override def open(parameters: Configuration): Unit = {
          super.open(parameters)
          //注冊(cè)累加器
          getRuntimeContext.addAccumulator("accumulator", acc)
        }
        override def map(in: String): String = {
          //使用累加器
          this.acc.add(1)
          in
        }
      }).map((_,1))
      .groupBy(0)
      .sum(1)
    counts.writeAsText("d:/test.txt/").setParallelism(1)
    val res = env.execute("Accumulator Test")
    //獲取累加器的結(jié)果
    val num = res.getAccumulatorResult[Int]("accumulator")
    println(num)
  }
}

三.Window使用[3]

在KeyedStream中進(jìn)行使用,根據(jù)某個(gè)特征針對(duì)每個(gè)key用windows進(jìn)行分組

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));

dataStream.keyBy(0).timeWindow(Time.seconds(5),Time.seconds(2));

四.流的切分和合并[4]

1.將流分割成多個(gè)流

SplitStream split = dataStream.split(new OutputSelector() {
    @Override
    public Iterable select(Integer value) {
        List output = new ArrayList();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

從split stream中選擇一個(gè)流

// SplitStream split;
DataStream even = split.select("even");
DataStream odd = split.select("odd");
DataStream all = split.select("even","odd");

2.合并多個(gè)數(shù)據(jù)流成一個(gè)新的數(shù)據(jù)流

dataStream.union(otherStream1, otherStream2, ...);

五.任務(wù)鏈[5]

把很多轉(zhuǎn)化操作的任務(wù)鏈接在一起放到同一個(gè)thread中執(zhí)行,可以獲得更好的性能。使用 StreamExecutionEnvironment.disableOperatorChaining()可以在整個(gè)job中去除某個(gè)鏈節(jié)點(diǎn)。
(1)Start new chain

omeStream.filter(...).map(...).startNewChain().map(...);

(2)Disable chaining

someStream.map(...).disableChaining();

(3)Set slot sharing group

someStream.filter(...).slotSharingGroup("name");

六.Flink消費(fèi)kafka數(shù)據(jù)起始o(jì)ffset配置[6]

Flink讀取Kafka數(shù)據(jù)確定開(kāi)始位置有以下幾種設(shè)置方式:

1.從topic的最早offset位置開(kāi)始處理數(shù)據(jù),如果kafka中保存有消費(fèi)者組的消費(fèi)位置將被忽略。
flinkKafkaConsumer.setStartFromEarliest()

2.從topic的最新offset位置開(kāi)始處理數(shù)據(jù),如果kafka中保存有消費(fèi)者組的消費(fèi)位置將被忽略。
flinkKafkaConsumer.setStartFromLatest()

3.從指定的時(shí)間戳(毫秒)開(kāi)始消費(fèi)數(shù)據(jù),Kafka中每個(gè)分區(qū)中數(shù)據(jù)大于等于設(shè)置的時(shí)間戳的數(shù)據(jù)位置將被當(dāng)做開(kāi)始消費(fèi)的位置。如果kafka中保存有消費(fèi)者組的消費(fèi)位置將被忽略。
flinkKafkaConsumer.setStartFromTimestamp(…)

4.默認(rèn)的設(shè)置。根據(jù)代碼中設(shè)置的group.id設(shè)置的消費(fèi)者組,去kafka中或者zookeeper中找到對(duì)應(yīng)的消費(fèi)者
offset位置消費(fèi)數(shù)據(jù)。如果沒(méi)有找到對(duì)應(yīng)的消費(fèi)者組的位置,那么將按照auto.offset.reset設(shè)置的策略讀取offset。
flinkKafkaConsumer.setStartFromGroupOffsets()

七.Flink消費(fèi)kafka數(shù)據(jù),消費(fèi)者offset提交配置[7]

Flink提供了消費(fèi)kafka數(shù)據(jù)的offset如何提交給Kafka的配置。注意,F(xiàn)link并不依賴提交給Kafka或者zookeeper中的offset來(lái)保證容錯(cuò)。提交的offset只是為了外部來(lái)查詢監(jiān)視kafka數(shù)據(jù)消費(fèi)的情況。配置offset的提交方式取決于是否為job設(shè)置開(kāi)啟checkpoint??梢允褂?code>env.enableCheckpointing(5000)來(lái)設(shè)置開(kāi)啟checkpoint。

1.關(guān)閉checkpoint
如果禁用了checkpoint,那么offset位置的提交取決于Flink讀取kafka客戶端的配置,例如enable.auto.commit 配置是否開(kāi)啟自動(dòng)提交offset, auto.commit.interval.ms決定自動(dòng)提交offset的周期。

2.開(kāi)啟checkpoint
如果開(kāi)啟了checkpoint,那么當(dāng)checkpoint保存狀態(tài)完成后,將checkpoint中保存的offset位置提交到kafka。這樣保證了Kafka中保存的offset和checkpoint中保存的offset一致,可以通過(guò)配置setCommitOffsetsOnCheckpoints(true/false)來(lái)配置是否將checkpoint中的offset提交到kafka中(默認(rèn)是true)。如果使用這種方式,那么properties中配置的kafka offset自動(dòng)提交參數(shù)enable.auto.commit和周期提交參數(shù)auto.commit.interval.ms參數(shù)將被忽略。

八.數(shù)據(jù)源[8]

StreamExecutionEnvironment提供的一些訪問(wèn)數(shù)據(jù)源的接口
1.基于文件的數(shù)據(jù)源

readTextFile(path)
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

2.基于Socket的數(shù)據(jù)源

socketTextStream  //Linux中啟動(dòng)Socket端口 nc -lk 9999

3.基于Collection的數(shù)據(jù)源

fromCollection(Collection)
fromCollection(Iterator, Class)
fromElements(T ...)
fromParallelCollection(SplittableIterator, Class)
generateSequence(from, to)

4.addSource

添加新的源函數(shù),例如從kafka中讀取數(shù)據(jù),

// SimpleStringSchema 指定key的格式
FlinkKafkaConsumer011<String> consumer011 = new FlinkKafkaConsumer011<>(
            "flink-topic",
            new SimpleStringSchema(), 
            props);

env.addSource(consumer);

九.數(shù)據(jù)存放[9]

writeAsText() / 以字符串的形式逐行寫(xiě)入文件
writeAsCsv(...) / 將元組寫(xiě)出以逗號(hào)分隔的csv文件。注意:只能作用到元組數(shù)據(jù)上。
print() / printToErr() / 控制臺(tái)直接輸出結(jié)果,調(diào)用對(duì)象的toString()方法得到輸出結(jié)果
writeUsingOutputFormat() / FileOutputFormat
writeToSocket
addSink  / 自定義接收函數(shù)或使用FlinkKafkaProducer將數(shù)據(jù)寫(xiě)入到Kafka

十.運(yùn)行時(shí)環(huán)境的區(qū)別[10]

創(chuàng)建StreamExecutionEnvironment 對(duì)象有多種方式,如下所示。但是通常用默認(rèn)方式就可以,它可以根據(jù)所處環(huán)境自動(dòng)做出正確的選擇。

//默認(rèn)
StreamExecutionEnvironment.getExecutionEnvironment();
//從本地環(huán)境創(chuàng)建
StreamExecutionEnvironment.createLocalEnvironment();
//遠(yuǎn)程創(chuàng)建
StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles);

十一.keyedStream中進(jìn)行聚合操作[11]

對(duì)于Key的指定方式可以參考我的這篇文章 Flink中指定Key的幾種方式

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

這個(gè)腳注怎么去掉???


  1. Flink使用WaterMark處理亂序事件 ?

  2. 累加器和計(jì)數(shù)器 ?

  3. Window使用 ?

  4. 流的切分和合并 ?

  5. 任務(wù)鏈 ?

  6. Flink消費(fèi)kafka數(shù)據(jù)起始o(jì)ffset配置 ?

  7. Flink消費(fèi)kafka數(shù)據(jù),消費(fèi)者offset提交配置 ?

  8. 數(shù)據(jù)源 ?

  9. 數(shù)據(jù)存放 ?

  10. 運(yùn)行時(shí)環(huán)境的區(qū)別 ?

  11. keyedStream中進(jìn)行聚合操作 ?

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容