2021-10-20 Scala Basics

Apache Spark with Scala-Hands on with big data

scala是funcional language

prep

image.png

intelliJ import項(xiàng)目的時(shí)候報(bào)錯(cuò):scala compiler not enough space
增加環(huán)境變量 _JAVA_OPTIONS: -Xmx512M
會(huì)報(bào)錯(cuò) picked up 。。。但可以正常跑完

新建 scala worksheet

scala 基本語(yǔ)法

  • values不可更改 val hello: String = "Hello"
  • varables 可更改
  • 類型:Int Boolean Char Double Float Long Byte

print

println()

println(f"is $a%.3f")
類似prinf,formating print;$表示一個(gè)變量;%部分為格式設(shè)定
%05d 補(bǔ)齊5位

println(s"is $a")
插入變量
${1+2} expression計(jì)算

正則表達(dá)式

   val theUltimateAnswer: String = "To life, the universe, and everything is 42."
   val pattern = """.* ([\d]+).*""".r
   val pattern(answerString) = theUltimateAnswer
   val answer = answerString.toInt
   println(answer)
   // VALUES are immutable constants.
   val hello: String = "Hola!"

   // VARIABLES are mutable
   var helloThere: String = hello
   helloThere = hello + " There!"
   println(helloThere)

   val immutableHelloThere = hello + " There"
   println(immutableHelloThere)

   // Data Types

   val numberOne: Int = 1
   val truth: Boolean = true
   val letterA: Char = 'a'
   val pi: Double = 3.14159265
   val piSinglePrecision: Float = 3.14159265f
   val bigNumber: Long = 123456789
   val smallNumber: Byte = 127

   println("Here is a mess: " + numberOne + truth + letterA + pi + bigNumber)

   println(f"Pi is about $piSinglePrecision%.3f")
   println(f"Zero padding on the left: $numberOne%05d")

   println(s"I can use the s prefix to use variables like $numberOne $truth $letterA")

   println(s"The s prefix isn't limited to variables; I can include any expression. Like ${1+2}")



   // Booleans
   val isGreater = 1 > 2
   val isLesser = 1 < 2
   val impossible = isGreater & isLesser
   val anotherWay = isGreater || isLesser

   val picard: String = "Picard"
   val bestCaptain: String = "Picard"
   val isBest: Boolean = picard == bestCaptain

Control Flow

// Flow control

// If / else:
if (1 > 3) println("Impossible!") else println("The world makes sense.")

if (1 > 3) {
  println("Impossible!")
  println("Really?")
} else {
  println("The world makes sense.")
  println("still.")
}

// Matching
val number = 2
number match {
  case 1 => println("One")
  case 2 => println("Two")
  case 3 => println("Three")
  case _ => println("Something else")
}

for (x <- 1 to 4) {
  val squared = x * x
  println(squared)
}

var x = 10
while (x >= 0) {
  println(x)
  x -= 1
}

x = 0
do { println(x); x+=1 } while (x <= 10)

// Expressions

{val x = 10; x + 20}

println({val x = 10; x + 20})

expression

{val x = 10; x + 20} 返回表達(dá)式最后的值

Functions

不要忘記等號(hào)
不需要return,最后一個(gè)表達(dá)式的值會(huì)被默認(rèn)返回

def squareIt(x: Int) : Int = {
  x * x
}

函數(shù)可以將函數(shù)作為參數(shù)

def transformInt(x: Int, f: Int => Int): Int = {
  f(x)
}

將函數(shù)名稱作為參數(shù)傳給y
或者放一個(gè)匿名函數(shù)

transformInt(2, cubeIt)

transformInt(3, x => x * x * x)

完整代碼

// Functions

// format def <function name>(parameter name: type...) : return type = { }

def squareIt(x: Int) : Int = {
  x * x
}

def cubeIt(x : Int) : Int = {x * x * x}

println(squareIt(2))

println(cubeIt(3))

def transformInt(x: Int, f: Int => Int): Int = {
  f(x)
}

val result = transformInt(2, cubeIt)
println(result)

// 匿名函數(shù)
transformInt(3, x => x * x * x)

transformInt(10, x => x / 2)

transformInt(2, x => {val y = x * 2; y * y}) //多行匿名函數(shù)

data structure

tuples

可以不同類型
1-based

// Tuples
// Immutable lists

val captainStuff = ("Picard", "Enterprise-D", "NCC-1701-D")
println(captainStuff)

// Refer to the individual fields with a ONE-BASED index
println(captainStuff._1)
println(captainStuff._2)
println(captainStuff._3)

val picardsShip = "Picard" -> "Enterprise-D"
println(picardsShip._2)

val aBunchOfStuff = ("Kirk", 1964, true)

lists

必須同一類型
0-based
head :第一個(gè)元素
tail:除去第一個(gè)的剩下的元素
map:將函數(shù)應(yīng)用于list所有元素
reduce:當(dāng)前輸出給x,新元素給y。
合并list: ++

// Lists
// Like a tuple, but more functionality
// Must be of same type

val shipList = List("Enterprise", "Defiant", "Voyager", "Deep Space Nine")

println(shipList(1))
// zero-based

println(shipList.head)
println(shipList.tail)

