Flink DataSouce

flink流處理和批處理都內(nèi)置了很多數(shù)據(jù)源,可以滿足我們大部分使用場景,當(dāng)然也可以通過實現(xiàn)flink提供的接口來實現(xiàn)其他數(shù)據(jù)源的接入。接下來我們就分別來了解下flink批處理和流處理的數(shù)據(jù)源吧。

1 批處理

批處理的數(shù)據(jù)源主要大致分為兩類:集合數(shù)據(jù)源、文件數(shù)據(jù)源。

sequence

集合數(shù)據(jù)源之迭代類。平時我們所使用到的scala中Seq(),Array(),List(),Map(),Set()等都是迭代類。

    env.fromCollection(data: Iterable[T])
    // 示例
    env.fromCollection(Seq(1, 2, 3))
    env.fromCollection(Array(1, 2, 3, 4, 5))
    env.fromCollection(List(1, 2, 3, 4, 5))
    env.fromCollection(Map("a" -> 1, "b" -> 2, "c" -> 3))
    env.fromCollection(Set(3, 5, 6))

    // 并行版本
    env.fromParallelCollection(iterator: SplittableIterator[T])
    // 示例
    env.fromParallelCollection(new NumberSequenceIterator(3, 19))
01_parallel_collection.png

集合類型源之元素數(shù)組。類似于java中的可變參數(shù)列表。

    env.fromElements(data: T*)
    // 示例
    env.fromElements(1, 2, 3)
    env.fromElements("a", "b", "c")

file

一般來說,我們的批處理程序的數(shù)據(jù)源主要都是文件數(shù)據(jù)源,包括本地文件系統(tǒng)中的和分布式文件系統(tǒng)中的文件。使用頻次比較高的是readTextFile()和readCsvFile(),當(dāng)然也可以通過重寫readFlie(new FileInputStream[T], filePath: String)中的FileInputFormat來讀取不同格式文件。

    env.readFile(new FileInputStream[T], filePath: String)
    env.readTextFile(filePath: String, charsetName: String = "UTF-8")
    env.readTextFileWithValue(filePath: String, charsetName: String = "UTF-8")
    env.readFileOfPrimitives(filePath: String, delimiter: String = "\n")
    env.readCsvFile(
      filePath: String,                 // csv文件地址
      lineDelimiter: String = "\n",     // 行劃分符號
      fieldDelimiter: String = ",",     // 字段劃分符號
      quoteCharacter: Character = null,
      ignoreFirstLine: Boolean = false, // 忽略首行
      ignoreComments: String = null,
      lenient: Boolean = false,         // 是否對數(shù)據(jù)嚴(yán)格判斷。false表示嚴(yán)格判斷,缺失的數(shù)據(jù)則會被忽略
      includedFields: Array[Int] = null,
      pojoFields: Array[String] = null
    )

readFile,自定義讀取文件內(nèi)容的邏輯

    env.readFile(new MyFileInputFormat[String](), getPath("words.txt"))

// 重寫讀取文件的邏輯
class MyFileInputFormat extends FileInputFormat[String]() {
  private var end = false
  private var input: BufferedReader = _

  // 只執(zhí)行一次,此處創(chuàng)建輸入流對象
  override def open(fileSplit: FileInputSplit): Unit = {
    val inputStream = new FileInputStream(this.getFilePath.getPath)
    input = new BufferedReader(new InputStreamReader(inputStream))
  }

  // 判斷是否讀取到了文件尾部
  override def reachedEnd(): Boolean = this.end

  // 讀取下一行的操作邏輯
  override def nextRecord(ot: String): String = {
    val str: String = input.readLine()
    if (str == null) {
      this.end = true
      ""
    } else str
  }
}

2 流處理

flink流處理的數(shù)據(jù)源大致分為四種:集合數(shù)據(jù)源、文件數(shù)據(jù)源、套接字?jǐn)?shù)據(jù)源、自定義數(shù)據(jù)源。flink還原生給我們寫好了kafka數(shù)據(jù)源,這是實時流處理中使用的最頻繁沒有之一的數(shù)據(jù)源(下邊會單獨簡單展示一下)。

sequence

不用多說了吧,完全參考批處理中的sequence。

    env.fromCollection(data: Iterable[T])
    env.fromElements(data: T*)
    env.fromParallelCollection(iterator: SplittableIterator[T])

file

一樣參考批處理中的flie。只是有一點兒不同的是流處理中readFile()還能監(jiān)視文件的變更狀況來進行更多的處理方式,如文件新增了記錄,可以重新處理文件或者直接退出。

    env.readTextFile(filePath: String, charsetName: String)
    env.readFile(
      inputFormat: FileInputFormat[T],
      filePath: String,
      watchType: FileProcessingMode,    // 監(jiān)視路徑并響應(yīng)新數(shù)據(jù),或處理一次并退出
      interval: Long)                   // 間隔時間millis

socket

直接監(jiān)視指定機子socket端口的記錄。

    env.socketTextStream(
      hostname: String, 
      port: Int, 
      delimiter: Char = '\n', 
      maxRetry: Long = 0)               // 如果端口監(jiān)聽中斷,最大重試間隔時間

自定義

