Spark RDD方式求topN
詳見(jiàn)代碼:
測(cè)試數(shù)據(jù):
aa 49
bb 11
cc 34
aa 22
bb 67
cc 29
aa 36
bb 33
cc 30
aa 11
bb 44
cc 49
Spark RDD 代碼
package cn.ted.secondarySort
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
* Author: LiYahui
* Date: Created in 2019/3/1 10:57
* Description: TODO spark 算子求分組topN,需要實(shí)現(xiàn)組內(nèi)排序
* Version: V1.0
*/
object GroupedTopN {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(s"${this.getClass.getSimpleName}")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.parquet.compression.codec", "gzip")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
val inputPath = "F:\\LocalFileForTest\\topN"
//-------------------------------------------------------------------------------------------
//以數(shù)組得形式進(jìn)行返回
val resultArray: Array[(String, List[String])] = sc.textFile(inputPath)
.map(_.split(" "))
.map(line => (line(0), line(1)))
.groupByKey()
.map(line => {
(line._1, line._2.toList.sortWith(_.toInt > _.toInt).take(3)) //按照降序進(jìn)行排列
}).collect()
//將結(jié)果進(jìn)行打印
for (ele <- resultArray) {
println("結(jié)果數(shù)據(jù)中的元素為:" + ele)
}
/**
* 結(jié)果數(shù)據(jù)中的元素為:(aa,List(49, 36, 22))
* 結(jié)果數(shù)據(jù)中的元素為:(bb,List(67, 44, 33))
* 結(jié)果數(shù)據(jù)中的元素為:(cc,List(49, 34, 30))
*/
//--------------------------------------------------------------------------------------------------
//方式二,能進(jìn)行實(shí)際開(kāi)發(fā)使用的。這種的效率
val resultRDD: RDD[(String, List[String])] = sc.textFile(inputPath)
.map(_.split(" "))
.map(line => (line(0), line(1)))
.groupByKey()
.map(line => {
(line._1, line._2.toList.sortWith(_ > _).take(2)) //按照降序進(jìn)行排列
})
// 直接進(jìn)行toDF操作,轉(zhuǎn)換成dataframe
import spark.implicits._
val frame: DataFrame = resultRDD.toDF("key", "value")
frame.show()
/**
* +---+--------+
* |key| value|
* +---+--------+
* | aa|[49, 36]|
* | bb|[67, 44]|
* | cc|[49, 34]|
* +---+--------+
*/
//----------------------------------------------------------------------------------------------
println("===============華麗分割線=====================")
//采用spark core的方式進(jìn)行轉(zhuǎn)換到df
//將分組的好的topN轉(zhuǎn)化成可用的rdd或者是dataframe
val tempRow: RDD[Row] = resultRDD.flatMap(line => {
val key: String = line._1.toString
val value: List[String] = line._2
flatMapTransformRow(key, value)
})
// 定義spark schema
val schema = StructType(List(
StructField("key", StringType, false),
StructField("value", StringType, false)
))
val tempDF: DataFrame = spark.createDataFrame(tempRow, schema)
tempDF.show()
/**
* +---+-----+
* |key|value|
* +---+-----+
* | aa| 49|
* | aa| 36|
* | bb| 67|
* | bb| 44|
* | cc| 49|
* | cc| 34|
* +---+-----+
*/
//-------------------------------------------------------------------------
spark.stop()
sc.stop()
}
/**
* 將 rdd進(jìn)行列轉(zhuǎn)行
*
* @param key
* @param value
* @return
*/
def flatMapTransformRow(key: String, value: List[String]) = {
// 定義最后的返回格式
var resultRow: Seq[Row] = Seq[Row]()
for (ele <- value) {
//注意此處書寫格式
resultRow = resultRow :+ Row(key, ele)
}
resultRow
}
/**
* 數(shù)據(jù)源:
* aa 11
* bb 11
* cc 34
* aa 22
* bb 67
* cc 29
* aa 36
* bb 33
* cc 30
* aa 42
* bb 44
* cc 49
*
* 需求:1、對(duì)上述數(shù)據(jù)按key值進(jìn)行分組
*
* 2、對(duì)分組后的值進(jìn)行排序
*
* 3、截取分組后值得top 3位以key-value形式返回結(jié)果
*/
}
Spark SQL代碼
代碼如下:
- 給出的建議:代碼的要回寫,sql風(fēng)格的代碼是需要更要會(huì)寫的,面試的時(shí)候經(jīng)常會(huì)問(wèn)道,讓你手寫,sql的功力還是需要經(jīng)常進(jìn)行練習(xí)的。
package cn.ted.secondarySort
import org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
* Author: LiYahui
* Date: Created in 2019/3/1 10:57
* Description: TODO spark 算子求分組topN,需要實(shí)現(xiàn)組內(nèi)排序
* Version: V1.0
*/
object GroupedTopN {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(s"${this.getClass.getSimpleName}")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.parquet.compression.codec", "gzip")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
val inputPath = "F:\\LocalFileForTest\\topN"
val tmpDF: DataFrame = sc.textFile(inputPath).map(line => {
val arr: Array[String] = line.split(" ")
(arr(0), arr(1))
}).toDF("key", "value")
// DSL風(fēng)格
import org.apache.spark.sql.functions.row_number
//注意:執(zhí)行排序的時(shí)候需要對(duì)字段加$引用
val windowRule: WindowSpec = Window.partitionBy("key").orderBy($"value".desc)
val resultDS: Dataset[Row] = tmpDF.withColumn("rank", row_number.over(windowRule))
.where("rank<3")
resultDS.show()
/**
* +---+-----+----+
* |key|value|rank|
* +---+-----+----+
* | cc| 49| 1|
* | cc| 34| 2|
* | bb| 67| 1|
* | bb| 44| 2|
* | aa| 49| 1|
* | aa| 36| 2|
* +---+-----+----+
*/
tmpDF.createOrReplaceTempView("tmp")
//sql 風(fēng)格
val ranksql = "select key,value,row_number() over(partition by key order by value desc) as rank from tmp having rank <3"
spark.sql(ranksql).show()
/**
* +---+-----+----+
* |key|value|rank|
* +---+-----+----+
* | cc| 49| 1|
* | cc| 34| 2|
* | bb| 67| 1|
* | bb| 44| 2|
* | aa| 49| 1|
* | aa| 36| 2|
* +---+-----+----+
*/
spark.stop()
sc.stop()
}