Flink提供了一個(gè)分布式緩存,類似于Apache Hadoop,可以在本地訪問用戶函數(shù)的并行實(shí)例。此函數(shù)可用于共享包含靜態(tài)外部數(shù)據(jù)的文件,如字典或機(jī)器學(xué)習(xí)的回歸模型。
緩存的工作原理如下。程序在其作為緩存文件的特定名稱下注冊(cè)本地或遠(yuǎn)程文件系統(tǒng)(如HDFS或S3)的文件或目錄ExecutionEnvironment。執(zhí)行程序時(shí),F(xiàn)link會(huì)自動(dòng)將文件或目錄復(fù)制到所有工作程序的本地文件系統(tǒng)。用戶函數(shù)可以查找指定名稱下的文件或目錄,并從worker的本地文件系統(tǒng)訪問它。
其實(shí)分布式緩存就相當(dāng)于spark的廣播,把一個(gè)變量廣播到所有的executor上,也可以看做是Flink的廣播流,只不過這里廣播的是一個(gè)文件.
分布式緩存使用如下:
注冊(cè)中的文件或目錄ExecutionEnvironment。
val env = ExecutionEnvironment.getExecutionEnvironment
// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()
訪問用戶函數(shù)中的緩存文件或目錄(此處為a MapFunction)。該函數(shù)必須擴(kuò)展RichFunction類,因?yàn)樗枰L問RuntimeContext。
// extend a RichFunction to have access to the RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {
override def open(config: Configuration): Unit = {
// access cached file via RuntimeContext and DistributedCache
val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
// read the file (or navigate the directory)
...
}
override def map(value: String): Int = {
// use content of cached file
...
}
}
下面給出一個(gè)完整的demo,計(jì)算存在于緩存文件中的單詞出現(xiàn)的次數(shù),看下面的代碼
package flink.cache
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.io.Source
import org.apache.flink.api.scala._
object FlinkCacheDemo {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
// 注冊(cè)緩存的文件,里面有數(shù)據(jù)hello jason
env.registerCachedFile("D:/test.txt", "testfile")
val stream = env.fromElements("hello", "jason", "hello", "jim")
val result = stream
.flatMap(_.split(","))
.map(new RichMapFunction[String, String] {
var list: List[(String)] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 獲取緩存的數(shù)據(jù)
val file = getRuntimeContext.getDistributedCache.getFile("testfile")
val lines = Source.fromFile(file.getAbsoluteFile).getLines()
list = lines.toList
}
override def map(value: String): String = {
var middle: String = ""
if(list(0).contains(value)) {
middle = value
}
middle
}
})
.map((_,1L))
.filter(_._1.nonEmpty)
.groupBy(0)
.sum(1)
.print()
}
}
運(yùn)行代碼輸出的結(jié)果是,因?yàn)閖im不在緩存的文件中,被過濾掉了
(hello,2)
(jason,1)
如果有寫的不對(duì)的地方,歡迎大家指正,如果有什么疑問,可以加QQ群:340297350,更多的Flink和spark的干貨可以加入下面的星球
