IDEA創(chuàng)建SparkSQL程序
IDEA中程序的打包和運(yùn)行方式都和SparkCore類似,Maven依賴中需要添加新的依賴項(xiàng):
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
package com.atguigu.sparksql
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
object HelloWorld {
def main(args: Array[String]) {
//創(chuàng)建SparkConf()并設(shè)置App名稱
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
//隱士轉(zhuǎn)換
import spark.implicits._
val df = spark.read.json("data/people.json")
// Displays the content of the DataFrame to stdout
df.show()
df.filter($"age" > 21).show()
df.createOrReplaceTempView("persons")
spark.sql("SELECT * FROM persons where age > 21").show()
spark.stop()
}
}
用戶自定義函數(shù)
用戶自定義UDF函數(shù)
首先先加載一個(gè)表
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
自定義添加UDF函數(shù),就是在名字前面加上Name
scala> spark.udf.register("addName", (x:String)=> "Name:"+x)
res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> df.createOrReplaceTempView("people")
scala> spark.sql("Select addName(name), age from people").show()
+-----------------+----+
|UDF:addName(name)| age|
+-----------------+----+
| Name:Michael|null|
| Name:Andy| 30|
| Name:Justin| 19|
+-----------------+----+
用戶自定義聚合函數(shù)
? 強(qiáng)類型的Dataset和弱類型的DataFrame都提供了相關(guān)的聚合函數(shù), 如 count(),countDistinct(),avg(),max(),min()。除此之外,用戶可以設(shè)定自己的自定義聚合函數(shù)。
? 弱類型用戶自定義聚合函數(shù):通過(guò)繼承UserDefinedAggregateFunction來(lái)實(shí)現(xiàn)用戶自定義聚合函數(shù)。下面展示一個(gè)求平均工資的自定義聚合函數(shù)。
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object MyAverage extends UserDefinedAggregateFunction {
// 聚合函數(shù)輸入?yún)?shù)的數(shù)據(jù)類型
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
// 聚合緩沖區(qū)中值得數(shù)據(jù)類型
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// 返回值的數(shù)據(jù)類型
def dataType: DataType = DoubleType
// 對(duì)于相同的輸入是否一直返回相同的輸出。
def deterministic: Boolean = true
// 初始化
def initialize(buffer: MutableAggregationBuffer): Unit = {
// 存工資的總額
buffer(0) = 0L
// 存工資的個(gè)數(shù)
buffer(1) = 0L
}
// 相同Execute間的數(shù)據(jù)合并。
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// 不同Execute間的數(shù)據(jù)合并
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 計(jì)算最終結(jié)果
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
在spark中調(diào)用
// 注冊(cè)函數(shù)
spark.udf.register("myAverage", MyAverage)
val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
?
加載保存的方法
JSON文件
如果要讓Spark加載Json文件,那么Json文件必須符合每一行都是一個(gè)json而不是像平常Json那樣多行為一個(gè)Json,這點(diǎn)必須要注意。例如
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
讀取Json時(shí),需要隱式導(dǎo)入
// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// | name|
// +------+
// |Justin|
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
Parquet文件
? Parquet是一種流行的列式存儲(chǔ)格式,可以高效地存儲(chǔ)具有嵌套字段的記錄。Parquet格式經(jīng)常在Hadoop生態(tài)圈中被使用,它也支持Spark SQL的全部數(shù)據(jù)類型。Spark SQL 提供了直接讀取和存儲(chǔ) Parquet 格式文件的方法。
importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
peopleDF.write.parquet("hdfs://hadoop102:9000/people.parquet")
val parquetFileDF = spark.read.parquet("hdfs:// hadoop102:9000/people.parquet")
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
JDBC
? Spark SQL可以通過(guò)JDBC從關(guān)系型數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)的方式創(chuàng)建DataFrame,通過(guò)對(duì)DataFrame一系列的計(jì)算后,還可以將數(shù)據(jù)再寫(xiě)回關(guān)系型數(shù)據(jù)庫(kù)中。
注意:*需要將相關(guān)的數(shù)據(jù)庫(kù)驅(qū)動(dòng)放到spark的類路徑下*。
從Mysql數(shù)據(jù)庫(kù)加載數(shù)據(jù)方式一
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/rdd")
.option("dbtable", "rddtable")
.option("user", "root")
.option("password", "000000")
.load()
jdbcDF2.write
.jdbc("jdbc:mysql://hadoop102:3306/rdd", "db", connectionProperties)
從Mysql數(shù)據(jù)庫(kù)加載數(shù)據(jù)方式二
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "000000")
val jdbcDF2 = spark.read
.jdbc("jdbc:mysql://hadoop102:3306/rdd", "rddtable", connectionProperties)
將數(shù)據(jù)寫(xiě)入Mysql方式一
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/rdd")
.option("dbtable", "dftable")
.option("user", "root")
.option("password", "000000")
.save()
將數(shù)據(jù)寫(xiě)入Mysql方式二
jdbcDF2.write
.jdbc("jdbc:mysql://hadoop102:3306/rdd", "db", connectionProperties)
Spark 和 Hive
? Apache Hive是Hadoop上的SQL引擎,Spark SQL編譯時(shí)可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL可以支持Hive表訪問(wèn)、UDF(用戶自定義函數(shù))以及 Hive 查詢語(yǔ)言(HiveQL/HQL)等。需要強(qiáng)調(diào)的一點(diǎn)是,如果要在Spark SQL中包含Hive的庫(kù),并不需要事先安裝Hive。一般來(lái)說(shuō),最好還是在編譯Spark SQL時(shí)引入Hive支持,這樣就可以使用這些特性了。如果你下載的是二進(jìn)制版本的 Spark,它應(yīng)該已經(jīng)在編譯時(shí)添加了 Hive 支持。
? 若要把Spark SQL連接到一個(gè)部署好的Hive上,你必須把hive-site.xml復(fù)制到 Spark的配置文件目錄中($SPARK_HOME/conf)。即使沒(méi)有部署好Hive,Spark SQL也可以運(yùn)行。 需要注意的是,如果你沒(méi)有部署好Hive,Spark SQL會(huì)在當(dāng)前的工作目錄中創(chuàng)建出自己的Hive 元數(shù)據(jù)倉(cāng)庫(kù),叫作 metastore_db。此外,如果你嘗試使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)語(yǔ)句來(lái)創(chuàng)建表,這些表會(huì)被放在你默認(rèn)的文件系統(tǒng)中的 /user/hive/warehouse 目錄中(如果你的 classpath 中有配好的 hdfs-site.xml,默認(rèn)的文件系統(tǒng)就是 HDFS,否則就是本地文件系統(tǒng))。
Hive加載
想連接外部已經(jīng)部署好的Hive,需要通過(guò)以下幾個(gè)步驟。
將Hive中的hive-site.xml拷貝或者軟連接到Spark安裝目錄下的conf目錄下
-
打開(kāi)spark shell,注意帶上訪問(wèn)Hive元數(shù)據(jù)庫(kù)的JDBC客戶端
$ bin/spark-shell --jars mysql-connector-java-5.1.27-bin.jar -
Spark SQL CLI可以很方便的在本地運(yùn)行Hive元數(shù)據(jù)服務(wù)以及從命令行執(zhí)行查詢?nèi)蝿?wù)。在Spark目錄下執(zhí)行如下命令啟動(dòng)Spark SQL CLI
./bin/spark-sql
代碼中使用Hive
添加依賴:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
創(chuàng)建SparkSession時(shí)需要添加hive支持
val warehouseLocation: String = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()