在傳統(tǒng)的推薦模型中,簡單理解推薦算法召回部分的核心原理無非就是將特征以不同形式進行組織,并按照距離求解算法計算用戶或物品間的相似度,那么在開始之前我們需要先了解一些常用的距離求解算法,這些算法在接下來的源碼實現(xiàn)部分會用到
| 場景 | 算法 | 優(yōu)勢 |
|---|---|---|
| 基于客戶需求的推薦 | 分類模型(邏輯回歸、神經(jīng)網(wǎng)絡(luò)) | 對當下用戶行為進行意圖識別 |
| 基于購物籃的推薦 | 關(guān)聯(lián)規(guī)則 | 提升商品活力,挖掘用戶潛在購買力 |
| 基于物品相似性的推薦 | 基于Item的協(xié)同過濾 | 分析物品潛在相似性,幫組用戶快速找到想要的商品 |
| 基于用戶相似性的推薦 | 基于User的協(xié)同過濾、KNN | 形成圈子文化,發(fā)現(xiàn)用戶潛在興趣 |
| 基于內(nèi)容相似性的推薦 | TFIDF、SVD | 算法可解釋性強,為用戶提供更多相似商品的選擇 |
| 市場細分 | K-means | 物以類聚、人以群分 |
常用推薦算法有以上幾種,本節(jié)我們以最經(jīng)典的協(xié)調(diào)過濾進行講解:
1、距離算法
同現(xiàn)矩陣
- 計算公式

- 通過案例說明
N(i)與N(j)分別表示喜歡 i 物品的人數(shù)與喜歡 j 物品的人數(shù),上述公式大致意思是求解喜歡物品 i 的人中又同時喜歡物品 j 的占比是多少,比值越大越能說明兩個物品的關(guān)聯(lián)度高,那么當其它用戶去購買物品 i 時將在很大程度上喜歡物品 j ;不過需要注意的時如果物品 j 是一個熱門物品,那么很多人都會喜歡物品 j,極端情況下所有喜歡物品 i 的用戶都喜歡物品 j , 那么計算出的物品 i 與物品 j 就是高度相似的,為了避免熱門物品的影響,在分母上對熱門物品進行了懲罰,當N(j)很大時,相識度就會很低。
另外該方式還有另一個優(yōu)勢,那就是在計算相似度時不需要額外收集評分數(shù)據(jù)
歐幾里得距離
- 計算公式


- 通過案例說明
該方法用來計算N維空間內(nèi)兩點之間的距離,當N為2時歐幾里得距離即平面內(nèi)兩點之間的距離,假設(shè)用戶1與用戶2分別對物品1、物品2、物品3與物品4進行的評價:

那么根據(jù)歐幾里得公式將得到兩個用戶的相似度為:

- 通過sql實現(xiàn)歐氏距離
with tb1 as (
SELECT u_i as user_id, pid id, score as rating from tab
) ,
tb2 as
(SELECT u_i as user_id, pid id, score as rating from tab )
select title,recommend,sim from (
select sim_tab.*, c.title as recommend from (
sELECT item_i, item_j, 1/(1+sqrt(sum(item_ij))) sim from (
sELECT tb1.id as item_i, tb2.id as item_j, pow(tb1.rating-tb2.rating, 2)as item_ij from tb1 LEFT join tb2 on tb1.user_id=tb2.user_id where tb1.id <> tb2.id
) as tab group by item_i, item_j
) as sim_tab left join app.eqs_merchandise_model as c on sim_tab.item_j=c.id
) as sim_tab2 left join app.eqs_merchandise_model as c on sim_tab2.item_i=c.id order by item_i,sim desc
- 通過spark實現(xiàn)
def euclidean2(v1: Vector, v2: Vector): Double = {
require(v1.size == v2.size, s"SimilarityAlgorithms:Vector dimensions do not match: Dim(v1)=${v1.size} and Dim(v2)" +
s"=${v2.size}.")
val x = v1.toArray
val y = v2.toArray
euclidean(x, y)
}
def euclidean(x: Array[Double], y: Array[Double]): Double = {
require(x.length == y.length, s"SimilarityAlgorithms:Array length do not match: Len(x)=${x.length} and Len(y)" +
s"=${y.length}.")
math.sqrt(x.zip(y).map(p => p._1 - p._2).map(d => d * d).sum)
}
def euclidean(v1: Vector, v2: Vector): Double = {
val sqdist = Vectors.sqdist(v1, v2)
math.sqrt(sqdist)
}
余弦距離
- 計算公式

