Parallel Execution
https://ci.apache.org/projects/flink/flink-docs-master/dev/parallel.html
配置Parallel
Operator Level
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = [...] DataStream<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1).setParallelism(5); wordCounts.print(); env.execute("Word Count Example");
Execution Environment Level
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3);
Client Level
在提交程序的時(shí)候設(shè)置
./bin/flink run -p 10 ../examples/WordCount-java.jar
System Level
setting the parallelism.default property in ./conf/flink-conf.yaml
坑
Using the parallelism provided by the remote cluster (16). To use another parallelism, set it at the ./bin/flink client.
這個(gè)通常表示,集群能夠提供的并行度沒有達(dá)到用戶設(shè)置的并行度
在運(yùn)行命令的時(shí)候 -yn 4 -ys 4 決定了程序的并行度。
最大并行度= container個(gè)數(shù) * 每個(gè)container的槽位
在程序中設(shè)置的并行度 parallelism 不能大于 最大并行度