內容摘入自<<Python大數(shù)據(jù)分析從入門到精通>>
推薦系統(tǒng)
自動推薦內容或產品以個性化的方式向適當?shù)挠脩籼峁?以增強整體體驗。推薦系統(tǒng)在術語上非常強大使用海量的數(shù)據(jù),學會理解偏好。
對于PySpark中的“推薦系統(tǒng)”模塊 pyspark.ml.recommendation module
官方文檔鏈接:api/python/pyspark.ml.html#module-pyspark.ml.recommendation
spark 推薦系統(tǒng)的ALS 算法
- 交替最小平方 (ALS) 矩陣分解:
ALS 嘗試將評級矩陣 R 估計為兩個較低級別矩陣(X 和 Y,即 X = Yt = R)的乘積。一般方法是迭代。在每次迭代期間,一個因子矩陣保持不變,而另一個因子矩陣使用最小二乘求解。然后,在求解另一個因子矩陣時,新求解的因子矩陣保持不變。
-
現(xiàn)實場景到推薦系統(tǒng)模型的轉換 ; 構建模型 中的轉換關系
- 劃分出推薦系統(tǒng)中參與的對象,如在線購物網(wǎng)站中,參與的主要對象是購物者和相關的商品。
- 將這些對象的按維度屬性進行劃分,通過這些維度屬性可以有效的表示這個對象。如購物者可以通過 年齡,性別,所住城市等這些屬性進行表示。
- 將對象通過對應的屬性維度構建好后。假設每個對象可以通過一個函數(shù)表示,如: ax1 + bx2 + cx3 + dx4 = y 。為每個對象,依據(jù)分配的屬性維度個數(shù),構建有相同元的函數(shù)。
- 通過不同對象個體的屬性維度值構建矩陣,構建好不同對象的矩陣后,進行矩陣相乘。得到的新矩陣可以認為是每個購物者對相關物品的關聯(lián)程度。
- 由于對象的函數(shù)表示,是通過假設的,所以需要獲取最優(yōu)函數(shù)的辦法,即通過使用最小二乘法來獲取最佳函數(shù)。
-
關于最小二乘法:
它通過最小化誤差的平方和尋找數(shù)據(jù)的最佳函數(shù)匹配。利用最小二乘法可以簡便地求得未知的數(shù)據(jù),并使得這些求得的數(shù)據(jù)與實際數(shù)據(jù)之間誤差的平方和為最小。
最小二乘法還可用于曲線擬合。其他一些優(yōu)化問題也可通過最小化能量或最大化熵用最小二乘法來表達。通俗的說:在平面(也可再高維度空間種)上有若干點,需要使用一個函數(shù)來表示這些點;如何確定這個函數(shù)是最優(yōu)的;通過在坐標系上,每個點到這個函數(shù)對應的圖形的距離的和最小,
由于擬合函數(shù)可以有很多種,但是求兩點的具體方法:坐標值差的平方的和,后再開方。 把這些值都加起來后求最小情況,就是最小二乘法。
使用最小二乘法的,不同擬合曲線:

推薦系統(tǒng)的分類
基于內容推薦
基于內容的推薦(Content-based Recommendation)是信息過濾技術的延續(xù)與發(fā)展,它是建立在項目的內容信息上作出推薦的,而不需要依據(jù)用戶對項目的評價意見,更多地需要用機 器學習的方法從關于內容的特征描述的事例中得到用戶的興趣資料
協(xié)同過濾推薦
協(xié)同過濾推薦(Collaborative Filtering Recommendation)技術是推薦系統(tǒng)中應用最早和最為成功的技術之一。它一般采用最近鄰技術,利用用戶的歷史喜好信息計算用戶之間的距離,然后 利用目標用戶的最近鄰居用戶對商品評價的加權評價值來預測目標用戶對特定商品的喜好程度,系統(tǒng)從而根據(jù)這一喜好程度來對目標用戶進行推薦
基于關聯(lián)規(guī)則推薦
基于關聯(lián)規(guī)則的推薦(Association Rule-based Recommendation)是以關聯(lián)規(guī)則為基礎,把已購商品作為規(guī)則頭,規(guī)則體為推薦對象。關聯(lián)規(guī)則挖掘可以發(fā)現(xiàn)不同商品在銷售過程中的相關性,在零 售業(yè)中已經得到了成功的應用
基于知識推薦
基于知識的推薦(Knowledge-based Recommendation)在某種程度是可以看成是一種推理(Inference)技術,它不是建立在用戶需要和偏好基礎上推薦的?;谥R的方法因 它們所用的功能知識不同而有明顯區(qū)別
組合推薦
由于各種推薦方法都有優(yōu)缺點,所以在實際中,組合推薦(Hybrid Recommendation)經常被采用。研究和應用最多的是內容推薦和協(xié)同過濾推薦的組合。最簡單的做法就是分別用基于內容的方法和協(xié)同過濾推薦方法 去產生一個推薦預測結果,然后用某方法組合其結果
基于效用推薦
基于效用的推薦(Utility-based Recommendation)是建立在對用戶使用項目的效用情況上計算的,其核心問題是怎么樣為每一個用戶去創(chuàng)建一個效用函數(shù),因此,用戶資料模型很大 程度上是由系統(tǒng)所采用的效用函數(shù)決定的。基于效用推薦的好處是它能把非產品的屬性,如提供商的可靠性(Vendor Reliability)和產品的可得性(Product Availability)等考慮到效用計算中