- 通過案例說明
與歐幾里得距離類似,也是用來求解N維空間內(nèi)兩個點的相似程度,不同的是余弦相似度計算的是A、B兩個點與原點構(gòu)成的夾角,夾角越小相似度越大;同樣以上面的例子進行說明計算用戶1與用戶2的相似度:

- 通過sql實現(xiàn)余弦相似度計算
with tb1 as (
SELECT u_i as user_id, pid id, score as rating from tab
) ,
tb2 as
(SELECT u_i as user_id, pid id, score as rating from tab )
select title,recommend,sim from (
select sim_tab.*, c.title as recommend from (
sELECT item_i, item_j, sum(item_ij)/(sqrt(sum(item_i_pow)) * sqrt(sum(item_j_pow))) sim from (
sELECT tb1.id as item_i, tb2.id as item_j, pow(tb1.rating, 2) as item_i_pow, pow(tb2.rating, 2) as item_j_pow, tb1.rating * tb2.rating as item_ij from tb1 LEFT join tb2 on tb1.user_id=tb2.user_id where tb1.id <> tb2.id
) as tab group by item_i, item_j
) as sim_tab left join app.eqs_merchandise_model as c on sim_tab.item_j=c.id
) as sim_tab2 left join app.eqs_merchandise_model as c on sim_tab2.item_i=c.id order by item_i,sim desc
皮爾遜系數(shù)
- 計算公式

- 通過案例說明
皮爾遜相關(guān)系數(shù)是一個介于-1和1之間的數(shù),它度量 兩個一一對應(yīng)數(shù)列之間的線性相關(guān)程度。也就是說,它表示兩個數(shù)列中對應(yīng)數(shù)字一起增大或者一起減小的可能性。它度量數(shù)字一起按比例改變的傾向性,也就是說兩個數(shù)列中的數(shù)字存在一個大致的線性關(guān)系。當該傾向性強時,相關(guān)值趨于1。當相關(guān)性很弱時,相關(guān)值趨于0。在負相關(guān)的情況下一個序列的值很高而另一個序列的值低,相關(guān)性就低。假設(shè)用戶3喜歡 物品1 和 物品4,評分分別為1、3,那么用戶1與用戶3的皮爾遜系數(shù)為:

盡量皮爾遜系數(shù)在很多推薦算法中進行了應(yīng)用,但是它也有一些明顯的不足之處,例如兩個看過200部相同的電影的用戶,即便他們給出的評分偶爾不一致,但可能要比兩個看過相同兩部且評分一致的用戶更相似。
- 通過spark實現(xiàn)
def pearsonCorrelationSimilarity(arr1: Array[Double], arr2: Array[Double]): Double = {
require(arr1.length == arr2.length, s"SimilarityAlgorithms:Array length do not match: Len(x)=${arr1.length} and Len(y)" +
s"=${arr2.length}.")
val sum_vec1 = arr1.sum
val sum_vec2 = arr2.sum
val square_sum_vec1 = arr1.map(x => x * x).sum
val square_sum_vec2 = arr2.map(x => x * x).sum
val zipVec = arr1.zip(arr2)
val product = zipVec.map(x => x._1 * x._2).sum
val numerator = product - (sum_vec1 * sum_vec2 / arr1.length)
val dominator = math.pow((square_sum_vec1 - math.pow(sum_vec1, 2) / arr1.length) * (square_sum_vec2 - math.pow(sum_vec2, 2) / arr2.length), 0.5)
if (dominator == 0) Double.NaN else numerator / (dominator * 1.0)
}
2、召回算法
協(xié)同過濾
基于用戶的協(xié)同過濾
- 找到和當前用戶相近的一批用戶
- 這批用戶看過,但當前用戶沒有看過的商品評分乘以這個用戶與當前用戶的相似度分值,得到當前用戶對新商品的預(yù)測分
- 將相同新商品預(yù)測分進行累加
- 新商品列表按照預(yù)測分倒序排列,取Top推薦給當前用戶
通過案例進行說明:

User 1 看過 Item 1 和 Item 2,而 User 3 和 User 4 也看過 Item 1 和 Item 2,那么 User 1 和 User 3、User 4 就是相似用戶。這樣一來,如果 User 3 和 User 4 還分別看過 Item 3 和 Item 4,我們就可以將 Item 3 和 Item 4 都推薦給 User 1 了。但是這里有個問題,我們并沒有預(yù)測User1對 Item 3 和 Item 4的偏好程度,所以也就不清楚User1更喜歡哪個。
基于上面的表格,我們可以把Item1到Item4看做是一個4維的空間向量,那么,每個用戶可以認為是這4維空間中的一個向量,每一維的向量值就是當前用戶對該物品的評分。n維空間求解向量相似度就可以用到我們上面說的余弦距離公式了。

