FM算法解析及Python代碼實現(xiàn)

背景

在推薦領域CTR(click-through rate)預估任務中,最常用到的基礎模型是LR(Logistic Regression)模型。對數(shù)據(jù)進行特征工程,構(gòu)造出大量單特征,編碼后送入模型。LR模型的優(yōu)勢在于:運算速度快;可解釋性強;在特征挖掘完備且訓練數(shù)據(jù)充分的前提下能夠達到一定精度。但這種模型的缺點也較為明顯:

  1. 模型并未考慮到特征之間的關系
    在現(xiàn)實情境下,特征之間無法滿足線性回歸中的嚴苛假設,即特征之間是存在相互關系的(如:“化妝品”類商品與“女”性,“球類運動配件”類商品與“男”性均存在相互關系)。顯然,對特征進行交叉組合往往能夠更好地提升模型效果。
  2. 高維災難問題
    高維的稀疏矩陣是實際工程中常見的問題,并直接會導致計算量過大,特征權(quán)值更新緩慢。如果能夠解決高維稀疏矩陣所帶來的低效率,耗性能問題,就能避免LR模型的高維災難問題。

FM(Factorization Machine)模型就是針對在特征組合過程中遇到的上述問題而提出的一種高效的解決方案。

FM算法解析

1.FM定義

FM模型以特征組合為切入點,在公式定義中引入特征交叉項,公式如下圖( FM模型可擴展到高階,但為簡化分析,這里只討論二階交叉)。


公式(1)

2.公式改寫

在實際工程中,數(shù)據(jù)非常稀疏的情況下很難滿足x_i,x_j都不為0,這樣會導致w_{ij}不能通過訓練的到,因此無法進行相應的參數(shù)估計。我們可以發(fā)現(xiàn)參數(shù)矩陣w是一個實對稱矩陣,w_{ij}可以使用矩陣分解的方法分解,通過引入輔助向量(又稱為隱向量)V來表示【圖1、圖2】

圖1

圖2

現(xiàn)在可以將公式(1)進行改寫得到公式(2):
公式(2)

重心轉(zhuǎn)移到如何求解公式(2)后面的二階項,得到公式(3)


公式(3)特征交叉項推導

圖4
最終模型表達式為:
圖5

引入隱向量的好處是:

  • 特征交叉項的參數(shù)數(shù)量由原來的\frac{n(n-1)}{2}降為(公式2)的kn^2再降為(公式3)的kn,即通過引入隱向量計算y的復雜度為O(kn^2) ,改寫后的計算復雜度為O(kn),提高模型推斷速度。
  • 原先參數(shù)之間并無關聯(lián)關系,但是現(xiàn)在通過隱向量可以建立關系。如,之前w_{ij}w_{ik}無關,但是現(xiàn)在w_{ij}=<V_i,V_j>w_{ik}=<V_i,V_k>兩者有共同的V_i,也就是說,所有包含x_i,x_j的非零組合特征(存在某個j≠i,使得x_ix_j≠0)的樣本都可以用來學習隱向量V_i,這很大程度上避免了數(shù)據(jù)稀疏性造成的影響。

3. FM求解

到目前為止已經(jīng)得到了FM的模型表示【圖5】,如何對模型參數(shù)求解呢?可以使用常見的梯度下降法對參數(shù)進行求解,為了對參數(shù)進行梯度下降更新,需要計算模型各參數(shù)的梯度表達式:
當參數(shù)為w0時,\frac{?y}{?w0}=1
當參數(shù)為wi時,\frac{?y}{?wi}=xi
當參數(shù)為v_{if}時,只需要關注模型高階項,當計算參數(shù)v_{if}的梯度時,其余無關參數(shù)可看做常數(shù)。

圖6

其中:
圖7

令,則:
圖8

結(jié)合圖6~8得:
圖9

綜上,最終模型各參數(shù)的梯度表達式如下:
圖10

4. FM算法的Python實現(xiàn)

FM算法的Python實現(xiàn)流程圖如下:

圖11. FM算法的Python實現(xiàn)

案例演示:用python實現(xiàn)FM算法,數(shù)據(jù)場景為二分類問題
圖12.數(shù)據(jù)場景

損失函數(shù)我們使用函數(shù)
圖13. 損失函數(shù)

圖14. FM模型方程

