Scala-Spark-notes&summary2

Basic Spark

spark : "A fast and general engine for large-scale data processing"

spark的結(jié)構(gòu)

Resilient Distributed Datasets(RDD)彈性分布式數(shù)據(jù)集
resilient: 冗余的,可容錯的數(shù)據(jù).

從外界導(dǎo)入spark的數(shù)據(jù), spark將它轉(zhuǎn)為RDD,
比如從hard drive, AWS, Hive, 等等
類型可以是textfile, csv, JSON, object file,等等.

對RDD有什么操作呢?

1.Transforming
for example:
map : apply a function to entire RDD
flatmap: if you don't have one-to-one relaiton ..?
filter
distinct
sample: good for testing
set operation: union, intersection, subtract .....

#map example
val rdd = sc.parallelize(List(1,2,3,4))  //sc stand for 'spark context
val squares = rdd.map(x => x*x)
#output 1,4,9,16
  1. RDD Actions, action actually triger Spark to do something under its system.
    Nothing happen untial you call an action -> "lazy evaluation". when action is called, spark construct a DAG and find an optimal way to do it for you.
    collect
    count
    countByvalue
    take
    top reduce ...

一個例子

val sc = new SparkContext("local[*]", "RatingsCounter")
use SparkContext() to create RDD,
local means run on local machine, [*] use all cores in the cpu,
"RatingssCounter" is the name of it.

val results = ratings.countByValue()
recalll that count by value is an Action, that Spark actually give a Scala map

val lines = sc.textFile("../ml-100k/u.data") now the RDD live within the new lines

val ratings = lines.map(x => x.toString().split("\t")(2)) "\t" split base on Tab
next line of code,
how to get the rating value of each user. before this operation, there are 4 columns and we only care about third column- rating at this monent, so we use map() to apply split() method in each lines.

image.png
countByValue

Sort the resulting map of (rating, count) tuples. need to convert Map to sequence to sort it. it return:
1 2
2 1
3 2 //sorted by rating
val sortedResults = results.toSeq.sortBy(_._1)

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._

/** Count up how many of each star rating exists in the MovieLens 100K data set. */
object RatingsCounter {
 
  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
        
    // Create a SparkContext using every core of the local machine, named RatingsCounter
    val sc = new SparkContext("local[*]", "RatingsCounter")
   
    // Load up each line of the ratings data into an RDD
    val lines = sc.textFile("../ml-100k/u.data")
    
    // Convert each line to a string, split it out by tabs, and extract the third field.
    // (The file format is userID, movieID, rating, timestamp)
    val ratings = lines.map(x => x.toString().split("\t")(2))
    
    // Count up how many times each value (rating) occurs
    val results = ratings.countByValue()
    
    // Sort the resulting map of (rating, count) tuples
    val sortedResults = results.toSeq.sortBy(_._1)
    
    // Print each result on its own line.
    sortedResults.foreach(println)
  }
}

What is inside the process under Spark?
Notice that countByValue() shuffle the data, cannot be processed parallely. so it consider a expensive operation.
Each stage is broken into tasks, which can be distributed into clusters.

image.png

Key-Value RDD

image.png

In scala, map pairs of data inot RDD using tuple. eg.
totalByAge = rdd.map(x => (x,1))

有了key-value RDD可以做什么?

reduceByKey
(x,y) is two value of same key, combine to x+y
groupByKey, group all values in a list with same key.

Also:


更多RDD操作

image.png

More example, 一個將數(shù)據(jù)轉(zhuǎn)成key-value 然后計算平均數(shù)的例子

data looks like this :
goal is find the Avg number of friends by given age.

data

first part of code: 把原始數(shù)據(jù)轉(zhuǎn)成 key-value pairs, 方便下一步計算

image.png

second part:
val totalsByAge =
rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))

part 1 of the long line
add value with same key

and last compute the average:


image.png

finally, call action to actually let Spark to do something:


action

Complete code

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

/** Compute the average number of friends by age in a social network. */
object FriendsByAge {
  
  /** A function that splits a line of input into (age, numFriends) tuples. */
  def parseLine(line: String) = {
      // Split by commas
      val fields = line.split(",")
      // Extract the age and numFriends fields, and convert to integers
      val age = fields(2).toInt
      val numFriends = fields(3).toInt
      // Create a tuple that is our result.
      (age, numFriends)
  }
  
  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
        
    // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "FriendsByAge")
  
    // Load each line of the source data into an RDD
    val lines = sc.textFile("../fakefriends.csv")
    
    // Use our parseLines function to convert to (age, numFriends) tuples
    val rdd = lines.map(parseLine)
    
    // Lots going on here...
    // We are starting with an RDD of form (age, numFriends) where age is the KEY and numFriends is the VALUE
    // We use mapValues to convert each numFriends value to a tuple of (numFriends, 1)
    // Then we use reduceByKey to sum up the total numFriends and total instances for each age, by
    // adding together all the numFriends values and 1's respectively.
    val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))
    
    // So now we have tuples of (age, (totalFriends, totalInstances))
    // To compute the average we divide totalFriends / totalInstances for each age.
    val averagesByAge = totalsByAge.mapValues(x => x._1 / x._2)
    
    // Collect the results from the RDD (This kicks off computing the DAG and actually executes the job)
    val results = averagesByAge.collect()
    
    // Sort and print the final results.
    results.sorted.foreach(println)
  }
    
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容