示例代碼
- 測試數(shù)據(jù)-用戶電影評分MovieLens, MovieLens 是歷史最悠久的推薦系統(tǒng)。它由美國 Minnesota 大學計算機科學與工程學院的 GroupLens 項目組創(chuàng)辦,是一個非商業(yè)性質的、以研究為目的的實驗性站點。MovieLens 主要使用 Collaborative Filtering 和 Association Rules 相結合的技術,向用戶推薦他們感興趣的電影
MovieLens https://grouplens.org/datasets/movielens/
完整數(shù)據(jù)下載
數(shù)據(jù)樣例下載
在成功獲取數(shù)據(jù),對文件內容進行說明介紹:
ratings.csv - 電影評分數(shù)據(jù)集
userId,movieId,rating,timestamp 為其數(shù)據(jù)列:表示每個用戶對每部電影在什么時候的評分。movies.csv - 對電影的分類數(shù)據(jù)集
movieId,title,genres 為其數(shù)據(jù)列:表示了每部電影的名字和分類tags.csv - 標簽文件
userId,movieId,tag,timestamp 為其數(shù)據(jù)列:表示每個用戶對電影的分類links.csv -
movieId,imdbId,tmdbId 為其數(shù)據(jù)列: 每個電影的 imdb(網(wǎng)路電影資料庫),tmdb(電影數(shù)據(jù)庫)的關聯(lián)編號
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('rs').getOrCreate()
from pyspark.sql.functions import *
print('-------------查看數(shù)據(jù)情況,檢測數(shù)據(jù)質量和相關的特征。即相對數(shù)據(jù)有一定的認識,對后續(xù)進行訓練做準備--------------------')
df_ratings=spark.read.csv('ml-latest-small/ratings.csv',inferSchema=True,header=True) # 讀取電影評分數(shù)據(jù)
df_ratings.createOrReplaceTempView("ratings") # 構建臨時表評分表
df_movie=spark.read.csv('ml-latest-small/movies.csv',inferSchema=True,header=True) # 讀取電影數(shù)據(jù)
df_movie.createOrReplaceTempView("movies") # 構建臨時電影表,這兩張表通過sql關聯(lián),得到具體電影的評分信息
df_details = spark.sql("SELECT ratings.userId , ratings.movieId , movies.title , movies.genres , ratings.rating FROM ratings \
LEFT JOIN movies ON ratings.movieId = movies.movieId ") # 兩表關聯(lián),獲取具體的信息
df_details.select('userId','title','rating').where('rating=4').show(10)
print((df_details.count(),len(df_details.columns))) # 查看數(shù)據(jù)規(guī)模
df_details.printSchema() # 數(shù)據(jù)列信息
df_details.orderBy(rand()).show(10,False) # 查看數(shù)據(jù)
print('-------------- 進行數(shù)據(jù)轉換,主要將類別數(shù)據(jù),轉換為可通過數(shù)值來度量------------------')
from pyspark.ml.feature import StringIndexer,IndexToString # StringIndexer可以把字符串的列按照出現(xiàn)頻率進行排序,將字符串轉化為可度量的
stringIndexer = StringIndexer(inputCol="title", outputCol="title_new") # 構建StringIndexer對象,設定輸入列和輸出列
model = stringIndexer.fit(df_details) # 構建model模型
indexed = model.transform(df_details) # 使用模型轉換數(shù)據(jù),講電影名轉換為數(shù)值,可以進行度量
indexed.show(10)
indexed.groupBy('title_new').count().orderBy('count',ascending=False).show(10,False) # 查看分類的數(shù)據(jù)樣式
train,test=indexed.randomSplit([0.75,0.25]) # 劃分訓練數(shù)據(jù)和測試數(shù)據(jù)
print('--------------- 使用推薦模型ALS算計 ------------------------')
from pyspark.ml.recommendation import ALS
'''
關于 ALS 的參數(shù):maxIter 最大迭代次數(shù) ; regParam 表示最小二乘法中l(wèi)ambda值的大小 ; userCol ,itemCol 用于表征對象的標識,通過指出這兩列后可以,通過它們構建起關系,通過ratingCol表示它們間的關系。構建成評分矩陣
再本例子中:useCol 是 用戶ID ,itemCol 是電影名 ,ratingCol 是用戶對電影的評分。
'''
rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")
rec_model=rec.fit(train) # 使用模型訓練數(shù)據(jù)
predicted_ratings=rec_model.transform(test) # 應用于測試數(shù)據(jù)
predicted_ratings.printSchema()
predicted_ratings.orderBy(rand()).show(10) # 參看應用模型預測的數(shù)據(jù)
print('------------- 引入回歸評估器來度量 推薦系統(tǒng) --------------')
from pyspark.ml.evaluation import RegressionEvaluator # RegressionEvaluator 回歸評估器,它期望兩個輸入列:預測和標簽。
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating') # 構建回歸評估器,評估準確性
rmse=evaluator.evaluate(predicted_ratings)
print('{}{}'.format("標準誤差:",rmse)) # 查看使用推薦系統(tǒng)后的預測的標準誤差,若標準誤差不是很大的話,可以進行下一步操作。
unique_movies=indexed.select('title_new').distinct() # 篩選出所有電影,使用distinct
unique_movies.count()
all = unique_movies.alias('all') # 所有電影df,重命名為 all
watched_movies=indexed.filter(indexed['userId'] == 46).select('title_new').distinct() # 查看85號用戶,看過的所有電影
watched_movies.count()
no_46=watched_movies.alias('no_46') # 46號用戶看過的電影df,重命名為no_46
total_movies = all.join(no_46, all.title_new == no_46.title_new,how='left') # 關聯(lián)得出用戶46沒有觀看評分的電影。
total_movies.show(10,False)
remaining_movies=total_movies.where(col("no_46.title_new").isNull()).select(all.title_new).distinct() # 46號用戶,沒看過電影的df
remaining_movies=remaining_movies.withColumn("userId",lit(46)) # 添加一列
recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False)
recommendations.show(5,False)
movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)
final_recommendations=movie_title.transform(recommendations)
final_recommendations.show(10,False) # 但是最后推薦給用戶的預估評分都超過了5分,這是個問題


