spark jdbc(mysql) 讀取并發(fā)度優(yōu)化

很多人在spark中使用默認提供的jdbc方法時,在數(shù)據(jù)庫數(shù)據(jù)較大時經(jīng)常發(fā)現(xiàn)任務 hang 住,其實是單線程任務過重導致,這時候需要提高讀取的并發(fā)度。
下文以 mysql 為例進行說明。

在spark中使用jdbc

spark-env.sh 文件中加入:

export SPARK_CLASSPATH=/path/mysql-connector-java-5.1.34.jar

任務提交時加入:

--jars /path/mysql-connector-java-5.1.34.jar

1. 單partition(無并發(fā))

調(diào)用函數(shù)

def jdbc(url: String, table: String, properties: Properties): DataFrame

使用:

val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"

// 設置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")

// 取得該表數(shù)據(jù)
val jdbcDF = sqlContext.read.jdbc(url,tableName,prop)

// 一些操作
....

查看并發(fā)度

jdbcDF.rdd.partitions.size # 結果返回 1

該操作的并發(fā)度為1,你所有的數(shù)據(jù)都會在一個partition中進行操作,意味著無論你給的資源有多少,只有一個task會執(zhí)行任務,執(zhí)行效率可想而之,并且在稍微大點的表中進行操作分分鐘就會OOM。

更直觀的說法是,達到千萬級別的表就不要使用該操作,count操作就要等一萬年,no zuo no die ,don't to try !

WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 56, spark047219):
 java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3380)

2. 根據(jù)Long類型字段分區(qū)

調(diào)用函數(shù)

  def jdbc(
  url: String,
  table: String,
  columnName: String,    # 根據(jù)該字段分區(qū),需要為整形,比如id等
  lowerBound: Long,      # 分區(qū)的下界
  upperBound: Long,      # 分區(qū)的上界
  numPartitions: Int,    # 分區(qū)的個數(shù)
  connectionProperties: Properties): DataFrame

使用:

val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"

val columnName = "colName"
val lowerBound = 1,
val upperBound = 10000000,
val numPartitions = 10,

// 設置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")

// 取得該表數(shù)據(jù)
val jdbcDF = sqlContext.read.jdbc(url,tableName,columnName,lowerBound,upperBound,numPartitions,prop)

// 一些操作
....

查看并發(fā)度

jdbcDF.rdd.partitions.size # 結果返回 10

該操作將字段 colName 中1-10000000條數(shù)據(jù)分到10個partition中,使用很方便,缺點也很明顯,只能使用整形數(shù)據(jù)字段作為分區(qū)關鍵字。

3000w數(shù)據(jù)的表 count 跨集群操作只要2s。

3. 根據(jù)任意類型字段分區(qū)

調(diào)用函數(shù)

jdbc(
  url: String,
  table: String,
  predicates: Array[String],
  connectionProperties: Properties): DataFrame

下面以使用最多的時間字段分區(qū)為例:

val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"

// 設置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")

/**
* 將9月16-12月15三個月的數(shù)據(jù)取出,按時間分為6個partition
* 為了減少事例代碼,這里的時間都是寫死的
* modified_time 為時間字段
*/

   
val predicates =
    Array(
      "2015-09-16" -> "2015-09-30",
      "2015-10-01" -> "2015-10-15",
      "2015-10-16" -> "2015-10-31",
      "2015-11-01" -> "2015-11-14",
      "2015-11-15" -> "2015-11-30",
      "2015-12-01" -> "2015-12-15"
    ).map {
      case (start, end) =>
        s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time as date) <= date '$end'"
    }

// 取得該表數(shù)據(jù)
val jdbcDF = sqlContext.read.jdbc(url,tableName,predicates,prop)

// 一些操作
....

查看并發(fā)度

jdbcDF.rdd.partitions.size # 結果返回 6

該操作的每個分區(qū)數(shù)據(jù)都由該段時間的分區(qū)組成,這種方式適合各種場景,較為推薦。

結語

mysql 3000W 數(shù)據(jù)量表為例,單分區(qū)count,僵死若干分鐘報OOM。

分成5-20個分區(qū)后,count 操作只需要 2s

高并發(fā)度可以大幅度提高讀取以及處理數(shù)據(jù)的速度,但是如果設置過高(大量的partition同時讀取)也可能會將數(shù)據(jù)源數(shù)據(jù)庫弄掛。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容