分別計算與User1相似度:

預(yù)測User1對新商品Item3、Item4的偏好程度:
Item 3 的推薦打分是:1 * 0.73=0.73(User3對Item3 的喜好度 * User3 和 User1 的相似度)
Item 4 的推薦打分是:2 * 0.54 = 1.08(User4對Item4 的喜好度 * User4 和 User1 的相似度)
基于物品的協(xié)同過濾
- 基于用戶物品評價矩陣,計算物品相似度矩陣
- 將物品相似度矩陣與當前用戶的物品評分矩陣相乘
- 將新的矩陣按照預(yù)測評分進行倒序排列,取Top推薦給當前用戶
通過案例進行說明:

為了便于大家理解,這里將上面的表格進行了行列對換,這里我們假設(shè)以用戶作為維度,物品作為向量,求解兩個物品在n維空間中的相似程度。
計算方式和上面計算相似用戶的步驟是一致的,從上面表格我們大致可以看出Item3與Item1、Item2都相似,且User1曾經(jīng)對Item1、Item2進行了評分,那么User1對Item3的偏好程度為:
Item3的推薦打分 = Item3與Item1的相似度 * User1對Item1的評分 + Item3與Item2的相似度 * User1對Item2的評分
這里我們假設(shè)已完成物品相似矩陣的計算,那么結(jié)合當前用戶的評分列表就可以求出當前用戶對推薦物品的偏好程度:

協(xié)同算法思考
- 用戶行為發(fā)生時間距離當前時間越近,越能反應(yīng)用戶的興趣
- 相近兩個行為更能反應(yīng)出元素之間的相似性
基于以上假設(shè),如何對協(xié)同過濾公式進行優(yōu)化,增加如下時間衰減因子
1/(1 + α*(data[u][i][time] - data[v][i][time]))
1/(1 + α*(data[u][i][time] - data[u][j][time]))
3、 基于Spark的實現(xiàn)
下面通過余弦距離計算物品間相似度,關(guān)于歐式距離、同現(xiàn)矩陣等方式大家可以嘗試修改步驟3來實現(xiàn)
/**
* 余弦相似度矩陣計算.
* T(x,y) = ∑x(i)y(i) / sqrt(∑(x(i)*x(i))) * sqrt(∑(y(i)*y(i)))
*
* MovieLens 【數(shù)據(jù)地址:https://grouplens.org/datasets/movielens/】(1M、10M、20M 共三個數(shù)據(jù)集)
*/
// 1 數(shù)據(jù)準備
val user_item_df = spark.read.options(Map(("delimiter",","),("header","true"))).csv("/tmp/ml-latest-small/ratings.csv")
val user_ds1 = user_item_df.groupBy("userId").agg(collect_set(concat_ws(":","movieId","rating")).as("item_set"))
// 2 物品:物品,上三角數(shù)據(jù)
val user_ds2 = user_ds1.flatMap { row =>
val itemlist = row.getAs[scala.collection.mutable.WrappedArray[String]](1).toArray.sorted
val result = new ArrayBuffer[(String, String, Double, Double)]()
for (i <- 0 to itemlist.length - 2) {
for (j <- i + 1 to itemlist.length - 1) {
result += ((itemlist(i).split(":")(0), itemlist(j).split(":")(0), itemlist(i).split(":")(1).toDouble, itemlist(j).split(":")(1).toDouble))
}
}
result
}.withColumnRenamed("_1", "itemidI").withColumnRenamed("_2", "itemidJ").withColumnRenamed("_3", "scoreI").withColumnRenamed("_4", "scoreJ")
// 3 按照距離公式求解相似度
// x*y = ∑x(i)y(i)
// |x|^2 = ∑(x(i)*x(i))
// |y|^2 = ∑(y(i)*y(i))
// result = x*y / sqrt(|x|^2) * sqrt(|y|^2)
val user_ds3 = user_ds2.
withColumn("cnt", lit(1)).
groupBy("itemidI", "itemidJ").
agg(sum(($"scoreI" * $"scoreJ")).as("sum_xy"),
sum(($"scoreI" * $"scoreI")).as("sum_x"),
sum(($"scoreJ" * $"scoreJ")).as("sum_y")).
withColumn("result", $"sum_xy" / (sqrt($"sum_x") * sqrt($"sum_y")))
// 4 上、下三角合并
val user_ds8 = user_ds3.select("itemidI", "itemidJ", "result").
union(user_ds3.select($"itemidJ".as("itemidI"), $"itemidI".as("itemidJ"), $"result"))
val user_prefer_ds2=user_ds8.join(user_item_df, $"itemidI"===$"movieId", "inner")
// 計算召回的用戶物品得分
val user_prefer_ds3 = user_prefer_ds2.withColumn("score", col("pref") * col("similar")).select("userid", "itemidJ", "score")
// user_prefer_ds3.show()
// 得分匯總
val user_prefer_ds4 = user_prefer_ds3.groupBy("userid", "itemidJ").agg(sum("score").as("score")).withColumnRenamed("itemidJ", "itemid")
// user_prefer_ds4.show()
// 用戶得分排序結(jié)果,去除用戶已評分物品
val user_prefer_ds5 = user_prefer_ds4.join(user_prefer_ds1, Seq("userid", "itemid"), "left").where("pref is null")
// user_prefer_ds5.show()
4、 基于python的實現(xiàn)
import math
user_items = {}
threshold = 5
def process_data(df):
for v in df.itertuples():
user_items.setdefault(v[1], {})
user_items[v[1]].setdefault(v[2], v[-1])
def user_similarity_cos():
W = {}
print(len(user_items.keys()))
c = 0
for user1 in user_items.keys():
#W.setdefault(user1, {})
c += 1
if(c % 2000 == 0):
print(c)
for user2 in user_items.keys():
if user1 == user2:
continue
#W[user1].setdefault(user2, 0)
if user2 in W.keys():
W[user1][user2] = W[user2][user1]
continue
cross_items = user_items[user1].keys() & user_items[user2].keys()
if len(cross_items) < threshold:
continue
#余弦距離
W.setdefault(user1, {})
#W[user1].setdefault(user2, 0)
sum_xy = sum([user_items[user1][v] * user_items[user2][v] for v in cross_items])
W[user1][user2] = sum_xy/(math.sqrt(sum([user_items[user1][v] * user_items[user1][v] for v in cross_items]))*math.sqrt(sum([user_items[user2][v] * user_items[user2][v] for v in cross_items])))
#歐氏距離
#dis = sum([pow(user_items[user1][v] - user_items[user2][v],2) for v in cross_items])
#W[user1][user2] = 1/(1+math.sqrt(dis))
return W
def recommend_cos(user, topN):
res = {}
items = user_items[user].keys()
for u, items_score in user_items.items():
if u == user or u not in user_simi[user].keys():
continue
for i in items_score.keys():
if i not in items:
res.setdefault(i, 0)
res[i] += user_simi[user][u] * items_score[i]
sort_val = dict(sorted(res.items(), key=lambda e: -e[1]))
data = {}
cn = 0
for i, s in sort_val.items():
cn += 1
if cn > topN:
break
data[i] = s
return data
user_simi = user_similarity_cos()
recommend('a38a23d5774a4e57bc8174928bac17d9', 5)
基于spark sql實現(xiàn)
- 構(gòu)建物品相似度
sql("with tb1 as (SELECT u_i as user_id, pid id, score as rating from tab ) , tb2 as (SELECT u_i as user_id, pid id, score as rating from tab )sELECT item_i, item_j, sum(item_ij)/(sqrt(sum(item_i_pow)) * sqrt(sum(item_j_pow))) sim from (sELECT tb1.id as item_i, tb2.id as item_j, pow(tb1.rating, 2) as item_i_pow, pow(tb2.rating, 2) as item_j_pow, tb1.rating * tb2.rating as item_ij from tb1 LEFT join tb2 on tb1.user_id=tb2.user_id where tb1.id <> tb2.id ) as tab group by item_i, item_j ").createOrReplaceTempView("sim")

- 計算推薦列表
sql(" select user, item, preference/icn preference,tab.score from(select user, item, sum(preference) preference from( select u_i as user, item_j as item, sum(sim*score) as preference from sim as a left join tab as b on a.item_i == b.pid group by user,item) cc left join tab dd on cc.item ==dd.pid group by user, item ) as c left join tab on tab.u_i==c.user and tab.pid == c.item ").where("score is not null").registerTempTable("rec_tab")