本節(jié)討論四個內(nèi)容:
1、線性回歸
2、多重共線性問題
3、嶺回歸
4、局部加權線性回歸
線性回歸(Linear Regression)是機器學習中解決 回歸問題 的基本算法,往往作為回歸任務的基線模型使用,“線性”是因為未知數(shù)的最高次冪為1次。

1、定義數(shù)據(jù)集與特征空間

2、定義假設空間

3、定義目標函數(shù)(平方誤差)

4、優(yōu)化算法
因目標函數(shù)為凸函數(shù),對目標數(shù)求一階導并令其為零,得到 w 的解析解表達式(xTx的行列式不能為0):

5、多重共線性問題
概念:多重共線性是指線性回歸模型中的 自變量 之間由于存在 高度相關性 而使模型估計失真或難以估計準確。具體而言,多重共線性會導致參數(shù)的 方差 變大,從而使預測不穩(wěn)定,即自變量微小的變化將導致預測值巨大的波動。
檢測:利用矩陣條件數(shù)檢測

證明:為什么多重共線性會導致參數(shù) 方差 變大

6、嶺回歸(帶二范數(shù)約束的回歸模型)
L0 范數(shù)是指向量中非0的元素的個數(shù)
L1 范數(shù)是指向量中各個元素絕對值之和,也叫“稀疏規(guī)則算子”(Lasso regularization),它是L0范數(shù)的最優(yōu)凸近似,但是不可導
L2 范數(shù)是指向量中各個元素的平方和然后求平方根,且可導
嶺回歸通過引入 L2 正則化項對參數(shù) w 施加懲罰,緩解過擬合和多重共線性風險。

7、局部加權回歸
局部加權是一種 非參數(shù)(non-parametric)思想,上文提到的線性回歸是參數(shù)方法,因為它的參數(shù)經(jīng)過學習是 固定 的,一旦將參數(shù)保存下來,不需要再保留訓練集即可進行預測。而局部加權線性回歸需要保留整個的訓練集,在預測時需要利用已知訓練集信息計算每個預測樣本與訓練集的相似度并賦予一定的權重,預測點離訓練點越近權重越大,反之則越小,因此每個預測點對應的參數(shù)不同。
因此,它的目標函數(shù)需要加入一個權重參數(shù),權重大小的衡量依據(jù)借鑒正態(tài)分布性質,樣本越靠近權重越接近 1,反之越接近 0,(相似度的衡量能否設計其他的算法,待探索):

