【記錄|Spark】簡單的電影推薦系統(tǒng)

為了學(xué)習(xí)spark,在實(shí)驗(yàn)樓上找到的一個(gè)spark入門課程,在此記錄一下學(xué)習(xí)過程。

我使用的Spark版本為Spark 2.2.0, 實(shí)驗(yàn)樓教程使用的是Spark 1.6.1

流程和算法介紹

這個(gè)簡單的電影推薦系統(tǒng)是根據(jù)已有用戶對電影的評價(jià)系統(tǒng),針對特定用戶輸出其可能會(huì)感興趣的電影,構(gòu)成一個(gè)簡單的電影推薦系統(tǒng)。

主要步驟

  • 加載數(shù)據(jù)集,解析成特定格式
  • 劃分?jǐn)?shù)據(jù)集,分為訓(xùn)練集和測試集
  • 利用交替最小二乘法(ALS)算法,訓(xùn)練用戶與電影之間的矩陣模型
  • 基于訓(xùn)練集進(jìn)行預(yù)測,利用測試集來驗(yàn)證預(yù)測結(jié)果是否有效。

實(shí)際上,上述步驟的第三四步是使用了協(xié)同過濾算法來推薦電影。
引用知乎上的回答解釋協(xié)同過濾

舉個(gè)簡單的小例子
我們已知道用戶u1喜歡的電影是A,B,C
用戶u2喜歡的電影是A, C, E, F
用戶u3喜歡的電影是B,D
我們需要解決的問題是:決定對u1是不是應(yīng)該推薦F這部電影。
基于內(nèi)容的做法:要分析F的特征和u1所喜歡的A、B、C的特征,需要知道的信息是A(戰(zhàn)爭片),B(戰(zhàn)爭片),C(劇情片),如果F(戰(zhàn)爭片),那么F很大程度上可以推薦給u1,這是基于內(nèi)容的做法,你需要對item進(jìn)行特征建立和建模。

協(xié)同過濾的辦法:那么你完全可以忽略item的建模,因?yàn)檫@種辦法的決策是依賴user和item之間的關(guān)系,也就是這里的用戶和電影之間的關(guān)系。我們不再需要知道ABCF哪些是戰(zhàn)爭片,哪些是劇情片,我們只需要知道用戶u1和u2按照item向量表示,他們的相似度比較高,那么我們可以把u2所喜歡的F這部影片推薦給u1。

在Spark MLlib中,協(xié)同過濾算法是通過交替最小二乘法(ALS)實(shí)現(xiàn)的,具體算法實(shí)現(xiàn)在此并不關(guān)注。

數(shù)據(jù)集

數(shù)據(jù)集來自GroupLens,是一個(gè)名為MovieLens的數(shù)據(jù)集的數(shù)據(jù),在此處選擇數(shù)據(jù)量為一百萬條的數(shù)據(jù)集,下載地址

具體代碼和分析

1.導(dǎo)入包

我們需要導(dǎo)入以下包

import org.apache.spark.rdd._
import org.apache.spark.sql._
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel

mllib包是Spark中的機(jī)器學(xué)習(xí)包,我們這次導(dǎo)入的有ALS,MatrixFactorizationModel,Rating。ALS即為上文提到的交替最小二乘算法,在Spark中ALS算法的返回結(jié)果為MatrixFactorizationModel類,最后的Rating是Spark定義的評價(jià)Model,對應(yīng)于我們數(shù)據(jù)中的Rating.dat中的內(nèi)容,不用用戶再自行定義

然后,我們還需要導(dǎo)入implicits包,這個(gè)是Spark中的隱式轉(zhuǎn)換包,可以自動(dòng)地對一些數(shù)據(jù)類型進(jìn)行轉(zhuǎn)換,但是這個(gè)包需要在代碼中動(dòng)態(tài)導(dǎo)入

 val spark = SparkSession.builder.master("local").appName("Predict").getOrCreate()
import spark.implicits._

其中spark為SparkSession類,在Spark 2.2.0中用來代替SparkContext,作為整個(gè)程序的入口點(diǎn)

2.數(shù)據(jù)處理

  • 定義電影、用戶數(shù)據(jù)實(shí)體類,用來映射對應(yīng)的數(shù)據(jù)
case class Movie(movieId: Int, title: String)
case class User(userId: Int, gender: String, age: Int, occupation: Int, zipCode: String)
  • 定義解析函數(shù),將數(shù)據(jù)從文件中解析出來
def parseMovieData(data: String): Movie = {
        val dataField = data.split("::")
        assert(dataField.size == 3)
        Movie(dataField(0).toInt, dataField(1))
    }

