4.Spark基礎(chǔ)學(xué)習(xí)四(IDEA創(chuàng)建Spark_SQL)

IDEA創(chuàng)建SparkSQL程序

IDEA中程序的打包和運(yùn)行方式都和SparkCore類似,Maven依賴中需要添加新的依賴項(xiàng):

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
package com.atguigu.sparksql

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

object HelloWorld {

  def main(args: Array[String]) {
    //創(chuàng)建SparkConf()并設(shè)置App名稱
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()

    // For implicit conversions like converting RDDs to DataFrames
    //隱士轉(zhuǎn)換
    import spark.implicits._

    val df = spark.read.json("data/people.json")

    // Displays the content of the DataFrame to stdout
    df.show()

    df.filter($"age" > 21).show()

    df.createOrReplaceTempView("persons")

    spark.sql("SELECT * FROM persons where age > 21").show()

    spark.stop()
  }

}

用戶自定義函數(shù)

用戶自定義UDF函數(shù)

首先先加載一個(gè)表

scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

自定義添加UDF函數(shù),就是在名字前面加上Name

scala> spark.udf.register("addName", (x:String)=> "Name:"+x)
res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> df.createOrReplaceTempView("people")

scala> spark.sql("Select addName(name), age from people").show()
+-----------------+----+
|UDF:addName(name)| age|
+-----------------+----+
|     Name:Michael|null|
|        Name:Andy|  30|
|      Name:Justin|  19|
+-----------------+----+

用戶自定義聚合函數(shù)

? 強(qiáng)類型的Dataset和弱類型的DataFrame都提供了相關(guān)的聚合函數(shù), 如 count(),countDistinct(),avg(),max(),min()。除此之外,用戶可以設(shè)定自己的自定義聚合函數(shù)。

? 弱類型用戶自定義聚合函數(shù):通過(guò)繼承UserDefinedAggregateFunction來(lái)實(shí)現(xiàn)用戶自定義聚合函數(shù)。下面展示一個(gè)求平均工資的自定義聚合函數(shù)。

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object MyAverage extends UserDefinedAggregateFunction {
    // 聚合函數(shù)輸入?yún)?shù)的數(shù)據(jù)類型 
    def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
    // 聚合緩沖區(qū)中值得數(shù)據(jù)類型
    def bufferSchema: StructType = {
        StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
     }
    
    // 返回值的數(shù)據(jù)類型 
    def dataType: DataType = DoubleType
    // 對(duì)于相同的輸入是否一直返回相同的輸出。
    def deterministic: Boolean = true
    // 初始化
    def initialize(buffer: MutableAggregationBuffer): Unit = {
        // 存工資的總額
        buffer(0) = 0L
        // 存工資的個(gè)數(shù)
        buffer(1) = 0L
     }
    
// 相同Execute間的數(shù)據(jù)合并。 
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        if (!input.isNullAt(0)) {
        buffer(0) = buffer.getLong(0) + input.getLong(0)
        buffer(1) = buffer.getLong(1) + 1
   }
 }
// 不同Execute間的數(shù)據(jù)合并 
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
   }
    
// 計(jì)算最終結(jié)果
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

在spark中調(diào)用

// 注冊(cè)函數(shù)
spark.udf.register("myAverage", MyAverage)

val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

?

加載保存的方法

JSON文件

如果要讓Spark加載Json文件,那么Json文件必須符合每一行都是一個(gè)json而不是像平常Json那樣多行為一個(gè)Json,這點(diǎn)必須要注意。例如

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

讀取Json時(shí),需要隱式導(dǎo)入

// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|

Parquet文件

? Parquet是一種流行的列式存儲(chǔ)格式,可以高效地存儲(chǔ)具有嵌套字段的記錄。Parquet格式經(jīng)常在Hadoop生態(tài)圈中被使用,它也支持Spark SQL的全部數(shù)據(jù)類型。Spark SQL 提供了直接讀取和存儲(chǔ) Parquet 格式文件的方法。

importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

peopleDF.write.parquet("hdfs://hadoop102:9000/people.parquet")

val parquetFileDF = spark.read.parquet("hdfs:// hadoop102:9000/people.parquet")