圖15. SGD更新FM模型的參數(shù)列表

圖16. 模型分類結(jié)果

完整代碼

'''
FM(因子分解機)模型算法:稀疏數(shù)據(jù)下的特征二階組合問題(個性化特征)
1、應用矩陣分解思想,引入隱向量構(gòu)造FM模型方程
2、目標函數(shù)(損失函數(shù)復合FM模型方程)的最優(yōu)問題:鏈式求偏導
3、SGD優(yōu)化目標函數(shù)
'''
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
'''二分類輸出非線性映射'''
def sigmoid(x):
    return 1 / (1 + np.exp(-x))
'''計算logit損失函數(shù):L = log(1 + e**(y_hat * y))'''
def logit(y, y_hat):
    return np.log(1 + np.exp(-y * y_hat))
'''計算logit損失函數(shù)的外層偏導(不含y_hat的一階偏導)'''
def df_logit(y, y_hat):
    return sigmoid(-y * y_hat) * (-y)
'''FM的模型方程:LR線性組合 + 交叉項組合 = 1階特征組合 + 2階特征組合'''
def FM(Xi, w0, W, V):
    # 樣本Xi的特征分量xi和xj的2階交叉項組合系數(shù)wij = xi和xj對應的隱向量Vi和Vj的內(nèi)積
    # 向量形式:Wij:= <Vi, Vj> * Xi * Xj
    interaction = np.sum((Xi.dot(V)) ** 2 - (Xi ** 2).dot(V ** 2))  # 二值硬核匹配->向量軟匹配
    y_hat = w0 + Xi.dot(W) + interaction / 2  # FM預測函數(shù)
    return y_hat[0]
'''SGD更新FM模型的參數(shù)列表:[w0, W, V]'''
def FM_SGD(X, y, k=2, alpha=0.01, iter=50):
    m, n = np.shape(X)
    w0, W = 0, np.zeros((n, 1))  # 初始化wo=R、W=(n, 1)
    V = np.random.normal(loc=0, scale=1, size=(n, k))  # 初始化隱向量矩陣V=(n, k)~N(0, 1),其中Vj是第j維特征的隱向量
    all_FM_params = []  # FM模型的參數(shù)列表:[w0, W, V]
    for it in range(iter):
        total_loss = 0  # 當前迭代模型的損失值
        for i in range(m):  # 遍歷訓練集
            y_hat = FM(Xi=X[i], w0=w0, W=W, V=V)  # FM的模型方程
            total_loss += logit(y=y[i], y_hat=y_hat)  # 計算logit損失函數(shù)值
            dloss = df_logit(y=y[i], y_hat=y_hat)  # 計算logit損失函數(shù)的外層偏導
            dloss_w0 = dloss * 1  # l(y, y_hat)中y_hat展開w0,求關于w0的內(nèi)層偏導
            w0 = w0 - alpha * dloss_w0  # 梯度下降更新w0
            for j in range(n):  # 遍歷n維向量X[i]
                if X[i, j] != 0:
                    dloss_Wj = dloss * X[i, j]  # l(y, y_hat)中y_hat展開y_hat,求關于W[j]的內(nèi)層偏導
                    W[j] = W[j] - alpha * dloss_Wj  # 梯度下降更新W[j]
                    for f in range(k):  # 遍歷k維隱向量Vj
                        # l(y, y_hat)中y_hat展開V[j, f],求關于V[j, f]的內(nèi)層偏導
                        dloss_Vjf = dloss * (X[i, j] * (X[i].dot(V[:, f])) - V[j, f] * X[i, j] ** 2)
                        V[j, f] = V[j, f] - alpha * dloss_Vjf  # 梯度下降更新V[j, f]
        print('FM第{}次迭代,當前損失值為:{:.4f}'.format(it + 1, total_loss / m))
        all_FM_params.append([w0, W, V])  # 保存當前迭代下FM的參數(shù)列表:[w0, W, V]
    return all_FM_params
'''FM模型預測測試集分類結(jié)果'''
def FM_predict(X, w0, W, V):
    predicts, threshold = [], 0.5  # sigmoid閾值設置
    for i in range(X.shape[0]):  # 遍歷測試集
        y_hat = FM(Xi=X[i], w0=w0, W=W, V=V)  # FM的模型方程
        predicts.append(-1 if sigmoid(y_hat) < threshold else 1)  # 分類結(jié)果非線性映射
    return np.array(predicts)
