背景
在推薦領域CTR(click-through rate)預估任務中,最常用到的基礎模型是LR(Logistic Regression)模型。對數(shù)據(jù)進行特征工程,構(gòu)造出大量單特征,編碼后送入模型。LR模型的優(yōu)勢在于:運算速度快;可解釋性強;在特征挖掘完備且訓練數(shù)據(jù)充分的前提下能夠達到一定精度。但這種模型的缺點也較為明顯:
-
模型并未考慮到特征之間的關系
在現(xiàn)實情境下,特征之間無法滿足線性回歸中的嚴苛假設,即特征之間是存在相互關系的(如:“化妝品”類商品與“女”性,“球類運動配件”類商品與“男”性均存在相互關系)。顯然,對特征進行交叉組合往往能夠更好地提升模型效果。 -
高維災難問題
高維的稀疏矩陣是實際工程中常見的問題,并直接會導致計算量過大,特征權(quán)值更新緩慢。如果能夠解決高維稀疏矩陣所帶來的低效率,耗性能問題,就能避免LR模型的高維災難問題。
FM(Factorization Machine)模型就是針對在特征組合過程中遇到的上述問題而提出的一種高效的解決方案。
FM算法解析
1.FM定義
FM模型以特征組合為切入點,在公式定義中引入特征交叉項,公式如下圖( FM模型可擴展到高階,但為簡化分析,這里只討論二階交叉)。

2.公式改寫
在實際工程中,數(shù)據(jù)非常稀疏的情況下很難滿足,
都不為0,這樣會導致
不能通過訓練的到,因此無法進行相應的參數(shù)估計。我們可以發(fā)現(xiàn)參數(shù)矩陣
是一個實對稱矩陣,
可以使用矩陣分解的方法分解,通過引入輔助向量(又稱為隱向量)
來表示【圖1、圖2】




重心轉(zhuǎn)移到如何求解公式(2)后面的二階項,得到公式(3)



引入隱向量的好處是:
- 特征交叉項的參數(shù)數(shù)量由原來的
降為(公式2)的
再降為(公式3)的
,即通過引入隱向量計算
的復雜度為
,改寫后的計算復雜度為
,提高模型推斷速度。
- 原先參數(shù)之間并無關聯(lián)關系,但是現(xiàn)在通過隱向量可以建立關系。如,之前
與
無關,但是現(xiàn)在
=
,
=
兩者有共同的
,也就是說,所有包含
,
的非零組合特征(存在某個
,使得
)的樣本都可以用來學習隱向量
,這很大程度上避免了數(shù)據(jù)稀疏性造成的影響。
3. FM求解
到目前為止已經(jīng)得到了FM的模型表示【圖5】,如何對模型參數(shù)求解呢?可以使用常見的梯度下降法對參數(shù)進行求解,為了對參數(shù)進行梯度下降更新,需要計算模型各參數(shù)的梯度表達式:
當參數(shù)為時,
=1
當參數(shù)為時,
=
當參數(shù)為時,只需要關注模型高階項,當計算參數(shù)
的梯度時,其余無關參數(shù)可看做常數(shù)。

其中:

令,則:

結(jié)合圖6~8得:

綜上,最終模型各參數(shù)的梯度表達式如下:

4. FM算法的Python實現(xiàn)
FM算法的Python實現(xiàn)流程圖如下:

案例演示:用python實現(xiàn)FM算法,數(shù)據(jù)場景為二分類問題

損失函數(shù)我們使用函數(shù)




