Structured Streaming自定義MySQLSink-威力增強(qiáng)版

之前使用過foreach單條處理的MySQLSink,可能導(dǎo)致連續(xù)開關(guān)連接,性能較差,故通過prepareStatement的addBatch批量處理數(shù)據(jù)。

class MySQLBatchSink(urlName: String, properties: Properties,
                     tableName: String,
                     fieldNames: Array[String],
                     sqlType: SqlType.Value) extends ForeachWriter[Row]() with Logging {
  Class.forName(properties.getProperty("driver"))
  var conn: Connection = _
  var ps: PreparedStatement = _

  val values: String = fieldNames.map(_ => "?").mkString(",")
  val sqlStr: String = sqlType match {
    case SqlType.Replace => s"REPLACE INTO `$tableName` (`${fieldNames.mkString("`,`")}`) VALUES ($values)"
    case SqlType.Update => s"UPDATE `$tableName` SET `${fieldNames.mkString("`=?, `")}`=?"
    case SqlType.Upsert =>
      s"""
         |INSERT INTO `$tableName` (`${fieldNames.mkString("`,`")}`) VALUES ($values)
         |ON DUPLICATE KEY UPDATE `${fieldNames.mkString("`=?, `")}`=?
         |""".stripMargin
  }

  override def open(partitionId: Long, epochId: Long): Boolean = {
    conn = DriverManager.getConnection(properties.getProperty(urlName), properties)
    conn.setAutoCommit(false)
    ps = conn.prepareStatement(sqlStr)
    true
  }

  override def process(value: Row): Unit = {
    for (i <- 0 until value.size) {
      val index_head: Int = i + 1
      prepareStatament(value, i, index_head)

      if (sqlType == SqlType.Upsert) {
        val index_tail: Int = index_head + value.size
        prepareStatament(value, i, index_tail)
      }
    }
    ps.addBatch()
  }

  override def close(errorOrNull: Throwable): Unit = {
    try {
      ps.executeBatch()
      conn.commit()
    } catch {
      case NonFatal(e) => logWarning("Exception committing transaction", e)
    } finally {
      try {
        if (ps != null) ps.close()
      } catch {
        case e: Exception => logWarning("Exception closing prepareStatement", e)
      }

      try {
        if (conn != null) conn.close()
      } catch {
        case e: Exception => logWarning("Exception closing connection", e)
      }
    }
  }

  private def prepareStatament(value: Row, i: Int, index: Int): Unit = {
    value.get(i) match {
      case v: Int => ps.setInt(index, v)
      case v: Long => ps.setLong(index, v)
      case v: String => ps.setString(index, v)
      case v: Timestamp => ps.setTimestamp(index, v)
      case v: Float => ps.setFloat(index, v)
      case v: Double => ps.setDouble(index, v)
      case v: java.math.BigDecimal => ps.setBigDecimal(index, v)
      case v: Boolean => ps.setBoolean(index, v)
      case v: Byte => ps.setByte(index, v)
      case v: Short => ps.setShort(index, v)
      case null => ps.setNull(index, SparkUtil.sparkTypeToSqlType(value.schema.fields(i).dataType))
      case _ => throw new IllegalArgumentException(s"No support for Spark SQL type ${value.schema.fields(i).dataType}")
    }
  }
}

這樣在process按條處理時(shí)只是addBatch,真正提交是在close時(shí)(即當(dāng)前批次結(jié)束)才執(zhí)行并關(guān)閉連接。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容