'''FM在不同迭代次數(shù)下的參數(shù)列表中,訓練集的損失值和測試集的準確率變化'''
def draw_research(all_FM_params, X_train, y_train, X_test, y_test):
    all_total_loss, all_total_accuracy = [], []
    for w0, W, V in all_FM_params:
        total_loss = 0
        for i in range(X_train.shape[0]):
            total_loss += logit(y=y_train[i], y_hat=FM(Xi=X_train[i], w0=w0, W=W, V=V))
        all_total_loss.append(total_loss / X_train.shape[0])
        all_total_accuracy.append(accuracy_score(y_test, FM_predict(X=X_test, w0=w0, W=W, V=V)))
    plt.plot(np.arange(len(all_FM_params)), all_total_loss, color='#FF4040', label='訓練集的損失值')
    plt.plot(np.arange(len(all_FM_params)), all_total_accuracy, color='#4876FF', label='測試集的準確率')
    plt.xlabel('SGD迭代次數(shù)')
    plt.title('FM模型:二階互異特征組合')
    plt.legend()
    plt.show()
if __name__ == '__main__':
    np.random.seed(123)
    df = pd.read_csv(r'D:\\FM-master\\data\\xg.csv')
    df['Class'] = df['Class'].map({0: -1, 1: 1})  # 標簽列從[0, 1]離散到[-1, 1]
    X_train, X_test, y_train, y_test = train_test_split(df.iloc[:, :-1].values, df.iloc[:, -1].values, test_size=0.3, random_state=123)
    X_train = MinMaxScaler().fit_transform(X_train)  # 歸一化訓練集,返回[0, 1]區(qū)間
    X_test = MinMaxScaler().fit_transform(X_test)  # 歸一化測試集,返回[0, 1]區(qū)間
    '''*****************FM預測模型*****************'''
    all_FM_params = FM_SGD(X=X_train, y=y_train, k=2, alpha=0.01, iter=45)  # SGD更新FM模型的參數(shù)列表:[w0, W, V]
    w0, W, V = all_FM_params[-1]  # FM模型的參數(shù)列表
    predicts = FM_predict(X=X_test, w0=w0, W=W, V=V)  # FM模型預測測試集分類結(jié)果 80.52%  80.09%
    print('FM在測試集的分類準確率為: {:.2%}'.format(accuracy_score(y_test, predicts)))
    # draw_research(all_FM_params=all_FM_params, X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test)

FM算法的評價指標
圖17
import pandas as pd
import numpy as np

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

rnames = ['user_id', 'movie_id', 'rating', 'timestamp']
df = pd.read_csv('D:\\u.data', sep='\t', header=None, names=rnames, engine='python')
#構(gòu)造2分類數(shù)據(jù)集

df['rating']=df['rating'].map(lambda x: -1 if x>=3 else 1) #1,2是label=1  3,4,5是label=0

#one-hot encoder
from sklearn.preprocessing import OneHotEncoder
columns=['user_id', 'movie_id']

for i in columns:
    get_dummy_feature=pd.get_dummies(df[i])
    df=pd.concat([df, get_dummy_feature],axis=1)
    df=df.drop(i, axis=1)

df=df.drop(['timestamp'], axis=1)
#這些特征可以進一步挖掘。這里都不要了,只保留one-hot特征

from sklearn.model_selection import train_test_split

X=df.drop('rating', axis=1)
Y=df['rating']

X_train,X_val,Y_train,Y_val=train_test_split(X, Y, test_size=0.3, random_state=123)

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def logit(y, y_hat): #對每一個樣本計算損失
    if y_hat == 'nan':
        return 0
    else:
        return np.log(1 + np.exp(-y * y_hat))

def df_logit(y, y_hat):
    return sigmoid(-y * y_hat) * (-y)

from sklearn.base import BaseEstimator, ClassifierMixin
from collections import Counter


