1.Flink API介紹
Flink提供了不同的抽象級別以開發(fā)流式或者批處理應用程序

-
Stateful Stream Processing 最低級的抽象接口是狀態(tài)化的數(shù)據(jù)流接口(stateful
streaming)。這個接口是通過 ProcessFunction 集成到 DataStream API 中的。該接口允許用戶
自由的處理來自一個或多個流中的事件,并使用一致的容錯狀態(tài)。另外,用戶也可以通過注冊
event time 和 processing time 處理回調(diào)函數(shù)的方法來實現(xiàn)復雜的計算 -
DataStream/DataSet API 是 Flink 提供的核心 API ,DataSet 處理
有界的數(shù)據(jù)集,DataStream 處理有界或者無界的數(shù)據(jù)流。用戶可以通過各種方法(map /
flatmap / window / keyby / sum / max / min / avg / join 等)將數(shù)據(jù)進行轉換 / 計算 -
Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起
來卻更加簡潔,可以在表與 DataStream/DataSet 之間無縫切換,也允許程序將 Table API 與
DataStream 以及 DataSet 混合使用 -
SQL Flink 提供的最高層級的抽象是 SQL 。這一層抽象在語法與表達能力上與 Table API 類似。
SQL 抽象與 Table API 交互密切,同時 SQL 查詢可以直接在 Table API 定義的表上執(zhí)行
2.Dataflows 數(shù)據(jù)流
在Flink的世界觀中,一切都是數(shù)據(jù)流,所以對于批計算來說,那只是流計算的一個特例而已
Flink Dataflows是由三部分組成,分別是:Source、Transformation、Sink結束
- Source負責讀取數(shù)據(jù)源
- Transformation利用各種算子進行處理加工
-
Sink最終輸出到外部(console、kafka、redis、DB......)
DataFlow
當source數(shù)據(jù)源的數(shù)量比較大或計算邏輯相對比較復雜的情況下,需要提高并行度來處理數(shù)據(jù),采用并行數(shù)據(jù)流
通過設置不同算子的并行度 source并行度設置為2 map也是2.... 代表會啟動多個并行的線程來處理數(shù)據(jù)

