Spark--Spark求分組TopN(RDD及SQL方式)面試必問(wèn)

Spark RDD方式求topN

詳見(jiàn)代碼:

測(cè)試數(shù)據(jù):

aa 49
bb 11
cc 34
aa 22
bb 67
cc 29
aa 36
bb 33
cc 30
aa 11
bb 44
cc 49

Spark RDD 代碼

package cn.ted.secondarySort

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
  * Author:  LiYahui
  * Date:  Created in  2019/3/1 10:57
  * Description: TODO spark 算子求分組topN,需要實(shí)現(xiàn)組內(nèi)排序
  * Version: V1.0         
  */
object GroupedTopN {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
        .appName(s"${this.getClass.getSimpleName}")
        .master("local[2]")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.parquet.compression.codec", "gzip")
        .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    val inputPath = "F:\\LocalFileForTest\\topN"
    //-------------------------------------------------------------------------------------------
    //以數(shù)組得形式進(jìn)行返回
    val resultArray: Array[(String, List[String])] = sc.textFile(inputPath)
        .map(_.split(" "))
        .map(line => (line(0), line(1)))
        .groupByKey()
        .map(line => {
          (line._1, line._2.toList.sortWith(_.toInt > _.toInt).take(3)) //按照降序進(jìn)行排列
        }).collect()
    //將結(jié)果進(jìn)行打印
    for (ele <- resultArray) {
      println("結(jié)果數(shù)據(jù)中的元素為:" + ele)
    }
    /**
      * 結(jié)果數(shù)據(jù)中的元素為:(aa,List(49, 36, 22))
      * 結(jié)果數(shù)據(jù)中的元素為:(bb,List(67, 44, 33))
      * 結(jié)果數(shù)據(jù)中的元素為:(cc,List(49, 34, 30))
      */
    //--------------------------------------------------------------------------------------------------
    //方式二,能進(jìn)行實(shí)際開(kāi)發(fā)使用的。這種的效率
    val resultRDD: RDD[(String, List[String])] = sc.textFile(inputPath)
        .map(_.split(" "))
        .map(line => (line(0), line(1)))
        .groupByKey()
        .map(line => {
          (line._1, line._2.toList.sortWith(_ > _).take(2)) //按照降序進(jìn)行排列
        })
    //    直接進(jìn)行toDF操作,轉(zhuǎn)換成dataframe
    import spark.implicits._
    val frame: DataFrame = resultRDD.toDF("key", "value")

    frame.show()

    /**
      * +---+--------+
      * |key|   value|
      * +---+--------+
      * | aa|[49, 36]|
      * | bb|[67, 44]|
      * | cc|[49, 34]|
      * +---+--------+
      */
    //----------------------------------------------------------------------------------------------
    println("===============華麗分割線=====================")
    //采用spark core的方式進(jìn)行轉(zhuǎn)換到df
    //將分組的好的topN轉(zhuǎn)化成可用的rdd或者是dataframe
    val tempRow: RDD[Row] = resultRDD.flatMap(line => {
      val key: String = line._1.toString
      val value: List[String] = line._2
      flatMapTransformRow(key, value)
    })

    //    定義spark schema
    val schema = StructType(List(
      StructField("key", StringType, false),
      StructField("value", StringType, false)
    ))

    val tempDF: DataFrame = spark.createDataFrame(tempRow, schema)
    tempDF.show()

    /**
      * +---+-----+
      * |key|value|
      * +---+-----+
      * | aa|   49|
      * | aa|   36|
      * | bb|   67|
      * | bb|   44|
      * | cc|   49|
      * | cc|   34|
      * +---+-----+
      */
    //-------------------------------------------------------------------------

    spark.stop()
    sc.stop()
  }

  /**
    * 將 rdd進(jìn)行列轉(zhuǎn)行
    *
    * @param key
    * @param value
    * @return
    */
  def flatMapTransformRow(key: String, value: List[String]) = {
    //  定義最后的返回格式
    var resultRow: Seq[Row] = Seq[Row]()
    for (ele <- value) {
      //注意此處書寫格式
      resultRow = resultRow :+ Row(key, ele)
    }
    resultRow
  }


  /**
    * 數(shù)據(jù)源:
    * aa 11
    * bb 11
    * cc 34
    * aa 22
    * bb 67
    * cc 29
    * aa 36
    * bb 33
    * cc 30
    * aa 42
    * bb 44
    * cc 49
    *
    * 需求:1、對(duì)上述數(shù)據(jù)按key值進(jìn)行分組
    *
    * 2、對(duì)分組后的值進(jìn)行排序
    *
    * 3、截取分組后值得top 3位以key-value形式返回結(jié)果
    */

}

Spark SQL代碼

代碼如下:

  • 給出的建議:代碼的要回寫,sql風(fēng)格的代碼是需要更要會(huì)寫的,面試的時(shí)候經(jīng)常會(huì)問(wèn)道,讓你手寫,sql的功力還是需要經(jīng)常進(jìn)行練習(xí)的。
package cn.ted.secondarySort

import org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
  * Author:  LiYahui
  * Date:  Created in  2019/3/1 10:57
  * Description: TODO spark 算子求分組topN,需要實(shí)現(xiàn)組內(nèi)排序
  * Version: V1.0         
  */
object GroupedTopN {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
        .appName(s"${this.getClass.getSimpleName}")
        .master("local[2]")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.parquet.compression.codec", "gzip")
        .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    val inputPath = "F:\\LocalFileForTest\\topN"
    val tmpDF: DataFrame = sc.textFile(inputPath).map(line => {
      val arr: Array[String] = line.split(" ")
      (arr(0), arr(1))
    }).toDF("key", "value")
    // DSL風(fēng)格
    import org.apache.spark.sql.functions.row_number
    //注意:執(zhí)行排序的時(shí)候需要對(duì)字段加$引用
    val windowRule: WindowSpec = Window.partitionBy("key").orderBy($"value".desc)
    val resultDS: Dataset[Row] = tmpDF.withColumn("rank", row_number.over(windowRule))
        .where("rank<3")
    resultDS.show()

    /**
      * +---+-----+----+
      * |key|value|rank|
      * +---+-----+----+
      * | cc|   49|   1|
      * | cc|   34|   2|
      * | bb|   67|   1|
      * | bb|   44|   2|
      * | aa|   49|   1|
      * | aa|   36|   2|
      * +---+-----+----+
      */
    tmpDF.createOrReplaceTempView("tmp")
    //sql  風(fēng)格
    val ranksql = "select key,value,row_number() over(partition by key order by value desc) as rank from tmp having rank <3"
    spark.sql(ranksql).show()

    /**
      * +---+-----+----+
      * |key|value|rank|
      * +---+-----+----+
      * | cc|   49|   1|
      * | cc|   34|   2|
      * | bb|   67|   1|
      * | bb|   44|   2|
      * | aa|   49|   1|
      * | aa|   36|   2|
      * +---+-----+----+
      */

    spark.stop()
    sc.stop()
  }


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容