Flink的分布式緩存

Flink提供了一個(gè)分布式緩存,類似于Apache Hadoop,可以在本地訪問用戶函數(shù)的并行實(shí)例。此函數(shù)可用于共享包含靜態(tài)外部數(shù)據(jù)的文件,如字典或機(jī)器學(xué)習(xí)的回歸模型。
緩存的工作原理如下。程序在其作為緩存文件的特定名稱下注冊(cè)本地或遠(yuǎn)程文件系統(tǒng)(如HDFS或S3)的文件或目錄ExecutionEnvironment。執(zhí)行程序時(shí),F(xiàn)link會(huì)自動(dòng)將文件或目錄復(fù)制到所有工作程序的本地文件系統(tǒng)。用戶函數(shù)可以查找指定名稱下的文件或目錄,并從worker的本地文件系統(tǒng)訪問它。
其實(shí)分布式緩存就相當(dāng)于spark的廣播,把一個(gè)變量廣播到所有的executor上,也可以看做是Flink的廣播流,只不過這里廣播的是一個(gè)文件.
分布式緩存使用如下:
注冊(cè)中的文件或目錄ExecutionEnvironment。

val env = ExecutionEnvironment.getExecutionEnvironment

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()

訪問用戶函數(shù)中的緩存文件或目錄(此處為a MapFunction)。該函數(shù)必須擴(kuò)展RichFunction類,因?yàn)樗枰L問RuntimeContext。


// extend a RichFunction to have access to the RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {

  override def open(config: Configuration): Unit = {

    // access cached file via RuntimeContext and DistributedCache
    val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
    // read the file (or navigate the directory)
    ...
  }

  override def map(value: String): Int = {
    // use content of cached file
    ...
  }
}

下面給出一個(gè)完整的demo,計(jì)算存在于緩存文件中的單詞出現(xiàn)的次數(shù),看下面的代碼


package flink.cache

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.io.Source
import org.apache.flink.api.scala._

object FlinkCacheDemo {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(3)
    // 注冊(cè)緩存的文件,里面有數(shù)據(jù)hello jason
    env.registerCachedFile("D:/test.txt", "testfile")
    val stream = env.fromElements("hello", "jason", "hello", "jim")
    val result = stream
      .flatMap(_.split(","))
      .map(new RichMapFunction[String, String] {
        var list: List[(String)] = _
        override def open(parameters: Configuration): Unit = {
          super.open(parameters)
          // 獲取緩存的數(shù)據(jù)
          val file = getRuntimeContext.getDistributedCache.getFile("testfile")
          val lines = Source.fromFile(file.getAbsoluteFile).getLines()
          list = lines.toList
        }
        override def map(value: String): String = {
          var middle: String = ""
          if(list(0).contains(value)) {
            middle = value
          }
          middle
        }
      })
      .map((_,1L))
      .filter(_._1.nonEmpty)
      .groupBy(0)
      .sum(1)
      .print()
  }
}

運(yùn)行代碼輸出的結(jié)果是,因?yàn)閖im不在緩存的文件中,被過濾掉了
(hello,2)
(jason,1)

如果有寫的不對(duì)的地方,歡迎大家指正,如果有什么疑問,可以加QQ群:340297350,更多的Flink和spark的干貨可以加入下面的星球


image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容