1.需求:
讀取本地student.txt,并創(chuàng)建DataFrame,并將結(jié)果寫入mysql數(shù)據(jù)庫中
2.數(shù)據(jù)源:
student.txt
1 tom 15
2 lucy 20
3 mike 18
3.寫代碼:
(1)添加依賴:
pom.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
(2)Demo3.scala
package day1209
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import java.util.Properties
/**
* 將結(jié)果寫入 Mysql
*/
object Demo3 {
def main(args: Array[String]): Unit = {
//創(chuàng)建Spark Session對象
val spark = SparkSession.builder().master("local").appName("Save to Mysql").getOrCreate()
//從指定地址讀取文件 創(chuàng)建RDD
val personRDD = spark.sparkContext.textFile("/users/macbook/TestInfo/student.txt").map(_.split("\t"))
//指定schema
val schema = StructType(
List(
StructField("id", IntegerType),
StructField("sname", StringType),
StructField("age", IntegerType)))
//將RDD轉(zhuǎn)換為 rowRDD
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim(), p(2).toInt))
//創(chuàng)建DataFrame 將schema與row對應(yīng)
val personDataFrame = spark.createDataFrame(rowRDD, schema)
//注冊視圖
personDataFrame.createOrReplaceTempView("t_person")
//執(zhí)行SQL
val result = spark.sql("select * from t_person order by age desc")
//顯示結(jié)果
//df.show()
//把結(jié)果保存在Mysql中
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "000000")
result.write.mode("append").jdbc("jdbc:mysql://192.168.1.121:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "student", props)
spark.stop()
}
}
4.結(jié)果:

執(zhí)行前查詢student表

執(zhí)行后查詢student表