Apache Spark with Scala-Hands on with big data
scala是funcional language
prep

intelliJ import項(xiàng)目的時(shí)候報(bào)錯(cuò):scala compiler not enough space
增加環(huán)境變量 _JAVA_OPTIONS: -Xmx512M
會(huì)報(bào)錯(cuò) picked up 。。。但可以正常跑完
新建 scala worksheet
scala 基本語(yǔ)法
- values不可更改
val hello: String = "Hello" - varables 可更改
- 類型:Int Boolean Char Double Float Long Byte
println()
println(f"is $a%.3f")
類似prinf,formating print;$表示一個(gè)變量;%部分為格式設(shè)定
%05d 補(bǔ)齊5位
println(s"is $a")
插入變量
${1+2} expression計(jì)算
正則表達(dá)式
val theUltimateAnswer: String = "To life, the universe, and everything is 42."
val pattern = """.* ([\d]+).*""".r
val pattern(answerString) = theUltimateAnswer
val answer = answerString.toInt
println(answer)
// VALUES are immutable constants.
val hello: String = "Hola!"
// VARIABLES are mutable
var helloThere: String = hello
helloThere = hello + " There!"
println(helloThere)
val immutableHelloThere = hello + " There"
println(immutableHelloThere)
// Data Types
val numberOne: Int = 1
val truth: Boolean = true
val letterA: Char = 'a'
val pi: Double = 3.14159265
val piSinglePrecision: Float = 3.14159265f
val bigNumber: Long = 123456789
val smallNumber: Byte = 127
println("Here is a mess: " + numberOne + truth + letterA + pi + bigNumber)
println(f"Pi is about $piSinglePrecision%.3f")
println(f"Zero padding on the left: $numberOne%05d")
println(s"I can use the s prefix to use variables like $numberOne $truth $letterA")
println(s"The s prefix isn't limited to variables; I can include any expression. Like ${1+2}")
// Booleans
val isGreater = 1 > 2
val isLesser = 1 < 2
val impossible = isGreater & isLesser
val anotherWay = isGreater || isLesser
val picard: String = "Picard"
val bestCaptain: String = "Picard"
val isBest: Boolean = picard == bestCaptain
Control Flow
// Flow control
// If / else:
if (1 > 3) println("Impossible!") else println("The world makes sense.")
if (1 > 3) {
println("Impossible!")
println("Really?")
} else {
println("The world makes sense.")
println("still.")
}
// Matching
val number = 2
number match {
case 1 => println("One")
case 2 => println("Two")
case 3 => println("Three")
case _ => println("Something else")
}
for (x <- 1 to 4) {
val squared = x * x
println(squared)
}
var x = 10
while (x >= 0) {
println(x)
x -= 1
}
x = 0
do { println(x); x+=1 } while (x <= 10)
// Expressions
{val x = 10; x + 20}
println({val x = 10; x + 20})
expression
{val x = 10; x + 20} 返回表達(dá)式最后的值
Functions
不要忘記等號(hào)
不需要return,最后一個(gè)表達(dá)式的值會(huì)被默認(rèn)返回
def squareIt(x: Int) : Int = {
x * x
}
函數(shù)可以將函數(shù)作為參數(shù)
def transformInt(x: Int, f: Int => Int): Int = {
f(x)
}
將函數(shù)名稱作為參數(shù)傳給y
或者放一個(gè)匿名函數(shù)
transformInt(2, cubeIt)
transformInt(3, x => x * x * x)
完整代碼
// Functions
// format def <function name>(parameter name: type...) : return type = { }
def squareIt(x: Int) : Int = {
x * x
}
def cubeIt(x : Int) : Int = {x * x * x}
println(squareIt(2))
println(cubeIt(3))
def transformInt(x: Int, f: Int => Int): Int = {
f(x)
}
val result = transformInt(2, cubeIt)
println(result)
// 匿名函數(shù)
transformInt(3, x => x * x * x)
transformInt(10, x => x / 2)
transformInt(2, x => {val y = x * 2; y * y}) //多行匿名函數(shù)
data structure
tuples
可以不同類型
1-based
// Tuples
// Immutable lists
val captainStuff = ("Picard", "Enterprise-D", "NCC-1701-D")
println(captainStuff)
// Refer to the individual fields with a ONE-BASED index
println(captainStuff._1)
println(captainStuff._2)
println(captainStuff._3)
val picardsShip = "Picard" -> "Enterprise-D"
println(picardsShip._2)
val aBunchOfStuff = ("Kirk", 1964, true)
lists
必須同一類型
0-based
head :第一個(gè)元素
tail:除去第一個(gè)的剩下的元素
map:將函數(shù)應(yīng)用于list所有元素
reduce:當(dāng)前輸出給x,新元素給y。
合并list: ++
// Lists
// Like a tuple, but more functionality
// Must be of same type
val shipList = List("Enterprise", "Defiant", "Voyager", "Deep Space Nine")
println(shipList(1))
// zero-based
println(shipList.head)
println(shipList.tail)
for (ship <- shipList) {println(ship)}
val backwardShips = shipList.map( (ship: String) => {ship.reverse})
for (ship <- backwardShips) {println(ship)}
// reduce() to combine together all the items in a collection using some function
val numberList = List(1, 2, 3, 4,5 )
val sum = numberList.reduce( (x: Int, y: Int) => x + y)
println(sum)
// filter() removes stuff
val iHateFives = numberList.filter( (x: Int) => x != 5)
val iHateThrees = numberList.filter(_ != 3)
// Concatenate lists
val moreNumbers = List(6,7,8)
val lotsOfNumbers = numberList ++ moreNumbers
val reversed = numberList.reverse
val sorted = reversed.sorted
val lotsOfDuplicates = numberList ++ numberList
val distinctValues = lotsOfDuplicates.distinct
val maxValue = numberList.max
val total = numberList.sum
val hasThree = iHateThrees.contains(3)
Maps
類似字典
// MAPS
val shipMap = Map("Kirk" -> "Enterprise", "Picard" -> "Enterprise-D", "Sisko" -> "Deep Space Nine", "Janeway" -> "Voyager")
println(shipMap("Janeway"))
println(shipMap.contains("Archer"))
val archersShip = util.Try(shipMap("Archer")) getOrElse "Unknown"
println(archersShip)
RDD
RDD: Resilient Distributed Dataset
rows

