本文基于Spark 2.1.0版本
新手首先要明白幾個配置:
spark.default.parallelism:(默認的并發(fā)數(shù))
? ? 如果配置文件spark-default.conf中沒有顯示的配置,則按照如下規(guī)則取值:
? ? 本地模式(不會啟動executor,由SparkSubmit進程生成指定數(shù)量的線程數(shù)來并發(fā)):
? ? spark-shell? ? ? ? ? ? ? ? ? ? ? ? ? ? ? spark.default.parallelism = 1
? ? spark-shell --master local[N] spark.default.parallelism = N (使用N個核)
? ? spark-shell --master local? ? ? spark.default.parallelism = 1
? ? 偽集群模式(x為本機上啟動的executor數(shù),y為每個executor使用的core數(shù),
z為每個 executor使用的內(nèi)存)
? ? ?spark-shell --master local-cluster[x,y,z] spark.default.parallelism = x * y
? ? ?mesos 細粒度模式
? ? ?Mesos fine grained mode? spark.default.parallelism = 8
? ? 其他模式(這里主要指yarn模式,當然standalone也是如此)
? ? Others: total number of cores on all executor nodes or 2, whichever is larger
? ? spark.default.parallelism =? max(所有executor使用的core總數(shù), 2)
經(jīng)過上面的規(guī)則,就能確定了spark.default.parallelism的默認值(前提是配置文件spark-default.conf中沒有顯示的配置,如果配置了,則spark.default.parallelism = 配置的值)
還有一個配置比較重要,spark.files.maxPartitionBytes = 128 M(默認)
The maximum number of bytes to pack into a single partition when reading files.
代表著rdd的一個分區(qū)能存放數(shù)據(jù)的最大字節(jié)數(shù),如果一個400m的文件,只分了兩個區(qū),則在action時會發(fā)生錯誤。
當一個spark應用程序執(zhí)行時,生成spark.context,同時會生成兩個參數(shù),由上面得到的spark.default.parallelism推導出這兩個參數(shù)的值
sc.defaultParallelism ? ? = spark.default.parallelism
sc.defaultMinPartitions = min(spark.default.parallelism,2)
當sc.defaultParallelism和sc.defaultMinPartitions最終確認后,就可以推算rdd的分區(qū)數(shù)了。
有兩種產(chǎn)生rdd的方式:
1,通過scala 集合方式parallelize生成rdd,
如, val rdd = sc.parallelize(1 to 10)
這種方式下,如果在parallelize操作時沒有指定分區(qū)數(shù),則
rdd的分區(qū)數(shù) = sc.defaultParallelism
2,通過textFile方式生成的rdd,
如, val rdd = sc.textFile(“path/file”)
有兩種情況:
a,從本地文件file:///生成的rdd,操作時如果沒有指定分區(qū)數(shù),則默認分區(qū)數(shù)規(guī)則為:
(按照官網(wǎng)的描述,本地file的分片規(guī)則,應該按照hdfs的block大小劃分,但實測的結果是固定按照32M來分片,可能是bug,不過不影響使用,因為spark能用所有hadoop接口支持的存儲系統(tǒng),所以spark textFile使用hadoop接口訪問本地文件時和訪問hdfs還是有區(qū)別的)
rdd的分區(qū)數(shù) = max(本地file的分片數(shù), sc.defaultMinPartitions)
b,從hdfs分布式文件系統(tǒng)hdfs://生成的rdd,操作時如果沒有指定分區(qū)數(shù),則默認分區(qū)數(shù)規(guī)則為:
rdd的分區(qū)數(shù) = max(hdfs文件的block數(shù)目, sc.defaultMinPartitions)
補充:
1,如果使用如下方式,從HBase的數(shù)據(jù)表轉(zhuǎn)換為RDD,則該RDD的分區(qū)數(shù)為該Table的region數(shù)。
String tableName ="pic_test2";
conf.set(TableInputFormat.INPUT_TABLE,tableName);
conf.set(TableInputFormat.SCAN,convertScanToString(scan));
JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(conf,
TableInputFormat.class,ImmutableBytesWritable.class,
Result.class);
Hbase Table:pic_test2的region為10,則hBaseRDD的分區(qū)數(shù)也為10。
2,如果使用如下方式,通過獲取json(或者parquet等等)文件轉(zhuǎn)換為DataFrame,則該DataFrame的分區(qū)數(shù)和該文件在文件系統(tǒng)中存放的Block數(shù)量對應。
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
people.json大小為300M,在HDFS中占用了2個blocks,則該DataFrame df分區(qū)數(shù)為2。
3,Spark Streaming獲取Kafka消息對應的分區(qū)數(shù),不在本文討論。
歡迎指正,轉(zhuǎn)載請標明作者和出處,謝謝。