def parseUserData(data: String): User = {
        val dataField = data.split("::")
        assert(dataField.size == 5)
        User(dataField(0).toInt, dataField(1).toString, dataField(2).toInt, dataField(3).toInt, dataField(4).toString)
    }

def parseRatingData(data: String): Rating = {
        val dataField = data.split("::")
        Rating(dataField(0).toInt, dataField(1).toInt, dataField(2).toDouble)
    }

  • 導(dǎo)入數(shù)據(jù)
var moviesData = spark.read.textFile("File:///home/hadoop/ml-1m/movies.dat").map(parseMovieData).cache()
        
var usersData = spark.read.textFile("File:///home/hadoop/ml-1m/users.dat").map(parseUserData).cache()

var ratingsData = spark.read.textFile("File:///home/hadoop/ml-1m/ratings.dat").map(parseRatingData).cache()

3. 訓(xùn)練模型

// convert to DataFrame
val moviesDF = moviesData.toDF()
val usersDF = usersData.toDF()
val ratingsDF = ratingsData.toDF()

// split to data set and test set
val tempPartitions = ratingsData.randomSplit(Array(0.7, 0.3), 1024L)
val trainingSetOfRatingsData = tempPartitions(0).cache().rdd
val testSetOfRatingData = tempPartitions(1).cache().rdd

// training model
val recomModel = new ALS().setRank(20).setIterations(10).run(trainingSetOfRatingsData)

按7:3的比例將數(shù)據(jù)集分為訓(xùn)練集和驗(yàn)證集,由于劃分出來的數(shù)據(jù)集為DataSet類型,而ALS算法的run函數(shù)接收的參數(shù)為RDD類型,所以需要將DataSet轉(zhuǎn)換為RDD,方法很簡單,就加上”.rdd"就可以了,如果不轉(zhuǎn)換會(huì)報(bào)錯(cuò)


spark_error_5.PNG

訓(xùn)練完之后可以調(diào)用模型進(jìn)行推薦,比如要給用戶ID為1000的用戶推薦適合TA看的10部電影,就可以執(zhí)行

val recomResult = recomModel.recommendProducts(1000, 10)

結(jié)果如下

運(yùn)行結(jié)果

返回的結(jié)果包括用戶ID,電影ID,和對應(yīng)的相關(guān)性
如果我們要顯示電影名,可以執(zhí)行以下代碼

val movieTitles = moviesDF.as[(Int, String)].rdd.collectAsMap()
val recommendMoviesWithTitle = recomResult.map(rating =>(movieTitles(rating.product), rating.rating))
println(recommendMoviesWithTitle.mkString("\n"))

在Spark老版本中,可以直接使用

val movieTitles = moviesDF.map(array => (array(0), array(1))).collectAsMap()

將moviesDF轉(zhuǎn)換為key為電影ID,value為電影名的map,但是在2.2.0中,如果這樣寫會(huì)提示DataSet沒有collectAsMap()方法,錯(cuò)誤截圖如下

錯(cuò)誤截圖

經(jīng)過一番搜索后,在StackOverflow上有人提到RDD有collectAsMap()方法,于是就要將moviesDF轉(zhuǎn)換為RDD類型,即上文用到的方法

打印出來的結(jié)果如圖

轉(zhuǎn)換結(jié)果

4.驗(yàn)證模型

如何知道模型是否正確呢?可以用之前從數(shù)據(jù)集里面劃分出來的驗(yàn)證集,通過調(diào)用模型得出預(yù)測結(jié)果,與驗(yàn)證集中的原數(shù)據(jù)進(jìn)行對比,可以判斷模型的效果如何

val predictResultOfTestSet = recomModel.predict(testSetOfRatingData.map{
    case Rating(user, product, rating) => (user, product)
})
val formatResultOfTestSet = testSetOfRatingData.map{
    case Rating(user, product, rating) => ((user, product), rating)
}
val formatResultOfPredictionResult = predictResultOfTestSet.map {
    case Rating(user, product, rating) => ((user, product), rating)
}
val finalResultForComparison = formatResultOfPredictionResult.join(formatResultOfTestSet)
val MAE = finalResultForComparison.map {
    case ((user, product), (ratingOfTest, ratingOfPrediction)) => 
    val error = (ratingOfTest - ratingOfPrediction)
    Math.abs(error)
}.mean()

