spark讀取oracle數(shù)據(jù)調(diào)優(yōu)

使用spark自帶的上下界限來分區(qū)的不均勻性導(dǎo)致傳輸慢(木桶效應(yīng)):


scala> a.split("\\n").map(x=>x.toInt)

res25: Array[Int] = Array(123447, 154643, 30561, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 216305, 114099, 254177, 5186719, 46387, 116380, 197942, 224119, 254281, 254261, 131158, 145298, 0, 174433, 187171, 58068, 77121, 45497, 144967)

scala> a.split("\\n").map(x=>x.toInt).sum

res26: Int = 8137034

scala> a.split("\\n").map(x=>x.toInt).max

res27: Int = 5186719

scala> 8137034/32

res58: Int = 254282

oracle結(jié)合分頁查詢防數(shù)據(jù)傳輸傾斜:


def query(index:Int,interval:Int):String={val basic =  "( select a.*,rownum as rn from EPM_XJ.C_CONS a ) b "; val condition =  " where  b.rn between " + ((index-1)*interval +1) + " AND " + (index)*interval;"( select * from " + basic+condition+" ) c"

}

import org.apache.spark.sql.DataFrame

def unionTableReducer:(DataFrame,DataFrame)=>DataFrame=(x:DataFrame,y:DataFrame)=>x.union(y)

下面這種寫法基本是串行的沒有充分利用集群的處理能力,但是感覺要是配上jdbc連接池以及分頁,威力應(yīng)該不錯(cuò):

val jdbcDF =(1 to 33).map(index => {val hehe = spark.read.format("jdbc").options( Map("url" -> "jdbc:oracle:thin:username/password@//192.168.0.89:1521/epps", "dbtable" -> query(index,254282) ,"driver" -> "oracle.jdbc.driver.OracleDriver")).load();hehe.write.parquet("C_CONS_hahaha/"+index)})

目前而言調(diào)用一個(gè)map-reduce是最快的 32個(gè)partition只要 1.9 min 800w數(shù)據(jù)

val jdbcDF =(1 to 33).map(index => {spark.read.format("jdbc").options( Map("url" -> "jdbc:oracle:thin:username/password@//192.168.0.89:1521/epps", "dbtable" -> query(index,254282) ,"driver" -> "oracle.jdbc.driver.OracleDriver")).load()}).reduce(unionTableReducer).write.parquet("C_CONS_hahaha")

spark-standalone基本配置:

--num-executors 3  --executor-cores 4  --executor-memory 5G --driver-cores 3  --driver-memory  4G  --conf spark.default.parallelism=32

實(shí)驗(yàn)結(jié)果:

C_CONS 615M數(shù)據(jù) 約1.9min 32片 不指定fetchsize

C_CONS 615M數(shù)據(jù) 約1.1min 16片 fetchsize:100

C_CONS 615M數(shù)據(jù) 約1.1min 16片 fetchsize:150

C_CONS 615M數(shù)據(jù) 約1min 16片 fetchsize:200

C_CONS 615M數(shù)據(jù) 約1.1min 16片 fetchsize:400

C_CONS 615M數(shù)據(jù) 約53s 8片 fetchsize:400

C_CONS 615M數(shù)據(jù) 約56s 8片 fetchsize:200

C_CONS 615M數(shù)據(jù) 約48s 9片 fetchsize:600

C_CONS 615M數(shù)據(jù) 約48s 9片 fetchsize:200

C_CONS 615M數(shù)據(jù) 約1.3min 3片 fetchsize:400 (num_worker_machine 3)

C_CONS 615M數(shù)據(jù) 約43s 12片 fetchsize:100 (num_total_cores 12=3*4)

C_CONS 615M數(shù)據(jù) 約41s 12片 fetchsize:200 (num_total_cores 12=3*4)

C_CONS 615M數(shù)據(jù) 約41s 12片 fetchsize:800 (num_total_cores 12=3*4)

C_CONS 615M數(shù)據(jù) 約41s 12片 fetchsize:1600 (num_total_cores 12=3*4)

基本推斷:

對(duì)一個(gè)表的傳輸分片數(shù)接近c(diǎn)ores個(gè)數(shù)為宜

也就是說1T的數(shù)據(jù)一天就可以拿出來傳輸?shù)絟dfs上!(1000*1025/(615/41)/3600約19個(gè)小時(shí))

加機(jī)器(cores)然后再試一試!

Append Test_1:

配置修改如下之后:

--num-executors 6  --executor-cores 2  --executor-memory 3G --driver-cores 3  --driver-memory  4G

做如下運(yùn)行:

val jdbcDF =(1 to 13).map(index => {spark.read.format("jdbc").options( Map("url" -> "jdbc:oracle:thin:username/password@//192.168.0.89:1521/epps", "dbtable" -> query(index,678086) ,"driver" -> "oracle.jdbc.driver.OracleDriver")).load()}).reduce(unionTableReducer).write.parquet("C_CONS_hahaha_13")

結(jié)果:

C_CONS 615M數(shù)據(jù) 約48s 9片 fetchsize:200
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容