SVD 代碼實(shí)踐

本文主要介紹tensorflow和pyspark對(duì)svd的實(shí)現(xiàn),具體原理可見(jiàn)上篇-SVD在協(xié)同過(guò)濾推薦系統(tǒng)中的應(yīng)用

大綱

  1. SVD 代碼實(shí)踐之tensorflow
  2. SVD 代碼實(shí)踐之pyspark

本文不介紹原理,但是仍回顧下目標(biāo)函數(shù):
C = \sum_{(u, i) \in R} (r_{ui} - p_u^Tq_i)^2 + \lambda (||p_u||^2 +||q_i||^2 )
本文使用的數(shù)據(jù)集是經(jīng)典的電影評(píng)分?jǐn)?shù)據(jù)集.

1. SVD代碼實(shí)踐之tensorflow

先對(duì)目標(biāo)函數(shù)做下簡(jiǎn)化:
C = \sum_{(u, i) \in R} (r_{ui} - p_u^Tq_i)^2

讀取數(shù)據(jù)集

數(shù)據(jù)通過(guò)"\t"分隔,共有4列('user', 'item', 'rate', 'time'),但其實(shí)真正需要的是前3列。

1.1 讀取數(shù)據(jù)集

import pandas as pd
import time

# read data
df = pd.read_csv('u.data', sep='\t', names=['user', 'item', 'rate', 'time'])
df["rate"] = df["rate"].astype("float")
print (df.dtypes)
msk = numpy.random.rand(len(df)) < 0.7 #產(chǎn)生[true,false,false,true]
df_train = df[msk]

user_indecies = [x-1 for x in df_train.user.values]
user_num = max(df_train.user.values)
item_indecies = [x-1 for x in df_train.item.values]
item_num = max(df_train.item.values)
rates = df_train.rate.values
print ("user_num: %d, item_num: %d" % (user_num, item_num) )

注意:原數(shù)據(jù)集的rate字段本來(lái)是int類(lèi)型,經(jīng)過(guò)試驗(yàn),轉(zhuǎn)換成float類(lèi)型也是可以的。只是在預(yù)測(cè)的時(shí)候很奇怪,具體可見(jiàn)1.3訓(xùn)練并測(cè)試。

1.2 計(jì)算目標(biāo)函數(shù),建模型

# variables
feature_len = 10
U = tf.Variable(initial_value=tf.truncated_normal([user_num, feature_len]), name='users')
P = tf.Variable(initial_value=tf.truncated_normal([feature_len, item_num]), name='items')
result = tf.matmul(U, P)
result_flatten = tf.reshape(result, [-1])

# rating
R = tf.gather(result_flatten, user_indecies * tf.shape(result)[1] + item_indecies, name='extracting_user_rate')

# cost function
diff_op = tf.subtract(R, rates, name='trainig_diff')
diff_op_squared = tf.abs(diff_op, name="squared_difference")
base_cost = tf.reduce_sum(diff_op_squared, name="sum_squared_error")

# regularization
lda = tf.constant(.001, name='lambda')
norm_sums = tf.add(tf.reduce_sum(tf.abs(U, name='user_abs'), name='user_norm'),
                   tf.reduce_sum(tf.abs(P, name='item_abs'), name='item_norm'))
regularizer = tf.multiply(norm_sums, lda, 'regularizer')

# cost function
lr = tf.constant(.001, name='learning_rate')
global_step = tf.Variable(0, trainable=False)
learning_rate = tf.train.exponential_decay(lr, global_step, 10000, 0.96, staircase=True)
optimizer = tf.train.GradientDescentOptimizer(learning_rate)
training_step = optimizer.minimize(base_cost, global_step=global_step)

tf.gather是將參數(shù)中的切片收集到由索引指定的形狀的張量中,所以解決了(u, i) \in R 的問(wèn)題。

1.3 訓(xùn)練并測(cè)試

sess = tf.Session()
init = tf.global_variables_initializer()
sess.run(init)

print ("training...")
print(sess.run(R))
for i in range(2):
    sess.run(training_step)

print(df.dtypes)
u, p, r = df[['user', 'item', 'rate']].values[0]
#u,p為什么會(huì)莫名其妙地變成float類(lèi)型
u, p = int(u), int(p)
print(u, p, r)
rhat = tf.gather(tf.gather(result, u-1), p-1)
print("rating for user " + str(u) + " for item " + str(p) + " is " + str(r) + " and our prediction is: " + str(sess.run(rhat)))

# calculate accuracy
df_test = df[~msk] #test set
user_indecies_test = [x-1 for x in df_test.user.values]
item_indecies_test = [x-1 for x in df_test.item.values]
rates_test = df_test.rate.values

# accuracy
R_test = tf.gather(result_flatten, user_indecies_test * tf.shape(result)[1] + item_indecies_test, name='extracting_user_rate_test')
diff_op_test = tf.subtract(R_test, rates_test, name='test_diff')
diff_op_squared_test = tf.abs(diff_op, name="squared_difference_test")

cost_test = tf.div(tf.reduce_sum(tf.square(diff_op_squared_test, name="squared_difference_test"), name="sum_squared_error_test"), df_test.shape[0], name="average_error")
print(sess.run(cost_test))

當(dāng)前面把rate字段轉(zhuǎn)換成float類(lèi)型的時(shí)候,在'u, p, r = df[['user', 'item', 'rate']].values[0]'這行代碼,u, p變成了float類(lèi)型,感覺(jué)很奇怪,沒(méi)想明白。

2. SVD 代碼實(shí)踐之pyspark

spark因?yàn)橹苯佑衜llib庫(kù),所以直接調(diào)用的API,spark的求解方式是ALS。

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
rdd = sc.textFile(data_path).map(lambda x: x.strip().split("\t"))
ratings = rdd.map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2])))
rank = 10
num_iterations = 10
model = ALS.train(ratings, rank, num_iterations)

spark的ALS有很多種predict的方法:

  • predict
  • predictAll
  • recommendUsers
  • recommendProducts
  • recommendProductsForUsers
  • recommendUsersforProducts
    更詳細(xì)的可參考這篇博客Pyspark ALS and Recommendation Outputs
    因?yàn)槭钦{(diào)用API,所以操作起來(lái)比tensorflow簡(jiǎn)單。

參考資料

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容