在得到測試集的預(yù)測評分結(jié)果之后,我們用 map 操作和 join 操作將它與測試集的原始數(shù)據(jù)組合成為 ((用戶ID, 電影ID), (測試集原有評分, 預(yù)測評分))的格式。這個(gè)格式是 Key-Value 形式的,Key 為 (user, product)。我們是要把這里的測試集原有評分與預(yù)測時(shí)得到的評分相比較,二者的聯(lián)系就是 user 和 product 相同。

上述代碼中首先調(diào)用模型進(jìn)行預(yù)測,然后將在測試集上的預(yù)測結(jié)果和測試集本身的數(shù)據(jù)都轉(zhuǎn)換為 ((user,product), rating) 的格式,之后將兩個(gè)數(shù)據(jù)組合在一起,計(jì)算兩者之間的評價(jià)的差值的絕對值,然后求平均值,這種方法叫做計(jì)算平均絕對誤差

平均絕對誤差( Mean Absolute Error )是所有單個(gè)觀測值與算術(shù)平均值偏差的絕對值的平均。
與平均誤差相比,平均絕對誤差由于離差被絕對值化,不會(huì)出現(xiàn)正負(fù)相抵消的情況,所以平均絕對誤差能更好地反映預(yù)測值誤差的實(shí)際情況。

最終算出的結(jié)果為


image.png

效果還算可以,如果想繼續(xù)優(yōu)化可以通過增加ALS的迭代次數(shù)和特征矩陣的秩來提高準(zhǔn)確率

完整代碼

import org.apache.spark.rdd._
import org.apache.spark.sql._
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel


object PredictMovie {

    case class Movie(movieId: Int, title: String)
    case class User(userId: Int, gender: String, age: Int, occupation: Int, zipCode: String)


    def parseMovieData(data: String): Movie = {
        val dataField = data.split("::")
        assert(dataField.size == 3)
        Movie(dataField(0).toInt, dataField(1))
    }

    def parseUserData(data: String): User = {
        val dataField = data.split("::")
        assert(dataField.size == 5)
        User(dataField(0).toInt, dataField(1).toString, dataField(2).toInt, dataField(3).toInt, dataField(4).toString)
    }

    def parseRatingData(data: String): Rating = {
        val dataField = data.split("::")
        Rating(dataField(0).toInt, dataField(1).toInt, dataField(2).toDouble)
    }

    def main(args: Array[String]){
        
        val spark = SparkSession.builder.master("local").appName("Predict").getOrCreate()
        import spark.implicits._

        var moviesData = spark.read.textFile("File:///home/hadoop/ml-1m/movies.dat").map(parseMovieData _).cache()
        
        var usersData = spark.read.textFile("File:///home/hadoop/ml-1m/users.dat").map(parseUserData _).cache()
        var ratingsData = spark.read.textFile("File:///home/hadoop/ml-1m/ratings.dat").map(parseRatingData _).cache()
        
        // convert to DataFrame
        val moviesDF = moviesData.toDF()
        val usersDF = usersData.toDF()
        val ratingsDF = ratingsData.toDF()

        // split to data set and test set
        val tempPartitions = ratingsData.randomSplit(Array(0.7, 0.3), 1024L)
        val trainingSetOfRatingsData = tempPartitions(0).cache().rdd
        val testSetOfRatingData = tempPartitions(1).cache().rdd

        // training model
        val recomModel = new ALS().setRank(20).setIterations(10).run(trainingSetOfRatingsData)

        val recomResult = recomModel.recommendProducts(1000, 10)
        println(s"Recommend Movie to User ID 1000")
        println(recomResult.mkString("\n"))

        val movieTitles = moviesDF.as[(Int, String)].rdd.collectAsMap()

        val recommendMoviesWithTitle = recomResult.map(rating =>(movieTitles(rating.product), rating.rating))
        println(recommendMoviesWithTitle.mkString("\n"))

        val predictResultOfTestSet = recomModel.predict(testSetOfRatingData.map{
            case Rating(user, product, rating) => (user, product)
        })

        val formatResultOfTestSet = testSetOfRatingData.map{
            case Rating(user, product, rating) => ((user, product), rating)
        }

        val formatResultOfPredictionResult = predictResultOfTestSet.map {
            case Rating(user, product, rating) => ((user, product), rating)
        }

        val finalResultForComparison = formatResultOfPredictionResult.join(formatResultOfTestSet)

        val MAE = finalResultForComparison.map {
            case ((user, product), (ratingOfTest, ratingOfPrediction)) => 
            val error = (ratingOfTest - ratingOfPrediction)
            Math.abs(error)
        }.mean()

        println(s"mean error: $MAE")
        spark.stop()
    }


}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容