當(dāng)上邊所有的數(shù)據(jù)源都滿足不了我們的場景需求時,我們可以通過繼承flink暴露的SourceFunction來實現(xiàn)自己的數(shù)據(jù)源(下文會展示自定義MySQL數(shù)據(jù)源)。

    env.addSource(function: SourceContext[T] => Unit)
    env.addSource(function: SourceFunction[T])

3 kafka connector

kafka connector是flink提供給我們的自定義連接器,可以直接實例化FlinkKafkaConsumer對象來消費kafka中記錄。

object FlinkStreamDataSource {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // kafka properties參數(shù)
    val props = new Properties()
    props.setProperty("bootstrap.servers", "172.16.57.101:9092,172.16.57.102:9092,172.16.57.103:9092")
    props.setProperty("zookeeper.connect", "172.16.57.101:2181,172.16.57.102:2181,172.16.57.103:2181")
    props.setProperty("group.id", "leslie")
    props.setProperty("auto.offset.reset", "latest")

    val original: DataStream[String] = env
      .addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), props))

    original
      .flatMap(_.split(","))
      .map(_ + "_test")
      .print()

    env.execute("flink_streaming_data_source")
  }
}
01_datasource_kafka.png

4 自定義DataSource

想要實現(xiàn)自定義的數(shù)據(jù)源十分簡單,只需繼承flink的SourceFunction接口并重寫其中的run(),cancel()方法。

object FlinkStreamCustomerDataSource {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 自定義dataSource
    env.addSource(new MysqlDataSource).print()

    env.execute("customer_date_source")
  }
}

下文代碼MysqlDataSource類是繼承了RichSourceFunction類,RichSourceFunction實現(xiàn)SourceFunction接口并同時繼承AbstractRichFunction抽象類,AbstractRichFunction抽象類又實現(xiàn)RichFunction接口。為什么我們的自定義數(shù)據(jù)源要多繼承RichSourceFunction類呢?原因就在這個Rich上!我們將這樣帶有"Rich"前綴的函數(shù)類稱為富函數(shù),既然是富函數(shù)了那么一定是比普通的函數(shù)多給我們帶來一些功能。RichSourceFunction中的open(),close()就是“富”出來的方法,open()方法僅在函數(shù)類實例化的時候調(diào)用一次(通常用來建立連接),close()則是在實例對象銷毀前調(diào)用一次(通常用來關(guān)閉連接),可以避免重復(fù)進行創(chuàng)建連接銷毀連接操作。(當(dāng)然富函數(shù)不僅僅只“富”了這么一點點,還“富”出來運行時上下文,這可是個好東西。此處不擴展哦,以后用到再來討論)

02_rich_function.png

我們需要進行獲取外部存儲組件數(shù)據(jù)的操作就在SourceFunction的run(),cancel()方法中實現(xiàn)。

class MysqlDataSource extends RichSourceFunction[String] {
  private var pStmt: PreparedStatement = _
  private var conn: Connection = _

  // 開始方法,只執(zhí)行一次,建立和mysql的連接
  override def open(parameters: Configuration): Unit = {
    Class.forName("com.mysql.jdbc.Driver")
    val url = "jdbc:mysql://localhost:3306/test_for_mysql?useSSL=false"
    val username = "root"
    val password = "123456"
    conn = DriverManager.getConnection(url, username, password);
    val sql =
      """
        |select id, name, sex, age from user;
        |""".stripMargin
    pStmt = conn.prepareStatement(sql)
  }

  // 結(jié)束方法,只執(zhí)行一次,關(guān)閉連接
  override def close(): Unit = {
    // 關(guān)閉連接
    if (pStmt != null) pStmt.close()
    if (conn != null) conn.close()
  }

  // 主體執(zhí)行方法
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    val rs: ResultSet = pStmt.executeQuery()
    while (rs.next()) {
      val id: Int = rs.getInt("id")
      val name: String = rs.getString("name")
      val sex: String = rs.getString("sex")
      val age: Int = rs.getInt("age")

      ctx.collect(s"id: $id, name: $name, sex: $sex, age:$age") // 收集記錄到上下文中
    }
  }
  override def cancel(): Unit = {}
}
02_customer_mysql.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 不是悲傷,是惶惑 也有懷疑自己的信仰 當(dāng)被所有人都遺忘,即便你是神靈 也有死亡。 再看看這閃頻的光有多少是真正的明...
    芝麻麗娜閱讀 136評論 0 0
  • 好久沒有提筆了,有些生疏。 孩子們都生病了,我也有些焦慮,第一次和大寶分開這么久,身體特別疲憊,卻沒有睡意,有時候...
    夏如虹閱讀 110評論 0 1
  • 秘密 你確認(rèn)在你說出來之前你仍是它的主人? 香煙 你確認(rèn)在你拆開之前你是第一個染指的? 而立之年我想要的只不過是生...
    春雨欲來花滿樓閱讀 269評論 0 0
  • 【前言】 本文作者橘子精,號稱詩劍雙絕。 試過肉裝蘇烈,明顯不夠強勢。推人有項羽,回血有程咬金,所以推出一個特色蘇...
    貪玩橘子精閱讀 1,526評論 4 2
  • 今天老師教我們做手抄報,主題是安全出行。做手抄報共有九個要素,1、主角,2、配角,3、主標(biāo)題,4、小標(biāo)題,5...
    王劉一桐閱讀 1,223評論 1 5

友情鏈接更多精彩內(nèi)容