為了學(xué)習(xí)spark,在實(shí)驗(yàn)樓上找到的一個(gè)spark入門課程,在此記錄一下學(xué)習(xí)過程。
我使用的Spark版本為Spark 2.2.0, 實(shí)驗(yàn)樓教程使用的是Spark 1.6.1
流程和算法介紹
這個(gè)簡單的電影推薦系統(tǒng)是根據(jù)已有用戶對電影的評價(jià)系統(tǒng),針對特定用戶輸出其可能會(huì)感興趣的電影,構(gòu)成一個(gè)簡單的電影推薦系統(tǒng)。
主要步驟
- 加載數(shù)據(jù)集,解析成特定格式
- 劃分?jǐn)?shù)據(jù)集,分為訓(xùn)練集和測試集
- 利用交替最小二乘法(ALS)算法,訓(xùn)練用戶與電影之間的矩陣模型
- 基于訓(xùn)練集進(jìn)行預(yù)測,利用測試集來驗(yàn)證預(yù)測結(jié)果是否有效。
實(shí)際上,上述步驟的第三四步是使用了協(xié)同過濾算法來推薦電影。
引用知乎上的回答解釋協(xié)同過濾
舉個(gè)簡單的小例子
我們已知道用戶u1喜歡的電影是A,B,C
用戶u2喜歡的電影是A, C, E, F
用戶u3喜歡的電影是B,D
我們需要解決的問題是:決定對u1是不是應(yīng)該推薦F這部電影。
基于內(nèi)容的做法:要分析F的特征和u1所喜歡的A、B、C的特征,需要知道的信息是A(戰(zhàn)爭片),B(戰(zhàn)爭片),C(劇情片),如果F(戰(zhàn)爭片),那么F很大程度上可以推薦給u1,這是基于內(nèi)容的做法,你需要對item進(jìn)行特征建立和建模。協(xié)同過濾的辦法:那么你完全可以忽略item的建模,因?yàn)檫@種辦法的決策是依賴user和item之間的關(guān)系,也就是這里的用戶和電影之間的關(guān)系。我們不再需要知道ABCF哪些是戰(zhàn)爭片,哪些是劇情片,我們只需要知道用戶u1和u2按照item向量表示,他們的相似度比較高,那么我們可以把u2所喜歡的F這部影片推薦給u1。
在Spark MLlib中,協(xié)同過濾算法是通過交替最小二乘法(ALS)實(shí)現(xiàn)的,具體算法實(shí)現(xiàn)在此并不關(guān)注。
數(shù)據(jù)集
數(shù)據(jù)集來自GroupLens,是一個(gè)名為MovieLens的數(shù)據(jù)集的數(shù)據(jù),在此處選擇數(shù)據(jù)量為一百萬條的數(shù)據(jù)集,下載地址
具體代碼和分析
1.導(dǎo)入包
我們需要導(dǎo)入以下包
import org.apache.spark.rdd._
import org.apache.spark.sql._
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
mllib包是Spark中的機(jī)器學(xué)習(xí)包,我們這次導(dǎo)入的有ALS,MatrixFactorizationModel,Rating。ALS即為上文提到的交替最小二乘算法,在Spark中ALS算法的返回結(jié)果為MatrixFactorizationModel類,最后的Rating是Spark定義的評價(jià)Model,對應(yīng)于我們數(shù)據(jù)中的Rating.dat中的內(nèi)容,不用用戶再自行定義
然后,我們還需要導(dǎo)入implicits包,這個(gè)是Spark中的隱式轉(zhuǎn)換包,可以自動(dòng)地對一些數(shù)據(jù)類型進(jìn)行轉(zhuǎn)換,但是這個(gè)包需要在代碼中動(dòng)態(tài)導(dǎo)入
val spark = SparkSession.builder.master("local").appName("Predict").getOrCreate()
import spark.implicits._
其中spark為SparkSession類,在Spark 2.2.0中用來代替SparkContext,作為整個(gè)程序的入口點(diǎn)
2.數(shù)據(jù)處理
- 定義電影、用戶數(shù)據(jù)實(shí)體類,用來映射對應(yīng)的數(shù)據(jù)
case class Movie(movieId: Int, title: String)
case class User(userId: Int, gender: String, age: Int, occupation: Int, zipCode: String)
- 定義解析函數(shù),將數(shù)據(jù)從文件中解析出來
def parseMovieData(data: String): Movie = {
val dataField = data.split("::")
assert(dataField.size == 3)
Movie(dataField(0).toInt, dataField(1))
}
def parseUserData(data: String): User = {
val dataField = data.split("::")
assert(dataField.size == 5)
User(dataField(0).toInt, dataField(1).toString, dataField(2).toInt, dataField(3).toInt, dataField(4).toString)
}
def parseRatingData(data: String): Rating = {
val dataField = data.split("::")
Rating(dataField(0).toInt, dataField(1).toInt, dataField(2).toDouble)
}
- 導(dǎo)入數(shù)據(jù)
var moviesData = spark.read.textFile("File:///home/hadoop/ml-1m/movies.dat").map(parseMovieData).cache()
var usersData = spark.read.textFile("File:///home/hadoop/ml-1m/users.dat").map(parseUserData).cache()
var ratingsData = spark.read.textFile("File:///home/hadoop/ml-1m/ratings.dat").map(parseRatingData).cache()
3. 訓(xùn)練模型
// convert to DataFrame
val moviesDF = moviesData.toDF()
val usersDF = usersData.toDF()
val ratingsDF = ratingsData.toDF()
// split to data set and test set
val tempPartitions = ratingsData.randomSplit(Array(0.7, 0.3), 1024L)
val trainingSetOfRatingsData = tempPartitions(0).cache().rdd
val testSetOfRatingData = tempPartitions(1).cache().rdd
// training model
val recomModel = new ALS().setRank(20).setIterations(10).run(trainingSetOfRatingsData)
按7:3的比例將數(shù)據(jù)集分為訓(xùn)練集和驗(yàn)證集,由于劃分出來的數(shù)據(jù)集為DataSet類型,而ALS算法的run函數(shù)接收的參數(shù)為RDD類型,所以需要將DataSet轉(zhuǎn)換為RDD,方法很簡單,就加上”.rdd"就可以了,如果不轉(zhuǎn)換會(huì)報(bào)錯(cuò)
訓(xùn)練完之后可以調(diào)用模型進(jìn)行推薦,比如要給用戶ID為1000的用戶推薦適合TA看的10部電影,就可以執(zhí)行
val recomResult = recomModel.recommendProducts(1000, 10)
結(jié)果如下