parquetFileDF.createOrReplaceTempView("parquetFile")

val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

JDBC

? Spark SQL可以通過(guò)JDBC從關(guān)系型數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)的方式創(chuàng)建DataFrame,通過(guò)對(duì)DataFrame一系列的計(jì)算后,還可以將數(shù)據(jù)再寫(xiě)回關(guān)系型數(shù)據(jù)庫(kù)中。

注意:*需要將相關(guān)的數(shù)據(jù)庫(kù)驅(qū)動(dòng)放到spark的類路徑下*。

從Mysql數(shù)據(jù)庫(kù)加載數(shù)據(jù)方式一

val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/rdd")
.option("dbtable", "rddtable")
.option("user", "root")
.option("password", "000000")
.load()

jdbcDF2.write
.jdbc("jdbc:mysql://hadoop102:3306/rdd", "db", connectionProperties)

從Mysql數(shù)據(jù)庫(kù)加載數(shù)據(jù)方式二

val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "000000")
val jdbcDF2 = spark.read
.jdbc("jdbc:mysql://hadoop102:3306/rdd", "rddtable", connectionProperties)

將數(shù)據(jù)寫(xiě)入Mysql方式一

jdbcDF.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/rdd")
.option("dbtable", "dftable")
.option("user", "root")
.option("password", "000000")
.save()

將數(shù)據(jù)寫(xiě)入Mysql方式二

jdbcDF2.write
.jdbc("jdbc:mysql://hadoop102:3306/rdd", "db", connectionProperties)

Spark 和 Hive

? Apache Hive是Hadoop上的SQL引擎,Spark SQL編譯時(shí)可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL可以支持Hive表訪問(wèn)、UDF(用戶自定義函數(shù))以及 Hive 查詢語(yǔ)言(HiveQL/HQL)等。需要強(qiáng)調(diào)的一點(diǎn)是,如果要在Spark SQL中包含Hive的庫(kù),并不需要事先安裝Hive。一般來(lái)說(shuō),最好還是在編譯Spark SQL時(shí)引入Hive支持,這樣就可以使用這些特性了。如果你下載的是二進(jìn)制版本的 Spark,它應(yīng)該已經(jīng)在編譯時(shí)添加了 Hive 支持。

? 若要把Spark SQL連接到一個(gè)部署好的Hive上,你必須把hive-site.xml復(fù)制到 Spark的配置文件目錄中($SPARK_HOME/conf)。即使沒(méi)有部署好Hive,Spark SQL也可以運(yùn)行。 需要注意的是,如果你沒(méi)有部署好Hive,Spark SQL會(huì)在當(dāng)前的工作目錄中創(chuàng)建出自己的Hive 元數(shù)據(jù)倉(cāng)庫(kù),叫作 metastore_db。此外,如果你嘗試使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)語(yǔ)句來(lái)創(chuàng)建表,這些表會(huì)被放在你默認(rèn)的文件系統(tǒng)中的 /user/hive/warehouse 目錄中(如果你的 classpath 中有配好的 hdfs-site.xml,默認(rèn)的文件系統(tǒng)就是 HDFS,否則就是本地文件系統(tǒng))。

Hive加載

想連接外部已經(jīng)部署好的Hive,需要通過(guò)以下幾個(gè)步驟。

  1. 將Hive中的hive-site.xml拷貝或者軟連接到Spark安裝目錄下的conf目錄下

  2. 打開(kāi)spark shell,注意帶上訪問(wèn)Hive元數(shù)據(jù)庫(kù)的JDBC客戶端

    $ bin/spark-shell  --jars mysql-connector-java-5.1.27-bin.jar
    
  3. Spark SQL CLI可以很方便的在本地運(yùn)行Hive元數(shù)據(jù)服務(wù)以及從命令行執(zhí)行查詢?nèi)蝿?wù)。在Spark目錄下執(zhí)行如下命令啟動(dòng)Spark SQL CLI

    ./bin/spark-sql
    

代碼中使用Hive

添加依賴:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>1.2.1</version>
</dependency>

創(chuàng)建SparkSession時(shí)需要添加hive支持

val warehouseLocation: String = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容