Basic Spark
spark : "A fast and general engine for large-scale data processing"

Resilient Distributed Datasets(RDD)彈性分布式數(shù)據(jù)集
resilient: 冗余的,可容錯的數(shù)據(jù).
從外界導(dǎo)入spark的數(shù)據(jù), spark將它轉(zhuǎn)為RDD,
比如從hard drive, AWS, Hive, 等等
類型可以是textfile, csv, JSON, object file,等等.
對RDD有什么操作呢?
1.Transforming
for example:
map : apply a function to entire RDD
flatmap: if you don't have one-to-one relaiton ..?
filter
distinct
sample: good for testing
set operation: union, intersection, subtract .....
#map example
val rdd = sc.parallelize(List(1,2,3,4)) //sc stand for 'spark context
val squares = rdd.map(x => x*x)
#output 1,4,9,16
- RDD Actions, action actually triger Spark to do something under its system.
Nothing happen untial you call an action -> "lazy evaluation". when action is called, spark construct a DAG and find an optimal way to do it for you.
collect
count
countByvalue
take
top reduce ...
一個例子
val sc = new SparkContext("local[*]", "RatingsCounter")
use SparkContext() to create RDD,
local means run on local machine, [*] use all cores in the cpu,
"RatingssCounter" is the name of it.
val results = ratings.countByValue()
recalll that count by value is an Action, that Spark actually give a Scala map
val lines = sc.textFile("../ml-100k/u.data") now the RDD live within the new lines
val ratings = lines.map(x => x.toString().split("\t")(2)) "\t" split base on Tab
next line of code,
how to get the rating value of each user. before this operation, there are 4 columns and we only care about third column- rating at this monent, so we use map() to apply split() method in each lines.


Sort the resulting map of (rating, count) tuples. need to convert Map to sequence to sort it. it return:
1 2
2 1
3 2 //sorted by rating
val sortedResults = results.toSeq.sortBy(_._1)
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
/** Count up how many of each star rating exists in the MovieLens 100K data set. */
object RatingsCounter {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine, named RatingsCounter
val sc = new SparkContext("local[*]", "RatingsCounter")
// Load up each line of the ratings data into an RDD
val lines = sc.textFile("../ml-100k/u.data")
// Convert each line to a string, split it out by tabs, and extract the third field.
// (The file format is userID, movieID, rating, timestamp)
val ratings = lines.map(x => x.toString().split("\t")(2))
// Count up how many times each value (rating) occurs
val results = ratings.countByValue()
// Sort the resulting map of (rating, count) tuples
val sortedResults = results.toSeq.sortBy(_._1)
// Print each result on its own line.
sortedResults.foreach(println)
}
}
What is inside the process under Spark?
Notice that countByValue() shuffle the data, cannot be processed parallely. so it consider a expensive operation.
Each stage is broken into tasks, which can be distributed into clusters.

Key-Value RDD

In scala, map pairs of data inot RDD using tuple. eg.
totalByAge = rdd.map(x => (x,1))

reduceByKey
(x,y) is two value of same key, combine to x+y
groupByKey, group all values in a list with same key.
Also:


More example, 一個將數(shù)據(jù)轉(zhuǎn)成key-value 然后計算平均數(shù)的例子
data looks like this :
goal is find the Avg number of friends by given age.

first part of code: 把原始數(shù)據(jù)轉(zhuǎn)成 key-value pairs, 方便下一步計算

second part:
val totalsByAge =
rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))


and last compute the average:

finally, call action to actually let Spark to do something:

Complete code
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
/** Compute the average number of friends by age in a social network. */
object FriendsByAge {
/** A function that splits a line of input into (age, numFriends) tuples. */
def parseLine(line: String) = {
// Split by commas
val fields = line.split(",")
// Extract the age and numFriends fields, and convert to integers
val age = fields(2).toInt
val numFriends = fields(3).toInt
// Create a tuple that is our result.
(age, numFriends)
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "FriendsByAge")
// Load each line of the source data into an RDD
val lines = sc.textFile("../fakefriends.csv")
// Use our parseLines function to convert to (age, numFriends) tuples
val rdd = lines.map(parseLine)
// Lots going on here...
// We are starting with an RDD of form (age, numFriends) where age is the KEY and numFriends is the VALUE
// We use mapValues to convert each numFriends value to a tuple of (numFriends, 1)
// Then we use reduceByKey to sum up the total numFriends and total instances for each age, by
// adding together all the numFriends values and 1's respectively.
val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))
// So now we have tuples of (age, (totalFriends, totalInstances))
// To compute the average we divide totalFriends / totalInstances for each age.
val averagesByAge = totalsByAge.mapValues(x => x._1 / x._2)
// Collect the results from the RDD (This kicks off computing the DAG and actually executes the job)
val results = averagesByAge.collect()
// Sort and print the final results.
results.sorted.foreach(println)
}
}