返回的結(jié)果包括用戶ID,電影ID,和對應(yīng)的相關(guān)性
如果我們要顯示電影名,可以執(zhí)行以下代碼
val movieTitles = moviesDF.as[(Int, String)].rdd.collectAsMap()
val recommendMoviesWithTitle = recomResult.map(rating =>(movieTitles(rating.product), rating.rating))
println(recommendMoviesWithTitle.mkString("\n"))
在Spark老版本中,可以直接使用
val movieTitles = moviesDF.map(array => (array(0), array(1))).collectAsMap()
將moviesDF轉(zhuǎn)換為key為電影ID,value為電影名的map,但是在2.2.0中,如果這樣寫會(huì)提示DataSet沒有collectAsMap()方法,錯(cuò)誤截圖如下
經(jīng)過一番搜索后,在StackOverflow上有人提到RDD有collectAsMap()方法,于是就要將moviesDF轉(zhuǎn)換為RDD類型,即上文用到的方法
打印出來的結(jié)果如圖

4.驗(yàn)證模型
如何知道模型是否正確呢?可以用之前從數(shù)據(jù)集里面劃分出來的驗(yàn)證集,通過調(diào)用模型得出預(yù)測結(jié)果,與驗(yàn)證集中的原數(shù)據(jù)進(jìn)行對比,可以判斷模型的效果如何
val predictResultOfTestSet = recomModel.predict(testSetOfRatingData.map{
case Rating(user, product, rating) => (user, product)
})
val formatResultOfTestSet = testSetOfRatingData.map{
case Rating(user, product, rating) => ((user, product), rating)
}
val formatResultOfPredictionResult = predictResultOfTestSet.map {
case Rating(user, product, rating) => ((user, product), rating)
}
val finalResultForComparison = formatResultOfPredictionResult.join(formatResultOfTestSet)
val MAE = finalResultForComparison.map {
case ((user, product), (ratingOfTest, ratingOfPrediction)) =>
val error = (ratingOfTest - ratingOfPrediction)
Math.abs(error)
}.mean()
在得到測試集的預(yù)測評分結(jié)果之后,我們用 map 操作和 join 操作將它與測試集的原始數(shù)據(jù)組合成為 ((用戶ID, 電影ID), (測試集原有評分, 預(yù)測評分))的格式。這個(gè)格式是 Key-Value 形式的,Key 為 (user, product)。我們是要把這里的測試集原有評分與預(yù)測時(shí)得到的評分相比較,二者的聯(lián)系就是 user 和 product 相同。
上述代碼中首先調(diào)用模型進(jìn)行預(yù)測,然后將在測試集上的預(yù)測結(jié)果和測試集本身的數(shù)據(jù)都轉(zhuǎn)換為 ((user,product), rating) 的格式,之后將兩個(gè)數(shù)據(jù)組合在一起,計(jì)算兩者之間的評價(jià)的差值的絕對值,然后求平均值,這種方法叫做計(jì)算平均絕對誤差
平均絕對誤差( Mean Absolute Error )是所有單個(gè)觀測值與算術(shù)平均值偏差的絕對值的平均。
與平均誤差相比,平均絕對誤差由于離差被絕對值化,不會(huì)出現(xiàn)正負(fù)相抵消的情況,所以平均絕對誤差能更好地反映預(yù)測值誤差的實(shí)際情況。
最終算出的結(jié)果為