在運行時,F(xiàn)link上運行的程序會被映射成“邏輯數(shù)據(jù)流”(dataflows),它包含了這三部分。每一個dataflow以一個或多個sources開始以一個或多個sinks結束。dataflow類似于任意的有向無環(huán)圖(DAG)。在大部分情況下,程序中的轉換運算(transformations)跟dataflow中的算子(operator)是一一對應的關系,但有時候,一個transformation可能對應多個operator
3.Flink DataStream API
3.1 Environment
StreamExecutionEnvironment是所有Flink程序的基礎。您可以使用以下靜態(tài)方法獲得一個StreamExecutionEnvironment:
#創(chuàng)建一個執(zhí)行環(huán)境,表示當前執(zhí)行程序的上下文。
#如果程序是獨立調(diào)用的,則此方法返回本地執(zhí)行環(huán)境;
#如果從命令行客戶端調(diào)用程序以提交到集群,則此方法返回此集群的執(zhí)行環(huán)境
#也就是說,getExecutionEnvironment會根據(jù)查詢運行的方式?jīng)Q定返回什么樣的運行環(huán)境,是最常用的一種創(chuàng)建執(zhí)行環(huán)境的方式。
getExecutionEnvironment()
#返回本地執(zhí)行環(huán)境
createLocalEnvironment()
#返回集群執(zhí)行環(huán)境,將Jar提交到遠程服務器。
#需要在調(diào)用時指定JobManager的IP和端口號,并指定要在集群中運行的Jar包。
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
3.2 Data Sources
Flink內(nèi)嵌支持的數(shù)據(jù)源非常多,比如HDFS、Socket、Kafka、Collections Flink也提供了addSource方式,可以自定義數(shù)據(jù)源,本小節(jié)將講解Flink所有內(nèi)嵌數(shù)據(jù)源及自定義數(shù)據(jù)源的原理及API
3.2.1 File-based:
通過讀取本地、HDFS文件創(chuàng)建一個數(shù)據(jù)源
env.readTextFile(path)
env.readTextFile(path)
env.readFile(fileInputFormat, path, watchType, interval, pathFilter)
readTextFile底層調(diào)用的就是readFile方法,readFile是一個更加底層的方式,使用起來會更加的靈活
3.2.2 Socket-based:
接受Socket Server中的數(shù)據(jù)
env.socketTextStream("node09",8888)
3.2.3 Collection-based:
env.fromCollection(Seq)
env.fromCollection(Iterator)
env.fromElements(elements: _*)
env.fromParallelCollection(SplittableIterator)
env.generateSequence(from, to)
3.2.4 Kafka Source
Flink的Kafka 消費類FlinkKafkaConsumer 這個是通用的kafka連接器適用0.10 以上的版本(或FlinkKafkaConsumer011 對應的是Kafka 0.11.x或FlinkKafkaConsumer010 對應Kafka 0.10.x)
kafka 作為source的例子:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
上面使用Flink已經(jīng)定義好的反序列化shema SimpleStringSchema 但是返回的結果只有Kafka的value,而沒有其它信息
如果需要獲得Kafka的消息的key、value 和元數(shù)據(jù),就需要通過實現(xiàn)KafkaDeserializationSchema接口方法deserialize 來實現(xiàn)
flink 對kafka的連接比較重要,后面會專門研究kafka connector
3.2.5 Custom Source
我們可以通過實現(xiàn) Flink 的SourceFunction 來實現(xiàn)單個或者多個并行度的 Source。具體調(diào)用如下:
val stream = env.addSource( new MySensorSource() )
我們希望可以隨機生成傳感器數(shù)據(jù),MySensorSource具體的代碼實現(xiàn)如下:
class MySensorSource extends SourceFunction[SensorReading]{
// flag: 表示數(shù)據(jù)源是否還在正常運行
var running: Boolean = true
override def cancel(): Unit = {
running = false
}
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
// 初始化一個隨機數(shù)發(fā)生器
val rand = new Random()
var curTemp = 1.to(10).map(
i => ( "sensor_" + i, 65 + rand.nextGaussian() * 20 )
)
while(running){
// 更新溫度值
curTemp = curTemp.map(
t => (t._1, t._2 + rand.nextGaussian() )
)
// 獲取當前時間戳
val curTime = System.currentTimeMillis()
curTemp.foreach(
t => ctx.collect(SensorReading(t._1, curTime, t._2))
)
Thread.sleep(100)
}
}
}
3.3 Transformations
Transformations算子可以將一個或者多個算子轉換成一個新的數(shù)據(jù)流,使用Transformations算子組合可以進行復雜的業(yè)務處理
3.3.1 簡單的操作 DataStream
① Map
DataStream → DataStream
遍歷數(shù)據(jù)流中的每一個元素,產(chǎn)生一個新的元素
② FlatMap
DataStream → DataStream
遍歷數(shù)據(jù)流中的每一個元素,產(chǎn)生N個元素 N=0,1,2,......
③ Filter
DataStream → DataStream
過濾算子,根據(jù)數(shù)據(jù)流的元素計算出一個boolean類型的值,true代表保留,false代表過濾掉
3.3.2 分組流的操作 KeyedStream
① KeyBy
DataStream → KeyedStream
根據(jù)數(shù)據(jù)流中指定的字段來分區(qū),相同指定字段值的數(shù)據(jù)一定是在同一個分區(qū)中,內(nèi)部分區(qū)使用的是HashPartitioner
指定分區(qū)字段的方式有三種:
1、根據(jù)索引號或者field 指定 (最新版本已經(jīng)不推薦使用這個方法指定分區(qū))
2、通過匿名函數(shù)來指定
3、通過實現(xiàn)KeySelector接口 指定分區(qū)字段
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1, 100)
stream
.map(x => (x % 3, 1))
//根據(jù)索引號來指定分區(qū)字段
// .keyBy(0)
//通過傳入匿名函數(shù) 指定分區(qū)字段
// .keyBy(x=>x._1)
//通過實現(xiàn)KeySelector接口 指定分區(qū)字段
.keyBy(new KeySelector[(Long, Int), Long] {
override def getKey(value: (Long, Int)): Long = value._1
})
.sum(1)
.print()
env.execute()
② Reduce
根據(jù)key聚合結果
注意: reduce是基于分區(qū)后的流對象進行聚合,也就是說,DataStream類型的對象無法調(diào)用reduce方法
.reduce((v1,v2) => (v1._1,v1._2 + v2._2))
④Fold
一個有初始值的分組數(shù)據(jù)流的滾動折疊操作. 合并當前元素和前一次折疊操作的結果,并產(chǎn)生一個新的值.
下面的fold函數(shù)就是當我們輸入一個 (1,2,3,4,5)的序列, 將會產(chǎn)生一下面的句子:"start-1", "start-1-2", "start-1-2-3", ...
val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i })
⑤ Aggregations
KeyedStream → DataStream
Aggregations代表的是一類聚合算子,具體算子如下:
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
3.2.3 連接算子
① Union
DataStream → DataStream
合并兩個或者更多的數(shù)據(jù)流產(chǎn)生一個新的數(shù)據(jù)流,這個新的數(shù)據(jù)流中包含了所合并的數(shù)據(jù)流的元素
注意:需要保證數(shù)據(jù)流中元素類型一致
val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds1 = env.fromCollection(List(("a",1),("b",2),("c",3)))
val ds2 = env.fromCollection(List(("d",4),("e",5),("f",6)))
val ds3 = env.fromCollection(List(("g",7),("h",8)))
// val ds3 = env.fromCollection(List((1,1),(2,2)))
val unionStream = ds1.union(ds2,ds3)
unionStream.print()
env.execute()
② Connect
DataStream,DataStream → ConnectedStreams
合并兩個數(shù)據(jù)流并且保留兩個數(shù)據(jù)流的數(shù)據(jù)類型,能夠共享兩個流的狀態(tài)
val ds1 = env.socketTextStream("node01", 8888)
val ds2 = env.socketTextStream("node01", 9999)
val wcStream1 = ds1.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
val wcStream2 = ds2.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
val restStream: ConnectedStreams[(String, Int), (String, Int)] = wcStream2.connect(wcStream1)
ConnectedStreams 可以通過下面的算子來操作最后輸出DataStream,具體看ConnectedStreams

