flink流處理和批處理都內(nèi)置了很多數(shù)據(jù)源,可以滿足我們大部分使用場景,當(dāng)然也可以通過實現(xiàn)flink提供的接口來實現(xiàn)其他數(shù)據(jù)源的接入。接下來我們就分別來了解下flink批處理和流處理的數(shù)據(jù)源吧。
1 批處理
批處理的數(shù)據(jù)源主要大致分為兩類:集合數(shù)據(jù)源、文件數(shù)據(jù)源。
sequence
集合數(shù)據(jù)源之迭代類。平時我們所使用到的scala中Seq(),Array(),List(),Map(),Set()等都是迭代類。
env.fromCollection(data: Iterable[T])
// 示例
env.fromCollection(Seq(1, 2, 3))
env.fromCollection(Array(1, 2, 3, 4, 5))
env.fromCollection(List(1, 2, 3, 4, 5))
env.fromCollection(Map("a" -> 1, "b" -> 2, "c" -> 3))
env.fromCollection(Set(3, 5, 6))
// 并行版本
env.fromParallelCollection(iterator: SplittableIterator[T])
// 示例
env.fromParallelCollection(new NumberSequenceIterator(3, 19))

集合類型源之元素數(shù)組。類似于java中的可變參數(shù)列表。
env.fromElements(data: T*)
// 示例
env.fromElements(1, 2, 3)
env.fromElements("a", "b", "c")
file
一般來說,我們的批處理程序的數(shù)據(jù)源主要都是文件數(shù)據(jù)源,包括本地文件系統(tǒng)中的和分布式文件系統(tǒng)中的文件。使用頻次比較高的是readTextFile()和readCsvFile(),當(dāng)然也可以通過重寫readFlie(new FileInputStream[T], filePath: String)中的FileInputFormat來讀取不同格式文件。
env.readFile(new FileInputStream[T], filePath: String)
env.readTextFile(filePath: String, charsetName: String = "UTF-8")
env.readTextFileWithValue(filePath: String, charsetName: String = "UTF-8")
env.readFileOfPrimitives(filePath: String, delimiter: String = "\n")
env.readCsvFile(
filePath: String, // csv文件地址
lineDelimiter: String = "\n", // 行劃分符號
fieldDelimiter: String = ",", // 字段劃分符號
quoteCharacter: Character = null,
ignoreFirstLine: Boolean = false, // 忽略首行
ignoreComments: String = null,
lenient: Boolean = false, // 是否對數(shù)據(jù)嚴(yán)格判斷。false表示嚴(yán)格判斷,缺失的數(shù)據(jù)則會被忽略
includedFields: Array[Int] = null,
pojoFields: Array[String] = null
)
readFile,自定義讀取文件內(nèi)容的邏輯
env.readFile(new MyFileInputFormat[String](), getPath("words.txt"))
// 重寫讀取文件的邏輯
class MyFileInputFormat extends FileInputFormat[String]() {
private var end = false
private var input: BufferedReader = _
// 只執(zhí)行一次,此處創(chuàng)建輸入流對象
override def open(fileSplit: FileInputSplit): Unit = {
val inputStream = new FileInputStream(this.getFilePath.getPath)
input = new BufferedReader(new InputStreamReader(inputStream))
}
// 判斷是否讀取到了文件尾部
override def reachedEnd(): Boolean = this.end
// 讀取下一行的操作邏輯
override def nextRecord(ot: String): String = {
val str: String = input.readLine()
if (str == null) {
this.end = true
""
} else str
}
}
2 流處理
flink流處理的數(shù)據(jù)源大致分為四種:集合數(shù)據(jù)源、文件數(shù)據(jù)源、套接字?jǐn)?shù)據(jù)源、自定義數(shù)據(jù)源。flink還原生給我們寫好了kafka數(shù)據(jù)源,這是實時流處理中使用的最頻繁沒有之一的數(shù)據(jù)源(下邊會單獨簡單展示一下)。
sequence
不用多說了吧,完全參考批處理中的sequence。
env.fromCollection(data: Iterable[T])
env.fromElements(data: T*)
env.fromParallelCollection(iterator: SplittableIterator[T])
file
一樣參考批處理中的flie。只是有一點兒不同的是流處理中readFile()還能監(jiān)視文件的變更狀況來進行更多的處理方式,如文件新增了記錄,可以重新處理文件或者直接退出。
env.readTextFile(filePath: String, charsetName: String)
env.readFile(
inputFormat: FileInputFormat[T],
filePath: String,
watchType: FileProcessingMode, // 監(jiān)視路徑并響應(yīng)新數(shù)據(jù),或處理一次并退出
interval: Long) // 間隔時間millis
socket
直接監(jiān)視指定機子socket端口的記錄。
env.socketTextStream(
hostname: String,
port: Int,
delimiter: Char = '\n',
maxRetry: Long = 0) // 如果端口監(jiān)聽中斷,最大重試間隔時間
自定義
當(dāng)上邊所有的數(shù)據(jù)源都滿足不了我們的場景需求時,我們可以通過繼承flink暴露的SourceFunction來實現(xiàn)自己的數(shù)據(jù)源(下文會展示自定義MySQL數(shù)據(jù)源)。
env.addSource(function: SourceContext[T] => Unit)
env.addSource(function: SourceFunction[T])
3 kafka connector
kafka connector是flink提供給我們的自定義連接器,可以直接實例化FlinkKafkaConsumer對象來消費kafka中記錄。
object FlinkStreamDataSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// kafka properties參數(shù)
val props = new Properties()
props.setProperty("bootstrap.servers", "172.16.57.101:9092,172.16.57.102:9092,172.16.57.103:9092")
props.setProperty("zookeeper.connect", "172.16.57.101:2181,172.16.57.102:2181,172.16.57.103:2181")
props.setProperty("group.id", "leslie")
props.setProperty("auto.offset.reset", "latest")
val original: DataStream[String] = env
.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), props))
original
.flatMap(_.split(","))
.map(_ + "_test")
.print()
env.execute("flink_streaming_data_source")
}
}

