目標:用RNN實現(xiàn)連續(xù)數(shù)據(jù)的預(yù)測(以股票預(yù)測為例)
有些數(shù)據(jù)是跟時間相關(guān)的,是可以根據(jù)上文預(yù)測出下文的。
1、循環(huán)核:參數(shù)時間共享,循環(huán)層提取時間信息。
循環(huán)核具有記憶力,通過不同時刻的參數(shù)共享,實現(xiàn)了對時間序列的信息提取

循環(huán)核2.png
可以通過設(shè)定記憶體的個數(shù),改變記憶容量。當記憶體個數(shù)被指定,輸入xt、輸出yt維度被指定,周圍這些待訓(xùn)練參數(shù)的維度也就被限定了。
- ht:記憶體內(nèi)當前時刻存儲的狀態(tài)信息
- xt:當前時刻輸入特征
- ht-1:記憶體上一時刻存儲的狀態(tài)信息
- yt:當前時刻循環(huán)核的輸出特征
1.1、循環(huán)核按時間步展開

循環(huán)核time.png
按時間步展開,就是把循環(huán)核按照時間軸方向展開。每個時刻記憶體狀態(tài)信息ht被刷新,記憶體周圍的參數(shù)矩陣wxh、whh和why是固定不變的。要訓(xùn)練優(yōu)化的就是這些參數(shù)矩陣。訓(xùn)練完成后,使用效果最好的參數(shù)矩陣,執(zhí)行前向傳播,輸出預(yù)測結(jié)果。循環(huán)神經(jīng)網(wǎng)絡(luò),就是借助循環(huán)核提取時間特征后,送入全連接網(wǎng)絡(luò),實現(xiàn)連續(xù)數(shù)據(jù)的預(yù)測。
1.2、循環(huán)計算層:向輸出方向生長
每個循環(huán)核構(gòu)成一層循環(huán)計算層。循環(huán)計算層的層數(shù)時是向輸出方向增長的。
1.3、TF描述循環(huán)計算層
tf.keras.layers.SimpleRNN(
'記憶體個數(shù)',
activation='tanh', # 激活函數(shù)
return_sequences=False # 默認False,是否每個時刻輸出ht到下一層
)

return_seq_false.png

