Spark開發(fā) 之 SQL簡介

本文基于Spark架構

目錄

引入

  • 早期大數據開發(fā)者都是從Web轉型而來 SQL又是Web開發(fā)者必備技能

  • Spark SQL提供了Data Frame 以簡化RDD開發(fā)

定義

  • Data Frame = 以RDD為基礎的分布式數據集

  • Data Frame = RDD + Scheme

image.png

特點

整合RDD和SQL

cat /opt/services/spark/examples/src/main/resources/people.txt
# Michael, 29
# Andy, 30
# Justin, 19

/opt/services/spark/bin/spark-shell
case class People(name: String, age: Long)

val rdd = sc.textFile("/opt/services/spark/examples/src/main/resources/people.txt")

val mapRDD = rdd.map(_.split(",")).map(attributes => People(attributes(0), attributes(1).trim.toInt))

val filterRDD = mapRDD.filter(_.age > 20)

filterRDD.foreach(p => println(s"${p.name} ${p.age}"))

關于Scala字符串插值 可以參考Scala字符串插值

case class People(name: String, age: Long)

val rdd = sc.textFile("/opt/services/spark/examples/src/main/resources/people.txt")

# import spark.implicits._
val df = rdd.map(_.split(",")).map(attributes => People(attributes(0), attributes(1).trim.toInt)).toDF()

df.createOrReplaceTempView("people")

spark.sql("SELECT * FROM people WHERE age > 20").show()
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
+-------+---

統(tǒng)一數據訪問

cat /opt/services/spark/examples/src/main/resources/people.json
# {"name":"Michael"}
# {"name":"Andy", "age":30}
# {"name":"Justin", "age":19}

/opt/services/spark/bin/spark-shell
val df = spark.read.json("/opt/services/spark/examples/src/main/resources/people.json")

df.createOrReplaceTempView("people")

spark.sql("SELECT * FROM people WHERE age > 20").show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

標準數據連接

docker run --name spark-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7.17

docker exec -it spark-mysql /bin/bash

mysql -uroot -p123456
CREATE DATABASE IF NOT EXISTS db_spark DEFAULT CHARSET utf8 COLLATE utf8_general_ci;

USE db_spark;

CREATE TABLE users ( \
  id int(10) unsigned NOT NULL AUTO_INCREMENT, \
  name varchar(20) DEFAULT NULL COMMENT '用戶名', \
  PRIMARY KEY (`id`) \
);

INSERT INTO users VALUES (1, 'XiaoWang');

INSERT INTO users VALUES (2, 'XiaoMing');
# cd /opt/services
# wget https://mirror.tuna.tsinghua.edu.cn/mysql/downloads/Connector-J/mysql-connector-java-5.1.49.tar.gz
# tar xf mysql-connector-java-5.1.49.tar.gz
/opt/services/spark/bin/spark-shell --jars /opt/services/mysql-connector-java-5.1.49/mysql-connector-java-5.1.49-bin.jar
# :paste
val df = spark.read.format("jdbc")
    .option("url", "jdbc:mysql://localhost:3306/db_spark")
    .option("driver", "com.mysql.jdbc.Driver")
    .option("user", "root")
    .option("password", "123456")
    .option("dbtable", "users")
    .load()
# Ctrl + D

df.createOrReplaceTempView("users")

val sqlDF = spark.sql("SELECT * FROM users WHERE name = 'XiaoMing'")

sqlDF.show()
+---+--------+
| id|    name|
+---+--------+
|  2|XiaoMing|
+---+--------+

參考

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容