前言
在Flink中,基于不同的Time Notion來處理流數(shù)據(jù),具有不同的意義和結(jié)果,官網(wǎng)給出的一張圖,非常形象地展示了Process Time、Event Time、Ingestion Time這三個(gè)時(shí)間分別所處的位置,如下圖所示:

下面,分別對(duì)這3個(gè)Time Notion進(jìn)行說明如下:
ProcessTime
Flink中有對(duì)數(shù)據(jù)處理的操作進(jìn)行抽象,稱為Transformation Operator,而對(duì)于整個(gè)Dataflow的開始和結(jié)束分別對(duì)應(yīng)著Source Operator和Sink Operator,這些Operator都是在Flink集群系統(tǒng)所在的主機(jī)節(jié)點(diǎn)上,所以在基于ProcessTime的Notion進(jìn)行與時(shí)間相關(guān)的數(shù)據(jù)處理時(shí),數(shù)據(jù)處理依賴于Flink程序運(yùn)行所在的主機(jī)節(jié)點(diǎn)系統(tǒng)時(shí)鐘(System Clock)。
因?yàn)槲覀冴P(guān)心的是數(shù)據(jù)處理時(shí)間(Process Time),比如進(jìn)行Time Window操作,對(duì)Window的指派就是基于當(dāng)前Operator所在主機(jī)節(jié)點(diǎn)的系統(tǒng)時(shí)鐘。也就是說,每次創(chuàng)建一個(gè)Window,計(jì)算Window對(duì)應(yīng)的起始時(shí)間和結(jié)束時(shí)間都使用Process Time,它與外部進(jìn)入的數(shù)據(jù)元素的事件時(shí)間無關(guān)。那么,后續(xù)作用于Window的操作(Function)都是基于具有Process Time特性的Window進(jìn)行的。
使用ProcessTime的場(chǎng)景,比如,我們需要對(duì)某個(gè)App應(yīng)用的用戶行為進(jìn)行實(shí)時(shí)統(tǒng)計(jì)分析與監(jiān)控,由于用戶可能使用不同的終端設(shè)備,這樣可能會(huì)造成數(shù)據(jù)并非是實(shí)時(shí)的(如用戶手機(jī)沒電,導(dǎo)致2小時(shí)以后才會(huì)將操作行為記錄批量上傳上來)。而此時(shí),如果我們按照每分鐘的時(shí)間粒度做實(shí)時(shí)統(tǒng)計(jì)監(jiān)控,那么這些數(shù)據(jù)記錄延遲的太嚴(yán)重,如果為了等到這些記錄上傳上來(無法預(yù)測(cè),具體什么時(shí)間能獲取到這些數(shù)據(jù))再做統(tǒng)計(jì)分析,對(duì)每分鐘之內(nèi)的數(shù)據(jù)進(jìn)行統(tǒng)計(jì)分析的結(jié)果恐怕要到幾個(gè)小時(shí)甚至幾天后才能計(jì)算并輸出結(jié)果,這不是我們所希望的。而且,數(shù)據(jù)處理系統(tǒng)可能也沒有這么大的容量來處理海量數(shù)據(jù)的情況。結(jié)合業(yè)務(wù)需求,其實(shí)我們只需要每分鐘時(shí)間內(nèi)進(jìn)入的數(shù)據(jù)記錄,依賴當(dāng)前數(shù)據(jù)處理系統(tǒng)的處理時(shí)間(Process Time)生成每分鐘的Window,指派數(shù)據(jù)記錄到指定Window并計(jì)算結(jié)果,這樣就不用考慮數(shù)據(jù)元素本身自帶的事件時(shí)間了。
EventTime
流數(shù)據(jù)中的數(shù)據(jù)元素可能會(huì)具有不變的事件時(shí)間(Event Time)屬性,該事件時(shí)間是數(shù)據(jù)元素所代表的行為發(fā)生時(shí)就不會(huì)改變。最簡(jiǎn)單的情況下,這也最容易理解:所有進(jìn)入到Flink處理系統(tǒng)的流數(shù)據(jù),都是在外部的其它系統(tǒng)中產(chǎn)生的,它們產(chǎn)生后具有了事件時(shí)間,經(jīng)過傳輸后,進(jìn)入到Flink處理系統(tǒng),理論上(如果所有系統(tǒng)都具有相同系統(tǒng)時(shí)鐘)該事件時(shí)間對(duì)應(yīng)的時(shí)間戳要早于進(jìn)入到Flink處理系統(tǒng)中進(jìn)行處理的時(shí)間戳,但實(shí)際應(yīng)用中會(huì)出現(xiàn)數(shù)據(jù)記錄亂序、延遲到達(dá)等問題,這也是非常普遍的。
基于EventTime的Notion,處理數(shù)據(jù)的進(jìn)度(Progress)依賴于數(shù)據(jù)本身,而不是當(dāng)前Flink處理系統(tǒng)中Operator所在主機(jī)節(jié)點(diǎn)的系統(tǒng)時(shí)鐘。所以,需要有一種機(jī)制能夠控制數(shù)據(jù)處理的進(jìn)度,比如一個(gè)基于事件時(shí)間的Time Window創(chuàng)建后,具體怎么確定屬于該Window的數(shù)據(jù)元素都已經(jīng)到達(dá)?如果確定都到達(dá)了,然后就可以對(duì)屬于這個(gè)Window的所有數(shù)據(jù)元素做滿足需要的處理(如匯總、分組等)。這就要用到WaterMark機(jī)制,它能夠衡量數(shù)據(jù)處理進(jìn)度(表達(dá)數(shù)據(jù)到達(dá)的完整性)。
WaterMark帶有一個(gè)時(shí)間戳,假設(shè)為X,進(jìn)入到數(shù)據(jù)處理系統(tǒng)中的數(shù)據(jù)元素具有事件時(shí)間,記為Y,如果Y<X,則所有的數(shù)據(jù)元素均已到達(dá),可以計(jì)算并輸出結(jié)果。反過來說,可能更容易理解一些:要想觸發(fā)對(duì)當(dāng)前Window中的數(shù)據(jù)元素進(jìn)行計(jì)算,必須保證對(duì)所有進(jìn)入到系統(tǒng)的數(shù)據(jù)元素,其事件時(shí)間Y>=X。如果數(shù)據(jù)元素的事件時(shí)間是有序的,那么當(dāng)出現(xiàn)一個(gè)數(shù)據(jù)元素的事件時(shí)間Y<X,則觸發(fā)對(duì)當(dāng)前Window計(jì)算,并創(chuàng)建另一個(gè)新的Window來指派事件時(shí)間Y<X的數(shù)據(jù)元素到該新的Window中。
可以看到,有了WaterMark機(jī)制,對(duì)基于事件時(shí)間的流數(shù)據(jù)處理會(huì)變得特別靈活,可以根據(jù)實(shí)際業(yè)務(wù)需要選擇各種組件和處理策略。比如,上面我們說到,當(dāng)Y<X則觸發(fā)當(dāng)前Window計(jì)算,記為t1時(shí)刻,如果流數(shù)據(jù)元素是亂序的,經(jīng)過一段時(shí)間,假設(shè)t2時(shí)刻有一個(gè)數(shù)據(jù)元素的事件時(shí)間Y>=X,這時(shí)該怎么辦呢?如果t1時(shí)刻的Window已經(jīng)不存在了,但我們還是希望新出現(xiàn)的亂序數(shù)據(jù)元素加入到t1時(shí)刻Window的計(jì)算中,這時(shí)可以實(shí)現(xiàn)自定義的Trigger來滿足各種業(yè)務(wù)場(chǎng)景的需要。
IngestionTime
IngestionTime是數(shù)據(jù)進(jìn)入到Flink流數(shù)據(jù)處理系統(tǒng)的時(shí)間,該時(shí)間依賴于Source Operator所在主機(jī)節(jié)點(diǎn)的系統(tǒng)時(shí)鐘,會(huì)為到達(dá)的數(shù)據(jù)記錄指派Ingestion Time?;贗ngestionTime的Notion,存在多個(gè)Source Operator的情況下,每個(gè)Source Operator會(huì)使用自己本地系統(tǒng)時(shí)鐘指派Ingestion Time。后續(xù)基于時(shí)間相關(guān)的各種操作,都會(huì)使用數(shù)據(jù)記錄中的Ingestion Time。
與EventTime相比,IngestionTime不能處理亂序、延遲到達(dá)事件的應(yīng)用場(chǎng)景,它也就不用必須指定如何生成WaterMark。
使用EventTime與WaterMark來完成不同類型內(nèi)容操作行為的統(tǒng)計(jì)分析
方法一 調(diào)用assignTimestampsAndWatermarks()進(jìn)行指派
具體操作: TimeWindow的大小設(shè)置為1分鐘(60000ms),允許延遲到達(dá)時(shí)間設(shè)置為50秒(50000ms),并且為了模擬流數(shù)據(jù)元素事件時(shí)間早于當(dāng)前處理系統(tǒng)的系統(tǒng)時(shí)間,設(shè)置延遲時(shí)間為2分鐘(120000ms)。
首先完成操作行為數(shù)據(jù)模擬包括數(shù)據(jù)延遲 亂序等情況: 自定義實(shí)現(xiàn)一個(gè)source:
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* @Auther: dalan
* @Date: 19-3-21 16:44
* @Description:
*/
public class StringLineEventSource extends RichParallelSourceFunction<String> {
/** logger */
private static final Logger LOGGER = LoggerFactory.getLogger(StringLineEventSource.class);
public Long latenessMills;
private volatile boolean running = true;
public StringLineEventSource(){super();}
public StringLineEventSource(Long latenessMills){super(); this.latenessMills = latenessMills;}
private List<String> channelSet = Arrays.asList("a", "b", "c", "d"); // 操作內(nèi)容: 模擬數(shù)據(jù)
private List<String> behaviorTypes = Arrays.asList("INSTALL", "OPEN",
"BROWSE", "CLICK",
"PURCHASE", "CLOSE", "UNINSTALL"); // 操作類型
private Random rand = new Random(9527); // 通過隨機(jī)函數(shù)生產(chǎn)結(jié)果
@Override
public void run(SourceContext<String> ctx) throws Exception {
long numElements = Long.MAX_VALUE;
long count = 0L;
while (running && count < numElements){
String channel = channelSet.get(rand.nextInt(channelSet.size()));
List<String> event = generateEvent(); // 生產(chǎn)數(shù)據(jù): eventtime使用的當(dāng)前時(shí)間
LOGGER.info(event.toString());
String ts = event.get(0);
String id = event.get(1);
String behaviorType = event.get(2);
String result = StringUtils.join(Arrays.asList(ts, channel, id, behaviorType),"\t");
ctx.collect(result);
count += 1;
TimeUnit.MILLISECONDS.sleep(5L);
}
}
private List<String> generateEvent() {
Long delayedTimestamp = Instant.ofEpochMilli(System.currentTimeMillis())
.minusMillis(latenessMills)
.toEpochMilli(); // 延遲時(shí)間
// timestamp, id, behaviorType
return Arrays.asList(delayedTimestamp.toString(),
UUID.randomUUID().toString(),
behaviorTypes.get(rand.nextInt(behaviorTypes.size()))); // <ts,uuid,type>
}
@Override
public void cancel() {
this.running = false;
}
}
流數(shù)據(jù)中的數(shù)據(jù)元素為字符串記錄行的格式,包含字段:事件時(shí)間、渠道、用戶編號(hào)、用戶行為類型。在Flink程序中,通過調(diào)用stream: DataStream[T]的assignTimestampsAndWatermarks()進(jìn)行時(shí)間戳的指派,并生成WaterMark。然后,基于Keyed Window生成Tumbling Window(不存在Window重疊)來操作數(shù)據(jù)記錄。最后,將計(jì)算結(jié)果輸出到Kafka中去。
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
/**
* @Auther: dalan
* @Date: 19-3-21 16:43
* @Description:
*/
public class UserDefineWaterMark {
/** logger */
private static final Logger LOGGER = LoggerFactory.getLogger(UserDefineWaterMark.class);
// main
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
Long sourceLatenessMillis = (Long)params.getLong("source-lateness-millis");
Long maxLaggedTimeMillis = params.getLong("window-lagged-millis");
Long windowSizeMillis = params.getLong("window-size-millis");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 基于event time
DataStream<String> streams = env.addSource(new StringLineEventSource(sourceLatenessMillis));
//解析輸入的數(shù)據(jù)
DataStream<String> inputMap = ((DataStreamSource<String>) streams)
.setParallelism(1)
.assignTimestampsAndWatermarks( // 指派時(shí)間戳,并生成WaterMark
new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(maxLaggedTimeMillis)){
@Override
public long extractTimestamp(String s) {
return NumberUtils.toLong(s.split("\t")[0]);
}
})
.setParallelism(2)
.map(new MapFunction<String, Tuple2<Tuple2<String,String>, Long>>() {
@Override
public Tuple2<Tuple2<String,String>, Long> map(String value) throws Exception {
String[] arr = value.split("\t");
String channel = arr[1];
return new Tuple2<Tuple2<String,String>, Long>(Tuple2.of(channel, arr[3]), 1L);
}
})
.setParallelism(2)
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSizeMillis)))
.process(new ProcessWindowFunction<Tuple2<Tuple2<String, String>, Long>, Object, Tuple, TimeWindow>() {
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<Tuple2<String, String>, Long>> iterable, Collector<Object> collector) throws Exception {
long count = 0;
Tuple2<String,String> tuple2 = null;
for (Tuple2<Tuple2<String, String>, Long> in : iterable){
tuple2 = in.f0;
count++;
}
LOGGER.info("window===" + tuple.toString());
collector.collect(new Tuple6<String, Long, Long,String,String,Long>(tuple2.getField(0).toString(),context.window().getStart(), context.window().getEnd(),tuple.getField(0).toString(),tuple.getField(1).toString(),count));
}
})
.setParallelism(4)
.map(t -> {
Tuple6<String, Long, Long,String,String,Long> tt = (Tuple6<String, Long, Long,String,String,Long>)t;
Long windowStart = tt.f1;
Long windowEnd = tt.f2;
String channel = tt.f3;
String behaviorType = tt.f4;
Long count = tt.f5;
return StringUtils.join(Arrays.asList(windowStart, windowEnd, channel, behaviorType, count) ,"\t");
})
.setParallelism(3);
inputMap.addSink((new FlinkKafkaProducer011<String>("localhost:9092,localhost:9092","windowed-result-topic",new SimpleStringSchema())))
.setParallelism(3);
//inputMap.print();
env.execute("EventTime and WaterMark Demo");
}
}
通過執(zhí)行 mvn clean package -DskipTests 生產(chǎn)jar包,或者直接執(zhí)行main方法,需要添加如下啟動(dòng)參數(shù)
--window-result-topic windowed-result-topic
--zookeeper.connect localhost:2181
--bootstrap.servers localhost:9092
--source-lateness-millis 120000 # 源延遲時(shí)間
--window-lagged-millis 50000 # 窗口延遲時(shí)間
--window-size-millis 60000 # 窗口大小
輸出如下內(nèi)容:
1553169300000 1553169360000 b UNINSTALL 440
1553169300000 1553169360000 c CLOSE 389
1553169300000 1553169360000 b CLICK 445
1553169300000 1553169360000 b OPEN 423
1553169300000 1553169360000 a OPEN 363
1553169300000 1553169360000 b CLOSE 431
1553169300000 1553169360000 a PURCHASE 433
1553169300000 1553169360000 d PURCHASE 427
1553169300000 1553169360000 a INSTALL 388
1553169300000 1553169360000 c BROWSE 381
1553169300000 1553169360000 d CLOSE 446
1553169300000 1553169360000 c UNINSTALL 427
1553169300000 1553169360000 a CLICK 391
1553169300000 1553169360000 b BROWSE 429
1553169300000 1553169360000 a UNINSTALL 377
1553169300000 1553169360000 d INSTALL 397
1553169300000 1553169360000 a CLOSE 423
1553169300000 1553169360000 c PURCHASE 437
1553169300000 1553169360000 a BROWSE 388
1553169300000 1553169360000 d UNINSTALL 379
1553169300000 1553169360000 d BROWSE 431
1553169300000 1553169360000 b INSTALL 434
1553169300000 1553169360000 c CLICK 415
1553169300000 1553169360000 c INSTALL 455
1553169300000 1553169360000 d OPEN 426
1553169300000 1553169360000 c OPEN 399
1553169300000 1553169360000 d CLICK 402
1553169300000 1553169360000 b PURCHASE 442
在上面的代碼實(shí)現(xiàn)中我們直接使用了Flink內(nèi)建實(shí)現(xiàn)的BoundedOutOfOrdernessTimestampExtractor來指派時(shí)間戳和生成WaterMark。從而實(shí)現(xiàn)了從事件記錄中提取時(shí)間戳的邏輯,實(shí)際生成WaterMark的邏輯使用BoundedOutOfOrdernessTimestampExtractor提供的默認(rèn)邏輯.
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
private static final long serialVersionUID = 1L;
private long currentMaxTimestamp;
private long lastEmittedWatermark = Long.MIN_VALUE;
private final long maxOutOfOrderness;
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
if (maxOutOfOrderness.toMilliseconds() < 0) {
throw new RuntimeException("Tried to set the maximum allowed " +
"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
}
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; // 初始設(shè)置當(dāng)前最大事件時(shí)間戳
}
public long getMaxOutOfOrdernessInMillis() {
return maxOutOfOrderness;
}
public abstract long extractTimestamp(T element);
@Override
public final Watermark getCurrentWatermark() {
long potentialWM = currentMaxTimestamp - maxOutOfOrderness; // 當(dāng)前最大事件時(shí)間戳,減去允許最大延遲到達(dá)時(shí)間
if (potentialWM >= lastEmittedWatermark) { // 檢查上一次emit的WaterMark時(shí)間戳,如果比lastEmittedWatermark大則更新其值
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) { // 檢查新到達(dá)的數(shù)據(jù)元素的事件時(shí)間,用currentMaxTimestamp記錄下當(dāng)前最大的
currentMaxTimestamp = timestamp;
}
return timestamp;
}
}
在getCurrentWatermark()和extractTimestamp()方法中,lastEmittedWatermark是WaterMark中的時(shí)間戳,計(jì)算它時(shí),總是根據(jù)當(dāng)前進(jìn)入Flink處理系統(tǒng)的數(shù)據(jù)元素的最大的事件時(shí)間currentMaxTimestamp,然后再減去一個(gè)maxOutOfOrderness(外部配置的支持最大延遲到達(dá)的時(shí)間),也就說,這里面實(shí)現(xiàn)的WaterMark中的時(shí)間戳序列是非嚴(yán)格單調(diào)遞增的。