使用spark自帶的上下界限來分區(qū)的不均勻性導(dǎo)致傳輸慢(木桶效應(yīng)):
scala> a.split("\\n").map(x=>x.toInt)
res25: Array[Int] = Array(123447, 154643, 30561, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 216305, 114099, 254177, 5186719, 46387, 116380, 197942, 224119, 254281, 254261, 131158, 145298, 0, 174433, 187171, 58068, 77121, 45497, 144967)
scala> a.split("\\n").map(x=>x.toInt).sum
res26: Int = 8137034
scala> a.split("\\n").map(x=>x.toInt).max
res27: Int = 5186719
scala> 8137034/32
res58: Int = 254282
oracle結(jié)合分頁查詢防數(shù)據(jù)傳輸傾斜:
def query(index:Int,interval:Int):String={val basic = "( select a.*,rownum as rn from EPM_XJ.C_CONS a ) b "; val condition = " where b.rn between " + ((index-1)*interval +1) + " AND " + (index)*interval;"( select * from " + basic+condition+" ) c"
}
import org.apache.spark.sql.DataFrame
def unionTableReducer:(DataFrame,DataFrame)=>DataFrame=(x:DataFrame,y:DataFrame)=>x.union(y)
下面這種寫法基本是串行的沒有充分利用集群的處理能力,但是感覺要是配上jdbc連接池以及分頁,威力應(yīng)該不錯(cuò):
val jdbcDF =(1 to 33).map(index => {val hehe = spark.read.format("jdbc").options( Map("url" -> "jdbc:oracle:thin:username/password@//192.168.0.89:1521/epps", "dbtable" -> query(index,254282) ,"driver" -> "oracle.jdbc.driver.OracleDriver")).load();hehe.write.parquet("C_CONS_hahaha/"+index)})
目前而言調(diào)用一個(gè)map-reduce是最快的 32個(gè)partition只要 1.9 min 800w數(shù)據(jù)
val jdbcDF =(1 to 33).map(index => {spark.read.format("jdbc").options( Map("url" -> "jdbc:oracle:thin:username/password@//192.168.0.89:1521/epps", "dbtable" -> query(index,254282) ,"driver" -> "oracle.jdbc.driver.OracleDriver")).load()}).reduce(unionTableReducer).write.parquet("C_CONS_hahaha")
spark-standalone基本配置:
--num-executors 3 --executor-cores 4 --executor-memory 5G --driver-cores 3 --driver-memory 4G --conf spark.default.parallelism=32
實(shí)驗(yàn)結(jié)果:
C_CONS 615M數(shù)據(jù) 約1.9min 32片 不指定fetchsize
C_CONS 615M數(shù)據(jù) 約1.1min 16片 fetchsize:100
C_CONS 615M數(shù)據(jù) 約1.1min 16片 fetchsize:150
C_CONS 615M數(shù)據(jù) 約1min 16片 fetchsize:200
C_CONS 615M數(shù)據(jù) 約1.1min 16片 fetchsize:400
C_CONS 615M數(shù)據(jù) 約53s 8片 fetchsize:400
C_CONS 615M數(shù)據(jù) 約56s 8片 fetchsize:200
C_CONS 615M數(shù)據(jù) 約48s 9片 fetchsize:600
C_CONS 615M數(shù)據(jù) 約48s 9片 fetchsize:200
C_CONS 615M數(shù)據(jù) 約1.3min 3片 fetchsize:400 (num_worker_machine 3)
C_CONS 615M數(shù)據(jù) 約43s 12片 fetchsize:100 (num_total_cores 12=3*4)
C_CONS 615M數(shù)據(jù) 約41s 12片 fetchsize:200 (num_total_cores 12=3*4)
C_CONS 615M數(shù)據(jù) 約41s 12片 fetchsize:800 (num_total_cores 12=3*4)
C_CONS 615M數(shù)據(jù) 約41s 12片 fetchsize:1600 (num_total_cores 12=3*4)
基本推斷:
對(duì)一個(gè)表的傳輸分片數(shù)接近c(diǎn)ores個(gè)數(shù)為宜
也就是說1T的數(shù)據(jù)一天就可以拿出來傳輸?shù)絟dfs上!(1000*1025/(615/41)/3600約19個(gè)小時(shí))
加機(jī)器(cores)然后再試一試!
Append Test_1:
配置修改如下之后:
--num-executors 6 --executor-cores 2 --executor-memory 3G --driver-cores 3 --driver-memory 4G
做如下運(yùn)行:
val jdbcDF =(1 to 13).map(index => {spark.read.format("jdbc").options( Map("url" -> "jdbc:oracle:thin:username/password@//192.168.0.89:1521/epps", "dbtable" -> query(index,678086) ,"driver" -> "oracle.jdbc.driver.OracleDriver")).load()}).reduce(unionTableReducer).write.parquet("C_CONS_hahaha_13")
結(jié)果:
C_CONS 615M數(shù)據(jù) 約48s 9片 fetchsize:200