for (ship <- shipList) {println(ship)}

val backwardShips = shipList.map( (ship: String) => {ship.reverse})
for (ship <- backwardShips) {println(ship)}

// reduce() to combine together all the items in a collection using some function
val numberList = List(1, 2, 3, 4,5 )
val sum = numberList.reduce( (x: Int, y: Int) => x + y)
println(sum)

// filter() removes stuff
val iHateFives = numberList.filter( (x: Int) => x != 5)

val iHateThrees = numberList.filter(_ != 3)

// Concatenate lists
val moreNumbers = List(6,7,8)
val lotsOfNumbers = numberList ++ moreNumbers

val reversed = numberList.reverse
val sorted = reversed.sorted
val lotsOfDuplicates = numberList ++ numberList
val distinctValues = lotsOfDuplicates.distinct
val maxValue = numberList.max
val total = numberList.sum
val hasThree = iHateThrees.contains(3)

Maps

類似字典

// MAPS
val shipMap = Map("Kirk" -> "Enterprise", "Picard" -> "Enterprise-D", "Sisko" -> "Deep Space Nine", "Janeway" -> "Voyager")
println(shipMap("Janeway"))
println(shipMap.contains("Archer"))
val archersShip = util.Try(shipMap("Archer")) getOrElse "Unknown"
println(archersShip)

RDD

RDD: Resilient Distributed Dataset
rows


如何創(chuàng)建RDD

transforming RDDs

  • map
  • flatmap:one row of RDD -> multiple rows of RDDs
  • filter
  • distinct
  • sample
  • union, intersection, substract, cartesian
    RDD actions
  • collect
  • count
  • countByValue
  • take
  • top
  • reduce

Key/Value RDD

totalsByAge = rdd.map( x => (x,1))

  • reduceByKey() rdd.reduceByKey((x+y) => x+y) 將同一個(gè)key的所有values相加。x可認(rèn)為是當(dāng)前running total,y是新的一個(gè)value
  • groupByKey()
  • sortByKey
    -keys() values() :創(chuàng)建一個(gè)RDD,只有keys 或者values
  • join,rightOuterJoin, leftOuterJoin,cogroup,subtractByKey
  • mapValues :只針對(duì)value應(yīng)用函數(shù)

代碼解讀
rdd的每行是一個(gè)tuple (age,numFriends)

  • mapValues :values從numFriends,變?yōu)?(numFriends,1)
  • reduceByKey:x和y都是tuples,前者是部分計(jì)算的結(jié)果,后者是新的未計(jì)算的一個(gè)tuple。 tuple的第一個(gè)元素加和,tuple的第二個(gè)元素加和
  • 結(jié)果 (age, (totalFriends, totalInstances))
    val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))
image.png

Filter

括號(hào)里寫一個(gè)返回布爾值的函數(shù)
val minTemps = parsedLines.filter(x=>x._2 == "TMIN")

Map & FlatMap

Map是一對(duì)一的轉(zhuǎn)換,row in row out
FlatMap是一對(duì)多的轉(zhuǎn)換


image.png

正則表達(dá)式
\\W+

項(xiàng)目總結(jié)

RatingsCounter

數(shù)據(jù)

  1. 創(chuàng)建一個(gè)sc對(duì)象,讀取數(shù)據(jù)
  2. lines為RDD,每行為一個(gè)String
  3. map 每行執(zhí)行行數(shù),提取第三列 。存入ratings(RDD)
  4. countByValue 對(duì)所有行統(tǒng)計(jì),每個(gè)unique值計(jì)數(shù)。results(Map[String,Long])
  5. results.toSeq.sortBy(_._1) Map可以排序,按照第一列
    6.打印 foreach(println)
val sc = new SparkContext("local[*]", "RatingsCounter")
val lines = sc.textFile("data/ml-100k/u.data")

MaxTemperatures

  1. parseLine 函數(shù) :讀入one line,返回tuple
    var fields = lines.split(",")
    val.toFloat val.toInt

  2. rdd.filter(x => x._2 == "TMAX")

  3. reduceByKey

  4. result = rdd.collect() 這時(shí)才會(huì)計(jì)算。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spark算子總結(jié) 算子分類 Transformation(轉(zhuǎn)換)轉(zhuǎn)換算子含義map(func)返回一個(gè)新的RDD...
    ronnie_yuan閱讀 563評(píng)論 0 0
  • 17.分區(qū)分桶的區(qū)別,為什么要分區(qū) 分區(qū)表:原來(lái)的一個(gè)大表存儲(chǔ)的時(shí)候分成不同的數(shù)據(jù)目錄進(jìn)行存儲(chǔ)。如果說(shuō)是單分區(qū)表,...
    qydong閱讀 766評(píng)論 0 0
  • http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIEx...
    scandly閱讀 491評(píng)論 0 1
  • 1、RDD RDD(Resilient Distributed Dataset彈性分布式數(shù)據(jù)集)是Spark中抽象...
    青禾ws閱讀 687評(píng)論 2 3
  • http://spark.apache.org/docs/latest/api/python/index.html...
    mpro閱讀 6,281評(píng)論 0 4

友情鏈接更多精彩內(nèi)容