https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html
創(chuàng)建Project
Use one of the following commands to create a project:
1.使用maven
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.3.2
2.使用qucikstart 腳本
$ curl https://flink.apache.org/q/quickstart.sh | bash
檢查Project
There will be a new directory in your working directory. If you’ve used the curl approach, the directory is called quickstart. Otherwise, it has the name of your artifactId:
$ tree quickstart/ quickstart/ ├── pom.xml └── src └── main ├── java │ └── org │ └── myorg │ └── quickstart │ ├── BatchJob.java │ ├── SocketTextStreamWordCount.java │ ├── StreamingJob.java │ └── WordCount.java └── resources └── log4j.properties
這個sample項目是使用maven project,它包含了4個class。StreamingJob 和 BatchJob是基本的骨架項目,SocketTextStreamWordCount 是一個工作的流式例子,WordCountJob 是一個批量例子。可以直接在在本地環(huán)境運行flink的example。
We recommend you import this project into your IDE to develop and test it. If you use Eclipse, the m2e plugin allows to import Maven projects. Some Eclipse bundles include that plugin by default, others require you to install it manually. The IntelliJ IDE supports Maven projects out of the box.
A note to Mac OS X users: The default JVM heapsize for Java is too small for Flink. You have to manually increase it. In Eclipse, chooseRun Configurations -> Arguments and write into the VM Arguments box: -Xmx800m.
編譯 Project
可以輸入命令 mvn clean install -Pbuild-jar ,就可以編譯一個好的jar包在 target/original-your-artifact-id-your-version.jar,這個是沒有依賴的thin jar包,如果需要fat jar包arget/your-artifact-id-your-version.jar 。(fat jar包是指所有的依賴也包含在里面)
下一步
編寫你的應用
The quickstart project contains a WordCount implementation, the “Hello World” of Big Data processing systems. The goal of WordCount is to determine the frequencies of words in a text, e.g., how often do the terms “the” or “house” occur in all Wikipedia texts.
開始項目包含一個 wordcount的實現(xiàn),這相當于大數(shù)據(jù)處理領域的“hello world”。wordcount的目的是計算一個文本中單次的頻率。比如計算 “the” 或者 “house” 出現(xiàn)在Wikipedia texts的頻率
比如:
Sample Input:
big data is big
Sample Output:
big 2 data 1 is 1
下面的code展示了wordcount的實現(xiàn)處理每行數(shù)據(jù)包含兩個操作((a FlatMap and a Reduce operation 通過聚合求 sum),然后把 結(jié)果單詞 和 次數(shù) 輸出
public class WordCount {
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new LineSplitter())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// execute and print result
counts.print();
}
}
The operations are defined by specialized classes, here the LineSplitter class.
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}