Connect與 Union 區(qū)別:
- Union之前兩個流的類型必須是一樣,Connect可以不一樣,在之后的coMap中再去調(diào)整成為一樣的。
- Connect只能操作兩個流,Union可以操作多個。
3.2.4 連接流的操作算子 ConnectedStreams
① CoMap, CoFlatMap
ConnectedStreams → DataStream
CoMap, CoFlatMap并不是具體算子名字,而是一類操作名稱
凡是基于ConnectedStreams數(shù)據(jù)流做map遍歷,這類操作叫做CoMap
凡是基于ConnectedStreams數(shù)據(jù)流做flatMap遍歷,這類操作叫做CoFlatMap
CoMap第一種實現(xiàn)方式:
connectedStream.map(new CoMapFunction[(String,Int),(String,Int),(String,Int)] {
//對第一個數(shù)據(jù)流做計算
override def map1(value: (String, Int)): (String, Int) = {
(value._1+":first",value._2+100)
}
//對第二個數(shù)據(jù)流做計算
override def map2(value: (String, Int)): (String, Int) = {
(value._1+":second",value._2*100)
}
}).print()
CoMap第二種實現(xiàn)方式:
connectedStream.map(
//對第一個數(shù)據(jù)流做計算
x=>{(x._1+":first",x._2+100)}
//對第二個數(shù)據(jù)流做計算
,y=>{(y._1+":second",y._2*100)}
).print()
CoFlatMap第一種實現(xiàn)方式:
ds1.connect(ds2).flatMap((x,c:Collector[String])=>{
//對第一個數(shù)據(jù)流做計算
x.split(" ").foreach(w=>{
c.collect(w)
})
}
//對第二個數(shù)據(jù)流做計算
,(y,c:Collector[String])=>{
y.split(" ").foreach(d=>{
c.collect(d)
})
}).print
CoFlatMap第二種實現(xiàn)方式:
ds1.connect(ds2).flatMap(
//對第一個數(shù)據(jù)流做計算
x=>{
x.split(" ")
}
//對第二個數(shù)據(jù)流做計算
,y=>{
y.split(" ")
}).print()
CoFlatMap第三種實現(xiàn)方式:
ds1.connect(ds2).flatMap(new CoFlatMapFunction[String,String,(String,Int)] {
//對第一個數(shù)據(jù)流做計算
override def flatMap1(value: String, out: Collector[(String, Int)]): Unit = {
val words = value.split(" ")
words.foreach(x=>{
out.collect((x,1))
})
}
//對第二個數(shù)據(jù)流做計算
override def flatMap2(value: String, out: Collector[(String, Int)]): Unit = {
val words = value.split(" ")
words.foreach(x=>{
out.collect((x,1))
})
}
}).print()
3.2.5 拆分流
① Split
DataStream → SplitStream
根據(jù)條件將一個流分成兩個或者更多的流
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,100)
val splitStream = stream.split(
d => {
d % 2 match {
case 0 => List("even")
case 1 => List("odd")
}
}
)
splitStream.select("even").print()
env.execute()
@deprecated Please use side output instead
② Select
SplitStream → DataStream
從SplitStream中選擇一個或者多個數(shù)據(jù)流
splitStream.select("even").print()
③ side output側輸出流
流計算過程,可能遇到根據(jù)不同的條件來分隔數(shù)據(jù)流。filter分割造成不必要的數(shù)據(jù)復制
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("node01",8888)
val gtTag = new OutputTag[String]("gt")
val processStream = stream.process(new ProcessFunction[String, String] {
override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
try {
val longVar = value.toLong
if (longVar > 100) {
out.collect(value)
} else {
ctx.output(gtTag, value)
}
} catch {
case e => e.getMessage
ctx.output(gtTag, value)
}
}
})
val sideStream = processStream.getSideOutput(gtTag)
sideStream.print("sideStream")
processStream.print("mainStream")
env.execute()
3.2.6 Iterate 迭代算子
DataStream → IterativeStream → DataStream
Iterate算子提供了對數(shù)據(jù)流迭代的支持,對于定義不斷更新模型的算法特別有用
迭代由兩部分組成:迭代體、終止迭代條件
不滿足終止迭代條件的數(shù)據(jù)流會返回到stream流中,進行下一次迭代
滿足終止迭代條件的數(shù)據(jù)流繼續(xù)往下游發(fā)送
val env = StreamExecutionEnvironment.getExecutionEnvironment
val initStream = env.socketTextStream("node01",8888)
val stream = initStream.map(_.toLong)
stream.iterate {
iteration => {
//定義迭代邏輯
val iterationBody = iteration.map ( x => {
println(x)
if(x > 0) x - 1
else x
} )
//> 0 大于0的值繼續(xù)返回到stream流中,當 <= 0 繼續(xù)往下游發(fā)送
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}.print()
env.execute()
3.2.7 Physical partitioning 分區(qū)算子 分區(qū)策略
① shuffle
場景:增大分區(qū)、提高并行度,解決數(shù)據(jù)傾斜
DataStream → DataStream
分區(qū)元素隨機均勻分發(fā)到下游分區(qū),網(wǎng)絡開銷比較大
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(1)
println(stream.getParallelism)
stream.shuffle.print()
env.execute()
console result: 上游數(shù)據(jù)比較隨意的分發(fā)到下游
② rebalance
場景:增大分區(qū)、提高并行度,解決數(shù)據(jù)傾斜
DataStream → DataStream
輪詢分區(qū)元素,均勻的將元素分發(fā)到下游分區(qū),下游每個分區(qū)的數(shù)據(jù)比較均勻,在發(fā)生數(shù)據(jù)傾斜時非常有用,網(wǎng)絡開銷比較大
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
val stream = env.generateSequence(1,100)
val shuffleStream = stream.rebalance
shuffleStream.print()
env.execute()
console result:上游數(shù)據(jù)比較均勻的分發(fā)到下游
③ rescale
場景:減少分區(qū) 防止發(fā)生大量的網(wǎng)絡傳輸 不會發(fā)生全量的重分區(qū)
DataStream → DataStream
通過輪詢分區(qū)元素,將一個元素集合從上游分區(qū)發(fā)送給下游分區(qū),發(fā)送單位是集合,而不是一個個元素
注意:rescale發(fā)生的是本地數(shù)據(jù)傳輸,而不需要通過網(wǎng)絡傳輸數(shù)據(jù),比如taskmanager的槽數(shù)。簡單來說,上游的數(shù)據(jù)只會發(fā)送給本TaskManager中的下游

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.rescale.writeAsText("./data/stream2").setParallelism(4)
env.execute()
④ broadcast
場景:需要使用映射表、并且映射表會經(jīng)常發(fā)生變動的場景
DataStream → DataStream
上游中每一個元素內(nèi)容廣播到下游每一個分區(qū)中
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.broadcast.writeAsText("./data/stream2").setParallelism(4)
env.execute()
⑤ global
場景:并行度降為1
DataStream → DataStream
上游分區(qū)的數(shù)據(jù)只分發(fā)給下游的第一個分區(qū)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.global.writeAsText("./data/stream2").setParallelism(4)
env.execute()
⑥ forward
場景:一對一的數(shù)據(jù)分發(fā),map、flatMap、filter 等都是這種分區(qū)策略
DataStream → DataStream
上游分區(qū)數(shù)據(jù)分發(fā)到下游對應分區(qū)中
partition1->partition1
partition2->partition2
注意:必須保證上下游分區(qū)數(shù)(并行度)一致,不然會有如下異常:
Forward partitioning does not allow change of parallelism
* Upstream operation: Source: Sequence Source-1 parallelism: 2,
* downstream operation: Sink: Unnamed-4 parallelism: 4
* stream.forward.writeAsText("./data/stream2").setParallelism(4)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.forward.writeAsText("./data/stream2").setParallelism(2)
env.execute()
⑦ keyBy
場景:與業(yè)務場景匹配
DataStream → DataStream
根據(jù)上游分區(qū)元素的Hash值與下游分區(qū)數(shù)取模計算出,將當前元素分發(fā)到下游哪一個分區(qū)
MathUtils.murmurHash(keyHash)(每個元素的Hash值) % maxParallelism(下游分區(qū)數(shù))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.keyBy(0).writeAsText("./data/stream2").setParallelism(2)
env.execute()
根據(jù)元素Hash值分發(fā)到下游分區(qū)中
⑧ PartitionCustom 自定義分區(qū)
DataStream → DataStream
通過自定義的分區(qū)器,來決定元素是如何從上游分區(qū)分發(fā)到下游分區(qū)
object ShuffleOperator {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val stream = env.generateSequence(1,10).map((_,1))
stream.writeAsText("./data/stream1")
stream.partitionCustom(new customPartitioner(),0)
.writeAsText("./data/stream2").setParallelism(4)
env.execute()
}
class customPartitioner extends Partitioner[Long]{
override def partition(key: Long, numPartitions: Int): Int = {
key.toInt % numPartitions
}
}
}
3.4 Sink
Flink內(nèi)置了大量sink,可以將Flink處理后的數(shù)據(jù)輸出到HDFS、kafka、Redis、ES、MySQL等等。除此以外,需要用戶自定義實現(xiàn)sink。
工程場景中,會經(jīng)常消費kafka中數(shù)據(jù),處理結果存儲到Redis、HBase或者MySQL中
3.4.1 redis Sink
Flink處理的數(shù)據(jù)可以存儲到Redis中,以便實時查詢
Flink內(nèi)嵌連接Redis的連接器,只需要導入連接Redis的依賴就可以
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
WordCount寫入到Redis中,選擇的是HSET數(shù)據(jù)類型
代碼如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("node01",8888)
val result = stream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
//若redis是單機
val config = new FlinkJedisPoolConfig.Builder().setDatabase(3).setHost("node01").setPort(6379).build()
//如果是 redis集群
/*val addresses = new util.HashSet[InetSocketAddress]()
addresses.add(new InetSocketAddress("node01",6379))
addresses.add(new InetSocketAddress("node01",6379))
val clusterConfig = new FlinkJedisClusterConfig.Builder().setNodes(addresses).build()*/
result.addSink(new RedisSink[(String,Int)](config,new RedisMapper[(String,Int)] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET,"wc")
}
override def getKeyFromData(t: (String, Int)) = {
t._1
}
override def getValueFromData(t: (String, Int)) = {
t._2 + ""
}
}))
env.execute()
3.4.2 Kafka Sink
處理結果寫入到kafka topic中,F(xiàn)link也是默認支持,需要添加連接器依賴,跟讀取kafka數(shù)據(jù)用的連接器依賴相同
跟讀取kafka數(shù)據(jù)用的連接器依賴相同
之前添加過就不需要再次添加了
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink-version}</version>
</dependency>
import java.lang
import java.util.Properties
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
object KafkaSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("node01",8888)
val result = stream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
val props = new Properties()
props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
// props.setProperty("key.serializer",classOf[StringSerializer].getName)
// props.setProperty("value.serializer",classOf[StringSerializer].getName)
/**
public FlinkKafkaProducer(
FlinkKafkaProducer(defaultTopic: String, serializationSchema: KafkaSerializationSchema[IN], producerConfig: Properties, semantic: FlinkKafkaProducer.Semantic)
*/
result.addSink(new FlinkKafkaProducer[(String,Int)]("wc",new KafkaSerializationSchema[(String, Int)] {
override def serialize(element: (String, Int), timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord("wc",element._1.getBytes(),(element._2+"").getBytes())
}
},props,FlinkKafkaProducer.Semantic.EXACTLY_ONCE))
env.execute()
}
}
import java.lang
import java.util.Properties
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
object KafkaSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("node01",8888)
val result = stream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
val props = new Properties()
props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
// props.setProperty("key.serializer",classOf[StringSerializer].getName)
// props.setProperty("value.serializer",classOf[StringSerializer].getName)
/**
public FlinkKafkaProducer(
FlinkKafkaProducer(defaultTopic: String, serializationSchema: KafkaSerializationSchema[IN], producerConfig: Properties, semantic: FlinkKafkaProducer.Semantic)
*/
result.addSink(new FlinkKafkaProducer[(String,Int)]("wc",new KafkaSerializationSchema[(String, Int)] {
override def serialize(element: (String, Int), timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord("wc",element._1.getBytes(),(element._2+"").getBytes())
}
},props,FlinkKafkaProducer.Semantic.EXACTLY_ONCE))
env.execute()
}
}
3.4.3 MySQL Sink(冪等性)
Flink處理結果寫入到MySQL中,這并不是Flink默認支持的,需要添加MySQL的驅動依賴
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
因為不是內(nèi)嵌支持的,所以需要基于RichSinkFunction自定義sink
消費kafka中數(shù)據(jù),統(tǒng)計各個卡口的流量,并且存入到MySQL中
注意:需要去重,操作MySQL需要冪等性
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringSerializer
object MySQLSink {
case class CarInfo(monitorId: String, carId: String, eventTime: String, Speed: Long)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//設置連接kafka的配置信息
val props = new Properties()
props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
props.setProperty("group.id", "flink-kafka-001")
props.setProperty("key.deserializer", classOf[StringSerializer].getName)
props.setProperty("value.deserializer", classOf[StringSerializer].getName)
//第一個參數(shù) : 消費的topic名
val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
//什么時候停止,停止條件是什么
override def isEndOfStream(t: (String, String)): Boolean = false
//要進行序列化的字節(jié)流
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
val key = new String(consumerRecord.key(), "UTF-8")
val value = new String(consumerRecord.value(), "UTF-8")
(key, value)
}
//指定一下返回的數(shù)據(jù)類型 Flink提供的類型
override def getProducedType: TypeInformation[(String, String)] = {
createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
}
}, props))
stream.map(data => {
val value = data._2
val splits = value.split("\t")
val monitorId = splits(0)
(monitorId, 1)
}).keyBy(_._1)
.reduce(new ReduceFunction[(String, Int)] {
//t1:上次聚合完的結果 t2:當前的數(shù)據(jù)
override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) = {
(t1._1, t1._2 + t2._2)
}
}).addSink(new MySQLCustomSink)
env.execute()
}
//冪等性寫入外部數(shù)據(jù)庫MySQL
class MySQLCustomSink extends RichSinkFunction[(String, Int)] {
var conn: Connection = _
var insertPst: PreparedStatement = _
var updatePst: PreparedStatement = _
//每來一個元素都會調(diào)用一次
override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
println(value)
updatePst.setInt(1, value._2)
updatePst.setString(2, value._1)
updatePst.execute()
println(updatePst.getUpdateCount)
if(updatePst.getUpdateCount == 0){
println("insert")
insertPst.setString(1, value._1)
insertPst.setInt(2, value._2)
insertPst.execute()
}
}
//thread初始化的時候執(zhí)行一次
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://node01:3306/test", "root", "123123")
insertPst = conn.prepareStatement("INSERT INTO car_flow(monitorId,count) VALUES(?,?)")
updatePst = conn.prepareStatement("UPDATE car_flow SET count = ? WHERE monitorId = ?")
}
//thread關閉的時候 執(zhí)行一次
override def close(): Unit = {
insertPst.close()
updatePst.close()
conn.close()
}
}
}
3.4.4 Socket Sink
Flink處理結果發(fā)送到套接字(Socket)
基于RichSinkFunction自定義sink
import java.io.PrintStream
import java.net.{InetAddress, Socket}
import java.util.Properties
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringSerializer
//sink 到 套接字 socket
object SocketSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//設置連接kafka的配置信息
val props = new Properties()
//注意 sparkstreaming + kafka(0.10之前版本) receiver模式 zookeeper url(元數(shù)據(jù))
props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
props.setProperty("group.id", "flink-kafka-001")
props.setProperty("key.deserializer", classOf[StringSerializer].getName)
props.setProperty("value.deserializer", classOf[StringSerializer].getName)
//第一個參數(shù) : 消費的topic名
val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
//什么時候停止,停止條件是什么
override def isEndOfStream(t: (String, String)): Boolean = false
//要進行序列化的字節(jié)流
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
val key = new String(consumerRecord.key(), "UTF-8")
val value = new String(consumerRecord.value(), "UTF-8")
(key, value)
}
//指定一下返回的數(shù)據(jù)類型 Flink提供的類型
override def getProducedType: TypeInformation[(String, String)] = {
createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
}
}, props))
stream.map(data => {
val value = data._2
val splits = value.split("\t")
val monitorId = splits(0)
(monitorId, 1)
}).keyBy(_._1)
.reduce(new ReduceFunction[(String, Int)] {
//t1:上次聚合完的結果 t2:當前的數(shù)據(jù)
override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) = {
(t1._1, t1._2 + t2._2)
}
}).addSink(new SocketCustomSink("node01",8888))
env.execute()
}
class SocketCustomSink(host:String,port:Int) extends RichSinkFunction[(String,Int)]{
var socket: Socket = _
var writer:PrintStream = _
override def open(parameters: Configuration): Unit = {
socket = new Socket(InetAddress.getByName(host), port)
writer = new PrintStream(socket.getOutputStream)
}
override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
writer.println(value._1 + "\t" +value._2)
writer.flush()
}
override def close(): Unit = {
writer.close()
socket.close()
}
}
}
3.4.5 HBase Sink
計算結果寫入sink 兩種實現(xiàn)方式:
- map算子寫入 頻繁創(chuàng)建hbase連接
- process寫入 適合批量寫入hbase
導入HBase依賴包
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
讀取kafka數(shù)據(jù),統(tǒng)計卡口流量保存至HBase數(shù)據(jù)庫中
- HBase中創(chuàng)建對應的表
create 'car_flow',{NAME => 'count', VERSIONS => 1}
- 實現(xiàn)代碼
import java.util.{Date, Properties}
import com.msb.stream.util.{DateUtils, HBaseUtil}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HTable, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.common.serialization.StringSerializer
object HBaseSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//設置連接kafka的配置信息
val props = new Properties()
//注意 sparkstreaming + kafka(0.10之前版本) receiver模式 zookeeper url(元數(shù)據(jù))
props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
props.setProperty("group.id", "flink-kafka-001")
props.setProperty("key.deserializer", classOf[StringSerializer].getName)
props.setProperty("value.deserializer", classOf[StringSerializer].getName)
val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(), props))
stream.map(row => {
val arr = row.split("\t")
(arr(0), 1)
}).keyBy(_._1)
.reduce((v1: (String, Int), v2: (String, Int)) => {
(v1._1, v1._2 + v2._2)
}).process(new ProcessFunction[(String, Int), (String, Int)] {
var htab: HTable = _
override def open(parameters: Configuration): Unit = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")
val hbaseName = "car_flow"
htab = new HTable(conf, hbaseName)
}
override def close(): Unit = {
htab.close()
}
override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = {
// rowkey:monitorid 時間戳(分鐘) value:車流量
val min = DateUtils.getMin(new Date())
val put = new Put(Bytes.toBytes(value._1))
put.addColumn(Bytes.toBytes("count"), Bytes.toBytes(min), Bytes.toBytes(value._2))
htab.put(put)
}
})
env.execute()
}
}
3.4.6 Elasticsearch Sink
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink-version}</version>
</dependency>
在主函數(shù)中調(diào)用:
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("localhost", 9200))
val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading]( httpHosts, new ElasticsearchSinkFunction[SensorReading] {
override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
println("saving data: " + t)
val json = new util.HashMap[String, String]()
json.put("data", t.toString)
val indexRequest = Requests.indexRequest().index("sensor").`type`("readingData").source(json)
requestIndexer.add(indexRequest)
println("saved successfully")
}
} )
dataStream.addSink( esSinkBuilder.build() )
3.5 支持的數(shù)據(jù)類型
Flink流應用程序處理的是以數(shù)據(jù)對象表示的事件流。所以在Flink內(nèi)部,我們需要能夠處理這些對象。它們需要被序列化和反序列化,以便通過網(wǎng)絡傳送它們;或者從狀態(tài)后端、檢查點和保存點讀取它們。為了有效地做到這一點,F(xiàn)link需要明確知道應用程序所處理的數(shù)據(jù)類型。Flink使用類型信息的概念來表示數(shù)據(jù)類型,并為每個數(shù)據(jù)類型生成特定的序列化器、反序列化器和比較器。
Flink還具有一個類型提取系統(tǒng),該系統(tǒng)分析函數(shù)的輸入和返回類型,以自動獲取類型信息,從而獲得序列化器和反序列化器。但是,在某些情況下,例如lambda函數(shù)或泛型類型,需要顯式地提供類型信息,才能使應用程序正常工作或提高其性能。
Flink支持Java和Scala中所有常見數(shù)據(jù)類型。使用最廣泛的類型有以下幾種。
3.5.1 基礎數(shù)據(jù)類型
Flink支持所有的Java和Scala基礎數(shù)據(jù)類型,Int, Double, Long, String, …?
val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
numbers.map( n => n + 1 )
3.5.2 Java和Scala元組(Tuples)
val persons: DataStream[(String, Integer)] = env.fromElements(
("Adam", 17),
("Sarah", 23) )
persons.filter(p => p._2 > 18)
3.5.3 Scala樣例類(case classes)
case class Person(name: String, age: Int)
val persons: DataStream[Person] = env.fromElements(
Person("Adam", 17),
Person("Sarah", 23) )
persons.filter(p => p.age > 18)
3.5.4 Java簡單對象(POJOs)
public class Person {
public String name;
public int age;
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
DataStream<Person> persons = env.fromElements(
new Person("Alex", 42),
new Person("Wendy", 23));
3.5.5 其它(Arrays, Lists, Maps, Enums, 等等)
Flink對Java和Scala中的一些特殊目的的類型也都是支持的,比如Java的ArrayList,HashMap,Enum等等。
3.6 實現(xiàn)UDF函數(shù)——更細粒度的控制流
3.6.1 函數(shù)類(Function Classes)
Flink暴露了所有udf函數(shù)的接口(實現(xiàn)方式為接口或者抽象類)。例如MapFunction, FilterFunction, ProcessFunction等等。
下面例子實現(xiàn)了FilterFunction接口:
class FilterFilter extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}
}
val flinkTweets = tweets.filter(new FlinkFilter)
還可以將函數(shù)實現(xiàn)成匿名類
val flinkTweets = tweets.filter(
new RichFilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}
}
)
我們filter的字符串"flink"還可以當作參數(shù)傳進去。
val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(new KeywordFilter("flink"))
class KeywordFilter(keyWord: String) extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains(keyWord)
}
}
3.6.2 匿名函數(shù)(Lambda Functions)
val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(_.contains("flink"))
3.6.3 富函數(shù)(Rich Functions)
“富函數(shù)”是DataStream API提供的一個函數(shù)類的接口,所有Flink函數(shù)類都有其Rich版本。它與常規(guī)函數(shù)的不同在于,可以獲取運行環(huán)境的上下文,并擁有一些生命周期方法,所以可以實現(xiàn)更復雜的功能。
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- …?
Rich Function有一個生命周期的概念。典型的生命周期方法有: - open()方法是rich function的初始化方法,當一個算子例如map或者filter被調(diào)用之前open()會被調(diào)用。
- close()方法是生命周期中的最后一個調(diào)用的方法,做一些清理工作。
- getRuntimeContext()方法提供了函數(shù)的RuntimeContext的一些信息,例如函數(shù)執(zhí)行的并行度,任務的名字,以及state狀態(tài)
class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {
var subTaskIndex = 0
override def open(configuration: Configuration): Unit = {
subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
// 以下可以做一些初始化工作,例如建立一個和HDFS的連接
}
override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
if (in % 2 == subTaskIndex) {
out.collect((subTaskIndex, in))
}
}
override def close(): Unit = {
// 以下做一些清理工作,例如斷開和HDFS的連接。
}
}
參考flink 官網(wǎng) Flink DataStream API
參考flink 官網(wǎng) Apache Kafka Connector 1.11
參考flink 官網(wǎng) Apache Kafka Connector1.10
參考flink 官網(wǎng) Operators
參考flink 官網(wǎng) Connectors