效果還算可以,如果想繼續(xù)優(yōu)化可以通過增加ALS的迭代次數(shù)和特征矩陣的秩來提高準(zhǔn)確率
完整代碼
import org.apache.spark.rdd._
import org.apache.spark.sql._
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
object PredictMovie {
case class Movie(movieId: Int, title: String)
case class User(userId: Int, gender: String, age: Int, occupation: Int, zipCode: String)
def parseMovieData(data: String): Movie = {
val dataField = data.split("::")
assert(dataField.size == 3)
Movie(dataField(0).toInt, dataField(1))
}
def parseUserData(data: String): User = {
val dataField = data.split("::")
assert(dataField.size == 5)
User(dataField(0).toInt, dataField(1).toString, dataField(2).toInt, dataField(3).toInt, dataField(4).toString)
}
def parseRatingData(data: String): Rating = {
val dataField = data.split("::")
Rating(dataField(0).toInt, dataField(1).toInt, dataField(2).toDouble)
}
def main(args: Array[String]){
val spark = SparkSession.builder.master("local").appName("Predict").getOrCreate()
import spark.implicits._
var moviesData = spark.read.textFile("File:///home/hadoop/ml-1m/movies.dat").map(parseMovieData _).cache()
var usersData = spark.read.textFile("File:///home/hadoop/ml-1m/users.dat").map(parseUserData _).cache()
var ratingsData = spark.read.textFile("File:///home/hadoop/ml-1m/ratings.dat").map(parseRatingData _).cache()
// convert to DataFrame
val moviesDF = moviesData.toDF()
val usersDF = usersData.toDF()
val ratingsDF = ratingsData.toDF()
// split to data set and test set
val tempPartitions = ratingsData.randomSplit(Array(0.7, 0.3), 1024L)
val trainingSetOfRatingsData = tempPartitions(0).cache().rdd
val testSetOfRatingData = tempPartitions(1).cache().rdd
// training model
val recomModel = new ALS().setRank(20).setIterations(10).run(trainingSetOfRatingsData)
val recomResult = recomModel.recommendProducts(1000, 10)
println(s"Recommend Movie to User ID 1000")
println(recomResult.mkString("\n"))
val movieTitles = moviesDF.as[(Int, String)].rdd.collectAsMap()
val recommendMoviesWithTitle = recomResult.map(rating =>(movieTitles(rating.product), rating.rating))
println(recommendMoviesWithTitle.mkString("\n"))
val predictResultOfTestSet = recomModel.predict(testSetOfRatingData.map{
case Rating(user, product, rating) => (user, product)
})
val formatResultOfTestSet = testSetOfRatingData.map{
case Rating(user, product, rating) => ((user, product), rating)
}
val formatResultOfPredictionResult = predictResultOfTestSet.map {
case Rating(user, product, rating) => ((user, product), rating)
}
val finalResultForComparison = formatResultOfPredictionResult.join(formatResultOfTestSet)
val MAE = finalResultForComparison.map {
case ((user, product), (ratingOfTest, ratingOfPrediction)) =>
val error = (ratingOfTest - ratingOfPrediction)
Math.abs(error)
}.mean()
println(s"mean error: $MAE")
spark.stop()
}
}