Flink Examples:Batch examples
官網(wǎng)鏈接:https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/examples.html
以下示例程序展示了Flink的不同應(yīng)用程序,從簡單的字?jǐn)?shù)統(tǒng)計(jì)到圖形算法。代碼示例說明了Flink的DataSet API的使用。
運(yùn)行一個(gè)demo
最簡單的的demo就是直接運(yùn)行worldcount了。
./bin/start-cluster.sh
## 使用內(nèi)在文件
./bin/flink run ./examples/batch/WordCount.jar
## 可以添加參數(shù)
./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result
WordCount
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]): Unit = {
val params=ParameterTool.fromArgs(args)
val env=ExecutionEnvironment.getExecutionEnvironment
env.getConfig.setGlobalJobParameters(params)
val text=
if(params.has("input")){
env.readTextFile(params.get("input"))
}else{
println("Executing WordCount example with default input data set.")
println("Use --input to specify file input.")
env.readTextFile("/Users/lorenyplv/software/flink-1.6.0/README.txt")
}
val counts=text.flatMap{_.toLowerCase.split("\\W") filter{_.nonEmpty}}.map{(_,1)}
.groupBy(0)
.sum(1)
if(params.has("output")){
counts.writeAsCsv(params.get("output"), "\n", " ")
env.execute("Scala WordCount Example")
}else{
println("Printing result to stdout. Use --output to specify output path.")
counts.print()
}
}
}