一個Flink程序,其實就是對DataStream的各種轉換。具體來說,代碼基本上由以下幾部分構成:
- 獲取執(zhí)行環(huán)境(Execution Environment);
- 讀取數(shù)據(jù)源(Source);
- 定義基于數(shù)據(jù)的轉換操作(Transformations);
- 定義計算結果的輸出位置(Sink);
-
觸發(fā)執(zhí)行程序;
5.1 執(zhí)行環(huán)境
5.1.1 創(chuàng)建執(zhí)行環(huán)境
創(chuàng)建執(zhí)行環(huán)境,通過調(diào)用StreamExecutionEnviroment類的的靜態(tài)方法。具體有三種:
-
StreamExecutionEnvironment.getExecutionEnvironment,它會根據(jù)當前運行的上下文
直接得到正確的結果;也就是說,這個方法會根據(jù)當前運行的方式,自行決定該返回什么樣的
運行環(huán)境; -
StreamExecutionEnvironment.createLocalEnvironment, 這個方法返回一個本地執(zhí)行環(huán)境; -
StreamExecutionEnvironment.createRemoteEnvironment, 這個方法返回集群執(zhí)行環(huán)境,調(diào)用時需要指定JobManager的主機號和端口號,并指定要運行的jar包;
5.1.2 執(zhí)行模式
- 流執(zhí)行模式(streaming);
- 批執(zhí)行模式(batch),有兩種方式進行配置:
- 命令行配置:
bin/flink run -Dexecution.runtime-mode=BATCH ...; - 代碼中進行配置:
env.setRuntimeMode(RuntimeExcutionMode.BATCH);
- 命令行配置:
- 自動模式(automatic),在這種模式下,將由程序根據(jù)輸入數(shù)據(jù)源是否有界,來自動選擇執(zhí)行模式。
5.2 數(shù)據(jù)源算子(SOURCE)
Flink可以從各種來源獲取數(shù)據(jù),然后構建DataStream進行轉換處理。一般將數(shù)據(jù)的輸入來源稱為數(shù)據(jù)源,而讀取數(shù)據(jù)的算子就是源算子(Source)。因此,Source就是整個處理程序的輸入端。
Flink有多種讀取源數(shù)據(jù)的方式:
// 定義一個模擬的用戶行為樣例類
case class Event(user:String, url:String, timestamp:Long)
// 創(chuàng)建執(zhí)行環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1、從集合讀取數(shù)據(jù)
val clicks = List(Event("Mary", "/.home", 1000L), Event("Bob", "/.cart", 2000L))
val stream1 = env.fromColletctions(clicks)
// 也可以直接將元素列舉出來通過fromElements進行讀取數(shù)據(jù)
val stream1 = env.fromElements(Event("Mary", "/.home", 1000L), Event("Bob", "/.cart", 2000L))
// 2、從文件讀取數(shù)據(jù):可以是目錄/文件,可以是hdfs文件,也可以是本地文件
val stream2 = env.readTextFile("clicks.csv")
// 3、從socket讀取數(shù)據(jù)
val stream3 = env.socketTextStream("localhost", 777)
// 4、從kafka讀取數(shù)據(jù)。需要添加依賴 連接工具 flink-connector-kafka
// 創(chuàng)建 FlinkKafkaConsumer 時需要傳入三個參數(shù):
// (1) topic,定義了從哪些主題中讀取數(shù)據(jù);
// (2) 第二個參數(shù)是一個 DeserializationSchema 或者 KeyedDeserializationSchema, 反序列化方式;
// (3) Properties 對象,設置了 Kafka 客戶端的一些屬性;
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
// 創(chuàng)建kafka相關配置
val properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
//創(chuàng)建一個 FlinkKafkaConsumer 對象,傳入必要參數(shù),從 Kafka 中讀取數(shù)據(jù)
val stream = env.addSource(new FlinkKafkaConsumer[String](
"clicks",
new SimpleStringSchema(),
properties
))
上面介紹的是直接通過API讀取數(shù)據(jù)源。另一種比較復雜的方式是自定義數(shù)據(jù)源,然后通過env.addSource進行讀取。
自定義數(shù)據(jù)源需要實現(xiàn)SourceFunction接口。主要需要重寫兩個關鍵方法:
-
run()方法,使用運行時上下文對象(SourceContext)向下游發(fā)送數(shù)據(jù); -
cancel()方法,通過標識位控制退出循環(huán),來達到中斷數(shù)據(jù)源的效果;
package com.whu.chapter05
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import java.util.Calendar
import scala.util.Random
// 調(diào)用
// val stream = env.addSource(new ClickSource)
case class Event(user: String, url: String, timestamp: Long)
// 實現(xiàn) SourceFunction 接口,接口中的泛型是自定義數(shù)據(jù)源中的類型
class ClickSource(sleepTime:Long=1000L) extends SourceFunction[Event] {
// 標志位,用來控制循環(huán)的退出
var running = true
// 重寫run方法,使用上下文對象sourceContext調(diào)用collect方法
override def run(ctx: SourceContext[Event]): Unit = {
// 實例化一個隨機數(shù)發(fā)生器
val random = new Random()
// 供隨機選擇的用戶名數(shù)組
val users = Array("Marry", "Bob", "Jack", "Cary")
// 供選擇的url數(shù)組
val urls = Array("./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2")
// 通過while循環(huán)發(fā)送數(shù)據(jù),running默認為true,所以會一直發(fā)送數(shù)據(jù)
while (running) {
// 調(diào)用collect方法向下游發(fā)送數(shù)據(jù)
ctx.collect(Event(
users(random.nextInt(users.length)),
urls(random.nextInt(urls.length)),
Calendar.getInstance.getTimeInMillis // 當前時間戳
))
// 每隔一秒生成一個點擊事件,方便觀測
Thread.sleep(sleepTime)
}
}
override def cancel(): Unit = {
// 通過將running設置為false來終止數(shù)據(jù)發(fā)送
running = false
}
}
5.3 轉換算子(Transformation)
數(shù)據(jù)源讀入數(shù)據(jù)之后,我們就可以使用各種轉換算子,講一個或多個DataStream轉換為新的DataStream。
5.3.1 基本轉換算子
-
map, 一個個進行數(shù)據(jù)轉換; -
filter, 對數(shù)據(jù)進行過濾; -
flatmap, 扁平映射,可以理解為先map然后進行flatten;
5.3.2 聚合算子(Aggregation)
-
keyBy, 按鍵分區(qū)。對于Flink來說,DataStream是沒有直接進行覺得API的。要做聚合需要先進行分區(qū),這個操作就是通過keyBy來完成的。keyBy()方法需要傳入一個參數(shù),這個參數(shù)指定了一個或一組 key。有很多不同的方法來指定 key:比如對于 Tuple 數(shù)據(jù)類型,可以指定字段的位置或者多個位置的組合。對于 POJO 類型或 Scala 的樣例類,可以指定字段的名稱(String);另外,還可以傳入 Lambda 表達式或者實現(xiàn)一個鍵選擇器(KeySelector),用于說明從數(shù)據(jù)中提取 key 的邏輯。 - 簡單聚合,sum、min、max、minBy、maxBy等。都是在指定字段上進行聚合操作。min()只計算指定字段的最小值,其他字段會保留最初第一個數(shù)據(jù)的值;而 minBy()則會返回包含字段最小值的整條數(shù)據(jù)。
指定字段的方式有兩種:指定位置,和指定名稱。元組通過位置,樣例類通過字段名稱。
keyBy得到的數(shù)據(jù)流一般稱為KeyedStream。而聚合操作則會將KeyedStream轉換為DataStream。
規(guī)約聚合(reduce)
與簡單聚合類似,reduce操作也會將KeyedStream轉換為DataStream。他不會改變流的元素數(shù)據(jù)類型,輸入輸出是一致的。
reduce方法來自ReduceFunction接口,該方法接收兩個輸入事件,經(jīng)過處理后輸出一個相同數(shù)據(jù)類型的事件。
一個簡單的栗子:
import org.apache.flink.streaming.api.scala._
object TransformationDemo {
def main(args:Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 添加自定義數(shù)據(jù)源
env.addSource(new ClickSource)
.map(r => (r.user, 1L))
// 按照用戶進行分組
.keyBy(_._1)
// 計算每個用戶的訪問頻次
.reduce((r1, r2) => (r1._1, r1._2+r2._2))
// 將所有數(shù)據(jù)分到同一個分區(qū)
.keyBy(_ => true)
// 通過reduce實現(xiàn)max功能,計算訪問頻次最高的用戶
.reduce((r1, r2)=> if(r1._2>r2._2) r1 else r2)
.print()
// 更簡單的方法是直接keyBy然后sum然后maxBy就行了,這里只是為了演示reduce用法
env.execute()
}
}
5.3.3 用戶自定義函數(shù)(UDF)
Flink的DataStream API編程風格其實是一致的:基本都是基于DataStream調(diào)用一個方法,表示要做一個轉換操作;方法需要傳入一個參數(shù),這個參數(shù)都是需要實現(xiàn)一個接口。
這個接口有一個共同特定:全部都以算子操作名稱 + Function命名,如數(shù)據(jù)源算子需要實現(xiàn)SourceFunction接口,map算子需要實現(xiàn)MapFunction接口。我們可以通過三種方式來實現(xiàn)接口。這就是所謂的用戶自定義函數(shù)(UDF)。
- 自定義函數(shù)類;
- 匿名類;
- lambda表達式;
接下來對這三種編程方式做一個梳理。
函數(shù)類(Function Classes)
package com.whu.chapter05
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._
object TransformationUDFDemo {
def main(args:Array[String]): Unit = {
// 自定義filterFunction類, 并接受額外的參數(shù)
class MyFilter(key:String) extends FilterFunction[Event] {
override def filter(t: Event): Boolean = {
t.url.contains(key)
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 通過自定義函數(shù)類
val stream1 = env.addSource(new ClickSource)
.filter(new MyFilter("home"))
// 通過匿名類
val stream2 = env.addSource(new ClickSource)
.filter(new FilterFunction[Event]{
override def filter(t: Event): Boolean = {
t.url.contains("home")
}
})
// 最簡單的lambda 表達式
val stream3 = env.addSource(new ClickSource)
.filter(_.url.contains("home"))
stream1.print("stream1")
stream2.print("stream2")
stream3.print("stream3")
env.execute()
}
}
富函數(shù)類(Rich Function Classes)
富函數(shù)類也是DataStream API提供的一個函數(shù)類的接口,所有的Flink函數(shù)類都有其Rich版本。富函數(shù)類一般是已抽象類的形式出現(xiàn)的。例如:RichMapFunction,RichFilterFunction,RichReduceFunction等。
與常規(guī)函數(shù)類的不同主要在于富函數(shù)類可以獲取運行環(huán)境的上下文,并擁有一些生命周期方法,所以可以實現(xiàn)更復雜的功能。
典型的生命周期方法有:
-
open方法,是RichFunction的初始化方法,會開啟一個算子的生命周期。當一個算子的實際工作方法如map、filter等方法被調(diào)用之前,open會首先被調(diào)用。所以像文件IO流、數(shù)據(jù)庫連接、配置文件讀取等等這樣一次性的工作,都適合在open方法中完成; -
close方法,是生命周期中最后一個調(diào)用的方法,類似于解構方法。一般用來做一些清理工作。
open、close等生命周期方法對于一個并行子任務來說只會調(diào)用一次;而對應的,實際工作方法,如map,對于每一條數(shù)據(jù)都會調(diào)用一次。
package com.whu.chapter05
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
object RichFunctionDemo {
def main(args:Array[String]) : Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
env.addSource(new ClickSource(10000))
.map(new RichMapFunction[Event, Long] {
// 在任務生命周期開始時會執(zhí)行open方法,在控制臺打印對應語句
override def open(parameters: Configuration): Unit = {
println(s"索引為 ${getRuntimeContext.getIndexOfThisSubtask} 的任務開始")
}
override def map(in: Event): Long = {
in.timeStamp
}
override def close(): Unit = {
println(s"索引為 ${getRuntimeContext.getIndexOfThisSubtask} 的任務結束")
}
}).print()
env.execute()
}
}
在上面的例子中可以看到,富函數(shù)類提供了getRuntimeContex方法,可以獲取運行時上下文信息,如程序執(zhí)行的并行度,任務名稱,任務狀態(tài)等。
5.3.4 物理分區(qū)(Physical Partitioning)
分區(qū)(partitioning)操作就是要將數(shù)據(jù)進行重新分布,傳遞到不同的流分區(qū)去進行下一步計算。keyBy是一種邏輯分區(qū)(logic partitioning)操作。
Flink 對于經(jīng)過轉換操作之后的 DataStream,提供了一系列的底層操作算子,能夠幫我們實現(xiàn)數(shù)據(jù)流的手動重分區(qū)。為了同 keyBy()相區(qū)別,我們把這些操作統(tǒng)稱為“物理分區(qū)”操作。
常見的物理分區(qū)策略有隨機分區(qū)、輪詢分區(qū)、重縮放和廣播,還有一種特殊的分區(qū)策略— —全局分區(qū),并且 Flink 還支持用戶自定義分區(qū)策略,下邊我們分別來做了解。
隨機分區(qū)(shuffle)
最簡單的重分區(qū)方式就是直接“洗牌”。通過調(diào)用 DataStream 的 shuffle()方法,將數(shù)據(jù)隨機地分配到下游算子的并行任務中去。
隨機分區(qū)服從均勻分布(uniform distribution),所以可以把流中的數(shù)據(jù)隨機打亂,均勻地傳遞到下游任務分區(qū)。
輪詢分區(qū)(Round-Robin)
輪詢也是一種常見的重分區(qū)方式。簡單來說就是“發(fā)牌”,按照先后順序將數(shù)據(jù)做依次分發(fā)。通過調(diào)用 DataStream的.rebalance()方法,就可以實現(xiàn)輪詢重分區(qū)。rebalance()使用的是 Round-Robin 負載均衡算法,可以將輸入流數(shù)據(jù)平均分配到下游的并行任務中去。

重縮放分區(qū)(rescale)
重縮放分區(qū)和輪詢分區(qū)非常相似。當調(diào)用 rescale()方法時,其實底層也是使用 Round-Robin算法進行輪詢,但是只會將數(shù)據(jù)輪詢發(fā)送到下游并行任務的一部分中,也就是說,“發(fā)牌人”如果有多個,那么 rebalance()的方式是每個發(fā)牌人都面向所有人發(fā)牌;而rescale()的做法是分成小團體,發(fā)牌人只給自己團體內(nèi)的所有人輪流發(fā)牌。

當下游任務(數(shù)據(jù)接收方)的數(shù)量是上游任務(數(shù)據(jù)發(fā)送方)數(shù)量的整數(shù)倍時,rescale()的效率明顯會更高。比如當上游任務數(shù)量是 2,下游任務數(shù)量是 6 時,上游任務其中一個分區(qū)的數(shù)據(jù)就將會平均分配到下游任務的 3 個分區(qū)中。
廣播(broadcast)
這種方式其實不應該叫作“重分區(qū)”,因為經(jīng)過廣播之后,數(shù)據(jù)會在不同的分區(qū)都保留一份,可能進行重復處理??梢酝ㄟ^調(diào)用 DataStream 的 broadcast()方法,將輸入數(shù)據(jù)復制并發(fā)送到下游算子的所有并行任務中去。
全局分區(qū)(global)
全局分區(qū)也是一種特殊的分區(qū)方式。這種做法非常極端,通過調(diào)用.global()方法,會將所有的輸入流數(shù)據(jù)都發(fā)送到下游算子的第一個并行子任務中去。這就相當于強行讓下游任務并行度變成了 1,所以使用這個操作需要非常謹慎,可能對程序造成很大的壓力。
自定義分區(qū)
當 Flink 提 供 的 所 有 分 區(qū) 策 略 都 不 能 滿 足 用 戶 的 需 求 時 , 我 們 可 以 通 過 使 用partitionCustom()方法來自定義分區(qū)策略。
在調(diào)用時,方法需要傳入兩個參數(shù),第一個是自定義分區(qū)器(Partitioner)對象,第二個是應用分區(qū)器的字段,它的指定方式與 keyBy 指定 key 基本一樣:可以通過字段名稱指定,也可以通過字段位置索引來指定,還可以實現(xiàn)一個 KeySelector 接口。
栗子:
package com.whu.chapter05
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.streaming.api.scala._
object PartitioningDemo {
def main(args:Array[String]) : Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 讀取數(shù)據(jù)源
val stream = env.addSource(new ClickSource())
// 隨機分區(qū)(shuffle)
stream.shuffle.print("shuffle").setParallelism(4)
// 輪詢分區(qū)(rebalance, Round-Robin)
stream.rebalance.print("rebalance").setParallelism(4)
// 重縮放分區(qū)(rescale)
stream.rescale.print("rescale").setParallelism(4)
// 廣播 (broadcast)
stream.broadcast.print("broadcast").setParallelism(4)
// 全局分區(qū)(global)
stream.global.print("global").setParallelism(4)
// 自定義分區(qū)
stream.partitionCustom(new Partitioner[Event] {
// 根據(jù) key 的奇偶性計算出數(shù)據(jù)將被發(fā)送到哪個分區(qū)
override def partition(k: Event, i: Int): Int = {
k.timeStamp.toInt % 2
}
}, "user"
).print()
env.execute()
}
}
5.4 輸出算子(Sink)
5.4.1 連接到外部系統(tǒng)
Flink的DataStream API專門提供了向外部寫入數(shù)據(jù)的方法:addSink。與addSource類似,addSink方法對應著一個Sink算子,主要就是用來實現(xiàn)與外部系統(tǒng)鏈接、并將數(shù)據(jù)提交寫入的;Flink程序中所有對外的輸出操作,一般都是利用Sink算子完成的。
與addSource類似,addSink也支持自定義sink算子SinkFunction。在這個接口中只需要重寫一個方法invoke(),用來將指定的值寫入到外部系統(tǒng)中。這個方法在每條數(shù)據(jù)記錄到來時都會調(diào)用。Flink官方提供了諸多第三方系統(tǒng)連接器:

除 Flink 官方之外,Apache Bahir 作為給 Spark 和 Flink 提供擴展支持的項目,也實現(xiàn)了一
些其他第三方系統(tǒng)與 Flink 的連接器:

5.4.2 輸出到文件
Flink有一些非常簡單粗暴的輸出到文件的預實現(xiàn)方法,如writeAsCsv等,目前這些簡單的方法已經(jīng)要被棄用。
Flink專門提供了一個流式文件系統(tǒng)連接器:StreamingFileSink,它繼承自抽象類RichSinkFunction,而且繼承了Flink的檢查點機制,用來確保精確一次(exactly)的一致性語義。
StreamingFileSink支持行編碼(row-encoded)和批量編碼(bulk-encoded,比如parquet)格式。這兩種不同的方式都有各自的構建器(builder),調(diào)用方法如下:
- 行編碼:StreamingFileSink.forRowFormat (basePath, rowEncoder);
- 批量編碼:StreamingFileSink.forBulkFormat (basePath,bulkWriterFactory);
在創(chuàng)建行或批量Sink時,我們需要傳入兩個參數(shù),用來指定存儲桶的基本路徑和數(shù)據(jù)的編碼邏輯。
package com.whu.chapter05
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.streaming.api.scala._
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import java.util.concurrent.TimeUnit
object SinkToFileDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new ClickSource())
val fileSink = StreamingFileSink.forRowFormat(
new Path("./output"),
new SimpleStringEncoder[String]("UTF-8")
)
// 通過.withRollingPolicy()方法指定滾動邏輯
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(1024*1024*1024)
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.build()
).build()
stream.map(_.toString).addSink(fileSink)
}
}
上面創(chuàng)建了一個簡單的文件 Sink,通過 withRollingPolicy()方法指定了一個“滾動策略”。上面的代碼設置了在以下 3 種情況下,我們就會滾動分區(qū)文件:
- 至少包含 15 分鐘的數(shù)據(jù);
- 最近 5 分鐘沒有收到新的數(shù)據(jù);
- 文件大小已達到1GB;
輸出到其他系統(tǒng)
略。
參考:
FLink教程
