把項(xiàng)目中的注釋和筆記整理了一下
(實(shí)際業(yè)務(wù)的變量名和列名肯定不是下面示例這樣的......)
1. Spark的壓縮
一般不用動(dòng)。默認(rèn)shuffle等階段都會(huì)用壓縮,看到有文章對(duì)比,Spark默認(rèn)的lz4算法,壓縮和解壓時(shí)間略好于snappy。
2. spark讀取多個(gè)嵌套文件夾
val myrdd = sc.textFile("data/*/part-*")
Spark天然就支持。RDD的binaryFiles和textfile、wholeTextFiles等方法都支持。
3. 分布式spark環(huán)境下讀local文件
- 在分布式環(huán)境下讀寫local文件,發(fā)現(xiàn)結(jié)果集數(shù)量變少,但沒(méi)有報(bào)錯(cuò)。但網(wǎng)上有文章說(shuō)應(yīng)該在讀文件時(shí)報(bào)錯(cuò),除非每個(gè)節(jié)點(diǎn)都存在數(shù)據(jù)集。
不報(bào)錯(cuò)的原因可能是設(shè)置了:
val spark = SparkSession.builder.appName("myApp")
.config("spark.files.ignoreCorruptFiles", "true")
.config("spark.sql.files.ignoreCorruptFiles", "true")
.getOrCreate
但理論上這只是過(guò)濾錯(cuò)誤文件,也有點(diǎn)說(shuō)不通,詳細(xì)原因沒(méi)有深究。
4. 利用SparkSession方法讀CSV
val df = spark.read
.option("header", "true")//含表頭
.option("delimiter", ";")//分隔符
.option("encoding", "GBK")//編碼方式
.option("inferSchema","true") //一般建議關(guān)閉,性能有影響
.option("mode","DROPMALFORMED")//從數(shù)據(jù)集中刪除損壞的記錄
.csv("實(shí)際路徑")
- 此時(shí)最好設(shè)置spark.sql.files.ignoreCorruptFiles。如果利用RDD方法讀取csv(其實(shí)就是讀文本,自己再切分)則不必設(shè)置這一行(只設(shè)置spark.files.ignoreCorruptFiles)。
- SparkSession沒(méi)有binaryFiles的類似方法。
5. RDD轉(zhuǎn)Dataframe
RDD轉(zhuǎn)成Dataframe之后,代碼好寫很多,還可以用SQL,對(duì)比測(cè)試發(fā)現(xiàn)一般情況下效率和直接用RDD差不多。我用的方法是將數(shù)據(jù)(指定的列)轉(zhuǎn)成list映射到case class,然后toDF
case class myschema(
col1: String,
col2: String,
col3: Double)
然后RDD端:
val mydf = myrdd
.map(x => x.split(","))
.map(x => List(x(0), x(5),x(7))
.map(x => myschema(x(0), x(1), x(5).toDouble))
.toDF()
默認(rèn)情況下會(huì)報(bào)錯(cuò):
Lvalue toDF is not a member of org.apache.spark.myrdd,
解決方法是:
- import spark.sqlContext.implicits._
(注意還有個(gè) spark.implicits._ 和 spark.sqlContext.implicits._不可同時(shí)存在) - case class 需要放在toDF 方法的作用域之外。
6. RDD和Dataframe的緩存
- 首先StorageLevel需要引用:
import org.apache.spark.storage.StorageLevel
//之后可以使用
myrdd.persist(StorageLevel.MEMORY_ONLY)
//或者
myrdd.cache()
- cache方法是一個(gè)Tranformation,不是action。此外persist是lazy級(jí)別的,unpersist時(shí)eager級(jí)別的。
- 對(duì)于RDD來(lái)說(shuō),cache方法相當(dāng)于MEMORY_ONLY
- dataframe的cache方法相當(dāng)于MEMORY_AND_DISK。MEMORY_AND_DISK也是dataframe的默認(rèn)存儲(chǔ)級(jí)別。
- 其他的內(nèi)存級(jí)別:
- MEMORY_ONLY_2:保存兩份
- MEMORY_ONLY_SER:數(shù)據(jù)序列化,降低內(nèi)存溢出風(fēng)險(xiǎn),但序列化有開(kāi)銷
- OFF_HEAP :堆外內(nèi)存,號(hào)稱能減少內(nèi)存消耗,減少GC,早期配合tachyon使用(參考:http://www.itdecent.cn/p/c6f6d4071560)
7. spark的重分區(qū):repartition和coalesce
- 可以理解為:
- repartition()多用于增加分區(qū)(寬依賴,有shuflle)
- coalesce()多用于減少分區(qū)(窄依賴,沒(méi)有shuffle)
- repartition方法是coalesce方法shuffle為true的情況
- 在使用filter之后進(jìn)行coalesce效果比較明顯。
8. 使用dataframe進(jìn)行聚合統(tǒng)計(jì)
應(yīng)該比較好看懂:
val mydf2 = mydf
.groupBy("col1", "col2")
.agg( "col3" -> "first","col4" -> "mean", "col5" -> "count")
//聚合之后的列名為: first(col3 ) avg(col4) count(col5)
.select(
bround($"avg(col4)", scale = 3).as("avgcol4"),
($"col2"),
($"first(col3)").as("fcol3"),
($"col1").as("col1"),
($"count(col5)").as("ccol5"))
最后發(fā)現(xiàn)還是用sql比較方便,因?yàn)楦ㄓ谩⒏煜?。比如上面描述的聚合之后的列名,用?”符號(hào)表示當(dāng)前表等語(yǔ)法,之前沒(méi)怎么用過(guò),查了資料才寫對(duì)。
mydf.createTempView("mytable")
val df2 = spark.sql("""
select col1,
first(col3) as col3,
count(col4) as col4,
cast(avg(col5) - 140 as decimal(10,3)) as col5
from mytable
group by col1""")
這樣比較方便,效率差不多,但貌似spark-sql要進(jìn)行優(yōu)化的話,方式語(yǔ)法和直接用dataframe有一些差別。
- 注意:
- 代碼中的spark是SparkSession
- 代碼是示隨便寫的,上下沒(méi)嚴(yán)格對(duì)應(yīng)關(guān)系
- 注意建表語(yǔ)句(createTempView)和sql語(yǔ)句的表名一致
- 輸出的df2是個(gè)dataframe
- 用三引號(hào)是為了字符串換行
- 中文字段名要加單引號(hào): first('字段1') as col1,
9. 在sql或dataframe中使用自定義函數(shù)
兩種情況下自定義函數(shù)的寫法是不同的。
spark-sql使用,需要將這個(gè)函數(shù)進(jìn)行注冊(cè)(register):
import org.apache.spark.sql.functions.udf
spark.udf.register( "funname", ((x: Double, y: Double) => { x + y }))
val df2= spark.sql(" select funname(col1,col2) from table1")
dafaframe中使用:
val funname = udf((x: Double, y: Double) => { x + y })
val df2 = mydf.select(funname($"col1",$"col2"))
注意實(shí)際使用時(shí)要根據(jù)實(shí)際情況轉(zhuǎn)換字段類型。
10. RDD去掉前n行數(shù)據(jù)
myrdd.zipWithIndex().filter(_._2>=n).keys
- zipWithIndex函數(shù)將RDD中的元素和元素的索引組成鍵值對(duì)。
(對(duì)比:zipWithUniqueId將RDD中元素和一個(gè)唯一ID組成鍵值對(duì)。) - 另一個(gè)方式:
val arr =df1.take(2)
val df2 =df1.filter(!arr.contains(_))
- 并沒(méi)有進(jìn)行認(rèn)真的測(cè)試,最后業(yè)務(wù)中回避了這個(gè)邏輯??偸侵饔^感覺(jué)在處理海量小文件的時(shí)候可能有性能問(wèn)題。