transforming RDDs
- map
- flatmap:one row of RDD -> multiple rows of RDDs
- filter
- distinct
- sample
- union, intersection, substract, cartesian
RDD actions - collect
- count
- countByValue
- take
- top
- reduce
Key/Value RDD
totalsByAge = rdd.map( x => (x,1))
- reduceByKey()
rdd.reduceByKey((x+y) => x+y)將同一個(gè)key的所有values相加。x可認(rèn)為是當(dāng)前running total,y是新的一個(gè)value - groupByKey()
- sortByKey
-keys() values() :創(chuàng)建一個(gè)RDD,只有keys 或者values - join,rightOuterJoin, leftOuterJoin,cogroup,subtractByKey
- mapValues :只針對(duì)value應(yīng)用函數(shù)
代碼解讀
rdd的每行是一個(gè)tuple (age,numFriends)
- mapValues :values從numFriends,變?yōu)?(numFriends,1)
- reduceByKey:x和y都是tuples,前者是部分計(jì)算的結(jié)果,后者是新的未計(jì)算的一個(gè)tuple。 tuple的第一個(gè)元素加和,tuple的第二個(gè)元素加和
- 結(jié)果 (age, (totalFriends, totalInstances))
val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))

Filter
括號(hào)里寫一個(gè)返回布爾值的函數(shù)
val minTemps = parsedLines.filter(x=>x._2 == "TMIN")
Map & FlatMap
Map是一對(duì)一的轉(zhuǎn)換,row in row out
FlatMap是一對(duì)多的轉(zhuǎn)換

正則表達(dá)式
\\W+
項(xiàng)目總結(jié)
RatingsCounter
數(shù)據(jù)
- 創(chuàng)建一個(gè)sc對(duì)象,讀取數(shù)據(jù)
- lines為RDD,每行為一個(gè)String
- map 每行執(zhí)行行數(shù),提取第三列 。存入ratings(RDD)
- countByValue 對(duì)所有行統(tǒng)計(jì),每個(gè)unique值計(jì)數(shù)。results(Map[String,Long])
- results.toSeq.sortBy(_._1) Map可以排序,按照第一列
6.打印 foreach(println)
val sc = new SparkContext("local[*]", "RatingsCounter")
val lines = sc.textFile("data/ml-100k/u.data")
MaxTemperatures
parseLine 函數(shù) :讀入one line,返回tuple
var fields = lines.split(",")
val.toFloatval.toIntrdd.filter(x => x._2 == "TMAX")
reduceByKey
result = rdd.collect() 這時(shí)才會(huì)計(jì)算。