class FactorizationMachine(BaseEstimator):
    def __init__(self, k=5, learning_rate=0.01, iternum=2):
        self.w0 = None
        self.W = None
        self.V = None
        self.k = k
        self.alpha = learning_rate
        self.iternum = iternum

    def _FM(self, Xi):
        interaction = np.sum((Xi.dot(self.V)) ** 2 - (Xi ** 2).dot(self.V ** 2))
        y_hat = self.w0 + Xi.dot(self.W) + interaction / 2
        return y_hat[0]

    def _FM_SGD(self, X, y):
        m, n = np.shape(X)
        # 初始化參數(shù)
        self.w0 = 0
        self.W = np.random.uniform(size=(n, 1))
        self.V = np.random.uniform(size=(n, self.k))  # Vj是第j個特征的隱向量  Vjf是第j個特征的隱向量表示中的第f維

        for it in range(self.iternum):
            total_loss = 0
            for i in range(m):  # 遍歷訓練集
                y_hat = self._FM(Xi=X[i])  # X[i]是第i個樣本  X[i,j]是第i個樣本的第j個特征

                total_loss += logit(y=y[i], y_hat=y_hat)  # 計算logit損失函數(shù)值
                dloss = df_logit(y=y[i], y_hat=y_hat)  # 計算logit損失函數(shù)的外層偏導

                dloss_w0 = dloss * 1  # 公式中的w0求導,計算復雜度O(1)
                self.w0 = self.w0 - self.alpha * dloss_w0

                for j in range(n):
                    if X[i, j] != 0:
                        dloss_Wj = dloss * X[i, j]  # 公式中的wi求導,計算復雜度O(n)
                        self.W[j] = self.W[j] - self.alpha * dloss_Wj
                        for f in range(self.k):  # 公式中的vif求導,計算復雜度O(kn)
                            dloss_Vjf = dloss * (X[i, j] * (X[i].dot(self.V[:, f])) - self.V[j, f] * X[i, j] ** 2)
                            self.V[j, f] = self.V[j, f] - self.alpha * dloss_Vjf

            print('iter={}, loss={:.4f}'.format(it+1, total_loss / m))

        return self

    def _FM_predict(self, X):
        predicts, threshold = [], 0.5  # sigmoid閾值設置
        for i in range(X.shape[0]):  # 遍歷測試集
            y_hat = self._FM(Xi=X[i])  # FM的模型方程
            predicts.append(-1 if sigmoid(y_hat) < threshold else 1)
        return np.array(predicts)

    def fit(self, X, y):
        if isinstance(X, pd.DataFrame):
            X = np.array(X)
            y = np.array(y)

        return self._FM_SGD(X, y)

    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X = np.array(X)

        return self._FM_predict(X)

    def predict_proba(self, X):
        pass

from sklearn.metrics import roc_auc_score, confusion_matrix

model=FactorizationMachine(k=10, learning_rate=0.001, iternum=2)
model.fit(X_train, Y_train)

y_pred=model.predict(X_train)

print('訓練集roc: {:.2%}'.format(roc_auc_score(Y_train.values, y_pred)))
print('混淆矩陣: \n',confusion_matrix(Y_train.values, y_pred))

y_true=model.predict(X_val)

print('驗證集roc: {:.2%}'.format(roc_auc_score(Y_val.values, y_true)))
print('混淆矩陣: \n',confusion_matrix(Y_val.values, y_true))

from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import recall_score
from sklearn.metrics import precision_score
from sklearn.metrics import accuracy_score

X_val = MinMaxScaler().fit_transform(X_val)#歸一化測試集,返回[0,1]區(qū)間

val_predicts = model._FM_predict(X_val)
print('FM測試集的分類準確率為: {:.2%}'.format(accuracy_score(Y_val,val_predicts)))
print("FM測試集均方誤差mse:{:.2%}".format(mean_squared_error(Y_val,val_predicts)))
print("FM測試集召回率recall:{:.2%}".format(recall_score(Y_val,val_predicts)))
print("FM測試集的精度precision:{:.2%}".format(precision_score(Y_val,val_predicts)))

5. FM算法小結(jié)

1.FM算法降低了因數(shù)據(jù)稀疏性而導致的特征交叉項參數(shù)學習不充分的影響。
2.FM算法提升了參數(shù)學習效率和模型預估的能力。

參考資料:http://www.itdecent.cn/p/8d792422e582
https://www.cnblogs.com/yinzm/p/11619829.html
https://gith<wbr>ub.com/moren<wbr>jiujiu/FM/bl<wbr>ob/master/FM<wbr>_Demo.ipynb

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容