很多人在spark中使用默認提供的jdbc方法時,在數(shù)據(jù)庫數(shù)據(jù)較大時經(jīng)常發(fā)現(xiàn)任務 hang 住,其實是單線程任務過重導致,這時候需要提高讀取的并發(fā)度。
下文以mysql為例進行說明。
在spark中使用jdbc
在 spark-env.sh 文件中加入:
export SPARK_CLASSPATH=/path/mysql-connector-java-5.1.34.jar
任務提交時加入:
--jars /path/mysql-connector-java-5.1.34.jar
1. 單partition(無并發(fā))
調(diào)用函數(shù)
def jdbc(url: String, table: String, properties: Properties): DataFrame
使用:
val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"
// 設置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")
// 取得該表數(shù)據(jù)
val jdbcDF = sqlContext.read.jdbc(url,tableName,prop)
// 一些操作
....
查看并發(fā)度
jdbcDF.rdd.partitions.size # 結果返回 1
該操作的并發(fā)度為1,你所有的數(shù)據(jù)都會在一個partition中進行操作,意味著無論你給的資源有多少,只有一個task會執(zhí)行任務,執(zhí)行效率可想而之,并且在稍微大點的表中進行操作分分鐘就會OOM。
更直觀的說法是,達到千萬級別的表就不要使用該操作,count操作就要等一萬年,no zuo no die ,don't to try !
WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 56, spark047219):
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3380)
2. 根據(jù)Long類型字段分區(qū)
調(diào)用函數(shù)
def jdbc(
url: String,
table: String,
columnName: String, # 根據(jù)該字段分區(qū),需要為整形,比如id等
lowerBound: Long, # 分區(qū)的下界
upperBound: Long, # 分區(qū)的上界
numPartitions: Int, # 分區(qū)的個數(shù)
connectionProperties: Properties): DataFrame
使用:
val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"
val columnName = "colName"
val lowerBound = 1,
val upperBound = 10000000,
val numPartitions = 10,
// 設置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")
// 取得該表數(shù)據(jù)
val jdbcDF = sqlContext.read.jdbc(url,tableName,columnName,lowerBound,upperBound,numPartitions,prop)
// 一些操作
....
查看并發(fā)度
jdbcDF.rdd.partitions.size # 結果返回 10
該操作將字段 colName 中1-10000000條數(shù)據(jù)分到10個partition中,使用很方便,缺點也很明顯,只能使用整形數(shù)據(jù)字段作為分區(qū)關鍵字。
3000w數(shù)據(jù)的表 count 跨集群操作只要2s。
3. 根據(jù)任意類型字段分區(qū)
調(diào)用函數(shù)
jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
下面以使用最多的時間字段分區(qū)為例:
val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"
// 設置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")
/**
* 將9月16-12月15三個月的數(shù)據(jù)取出,按時間分為6個partition
* 為了減少事例代碼,這里的時間都是寫死的
* modified_time 為時間字段
*/
val predicates =
Array(
"2015-09-16" -> "2015-09-30",
"2015-10-01" -> "2015-10-15",
"2015-10-16" -> "2015-10-31",
"2015-11-01" -> "2015-11-14",
"2015-11-15" -> "2015-11-30",
"2015-12-01" -> "2015-12-15"
).map {
case (start, end) =>
s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time as date) <= date '$end'"
}
// 取得該表數(shù)據(jù)
val jdbcDF = sqlContext.read.jdbc(url,tableName,predicates,prop)
// 一些操作
....
查看并發(fā)度
jdbcDF.rdd.partitions.size # 結果返回 6
該操作的每個分區(qū)數(shù)據(jù)都由該段時間的分區(qū)組成,這種方式適合各種場景,較為推薦。
結語
以
mysql3000W 數(shù)據(jù)量表為例,單分區(qū)count,僵死若干分鐘報OOM。
分成5-20個分區(qū)后,
count操作只需要2s
高并發(fā)度可以大幅度提高讀取以及處理數(shù)據(jù)的速度,但是如果設置過高(大量的partition同時讀取)也可能會將數(shù)據(jù)源數(shù)據(jù)庫弄掛。