Spark SQL
DataFrame 的創(chuàng)建以及基本操作
DataFrame可以理解成關系型數(shù)據(jù)庫中的表,它與 RDD 的差別在于 DataFrame 有 schema 信息
public class DataFrameCreate {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("DataFrame");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
DataFrame dataFrame = sqlContext.read()
.json(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath());
// 打印 DataFrame
dataFrame.show();
// 打印 DataFrame 的 Schema 信息
dataFrame.printSchema();
// 查詢某一列的數(shù)據(jù)
dataFrame.select(dataFrame.col("name")).show();
// 查詢某幾列的數(shù)據(jù),并對列進行計算
dataFrame.select(dataFrame.col("name"), dataFrame.col("age").plus(1)).show();
// 根據(jù)某一列的值進行過濾
dataFrame.filter(dataFrame.col("age").gt(20)).show();
// 根據(jù)某一列進行分組,然后再進行聚合
dataFrame.groupBy(dataFrame.col("name")).count().show();
}
}
object DataFrameCreate extends App {
val conf = new SparkConf().setMaster("local").setAppName("DataFrame")
val sc = SparkContext.getOrCreate(conf)
// 創(chuàng)建 SQLContext
val sqlContext = SQLContext.getOrCreate(sc)
// 創(chuàng)建 DataFrame 對象
val dataFrame = sqlContext.read.json(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
dataFrame.show()
dataFrame.printSchema()
dataFrame.select(dataFrame.col("name")).show()
dataFrame.select(dataFrame.col("name"), dataFrame.col("age").plus(1)).show()
dataFrame.filter(dataFrame.col("age").gt(20)).show()
dataFrame.groupBy(dataFrame.col("name")).count().show()
}
RDD轉換為DataFrame
使用反射的方式將RDD轉換為DataFrame
/**
* Java語言實現(xiàn)
* 通過反射的方式將RDD轉換為DataFrame
*/
public class RDD2DataFrameReflection {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameReflection");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// 讀取文件生成RDD,然后將RDD中的line轉換為Person類型
JavaRDD<Person> initRDD = sc.textFile(Thread.currentThread().getContextClassLoader().getResource("person.txt").getPath())
.map(line -> line.split(" "))
.map(array -> new Person(array[0], Integer.parseInt(array[1]), array[2]));
// 通過反射創(chuàng)建DataFrame,然后注冊成一張臨時表(person)
sqlContext.createDataFrame(initRDD, Person.class).registerTempTable("person");
// 執(zhí)行sql查詢,查詢出年齡大于18歲的數(shù)據(jù)
DataFrame personDataFrame = sqlContext.sql("select name,age,sex from person where age > 18");
// 打印數(shù)據(jù)
personDataFrame.show();
// 將DataFrame再次轉換成RDD,此時RDD中的數(shù)據(jù)類型為RDD,需要將其轉換成Person類型。最后打印結果。
personDataFrame.javaRDD()
.map(row -> new Person(row.getString(0), row.getInt(1), row.getString(2)))
.collect()
.forEach(System.out::println);
}
/**
* 使用反射將RDD轉換為DataFrame時,JavaBean的class必須使用public修飾,并且需要實現(xiàn)Serializable接口
*/
public static class Person implements Serializable {
private String name;
private Integer age;
private String sex;
public Person() {
}
public Person(String name, Integer age, String sex) {
this.name = name;
this.age = age;
this.sex = sex;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
", age=" + age +
", sex='" + sex + '\'' +
'}';
}
}
}
// 使用scala語言實現(xiàn)RDD和DataFrame之間的轉換
object RDD2DataFrameReflection {
// 申明一個case class 作為bean對象
case class Person(name: String, age: Int, sex: String)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrame")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
// scala中RDD轉成DataFrame需要導入隱式轉換的包
import sqlContext.implicits._
// 讀取txt文件生成RDD,調用toDF轉換成DataFrame,最后將DataFrame注冊成一張臨時表
sc.textFile(Thread.currentThread().getContextClassLoader.getResource("person.txt").getPath)
.map(_.split(" "))
.map(array => Person(array(0), array(1).trim.toInt, array(2)))
.toDF()
.registerTempTable("person")
// 直接調用DataFrame的rdd方法即可得到RDD,此時RDD中的數(shù)據(jù)類型為Row,需要轉換成Person。最后進行打印
sqlContext.sql("select name,age,sex from person where age > 18")
.rdd
.map(row => Person(row.getString(0), row.getInt(1), row.getString(2)))
.collect
.foreach(println)
}
}
使用編碼的方式動態(tài)創(chuàng)建元數(shù)據(jù)
// 使用Java語言實現(xiàn)RDD和DataFrame的互相轉換(自定義元數(shù)據(jù)的方式)
public class RDD2DataFrameProgrammatically {
public static void main(String[] args) {
// 創(chuàng)建SparkContext和SQLContext
SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrame");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// 從文件中讀取數(shù)據(jù)生成RDD,將RDD中的類型轉換成Row類型
JavaRDD<Row> rdd = sc.textFile(Thread.currentThread().getContextClassLoader().getResource("person.txt").getPath())
.map(line -> line.split(" "))
.map(array -> RowFactory.create(array[0], Integer.parseInt(array[1]), array[2]));
// 構建元數(shù)據(jù)信息
List<StructField> structFields = Arrays.asList(
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true),
DataTypes.createStructField("sex", DataTypes.StringType, true)
);
StructType structType = DataTypes.createStructType(structFields);
// 將RDD轉換成DataFrame,并將其注冊成一張臨時表
sqlContext.createDataFrame(rdd, structType).registerTempTable("person");
sqlContext.sql("select name,age,sex from person where age > 18")
.javaRDD()
.map(row -> new RDD2DataFrameReflection.Person(row.getString(0), row.getInt(1), row.getString(2)))
.collect()
.forEach(System.out::println);
}
}
// 使用scala語言實現(xiàn)RDD和DataFrame的互相轉換(自定義元數(shù)據(jù)的方式)
object RDD2DataFrameProgrammatically {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrame")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
// 從文本文件中讀取數(shù)據(jù)生成RDD
val rdd = sc.textFile(Thread.currentThread().getContextClassLoader.getResource("person.txt").getPath)
.map(_.split(" "))
.map(array => Row(array(0), array(1).toInt, array(2)))
// 構建元數(shù)據(jù)信息
val structType = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("sex", StringType, true)
))
// 將RDD轉換成DataFrame并將其注冊成一張臨時表
sqlContext.createDataFrame(rdd, structType).registerTempTable("person")
// 查詢出年齡大于18的用戶信息,將其轉成RDD再進行打印
sqlContext.sql("select name,age,sex from person where age > 18")
.rdd
.map(row => Person(row.getString(0), row.getAs[Integer](1), row.getString(2)))
.collect()
.foreach(println)
}
}
load && save 方法
// Java 語言演示 load 和 save 方法的具體使用
public static void loadAndSave() {
SparkConf conf = new SparkConf().setAppName("LoadAndSave").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// 不指定類型時 load 和 save 默認的格式為 parquet
sqlContext.read()
.load(Thread.currentThread().getContextClassLoader().getResource("student.parquet").getPath())
.write()
.save(Thread.currentThread().getContextClassLoader().getResource("student_copy.parquet").getPath());
// 手動指定 load 和 save 方法操作的文件類型
sqlContext.read()
.format("json")
.load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
.show();
sqlContext.read()
.format("json")
.load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
.write()
.format("json")
.save("file:///Users/garen/Documents/student_copy");
sqlContext.read()
.format("json")
.load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
.write()
.format("json")
.mode(SaveMode.ErrorIfExists)
.save("file:///Users/garen/Documents/student_copy");
}
def loadAndSave(): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("LoadAndSave")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
sqlContext.read
.format("json")
.load(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
.show()
val dataFrame = sqlContext.read
.format("json")
.load(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
dataFrame.filter(dataFrame.col("age").gt(18))
.write
.format("json")
.mode(SaveMode.ErrorIfExists)
.save("file:///D:/demo")
}
parquet數(shù)據(jù)源操作
使用編程方式加載parquet數(shù)據(jù)源
// java讀取parquet文件
public static void loadParquet() {
SparkConf conf = new SparkConf().setMaster("local").setAppName("parquet");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
sqlContext.read().parquet(Thread.currentThread().getContextClassLoader().getResource("/tarball/users.parquet").getPath()).registerTempTable("person");
sqlContext.sql("select name from person")
.javaRDD()
.map(row -> row.getString(0))
.collect()
.forEach(System.out::println);
}
// scala讀取parquet文件
def parquet(): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("parquet")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
sqlContext.read
.parquet(Thread.currentThread.getContextClassLoader.getResource("tarball/users.parquet").getPath)
.registerTempTable("users")
sqlContext.sql("select * from users")
.rdd
.map(_.getString(0))
.collect
.foreach(println)
}
自動分區(qū)推斷
// Java語言
public static void parquetPartion() {
SparkConf conf = new SparkConf().setMaster("local").setAppName("ParquetPartion");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// 打印schema信息,會多出兩個分區(qū)字段("gender","country")
// 分區(qū)列數(shù)據(jù)類型:支持numeric和string類型的自動推斷,通過“spark.sql.sources.partitionColumnTypeInference.enabled”配置開啟或關閉(默認開啟),關閉后分區(qū)列全為string類型。
sqlContext.read()
.parquet("hdfs://spark1:9000/spark-study/users/gender=male/country=US/users.parquet")
.printSchema();
}
自動合并元數(shù)據(jù)
def mergeSchema(): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("MergeSchema")
// 方法一:通過 conf 對象,設置 spark.sql.parquet.mergeSchema 為 true
// conf.set("spark.sql.parquet.mergeSchema", "true")
val sc: SparkContext = SparkContext.getOrCreate(conf)
val sqlContext: SQLContext = SQLContext.getOrCreate(sc)
val nameAndAgeArray: Array[(String, Int)] = Array(("Tom", 23), ("Garen", 21), ("Peter", 25))
val nameAndScoreArray: Array[(String, Int)] = Array(("Tome", 100), ("Garen", 98), ("Peter", 95))
import sqlContext.implicits._
sc.parallelize(nameAndAgeArray)
.toDF("name", "age")
.write
.format("parquet")
.mode(SaveMode.Append)
.save("hdfs://users/hadoop/persons")
sc.parallelize(nameAndScoreArray)
.toDF("name", "score")
.write
.format("parquet")
.mode(SaveMode.Append)
.save("hdfs://users/hadoop/persons")
// 方法二:通過 option 設置 MergeSchema 為 true
sqlContext.read.option("mergeSchema", "true")
.parquet("hdfs://users/hadoop/persons")
.printSchema()
sqlContext.read.option("mergeSchema", "true")
.parquet("data/persons")
.show()
}
/**
* root
* |-- name: string (nullable = true)
* |-- score: integer (nullable = true)
* |-- age: integer (nullable = true)
*
* +-----+-----+----+
* | name|score| age|
* +-----+-----+----+
* | Tom| null| 23|
* |Garen| null| 21|
* |Peter| null| 25|
* | Tome| 100|null|
* |Garen| 98|null|
* |Peter| 95|null|
* +-----+-----+----+
*/
Json 數(shù)據(jù)源操作
需求:查詢分數(shù)大于80的學生信息以及成績信息
public static void jsonOption() {
SparkConf conf = new SparkConf().setMaster("local").setAppName("JsonOption");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// 方式一: 從 json 文件創(chuàng)建學生信息的 DataFrame,并注冊成臨時表
sqlContext.read().format("json")
.load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
.registerTempTable("student_info");
JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
"{\"id\":\"1001\",\"score\":100}",
"{\"id\":\"1002\",\"score\":79}",
"{\"id\":\"1003\",\"score\":98}"
));
// 方式二: 從 json 格式的 RDD 創(chuàng)建學生分數(shù)信息的 DataFrame,并注冊成臨時表
sqlContext.read().json(rdd).registerTempTable("student_score");
// 查詢分數(shù)大于80的學生信息和成績,并保存到 json 文件中
sqlContext.sql("select a.id,a.name,a.age,b.score from student_info a left join student_score b on a.id = b.id where b.score > 80")
.write()
.format("json")
.mode(SaveMode.Overwrite)
.save("data/student_score");
}
def jsonOption(): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("JsonOption")
val sc: SparkContext = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
// 從文件中讀取json數(shù)據(jù),并注冊成臨時表
sqlContext.read
.format("json")
.load(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
.registerTempTable("student_info")
// 從rdd中讀取數(shù)據(jù),并注冊成臨時表
val jsonArray: Array[String] = Array(
"{\"id\":\"1001\",\"score\":100}",
"{\"id\":\"1002\",\"score\":90}",
"{\"id\":\"1003\",\"score\":80}"
)
val rdd = sc.parallelize(jsonArray)
sqlContext.read.json(rdd).registerTempTable("student_score")
sqlContext.sql("select a.id,a.name,a.age,b.score from student_info a left join student_score b on a.id = b.id where b.score > 80")
.write
.format("json")
.mode(SaveMode.Overwrite)
.save("data/students_score")
}
Hive數(shù)據(jù)源操作
操作hive數(shù)據(jù)源完成查詢成績大于80的用戶信息的功能。對hive數(shù)據(jù)源進行操作時需要將hive-site.xml和mysql的驅動包拷貝到spark的目錄下。
public static void hiveOption() {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("HiveOption");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc);
// 如果student_info表存在則先刪除
hiveContext.sql("DROP TABLE IF EXISTS hive.student_info").show();
// 創(chuàng)建studnet_info表結構
hiveContext.sql("CREATE TABLE IF NOT EXISTS hive.student_info (name STRING,age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '").show();
// 加載數(shù)據(jù)到student_info表中
hiveContext.sql("LOAD DATA INPATH '/user/hadoop/test_data/student_info.txt' INTO TABLE student_info").show();
// 如果student_score表存在則先刪除
hiveContext.sql("DROP TABLE IF EXISTS hive.student_score").show();
// 創(chuàng)建student_score表結構
hiveContext.sql("CREATE TABLE IF NOT EXISTS hive.student_score (name STRING,score INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '").show();
// 加載數(shù)據(jù)到student_score表中
hiveContext.sql("LOAD DATA INPATH '/user/hadoop/test_data/student_score.txt' INTO TABLE student_score").show();
// 查詢出成績大于80的學生信息并保存為hive的一張新表
hiveContext.sql("select a.name,a.age,b.score from student_info a left join student_score b on a.name = b.name where b.score > 80")
.saveAsTable("student_info_score");
// 通過hive中的表直接創(chuàng)建DataFrame
Row[] rows = hiveContext.table("student_info_score").collect();
Arrays.asList(rows).forEach(System.out::println);
}
JDBC 數(shù)據(jù)源操作
public static void jdbc() {
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("JDBCOption");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = SQLContext.getOrCreate(sc.sc());
// 從student_info表中讀取數(shù)據(jù),并轉換成 RDD
Map<String, String> options = new HashMap<>();
options.put("url", "jdbc:mysql://hadoop-cdh:3306/testdb");
options.put("user", "root");
options.put("password", "Slb930802.");
options.put("dbtable", "student_info");
DataFrame student_info_df = sqlContext.read().format("jdbc").options(options).load();
JavaPairRDD<String, Integer> student_info_rdd = student_info_df.javaRDD()
.mapToPair(row -> new Tuple2<String, Integer>(row.getString(0), row.getInt(1)));
// 從student_score表中讀取數(shù)據(jù),并轉換成 RDD
options.put("dbtable", "student_score");
DataFrame student_score_df = sqlContext.read().format("jdbc").options(options).load();
JavaPairRDD<String, Integer> student_score_rdd = student_score_df.javaRDD()
.mapToPair(row -> new Tuple2<String, Integer>(row.getString(0), row.getInt(1)));
// 使用 join 算子連接兩個 RDD,再篩選出分數(shù)大于80的學生信息
JavaRDD<Row> good_student_info_rdd = student_info_rdd.join(student_score_rdd)
.map(t -> RowFactory.create(t._1(), t._2()._1(), t._2()._2()))
.filter(row -> row.getInt(2) > 80);
// 通過自定義元數(shù)據(jù)的方式將 RDD 轉換成 DataFrame 并打印
List<StructField> structFields = Arrays.asList(
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true),
DataTypes.createStructField("score", DataTypes.IntegerType, true)
);
StructType structType = DataTypes.createStructType(structFields);
sqlContext.createDataFrame(good_student_info_rdd, structType).show();
// 將 RDD 的數(shù)據(jù)保存到 mysql 數(shù)據(jù)庫中
good_student_info_rdd.foreach(row -> {
Class.forName("com.mysql.jdbc.Driver");
Connection conn = null;
PreparedStatement ps = null;
try {
conn = DriverManager.getConnection("jdbc:mysql://hadoop-cdh:3306/testdb", "root", "Slb930802.");
ps = conn.prepareStatement("INSERT INTO good_student_info VALUES(?,?,?)");
ps.setString(1, row.getString(0));
ps.setInt(2, row.getInt(1));
ps.setInt(3, row.getInt(2));
ps.executeUpdate();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (ps != null) ps.close();
if (conn != null) conn.close();
}
});
}
SparkSQL內(nèi)置函數(shù)
統(tǒng)計UV
def uv(): Unit = {
val conf = new SparkConf().setMaster("local[4]").setAppName("uv")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
// 構造日志信息,并轉換成DataFrame的格式
val logArray = Array(
"20180115,fpanpxc2gvjd2py0jd2254ng",
"20180115,fpanpxc2gvjd2py0jd2254ng",
"20180115,4xlt0txdgy220f3owkhxv4y1",
"20180116,kz5t0lvcjsbsqacjqwqyhfu4")
val logRDD = sc.parallelize(logArray)
.map(_.split(","))
.map(array => RowFactory.create(array(0), array(1)))
val structType = StructType(Array(
StructField("data", StringType, false),
StructField("sessionId", StringType, false)
))
val logDF = sqlContext.createDataFrame(logRDD, structType)
// 使用SparkSQL的內(nèi)置函數(shù)對字段做聚合操作
// 使用SparkSQL的內(nèi)置函數(shù)需要導入以下兩個包
import sqlContext.implicits._
import org.apache.spark.sql.functions._
logDF.groupBy("data").agg(countDistinct('sessionId).as("uv")).show()
}
統(tǒng)計每日的總銷售額
def totalSale(): Unit = {
val conf = new SparkConf().setMaster("local[4]").setAppName("TotalSale")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
// 構建DataFrame
val logArray = Array("20180115,34.3",
"20180115,35.6",
"20180116,99.9")
val logRDD = sc.parallelize(logArray)
.map(_.split(","))
.map(array => Row(array(0), array(1).toDouble))
val structType = StructType(Array(
StructField("data", StringType, false),
StructField("sale", DoubleType, false)
))
// 做分組聚合,統(tǒng)計每日交易額
import sqlContext.implicits._
import org.apache.spark.sql.functions._
sqlContext.createDataFrame(logRDD, structType)
.groupBy("data")
.agg('data, sum('sale).as("total_sale"))
.show()
}
SparkSQL窗口函數(shù)
row_number()窗口函數(shù)
select url,rate,row_number() over(partition by url order by rate desc) as r from window_test2;