今天要從之前的數(shù)據(jù)中找出每個地點的一段時間內(nèi)的產(chǎn)品發(fā)生率,即某地點有產(chǎn)量的天數(shù)/總天數(shù)。
在此之前,為了輸出的可讀性,我們需要用地點別名代替地點串號。
導入地點串號文件到hdfs中,串號文件每行格式如下:
1701170830479148,010102000000000000000013,1401010000000005,22,十區(qū)
其中第一列是串號,最后一列是地點別名
首先得到一個Map
val rfidCardFile = spark.sparkContext
? .textFile("hdfs://Y40/medical_waste/card.csv")
? .filter(line=>{
??? line.contains(","+ORG_ID+",")//過濾只有本機構內(nèi)的記錄
? }).cache()
val rfidCardMap = rfidCardFile.map(line=>{
? val a = line.split(",")
? (a(0),a(4))
}).cache()
val localMap = rfidCardMap.collect().toMap
然后我們要構建一個機構內(nèi)地點名稱的數(shù)組
val teamWeightFile = spark.sparkContext.textFile("hdfs://Y40/medical_waste/team_mw.csv")
.cache()
val teamNameArray = teamWeightFile.map(line => {
? val la = line.split(",")
? localMap.get(la(0)).toString
}).distinct().collect()
然后我們需要構建一個地點的產(chǎn)出與否的數(shù)據(jù)源,這里可以簡化成構建一個(地點,日期)數(shù)組,因為數(shù)據(jù)文件中記錄的是產(chǎn)出日期和產(chǎn)出重量,沒有產(chǎn)出就沒有記錄,重復的(地點,日期)只需要記錄一次。
val teamDateArray = teamWeightFile.map(line => {
? val la = line.split(",")
? (localMap.get(la(0)).toString,la(1).substring(1,11).replaceAll("-","").toInt)
}).distinct().collect()
這里我們的日期使用整型記錄,類似于20170227這樣。
接著我們需要知道兩個日期之間的天數(shù),分為兩個函數(shù),首先把上述整型表述的時間轉為LocalDateTime,然后再使用Duration.between計算。
def intToLocalDate(day1:Int):LocalDateTime = {
? val day1D = day1 %100
? val day1M = ((day1 - day1D)/100) %100
? val day1Y = (day1 - day1D - (day1M*100))/10000
?? LocalDateTime.of(day1Y,day1M,day1D,0,0,0)
}
def countDay(day1:Int,day2:Int):Long = {
? val day1Date =intToLocalDate(day1)
? val day2Date =intToLocalDate(day2)
? Duration.between(day2Date,day1Date).abs().toDays
}
最后遍歷teamNameArray
teamNameArray.map(teamName=>{
? val happenCount = teamDateArray.filter(item=>{
? ? item._1==teamName && item._2>=BEGIN_DATE && item._2<END_DATE
? }).length
? val result = teamName+"\t"+happenCount+"\t"+dayLength+"\t"+(happenCount.toDouble/dayLength.toDouble)
? println(result)
})
這里BEGIN_DATE和END_DATE可以自己定義,只要是整型表述的日期即可。
為什么是本地的map和array呢?如果使用rdd的話,性能是否會更好?答案很遺憾,如果使用rdd來做rfidCardMap和teamNameArray,編譯雖然會通過,運行卻會報錯,大意是rdd并不支持嵌套操作,也就是說不能在一個rdd的操作因子中使用到另外一個rdd。