因為要為每個預測點計算相似度,算法的時間復雜度為 O(N^2)
手寫算法并與 Sklearn 進行對比
# -*- coding: utf-8 -*-
import os
import copy
import numpy as np
import pandas as pd
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.linear_model import (LinearRegression, Ridge)
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
class LinearReg(object):
def __init__(self, lamb):
self.lamb = lamb
def z_scale(self, x_train):
'''z標準化,在動用距離度量的算法中,必須先進行標準化以消除數(shù)據(jù)量綱的影響'''
mu = np.mean(x_train, axis=0)
std = np.std(x_train, axis=0)
return mu, std
def data_transform(self, mu, std, x_train, x_test):
'''
數(shù)據(jù)變換
1、執(zhí)行標準化操作
2、插入截距項
'''
x_train_scale = (x_train - mu) / std
x_test_scale = (x_test - mu) / std
intercept_train = np.ones(x_train_scale.shape[0]).reshape(-1, 1)
intercept_test = np.ones(x_test_scale.shape[0]).reshape(-1, 1)
x_train_scale = np.concatenate([intercept_train, x_train_scale], axis=1)
x_test_scale = np.concatenate([intercept_test, x_test_scale], axis=1)
return x_train_scale, x_test_scale
def get_loss(self, x_train_scale, y_train, w):
'''計算損失函數(shù)值'''
y_hat = x_train_scale.dot(w)
loss = 0.5 * np.mean((y_train - y_hat)**2) + 0.5 * self.lamb * np.mean(w**2)
return loss
def fit(self, x_train_scale, y_train):
'''模型訓練, lambda = 0即為普通線性回歸'''
A = x_train_scale.T.dot(x_train_scale)
M = np.eye(A.shape[0]) * self.lamb
w = np.linalg.inv(A + M).dot(x_train_scale.T).dot(y_train)
return w
def predict(self, x_test_scale, w):
'''模型預測'''
y_pred = x_test_scale.dot(w)
return y_pred
class LocallyWeightedReg(LinearReg):
def __init__(self, k, lamb):
self.k = k
super(LocallyWeightedReg, self).__init__(lamb)
def predict(self, x_train_scale, x_test_scale, y_train, y_test):
'''模型預測'''
# theta矩陣:預測集和訓練集元素距離矩陣
Theta = np.exp(-0.5 * dist.cdist(x_test_scale, x_train_scale)**2 / self.k**2)
# 預測值保存
y_pred = np.array([])
# 對每個預測點進行循環(huán)
for i in range(len(Theta)):
theta_sample = np.eye(x_train_scale.shape[0]) * Theta[i, :] # 每個預測點與訓練集的距離對角陣
A = x_train_scale.T.dot(theta_sample).dot(x_train_scale)
w_sample = np.linalg.inv(A).dot(x_train_scale.T).dot(theta_sample).dot(y_train)
print(f"sample: {i}; w: {w_sample}")
y_pred_sample = x_test_scale[i, :].dot(w_sample)
y_pred = np.append(y_pred, y_pred_sample)
return y_pred
if __name__ == "__main__":
file_path = os.getcwd()
dataSet = pd.read_csv(file_path + "/swiss_linear.csv")
df = dataSet[["Fertility", "Agriculture", "Catholic", "InfantMortality"]]
x = df.iloc[:, 1: ]
y = df.iloc[:, 0]
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=0)
# 手寫模型
model = LinearReg(lamb=0.5)
mu, std = model.z_scale(x_train)
x_train_scale, x_test_scale = model.data_transform(mu, std, x_train, x_test)
w = model.fit(x_train_scale, y_train)
loss_train = model.get_loss(x_train_scale, y_train, w)
y_pred = model.predict(x_test_scale, w)
rmse = np.mean((y_test - y_pred)**2)
print(f"LinearReg RMSE: {rmse}")
# sklearn
scale = StandardScaler(with_mean=True, with_std=True)
scale.fit(x_train)
x_train_scale = scale.transform(x_train)
x_test_scale = scale.transform(x_test)
# reg = LinearRegression(fit_intercept=True)
reg = Ridge(alpha=0.5, fit_intercept=True)
reg.fit(x_train_scale, y_train)
w = reg.coef_
b = reg.intercept_
y_pred = reg.predict(x_test_scale)
rmse = np.mean((y_test - y_pred)**2)
print(f"sklearn RMSE: {rmse}")
# 畫圖
# plt.figure(figsize=(8,6))
# n_alphas = 20
# alphas = np.logspace(-1,4,num=n_alphas)
# coefs = []
# for a in alphas:
# ridge = Ridge(alpha=a, fit_intercept=False)
# ridge.fit(X, y)
# coefs.append(ridge.coef_[0])
# ax = plt.gca()
# ax.plot(alphas, coefs)
# ax.set_xscale('log')
# handles, labels = ax.get_legend_handles_labels()
# plt.legend(labels=df.columns[1:-1])
# plt.xlabel('alpha')
# plt.ylabel('weights')
# plt.axis('tight')
# plt.show()
# statsmodels
# import statsmodels.api as sm #方法一
# import statsmodels.formula.api as smf #方法二
# est = sm.OLS(y, sm.add_constant(X)).fit() #方法一
# est = smf.ols(formula='sales ~ TV + radio', data=df).fit() #方法二
# y_pred = est.predict(X)
# print(est.summary()) #回歸結果
# print(est.params) #系數(shù)
model = LocallyWeightedReg(k=50)
mu, std = model.z_scale(x_train)
x_train_scale, x_test_scale = model.data_transform(mu, std, x_train, x_test)
y_pred = model.predict(x_train_scale, x_test_scale, y_train, y_test)
rmse = np.mean((y_test - y_pred)**2)
print(f"LocallyWeightedReg RMSE: {rmse}")
LinearReg RMSE: 77.4638537052191
sklearn RMSE: 75.43038786488579
LocallyWeightedReg RMSE: 76.90550316746727
sample: 0; w: [70.44607336 3.92891665 4.27045562 4.66474541]
sample: 1; w: [70.44607321 3.92897979 4.27435879 4.66244306]
sample: 2; w: [70.44607258 3.93025749 4.27448647 4.66327313]
sample: 3; w: [70.4460735 3.92809796 4.27127147 4.66404582]
sample: 4; w: [70.44607662 3.93275617 4.26876574 4.6682018 ]
sample: 5; w: [70.44607404 3.9289215 4.2720904 4.6642848 ]
sample: 6; w: [70.44607343 3.92854954 4.27086921 4.66450444]
sample: 7; w: [70.44607319 3.92857968 4.27479848 4.66200071]
sample: 8; w: [70.44607402 3.92942195 4.27152206 4.66475568]
sample: 9; w: [70.44607426 3.92765904 4.27394164 4.66167854]