return_seq_true.png
入RNN時,x_train維度:
[送入樣本數(shù),循環(huán)核時間展開數(shù),每個時間步輸入特征個數(shù)]
1.4、字母(One hot編碼)預(yù)測
import tensorflow as tf
import numpy as np
from tensorflow.keras.layers import Dense, SimpleRNN
import matplotlib.pyplot as plt
import os
input_words = "abcde"
w_to_d = {"a": 0, "b": 1, "c": 2, "d": 3, "e": 4}
id_to_onehot = {0: [1., 0., 0., 0., 0.], 1: [0., 1., 0., 0., 0.], 2: [0., 0., 1., 0., 0.],
3: [0., 0., 0., 1., 0.], 4: [0., 0., 0., 0., 1.]}
x_train = [id_to_onehot[w_to_d["a"]], id_to_onehot[w_to_d["b"]], id_to_onehot[w_to_d["c"]],
id_to_onehot[w_to_d["d"]], id_to_onehot[w_to_d["e"]]]
y_train = [w_to_d["b"], w_to_d["c"], w_to_d["d"], w_to_d["e"], w_to_d["a"]]
np.random.seed(8)
np.random.shuffle(x_train)
np.random.seed(8)
np.random.shuffle(y_train)
tf.random.set_seed(8)
# 使x_train符合SimpleRNN的輸入要求: [送入樣本數(shù), 循環(huán)核時間展開步數(shù), 每個時間步輸入特征個數(shù)
x_train = np.reshape(x_train, (len(x_train), 1, 5))
y_train = np.array(y_train)
model = tf.keras.Sequential([
SimpleRNN(3), # 3個記憶體(越多記憶力越好,但會更占用資源)
Dense(5, activation="softmax")
])
model.compile(optimizer=tf.keras.optimizers.Adam(0.01),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
metrics=["sparse_categorical_accuracy"])
check_point_save_path = "./checkpoint_rnn/rnn_onehot_1pre1.ckpt"
if os.path.exists(check_point_save_path + ".index"):
print("******load model******")
model.load_wights(check_point_save_path)
cp_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=check_point_save_path,
save_weights_only=True,
save_best_only=True,
monitor="loss"
)
history = model.fit(x_train, y_train, batch_size=32, epochs=100, callbacks=[cp_callback])
model.summary()
with open("./rnn_weight.txt", "w") as f:
for v in model.trainable_variables:
f.write(str(v.name) + "\n")
f.write(str(v.shape) + "\n")
f.write(str(v.numpy()) + "\n")
acc = history.history["sparse_categorical_accuracy"]
loss = history.history["loss"]
plt.subplot(1, 2, 1)
plt.plot(acc, label="Training acc")
plt.title("training loss")
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(loss, label="training loss")
plt.title("training loss")
plt.legend()
plt.show()
preNum = int(input("input the number of test alphabet: "))
for i in range(preNum):
alpha = input("input test alphabet")
alphabet = [id_to_onehot[w_to_d[alpha]]]
alphabet = np.reshape(alphabet, (1, 1, 5))
result = model.predict([alphabet])
pred = tf.argmax(result, axis=1)
pred = int(pred)
tf.print(alphabet + "->" + input_words[pred])
1.5、 Embedding--一種編碼的方法
由于獨熱碼(one-hot)的位寬要與詞匯量一致,如果詞匯量增大時,非常浪費資源。因此,自然語言處理中,有專門一個方向在研究單詞的編碼。Embedding是一種單詞編碼的方法,用低維向量實現(xiàn)了編碼。這種編碼可以通過神經(jīng)網(wǎng)絡(luò)訓(xùn)練優(yōu)化,能表達出單詞間的相關(guān)性。
tf.keras.layers.Embedding(
詞匯量大小, # 編碼一共要表示多少個單詞
編碼維度 # 用幾個數(shù)字表達一個單詞
)
入Embedding時,x_train維度:
[送入樣本數(shù),循環(huán)核時間展開步數(shù)]
1.6 、字母(Embedding編碼)預(yù)測
model2 = tf.keras.Sequential([
Embedding(5, 2),
SimpleRNN(3),
Dense(5, activation="softmax")
])
# 其余代碼同上
1.7、 用RNN實現(xiàn)股票預(yù)測
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
from tensorflow.keras.layers import Dense, Dropout, SimpleRNN
import os
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import math
maotai = pd.read_csv("./SH600519.csv") # 讀取股票文件
training_set = maotai.iloc[0:2126, 2:3].values # 前2126天的開盤價作為訓(xùn)練集,取C列開盤價
test_set = maotai.iloc[2126:, 2:3] # 后300天的開盤價作為測試集
# 歸一化
sc = MinMaxScaler(feature_range=(0, 1)) # 進行歸一化,歸一化到(0,1)之間
training_set_scaled = sc.fit_transform(training_set) # 訓(xùn)練集上進行歸一化
test_set = sc.transform(test_set) # 利用訓(xùn)練集的屬性對測試集進行歸一化
x_train = []
y_train = []
x_test = []
y_test = []
# 提取連續(xù)60天的開盤價作為輸入特征x_train,第61天的數(shù)據(jù)作為標簽
for i in range(60, len(training_set_scaled)):
x_train.append(training_set_scaled[i-60: i, 0])
y_train.append(training_set_scaled[i, 0])
np.random.seed(8)
np.random.shuffle(x_train)
np.random.seed(8)
np.random.shuffle(y_train)
tf.random.set_seed(8)
x_train, y_train = np.array(x_train), np.array(y_train)
# 使x_train符合RNN輸入要求: [送入樣本數(shù), 循環(huán)核時間展開步數(shù), 每個時間同步輸入特征個數(shù)]
# 此處整個數(shù)據(jù)集送入,送入樣本數(shù)為x_train.shape[0]組數(shù)據(jù):輸入60個開盤價,預(yù)測第61天的開盤價,
# 循環(huán)核展開步數(shù)為60;每個時間步進入的特征是第一天的開盤價,只有一個數(shù)據(jù),故每個時間步
# 輸入特征個數(shù)為1
x_train = np.reshape(x_train, (x_train.shape[0], 60, 1))
# 同上處理測試集
for i in range(60, len(test_set)):
x_test.append(test_set[i-60:i, 0])
y_test.append(test_set[i, 0])
x_test, y_test = np.array(x_test), np.array(y_test)
x_test = np.reshape(x_test, (x_test.shape[0], 60, 1))
model = tf.keras.Sequential([
SimpleRNN(80, return_sequences=True),
Dropout(0.2),
SimpleRNN(100),
Dropout(0.2),
Dense(1)
])
model.compile(optimizer=tf.keras.optimizers.Adam(0.001),
loss="mean_squared_error")
check_point_save_path = "./checkpoint_stock/rnn_stock.ckpt"
if os.path.exists(check_point_save_path + ".index"):
print("******load model******")
model.load_weights(check_point_save_path)
cp_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=check_point_save_path,
save_weights_only=True,
save_best_only=True,
monitor="val_loss"
)
history = model.fit(x_train, y_train, batch_size=64, epochs=50,
validation_data=(x_test, y_test), validation_freq=1, callbacks=[cp_callback])
model.summary()
with open("./rnn__stock_weight.txt", "w") as f:
for v in model.trainable_variables:
f.write(str(v.name) + "\n")
f.write(str(v.shape) + "\n")
f.write(str(v.numpy()) + "\n")
loss = history.history["loss"]
val_loss = history.history["val_loss"]
plt.plot(loss, label="Training Loss")
plt.plot(val_loss, label="Validation Loss")
plt.title("Training and Validation Loss")
plt.legend()
plt.show()
# 測試集輸入模型進行預(yù)測
predictd_stock_price = model.predict(x_test)
# 對預(yù)測數(shù)據(jù)進行還原,反歸一化
predictd_stock_price = sc.inverse_transform(predictd_stock_price)
# 對真實數(shù)據(jù)進行還原,反歸一化
real_stock_price = sc.inverse_transform(test_set[60:])
# 畫出真實數(shù)據(jù)和預(yù)測數(shù)據(jù)的對比曲線
plt.plot(real_stock_price, color="red", label="MaoTai Stock Price")
plt.plot(predictd_stock_price, color="blue", label="Predicted MaoTai Stock Price")
plt.title("MaoTai Stock Price Rrediction")
plt.xlabel("Time")
plt.ylabel("MaoTai Stock Price")
plt.legend()
plt.show()
# 評價模型:均方誤差,均方根誤差,平均絕對誤差
mse = mean_squared_error(predictd_stock_price, real_stock_price)
rmse = math.sqrt(mse)
mae = mean_absolute_error(predictd_stock_price, real_stock_price)
print("均方誤差: %.6f" % mse)
print("均方根誤差: %.6f" % rmse)
print("平均絕對誤差: %.6f" % mae)
部分結(jié)果如下:
均方誤差: 2500.951841
均方根誤差: 50.009518
平均絕對誤差: 45.223081