原文地址: https://spark.apache.org/docs/latest/sql-programming-guide.html
OverView
Spark SQL是用于處理結(jié)構(gòu)化數(shù)據(jù)的spark模塊。與基本的Spark RDD API不同,Spark SQL提供的接口為Spark提供了更多的數(shù)據(jù)結(jié)構(gòu)和計算信息。在內(nèi)部,Spark SQL使用這些額外的信息來執(zhí)行額外的優(yōu)化。有幾種方法可以與Spark SQL交互,包括SQL和DataSet API。當(dāng)計算結(jié)果時,會使用相同的執(zhí)行引擎,而與你用來實現(xiàn)計算的API/語言無關(guān)。這種統(tǒng)一意味著開發(fā)人員可以很容易地在不同的API之間來回切換,基于這些Spark提供了非常自然的轉(zhuǎn)換表達(dá)式。
本頁中的所有的示例代碼使用spark分布式中的示例數(shù)據(jù), 可以運行在spark-shell, pyspark-shell 或者sparkR shell中.
SQL
Spark SQL的一個用途是執(zhí)行SQL查詢。Spark SQL還可以用于從現(xiàn)有的Hive服務(wù)中讀取數(shù)據(jù)。有關(guān)如何配置此功能的詳細(xì)信息,請參閱Hive Tables 部分。從其他編程語言運行SQL時,結(jié)果將作為數(shù)據(jù)集/數(shù)據(jù)幀(Dataset/DataFrame)返回。你還可以使用命令行或通過jdbc/odbc與SQL接口交互。
Datasets and DataFrames
數(shù)據(jù)集是數(shù)據(jù)的分布式集合。DataSet是Spark 1.6中添加的一個新接口,它提供了RDD的優(yōu)點(強(qiáng)類型、使用強(qiáng)大lambda函數(shù)的能力)以及Spark SQL優(yōu)化的執(zhí)行引擎的優(yōu)化。數(shù)據(jù)集可以從jvm對象構(gòu)造,然后使用函數(shù)轉(zhuǎn)換(map,flatmap,filter,等等)進(jìn)行操作。數(shù)據(jù)集的API在Scala和Java中是可用的, 但是Python不支持?jǐn)?shù)據(jù)集API。但是,由于python的動態(tài)特性,數(shù)據(jù)集API的許多好處已經(jīng)可用(你可以自然地按名稱訪問行的字段'row.columnname`)。R的情況類似。
數(shù)據(jù)幀是一個由命名列組成的數(shù)據(jù)集。在概念上,它相當(dāng)于關(guān)系數(shù)據(jù)庫中的一個表或R/Python中的一個數(shù)據(jù)幀,但在底層做了更多優(yōu)化。數(shù)據(jù)幀可以從一系列源構(gòu)建,例如:結(jié)構(gòu)化數(shù)據(jù)文件、Hive中的表、外部數(shù)據(jù)庫或現(xiàn)有RDD。數(shù)據(jù)幀 API在Scala、Java、Python和R中都有效. 在Scala和Java中,數(shù)據(jù)幀可以表示為Row的數(shù)據(jù)集。在Scala API中,DataFrame表示為Dataset[Row], 而在Java API中,用 Dataset<Row>來表示DataFrame。
Get Start
Starting Point: SparkSession
Spark Sql的入口是SparkSession, 創(chuàng)建SparkSession只需要調(diào)用SparkSession.builder():
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
Spark 2.0中的SparkSession為Hive提供了內(nèi)置支持,包括使用HiveQL編寫查詢、訪問 Hive UDF, 以及從Hive表讀取數(shù)據(jù)的功能。要使用這些功能,你不需要安裝Hive。
Creating DataFrames(創(chuàng)建數(shù)據(jù)幀)
應(yīng)用使用SparkSession, 可以從現(xiàn)有RDD,Hive表, Spark數(shù)據(jù)源中創(chuàng)建數(shù)據(jù)幀.
舉個例子, 下面的代碼從Json文件中創(chuàng)建數(shù)據(jù)幀:
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Untyped Dataset Operations (aka DataFrame Operations)
非類型化的數(shù)據(jù)集操作(又稱數(shù)據(jù)幀操作)
數(shù)據(jù)幀為Scala,Java,Python和R操作結(jié)構(gòu)化數(shù)據(jù)提供了特定領(lǐng)域語言.
就像上面提到的, 在Spark2.0中, Scala和Java API的數(shù)據(jù)幀只是Row類型的數(shù)據(jù)集. 這些操作也叫"非類型化轉(zhuǎn)換", 與強(qiáng)類型的Scala/Java數(shù)據(jù)集的"類型化轉(zhuǎn)換"相反.
下面是使用數(shù)據(jù)集處理結(jié)構(gòu)化數(shù)據(jù)的一些基本例子:
// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show();
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
有關(guān)可以對數(shù)據(jù)集執(zhí)行的操作類型的完整列表,請參閱API文檔
除了簡單的列引用和表達(dá)式外,數(shù)據(jù)集還具有豐富的函數(shù)庫,包括字符串操作、日期處理、常見數(shù)學(xué)運算等。完整列表在DataFrame Function Reference
Running SQL Queries Programmatically
SparkSession上的sql函數(shù)允許應(yīng)用程序以編程方式運行SQL查詢,并將結(jié)果作為Dataset<Row>返回。
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Global Temporary View
Spark SQL中的臨時視圖是spark會話范圍的,并且如果創(chuàng)建它的會話終止,它將消失。如果你希望擁有一個在所有會話之間共享的臨時視圖,并在Spark應(yīng)用程序終止之前保持活動狀態(tài),那么你可以創(chuàng)建一個全局臨時視圖。全局臨時視圖綁定到系統(tǒng)保留的數(shù)據(jù)庫global_temp,我們必須使用限定名稱來引用它,例如select * from global_temp.view1
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Creating Datasets(創(chuàng)建數(shù)據(jù)集)
數(shù)據(jù)集與RDD相似,但是,它們不使用Java序列化或Kryo,而是使用專門的編碼器來序列化對象以在網(wǎng)絡(luò)上進(jìn)行處理或傳輸。雖然編碼器和標(biāo)準(zhǔn)序列化都負(fù)責(zé)將對象轉(zhuǎn)換為字節(jié),但編碼器是動態(tài)生成的代碼,并使用一種允許Spark執(zhí)行許多操作的格式,如篩選、排序和散列,而不將字節(jié)反序列化回對象。
public static class Person implements Serializable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);
// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
Collections.singletonList(person),
personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+
// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
(MapFunction<Integer, Integer>) value -> value + 1,
integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]
// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Interoperating with RDDs(與RDD的交互)
Spark SQL支持兩種方法將現(xiàn)有RDD轉(zhuǎn)換為數(shù)據(jù)集。第一種方法使用反射來推斷包含特定對象類型的RDD的模式。這種基于反射的方法可以得到更簡潔的代碼,并且在編寫Spark應(yīng)用程序時,當(dāng)你已經(jīng)知道模式時,它可以很好地工作。
創(chuàng)建數(shù)據(jù)集的第二種方法是通過編程接口,該接口允許你構(gòu)造一個模式,然后將其應(yīng)用到現(xiàn)有的RDD。此方法更詳細(xì),它允許你在運行時才知道列及其類型時再構(gòu)造數(shù)據(jù)集。
Inferring the Schema Using Reflection(使用反射推斷模式)
Spark SQL支持自動將JavaBeans的RDD轉(zhuǎn)換為數(shù)據(jù)幀。使用反射獲得的BeanInfo定義了表的模式。目前,Spark SQL不支持包含Map字段的JavaBeans。不過,支持嵌套的javaBeans和List或Array字段。您可以通過創(chuàng)建一個類來創(chuàng)建JavaBean,該類實現(xiàn)了Serializable接口并且所有字段都有g(shù)etter和setter。
// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
.textFile("examples/src/main/resources/people.txt")
.javaRDD()
.map(line -> {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
});
// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
Programmatically Specifying the Schema(編程指定模式)
When JavaBean classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a Dataset<Row> can be created programmatically with three steps.
如果不能提前定義JavaBean類(例如,記錄的結(jié)構(gòu)編碼在字符串中,或者需要解析文本數(shù)據(jù)集并且不同的用戶有不同的字段),則可以通過三個步驟以編程方式創(chuàng)建Dataset<Row>;
- 1.從原始RDD創(chuàng)建基于
Row的RDD - 2.使用
StructType創(chuàng)建匹配RDD中Row數(shù)據(jù)的模式 - 3.使用
createDataFrame方法將模式應(yīng)用到基于Row的RDD
例:
// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
.textFile("examples/src/main/resources/people.txt", 1)
.toJavaRDD();
// The schema is encoded in a string
String schemaString = "name age";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
});
// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");
// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
Aggregations(聚合)
內(nèi)置的數(shù)據(jù)幀函數(shù)提供常見的聚合,如count()、countDistinct()、avg()、max()、min()等。雖然這些函數(shù)是為數(shù)據(jù)幀設(shè)計的,但Spark SQL在Scala和Java中的某些函數(shù)還具有類型安全的版本,可用于強(qiáng)類型數(shù)據(jù)集。此外,用戶不僅限于預(yù)定義的聚合函數(shù),還可以創(chuàng)建自己的聚合函數(shù)。
Untyped User-Defined Aggregate Functions(非類型化的用戶自定義聚合函數(shù))
用戶必須擴(kuò)展UserDefinedAggregateFunction抽象類以實現(xiàn)自定義的非類型化聚合函數(shù)。例如,用戶定義的平均值可以如下所示:
public static class MyAverage extends UserDefinedAggregateFunction {
private StructType inputSchema;
private StructType bufferSchema;
public MyAverage() {
List<StructField> inputFields = new ArrayList<>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema = DataTypes.createStructType(bufferFields);
}
// Data types of input arguments of this aggregate function
public StructType inputSchema() {
return inputSchema;
}
// Data types of values in the aggregation buffer
public StructType bufferSchema() {
return bufferSchema;
}
// The data type of the returned value
public DataType dataType() {
return DataTypes.DoubleType;
}
// Whether this function always returns the same output on the identical input
public boolean deterministic() {
return true;
}
// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, 0L);
buffer.update(1, 0L);
}
// Updates the given aggregation buffer `buffer` with new input data from `input`
public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
long updatedSum = buffer.getLong(0) + input.getLong(0);
long updatedCount = buffer.getLong(1) + 1;
buffer.update(0, updatedSum);
buffer.update(1, updatedCount);
}
}
// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
buffer1.update(0, mergedSum);
buffer1.update(1, mergedCount);
}
// Calculates the final result
public Double evaluate(Row buffer) {
return ((double) buffer.getLong(0)) / buffer.getLong(1);
}
}
// Register the function to access it
spark.udf().register("myAverage", new MyAverage());
Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
result.show();
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
Type-Safe User-Defined Aggregate Functions(類型化用戶自定義聚合函數(shù))
User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class. For example, a type-safe user-defined average can look like:
強(qiáng)類型數(shù)據(jù)集的用戶定義聚合圍繞聚合器抽象類Aggregator實現(xiàn)。例如,類型安全的用戶定義平均值可以如下所示:
public static class Employee implements Serializable {
private String name;
private long salary;
// Constructors, getters, setters...
}
public static class Average implements Serializable {
private long sum;
private long count;
// Constructors, getters, setters...
}
public static class MyAverage extends Aggregator<Employee, Average, Double> {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
public Average zero() {
return new Average(0L, 0L);
}
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
public Average reduce(Average buffer, Employee employee) {
long newSum = buffer.getSum() + employee.getSalary();
long newCount = buffer.getCount() + 1;
buffer.setSum(newSum);
buffer.setCount(newCount);
return buffer;
}
// Merge two intermediate values
public Average merge(Average b1, Average b2) {
long mergedSum = b1.getSum() + b2.getSum();
long mergedCount = b1.getCount() + b2.getCount();
b1.setSum(mergedSum);
b1.setCount(mergedCount);
return b1;
}
// Transform the output of the reduction
public Double finish(Average reduction) {
return ((double) reduction.getSum()) / reduction.getCount();
}
// Specifies the Encoder for the intermediate value type
public Encoder<Average> bufferEncoder() {
return Encoders.bean(Average.class);
}
// Specifies the Encoder for the final output value type
public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}
Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder);
ds.show();
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
MyAverage myAverage = new MyAverage();
// Convert the function to a `TypedColumn` and give it a name
TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary");
Dataset<Double> result = ds.select(averageSalary);
result.show();
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+