Spark RDD與本地容器交互

今天要從之前的數(shù)據(jù)中找出每個地點的一段時間內(nèi)的產(chǎn)品發(fā)生率,即某地點有產(chǎn)量的天數(shù)/總天數(shù)。

在此之前,為了輸出的可讀性,我們需要用地點別名代替地點串號。

導入地點串號文件到hdfs中,串號文件每行格式如下:

1701170830479148,010102000000000000000013,1401010000000005,22,十區(qū)

其中第一列是串號,最后一列是地點別名

首先得到一個Map

val rfidCardFile = spark.sparkContext

? .textFile("hdfs://Y40/medical_waste/card.csv")

? .filter(line=>{

??? line.contains(","+ORG_ID+",")//過濾只有本機構內(nèi)的記錄

? }).cache()

val rfidCardMap = rfidCardFile.map(line=>{

? val a = line.split(",")

? (a(0),a(4))

}).cache()

val localMap = rfidCardMap.collect().toMap

然后我們要構建一個機構內(nèi)地點名稱的數(shù)組

val teamWeightFile = spark.sparkContext.textFile("hdfs://Y40/medical_waste/team_mw.csv")

.cache()

val teamNameArray = teamWeightFile.map(line => {

? val la = line.split(",")

? localMap.get(la(0)).toString

}).distinct().collect()

然后我們需要構建一個地點的產(chǎn)出與否的數(shù)據(jù)源,這里可以簡化成構建一個(地點,日期)數(shù)組,因為數(shù)據(jù)文件中記錄的是產(chǎn)出日期和產(chǎn)出重量,沒有產(chǎn)出就沒有記錄,重復的(地點,日期)只需要記錄一次。

val teamDateArray = teamWeightFile.map(line => {

? val la = line.split(",")

? (localMap.get(la(0)).toString,la(1).substring(1,11).replaceAll("-","").toInt)

}).distinct().collect()

這里我們的日期使用整型記錄,類似于20170227這樣。

接著我們需要知道兩個日期之間的天數(shù),分為兩個函數(shù),首先把上述整型表述的時間轉為LocalDateTime,然后再使用Duration.between計算。

def intToLocalDate(day1:Int):LocalDateTime = {

? val day1D = day1 %100

? val day1M = ((day1 - day1D)/100) %100

? val day1Y = (day1 - day1D - (day1M*100))/10000

?? LocalDateTime.of(day1Y,day1M,day1D,0,0,0)

}

def countDay(day1:Int,day2:Int):Long = {

? val day1Date =intToLocalDate(day1)

? val day2Date =intToLocalDate(day2)

? Duration.between(day2Date,day1Date).abs().toDays

}

最后遍歷teamNameArray

teamNameArray.map(teamName=>{

? val happenCount = teamDateArray.filter(item=>{

? ? item._1==teamName && item._2>=BEGIN_DATE && item._2<END_DATE

? }).length

? val result = teamName+"\t"+happenCount+"\t"+dayLength+"\t"+(happenCount.toDouble/dayLength.toDouble)

? println(result)

})

這里BEGIN_DATE和END_DATE可以自己定義,只要是整型表述的日期即可。

為什么是本地的map和array呢?如果使用rdd的話,性能是否會更好?答案很遺憾,如果使用rdd來做rfidCardMap和teamNameArray,編譯雖然會通過,運行卻會報錯,大意是rdd并不支持嵌套操作,也就是說不能在一個rdd的操作因子中使用到另外一個rdd。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容