對于流式處理,如果我們需要求取總和,平均值,或者最大值,最小值等,是做不到的,因為數(shù)據(jù)一直在源源不斷的產(chǎn)生,即數(shù)據(jù)是沒有邊界的,所以沒法求最大值,最小值,平均值等,所以為了一些數(shù)值統(tǒng)計的功能,我們必須指定時間段,對某一段時間的數(shù)據(jù)求取一些數(shù)據(jù)值是可以做到的?;蛘邔δ骋恍?shù)據(jù)求取數(shù)據(jù)值也是可以做到的
所以,流上的聚合需要由 window 來劃定范圍,比如 “計算過去的5分鐘” ,或者 “最后100個元素的和” 。
window是一種可以把無限數(shù)據(jù)切割為有限數(shù)據(jù)塊的手段
窗口可以是 時間驅(qū)動的 【Time Window】(比如:每30秒)或者 數(shù)據(jù)驅(qū)動的【Count Window】 (比如:每100個元素)。
窗口類型匯總:
1、窗口的基本類型介紹
窗口通常被區(qū)分為不同的類型:
? tumbling windows:滾動窗口 【沒有重疊】
? sliding windows:滑動窗口 【有重疊】
? session windows:會話窗口 ,一般沒人用
tumbling windows類型:沒有重疊的窗口
sliding windows:滑動窗口 【有重疊】
2、Flink的窗口介紹
Time Window窗口的應(yīng)用
time window又分為滾動窗口和滑動窗口,這兩種窗口調(diào)用方法都是一樣的,都是調(diào)用timeWindow這個方法,如果傳入一個參數(shù)就是滾動窗口,如果傳入兩個參數(shù)就是滑動窗口
Count Windos窗口的應(yīng)用
與timeWindow類型,CountWinodw也可以分為滾動窗口和滑動窗口,這兩個窗口調(diào)用方法一樣,都是調(diào)用countWindow,如果傳入一個參數(shù)就是滾動窗口,如果傳入兩個參數(shù)就是滑動窗口
自定義window的應(yīng)用
如果time window和 countWindow還不夠用的話,我們還可以使用自定義window來實現(xiàn)數(shù)據(jù)的統(tǒng)計等功能。
3、window的數(shù)值聚合統(tǒng)計
對于某一個window內(nèi)的數(shù)值統(tǒng)計,我們可以增量的聚合統(tǒng)計或者全量的聚合統(tǒng)計
增量聚合統(tǒng)計:
窗口當(dāng)中每加入一條數(shù)據(jù),就進(jìn)行一次統(tǒng)計
? reduce(reduceFunction)
? aggregate(aggregateFunction)
? sum(),min(),max()
需求:通過接收socket當(dāng)中輸入的數(shù)據(jù),統(tǒng)計每5秒鐘數(shù)據(jù)的累計的值
代碼實現(xiàn):
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
object FlinkTimeCount {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val socketStream: DataStream[String] = environment.socketTextStream("node01",9000)
val print: DataStreamSink[(Int, Int)] = socketStream
.map(x => (1, x.toInt))
.keyBy(0)
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction[(Int, Int)] {
override def reduce(t: (Int, Int), t1: (Int, Int)): (Int, Int) = {
(t._1, t._2 + t1._2)
}
}).print()
environment.execute("startRunning")
}
}
全量聚合統(tǒng)計:
等到窗口截止,或者窗口內(nèi)的數(shù)據(jù)全部到齊,然后再進(jìn)行統(tǒng)計,可以用于求窗口內(nèi)的數(shù)據(jù)的最大值,或者最小值,平均值等
等屬于窗口的數(shù)據(jù)到齊,才開始進(jìn)行聚合計算【可以實現(xiàn)對窗口內(nèi)的數(shù)據(jù)進(jìn)行排序等需求】
apply(windowFunction)
process(processWindowFunction)
processWindowFunction比windowFunction提供了更多的上下文信息。
需求:通過全量聚合統(tǒng)計,求取每3條數(shù)據(jù)的平均值
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow}
import org.apache.flink.util.Collector
object FlinkCountWindowAvg {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val socketStream: DataStream[String] = environment.socketTextStream("node01",9000)
//統(tǒng)計一個窗口內(nèi)的數(shù)據(jù)的平均值
val socketDatas: DataStreamSink[Double] = socketStream.map(x => (1, x.toInt))
.keyBy(0)
//.timeWindow(Time.seconds(10))
.countWindow(3)
//通過process方法來統(tǒng)計窗口的平均值
.process(new MyProcessWindowFunctionclass).print()
//必須調(diào)用execute方法,否則程序不會執(zhí)行
environment.execute("count avg")
}
}
/**ProcessWindowFunction 需要跟四個參數(shù)
- 輸入?yún)?shù)類型,輸出參數(shù)類型,聚合的key的類型,window的下界
*/
class MyProcessWindowFunctionclass extends ProcessWindowFunction[(Int , Int) , Double , Tuple , GlobalWindow]{
override def process(key: Tuple, context: Context, elements: Iterable[(Int, Int)], out: Collector[Double]): Unit = {
var totalNum = 0;
var countNum = 0;
for(data <- elements){
totalNum +=1
countNum += data._2
}
out.collect(countNum/totalNum)
}
}
4、Flink的Time三兄弟
前面我們已經(jīng)介紹過我們可以通過window窗口來統(tǒng)計每一段時間或者每多少條數(shù)據(jù)的一些數(shù)值統(tǒng)計,但是也存在另外一個問題,就是如果數(shù)據(jù)有延遲該如何解決,例如一個窗口定義的是每隔五分鐘統(tǒng)計一次,我們應(yīng)該在上午九點至九點零五分這段時間統(tǒng)計一次數(shù)據(jù)的結(jié)果值,但是由于某一條數(shù)據(jù)由于網(wǎng)絡(luò)延遲,數(shù)據(jù)產(chǎn)生時間是在九點零三分,數(shù)據(jù)到達(dá)我們的flink框架已經(jīng)是在十點零三分了,這種問題怎么解決??
再例如:
原始日志如下:
日志自帶時間
2018-10-10 10:00:01,134 INFO executor.Executor: Finished task in state 0.0
數(shù)據(jù)進(jìn)入flink框架時間:
這條數(shù)據(jù)進(jìn)入Flink的時間是2018-10-10 20:00:00,102
數(shù)據(jù)被window窗口處理時間:
到達(dá)window處理的時間為2018-10-10 20:00:01,100
為了解決這個問題,flink在實時處理當(dāng)中,對數(shù)據(jù)當(dāng)中的時間規(guī)劃為以下三個類型
針對stream數(shù)據(jù)中的時間,可以分為以下三種
? Event Time:事件產(chǎn)生的時間,它通常由事件中的時間戳描述。
? Ingestion time:事件進(jìn)入Flink的時間
? Processing Time:事件被處理時當(dāng)前系統(tǒng)的時間
1、EventTime詳解
EventTime
1.事件生成時的時間,在進(jìn)入Flink之前就已經(jīng)存在,可以從event的字段中抽取。
2.必須指定watermarks(水位線)的生成方式。
3.優(yōu)勢:確定性,亂序、延時、或者數(shù)據(jù)重放等情況,都能給出正確的結(jié)果
4.弱點:處理無序事件時性能和延遲受到影響
2、IngestTime
1.事件進(jìn)入flink的時間,即在source里獲取的當(dāng)前系統(tǒng)的時間,后續(xù)操作統(tǒng)一使用該時間。
2.不需要指定watermarks的生成方式(自動生成)
3.弱點:不能處理無序事件和延遲數(shù)據(jù)
3、ProcessingTime
1.執(zhí)行操作的機(jī)器的當(dāng)前系統(tǒng)時間(每個算子都不一樣)
2.不需要流和機(jī)器之間的協(xié)調(diào)
3.優(yōu)勢:最佳的性能和最低的延遲
4.弱點:不確定性 ,容易受到各種因素影像(event產(chǎn)生的速度、到達(dá)flink的速度、在算子之間傳輸速度等),壓根就不管順序和延遲
4、三種時間的綜合比較
性能: ProcessingTime> IngestTime> EventTime
延遲: ProcessingTime< IngestTime< EventTime
確定性: EventTime> IngestTime> ProcessingTime
5、如何設(shè)置time類型
在我們創(chuàng)建StreamExecutionEnvironment的時候可以設(shè)置time類型,不設(shè)置time類型,默認(rèn)是processingTime,如果設(shè)置time類型為eventTime,那么必須要在我們的source之后明確指定Timestamp Assigner & Watermark Generator
// 設(shè)置時間特性
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 不設(shè)置Time 類型,默認(rèn)是processingTime。
// 如果使用EventTime則需要在source之后明確指定Timestamp Assigner & Watermark Generator
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)