完整代碼
'''
FM(因子分解機)模型算法:稀疏數(shù)據(jù)下的特征二階組合問題(個性化特征)
1、應用矩陣分解思想,引入隱向量構(gòu)造FM模型方程
2、目標函數(shù)(損失函數(shù)復合FM模型方程)的最優(yōu)問題:鏈式求偏導
3、SGD優(yōu)化目標函數(shù)
'''
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
'''二分類輸出非線性映射'''
def sigmoid(x):
return 1 / (1 + np.exp(-x))
'''計算logit損失函數(shù):L = log(1 + e**(y_hat * y))'''
def logit(y, y_hat):
return np.log(1 + np.exp(-y * y_hat))
'''計算logit損失函數(shù)的外層偏導(不含y_hat的一階偏導)'''
def df_logit(y, y_hat):
return sigmoid(-y * y_hat) * (-y)
'''FM的模型方程:LR線性組合 + 交叉項組合 = 1階特征組合 + 2階特征組合'''
def FM(Xi, w0, W, V):
# 樣本Xi的特征分量xi和xj的2階交叉項組合系數(shù)wij = xi和xj對應的隱向量Vi和Vj的內(nèi)積
# 向量形式:Wij:= <Vi, Vj> * Xi * Xj
interaction = np.sum((Xi.dot(V)) ** 2 - (Xi ** 2).dot(V ** 2)) # 二值硬核匹配->向量軟匹配
y_hat = w0 + Xi.dot(W) + interaction / 2 # FM預測函數(shù)
return y_hat[0]
'''SGD更新FM模型的參數(shù)列表:[w0, W, V]'''
def FM_SGD(X, y, k=2, alpha=0.01, iter=50):
m, n = np.shape(X)
w0, W = 0, np.zeros((n, 1)) # 初始化wo=R、W=(n, 1)
V = np.random.normal(loc=0, scale=1, size=(n, k)) # 初始化隱向量矩陣V=(n, k)~N(0, 1),其中Vj是第j維特征的隱向量
all_FM_params = [] # FM模型的參數(shù)列表:[w0, W, V]
for it in range(iter):
total_loss = 0 # 當前迭代模型的損失值
for i in range(m): # 遍歷訓練集
y_hat = FM(Xi=X[i], w0=w0, W=W, V=V) # FM的模型方程
total_loss += logit(y=y[i], y_hat=y_hat) # 計算logit損失函數(shù)值
dloss = df_logit(y=y[i], y_hat=y_hat) # 計算logit損失函數(shù)的外層偏導
dloss_w0 = dloss * 1 # l(y, y_hat)中y_hat展開w0,求關于w0的內(nèi)層偏導
w0 = w0 - alpha * dloss_w0 # 梯度下降更新w0
for j in range(n): # 遍歷n維向量X[i]
if X[i, j] != 0:
dloss_Wj = dloss * X[i, j] # l(y, y_hat)中y_hat展開y_hat,求關于W[j]的內(nèi)層偏導
W[j] = W[j] - alpha * dloss_Wj # 梯度下降更新W[j]
for f in range(k): # 遍歷k維隱向量Vj
# l(y, y_hat)中y_hat展開V[j, f],求關于V[j, f]的內(nèi)層偏導
dloss_Vjf = dloss * (X[i, j] * (X[i].dot(V[:, f])) - V[j, f] * X[i, j] ** 2)
V[j, f] = V[j, f] - alpha * dloss_Vjf # 梯度下降更新V[j, f]
print('FM第{}次迭代,當前損失值為:{:.4f}'.format(it + 1, total_loss / m))
all_FM_params.append([w0, W, V]) # 保存當前迭代下FM的參數(shù)列表:[w0, W, V]
return all_FM_params
'''FM模型預測測試集分類結(jié)果'''
def FM_predict(X, w0, W, V):
predicts, threshold = [], 0.5 # sigmoid閾值設置
for i in range(X.shape[0]): # 遍歷測試集
y_hat = FM(Xi=X[i], w0=w0, W=W, V=V) # FM的模型方程
predicts.append(-1 if sigmoid(y_hat) < threshold else 1) # 分類結(jié)果非線性映射
return np.array(predicts)
'''FM在不同迭代次數(shù)下的參數(shù)列表中,訓練集的損失值和測試集的準確率變化'''
def draw_research(all_FM_params, X_train, y_train, X_test, y_test):
all_total_loss, all_total_accuracy = [], []
for w0, W, V in all_FM_params:
total_loss = 0
for i in range(X_train.shape[0]):
total_loss += logit(y=y_train[i], y_hat=FM(Xi=X_train[i], w0=w0, W=W, V=V))
all_total_loss.append(total_loss / X_train.shape[0])
all_total_accuracy.append(accuracy_score(y_test, FM_predict(X=X_test, w0=w0, W=W, V=V)))
plt.plot(np.arange(len(all_FM_params)), all_total_loss, color='#FF4040', label='訓練集的損失值')
plt.plot(np.arange(len(all_FM_params)), all_total_accuracy, color='#4876FF', label='測試集的準確率')
plt.xlabel('SGD迭代次數(shù)')
plt.title('FM模型:二階互異特征組合')
plt.legend()
plt.show()
if __name__ == '__main__':
np.random.seed(123)
df = pd.read_csv(r'D:\\FM-master\\data\\xg.csv')
df['Class'] = df['Class'].map({0: -1, 1: 1}) # 標簽列從[0, 1]離散到[-1, 1]
X_train, X_test, y_train, y_test = train_test_split(df.iloc[:, :-1].values, df.iloc[:, -1].values, test_size=0.3, random_state=123)
X_train = MinMaxScaler().fit_transform(X_train) # 歸一化訓練集,返回[0, 1]區(qū)間
X_test = MinMaxScaler().fit_transform(X_test) # 歸一化測試集,返回[0, 1]區(qū)間
'''*****************FM預測模型*****************'''
all_FM_params = FM_SGD(X=X_train, y=y_train, k=2, alpha=0.01, iter=45) # SGD更新FM模型的參數(shù)列表:[w0, W, V]
w0, W, V = all_FM_params[-1] # FM模型的參數(shù)列表
predicts = FM_predict(X=X_test, w0=w0, W=W, V=V) # FM模型預測測試集分類結(jié)果 80.52% 80.09%
print('FM在測試集的分類準確率為: {:.2%}'.format(accuracy_score(y_test, predicts)))
# draw_research(all_FM_params=all_FM_params, X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test)
FM算法的評價指標
import pandas as pd
import numpy as np
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
rnames = ['user_id', 'movie_id', 'rating', 'timestamp']
df = pd.read_csv('D:\\u.data', sep='\t', header=None, names=rnames, engine='python')
#構(gòu)造2分類數(shù)據(jù)集
df['rating']=df['rating'].map(lambda x: -1 if x>=3 else 1) #1,2是label=1 3,4,5是label=0
#one-hot encoder
from sklearn.preprocessing import OneHotEncoder
columns=['user_id', 'movie_id']
for i in columns:
get_dummy_feature=pd.get_dummies(df[i])
df=pd.concat([df, get_dummy_feature],axis=1)
df=df.drop(i, axis=1)
df=df.drop(['timestamp'], axis=1)
#這些特征可以進一步挖掘。這里都不要了,只保留one-hot特征
from sklearn.model_selection import train_test_split
X=df.drop('rating', axis=1)
Y=df['rating']
X_train,X_val,Y_train,Y_val=train_test_split(X, Y, test_size=0.3, random_state=123)
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def logit(y, y_hat): #對每一個樣本計算損失
if y_hat == 'nan':
return 0
else:
return np.log(1 + np.exp(-y * y_hat))
def df_logit(y, y_hat):
return sigmoid(-y * y_hat) * (-y)
from sklearn.base import BaseEstimator, ClassifierMixin
from collections import Counter
class FactorizationMachine(BaseEstimator):
def __init__(self, k=5, learning_rate=0.01, iternum=2):
self.w0 = None
self.W = None
self.V = None
self.k = k
self.alpha = learning_rate
self.iternum = iternum
def _FM(self, Xi):
interaction = np.sum((Xi.dot(self.V)) ** 2 - (Xi ** 2).dot(self.V ** 2))
y_hat = self.w0 + Xi.dot(self.W) + interaction / 2
return y_hat[0]
def _FM_SGD(self, X, y):
m, n = np.shape(X)
# 初始化參數(shù)
self.w0 = 0
self.W = np.random.uniform(size=(n, 1))
self.V = np.random.uniform(size=(n, self.k)) # Vj是第j個特征的隱向量 Vjf是第j個特征的隱向量表示中的第f維
for it in range(self.iternum):
total_loss = 0
for i in range(m): # 遍歷訓練集
y_hat = self._FM(Xi=X[i]) # X[i]是第i個樣本 X[i,j]是第i個樣本的第j個特征
total_loss += logit(y=y[i], y_hat=y_hat) # 計算logit損失函數(shù)值
dloss = df_logit(y=y[i], y_hat=y_hat) # 計算logit損失函數(shù)的外層偏導
dloss_w0 = dloss * 1 # 公式中的w0求導,計算復雜度O(1)
self.w0 = self.w0 - self.alpha * dloss_w0
for j in range(n):
if X[i, j] != 0:
dloss_Wj = dloss * X[i, j] # 公式中的wi求導,計算復雜度O(n)
self.W[j] = self.W[j] - self.alpha * dloss_Wj
for f in range(self.k): # 公式中的vif求導,計算復雜度O(kn)
dloss_Vjf = dloss * (X[i, j] * (X[i].dot(self.V[:, f])) - self.V[j, f] * X[i, j] ** 2)
self.V[j, f] = self.V[j, f] - self.alpha * dloss_Vjf
print('iter={}, loss={:.4f}'.format(it+1, total_loss / m))
return self
def _FM_predict(self, X):
predicts, threshold = [], 0.5 # sigmoid閾值設置
for i in range(X.shape[0]): # 遍歷測試集
y_hat = self._FM(Xi=X[i]) # FM的模型方程
predicts.append(-1 if sigmoid(y_hat) < threshold else 1)
return np.array(predicts)
def fit(self, X, y):
if isinstance(X, pd.DataFrame):
X = np.array(X)
y = np.array(y)
return self._FM_SGD(X, y)
def predict(self, X):
if isinstance(X, pd.DataFrame):
X = np.array(X)
return self._FM_predict(X)
def predict_proba(self, X):
pass
from sklearn.metrics import roc_auc_score, confusion_matrix
model=FactorizationMachine(k=10, learning_rate=0.001, iternum=2)
model.fit(X_train, Y_train)
y_pred=model.predict(X_train)
print('訓練集roc: {:.2%}'.format(roc_auc_score(Y_train.values, y_pred)))
print('混淆矩陣: \n',confusion_matrix(Y_train.values, y_pred))
y_true=model.predict(X_val)
print('驗證集roc: {:.2%}'.format(roc_auc_score(Y_val.values, y_true)))
print('混淆矩陣: \n',confusion_matrix(Y_val.values, y_true))
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import recall_score
from sklearn.metrics import precision_score
from sklearn.metrics import accuracy_score
X_val = MinMaxScaler().fit_transform(X_val)#歸一化測試集,返回[0,1]區(qū)間
val_predicts = model._FM_predict(X_val)
print('FM測試集的分類準確率為: {:.2%}'.format(accuracy_score(Y_val,val_predicts)))
print("FM測試集均方誤差mse:{:.2%}".format(mean_squared_error(Y_val,val_predicts)))
print("FM測試集召回率recall:{:.2%}".format(recall_score(Y_val,val_predicts)))
print("FM測試集的精度precision:{:.2%}".format(precision_score(Y_val,val_predicts)))
5. FM算法小結(jié)
1.FM算法降低了因數(shù)據(jù)稀疏性而導致的特征交叉項參數(shù)學習不充分的影響。
2.FM算法提升了參數(shù)學習效率和模型預估的能力。
參考資料:http://www.itdecent.cn/p/8d792422e582
https://www.cnblogs.com/yinzm/p/11619829.html
https://gith<wbr>ub.com/moren<wbr>jiujiu/FM/bl<wbr>ob/master/FM<wbr>_Demo.ipynb