Filtering RDDs
Basic idea: strip info that is not needed at this point, save Spark resource .
找數(shù)據(jù)中氣溫最低的例子, min temperatures example.
we have data with the form:
location,date,type(can be TMAX or TMIN), temp
and want to filter put those don't have "TMIN" column
val minTemps = parseLines.filter(x =>x._2=="TMIN")
/** Find the minimum temperature by weather station */
object MinTemperatures {
def parseLine(line:String)= {
val fields = line.split(",")
val stationID = fields(0)
val entryType = fields(2)
val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f
(stationID, entryType, temperature)
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "MinTemperatures")
// Read each line of input data
val lines = sc.textFile("../1800.csv")
// Convert to (stationID, entryType, temperature) tuples
val parsedLines = lines.map(parseLine)
// Filter out all but TMIN entries
val minTemps = parsedLines.filter(x => x._2 == "TMIN")
// Convert from tuple of three to tuple of two (stationID, temperature), now stationTemp is a key-value RDD
val stationTemps = minTemps.map(x => (x._1, x._3.toFloat))
// Reduce by stationID retaining the minimum temperature found.
//這一行是 aggregate together by station ID as the key value, and get the min
val minTempsByStation = stationTemps.reduceByKey( (x,y) => min(x,y))
// Collect, format, and print the results
//collect() alows RDD transform to scala object that can be iterated through
val results = minTempsByStation.collect()
for (result <- results.sorted) {
val station = result._1
val temp = result._2
val formattedTemp = f"$temp%.2f F"
println(s"$station minimum temperature: $formattedTemp")
}
}
}
Map vs FlatMap
map is one-one relation from input to output, while flatmap can be one-one, one to many or none
recall Map() operation:

flatmap return a list of result

Count Number of words appreance in book example 例子
Naive example use countByValue() which is not efficience.
Better way in the later code use sortByKey()
Split by regular expression, and sort the result.
code on how to count the word
val wordCounts = lowercaseWords.map(x => (x,1).reduceByKey((x,y) => x+y)
first map each word to a tuple, (word,1) 1 means it appear once. same tuple will appear many times if this word appear multiple times. And reduceByKey() aggregate all tuple with same word, and adding the value together.
val wordCountsSorted = wordCounts.map(x=>(x._2,x._1)).sortByKey()
1.filp (word,count) pairs to (count,word)
2.call sortByKey()
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
/** Count up how many of each word occurs in a book, using regular expressions and sorting the final results */
object WordCountBetterSorted {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using the local machine
val sc = new SparkContext("local", "WordCountBetterSorted")
// Load each line of my book into an RDD
val input = sc.textFile("../book.txt")
// Split using a regular expression that extracts words
val words = input.flatMap(x => x.split("\\W+"))
// Normalize everything to lowercase
val lowercaseWords = words.map(x => x.toLowerCase())
// Count of the occurrences of each word
val wordCounts = lowercaseWords.map(x => (x, 1)).reduceByKey( (x,y) => x + y )
// Flip (word, count) tuples to (count, word) and then sort by key (the counts)
val wordCountsSorted = wordCounts.map( x => (x._2, x._1) ).sortByKey()
// Print the results, flipping the (count, word) results to word: count as we go.
for (result <- wordCountsSorted) {
val count = result._1
val word = result._2
println(s"$word: $count")
}
}
}
提升練習(xí):
use Filter() to filter out some top-occurance but not interesting words, like "the" "to" "a"...