4 自定義DataSource
想要實現(xiàn)自定義的數(shù)據(jù)源十分簡單,只需繼承flink的SourceFunction接口并重寫其中的run(),cancel()方法。
object FlinkStreamCustomerDataSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 自定義dataSource
env.addSource(new MysqlDataSource).print()
env.execute("customer_date_source")
}
}
下文代碼MysqlDataSource類是繼承了RichSourceFunction類,RichSourceFunction實現(xiàn)SourceFunction接口并同時繼承AbstractRichFunction抽象類,AbstractRichFunction抽象類又實現(xiàn)RichFunction接口。為什么我們的自定義數(shù)據(jù)源要多繼承RichSourceFunction類呢?原因就在這個Rich上!我們將這樣帶有"Rich"前綴的函數(shù)類稱為富函數(shù),既然是富函數(shù)了那么一定是比普通的函數(shù)多給我們帶來一些功能。RichSourceFunction中的open(),close()就是“富”出來的方法,open()方法僅在函數(shù)類實例化的時候調(diào)用一次(通常用來建立連接),close()則是在實例對象銷毀前調(diào)用一次(通常用來關(guān)閉連接),可以避免重復(fù)進行創(chuàng)建連接銷毀連接操作。(當(dāng)然富函數(shù)不僅僅只“富”了這么一點點,還“富”出來運行時上下文,這可是個好東西。此處不擴展哦,以后用到再來討論)

我們需要進行獲取外部存儲組件數(shù)據(jù)的操作就在SourceFunction的run(),cancel()方法中實現(xiàn)。
class MysqlDataSource extends RichSourceFunction[String] {
private var pStmt: PreparedStatement = _
private var conn: Connection = _
// 開始方法,只執(zhí)行一次,建立和mysql的連接
override def open(parameters: Configuration): Unit = {
Class.forName("com.mysql.jdbc.Driver")
val url = "jdbc:mysql://localhost:3306/test_for_mysql?useSSL=false"
val username = "root"
val password = "123456"
conn = DriverManager.getConnection(url, username, password);
val sql =
"""
|select id, name, sex, age from user;
|""".stripMargin
pStmt = conn.prepareStatement(sql)
}
// 結(jié)束方法,只執(zhí)行一次,關(guān)閉連接
override def close(): Unit = {
// 關(guān)閉連接
if (pStmt != null) pStmt.close()
if (conn != null) conn.close()
}
// 主體執(zhí)行方法
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
val rs: ResultSet = pStmt.executeQuery()
while (rs.next()) {
val id: Int = rs.getInt("id")
val name: String = rs.getString("name")
val sex: String = rs.getString("sex")
val age: Int = rs.getInt("age")
ctx.collect(s"id: $id, name: $name, sex: $sex, age:$age") // 收集記錄到上下文中
}
}
override def cancel(): Unit = {}
}
