sin函數(shù)
# -*coding: utf-8 -*-
import numpy as np
import tensorflow as tf
# import matplotlib as mpl
from matplotlib import pyplot as plt
HIDDEN_SIZE = 30 # LSTM中隱藏節(jié)點(diǎn)的個(gè)數(shù)。
NUM_LAYERS = 2 # LSTM的層數(shù)。
TIMESTEPS = 10 # 循環(huán)神經(jīng)網(wǎng)絡(luò)的訓(xùn)練序列長(zhǎng)度。
TRAINING_STEPS = 10000 # 訓(xùn)練輪數(shù)。
BATCH_SIZE = 32 # batch大小。
TRAINING_EXAMPLES = 10000 # 訓(xùn)練數(shù)據(jù)個(gè)數(shù)。
TESTING_EXAMPLES = 1000 # 測(cè)試數(shù)據(jù)個(gè)數(shù)。
SAMPLE_GAP = 0.01 # 采樣間隔。
def generate_data(seq):
X = []
y = []
# 序列的第i項(xiàng)和后面的TIMESTEPS-1項(xiàng)合在一起作為輸入;第i + TIMESTEPS項(xiàng)作為輸
# 出。即用sin函數(shù)前面的TIMESTEPS個(gè)點(diǎn)的信息,預(yù)測(cè)第i + TIMESTEPS個(gè)點(diǎn)的函數(shù)值。
for i in range(len(seq) - TIMESTEPS):
X.append([seq[i: i + TIMESTEPS]])
y.append([seq[i + TIMESTEPS]])
return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32)
def lstm_model(X, y, is_training):
# 使用多層的LSTM結(jié)構(gòu)。
cell = tf.nn.rnn_cell.MultiRNNCell([
tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE) for _ in range(NUM_LAYERS)])
# 使用TensorFlow接口將多層的LSTM結(jié)構(gòu)連接成RNN網(wǎng)絡(luò)并計(jì)算其前向傳播結(jié)果。
outputs, _ = tf.nn.dynamic_rnn(cell, X, dtype=tf.float32)
#outputs是頂層LSTM在每一步的輸出結(jié)果,它的維度是[batch_size, time, HIDDEN_SIZE]。
# 在本問題中只關(guān)注最后一個(gè)時(shí)刻的輸出結(jié)果
output = outputs[:, -1, :]
# 對(duì)LSTM網(wǎng)絡(luò)的輸出再做加一層全鏈接層并計(jì)算損失。注意這里默認(rèn)的損失為平均
# 平方差損失函數(shù)。
predictions = tf.contrib.layers.fully_connected(output, 1, activation_fn=None)
# 只在訓(xùn)練時(shí)計(jì)算損失函數(shù)和優(yōu)化步驟。測(cè)試時(shí)直接返回預(yù)測(cè)結(jié)果。
if not is_training:
return predictions, None, None
# 計(jì)算損失函數(shù)。
loss = tf.losses.mean_squared_error(labels=y, predictions=predictions)
# 創(chuàng)建模型優(yōu)化器并得到優(yōu)化步驟。
train_op = tf.contrib.layers.optimize_loss(loss, tf.train.get_global_step(), optimizer='Adagrad', learning_rate=0.1)
return predictions, loss, train_op
def train(sess, train_X, train_y):
# 將訓(xùn)練數(shù)據(jù)以數(shù)據(jù)集的方式提供給計(jì)算圖。
ds = tf.data.Dataset.from_tensor_slices((train_X, train_y))
ds = ds.repeat().shuffle(1000).batch(BATCH_SIZE)
X, y = ds.make_one_shot_iterator().get_next()
# 調(diào)用模型,得到預(yù)測(cè)結(jié)果、損失函數(shù),和訓(xùn)練操作。
with tf.variable_scope("model"):
predictions, loss, train_op = lstm_model(X, y, True)
# 初始化變量
sess.run(tf.global_variables_initializer())
for i in range(TRAINING_STEPS):
_, l = sess.run([train_op, loss])
if i % 100 == 0:
print("train step:" + str(i) + ", loss:" + str(l))
def run_eval(sess, test_X, test_y):
# 將測(cè)試數(shù)據(jù)以數(shù)據(jù)集的方式提供給計(jì)算圖
ds = tf.data.Dataset.from_tensor_slices((test_X, test_y))
ds = ds.batch(1)
X, y = ds.make_one_shot_iterator().get_next()
# 調(diào)用模型得到計(jì)算結(jié)果。這里不需要輸入真實(shí)的y值。
with tf.variable_scope("model", reuse=True):
prediction, _, _ = lstm_model(X, [0.0], False)
# 將預(yù)測(cè)結(jié)果存入一個(gè)數(shù)組。
predictions = []
labels = []
for i in range(TESTING_EXAMPLES):
p, l = sess.run([prediction, y])
predictions.append(p)
labels.append(l)
# 計(jì)算rmse作為評(píng)價(jià)指標(biāo)。
predictions = np.array(predictions).squeeze()
labels = np.array(labels).squeeze()
rmse = np.sqrt(((predictions -labels) ** 2).mean(axis=0))
print("Mean Square Error is : %f" % rmse)
# 對(duì)預(yù)測(cè)的sin函數(shù)曲線進(jìn)行繪圖。
plt.figure()
plt.plot(predictions, label='predictions')
plt.plot(labels, label='real_sin')
plt.legend()
plt.show()
if __name__ == '__main__':
# 用正弦函數(shù)生成訓(xùn)練和測(cè)試數(shù)據(jù)集合。
# numpy.linspace函數(shù)可以創(chuàng)建一個(gè)等差序列的數(shù)組,常用的參數(shù)有三個(gè),第一個(gè)表示起始值,第二個(gè)表示終止值
# 第三個(gè)表示序列的長(zhǎng)度。例如 linespace(1, 10, 10)
# 產(chǎn)生的數(shù)組 array([1,2,3,4,5,6,7,8,9,10])
test_start = (TRAINING_EXAMPLES + TIMESTEPS) * SAMPLE_GAP
test_end = test_start + (TESTING_EXAMPLES + TIMESTEPS) * SAMPLE_GAP
train_X, train_y = generate_data(np.sin(np.linspace(
0, test_start,TRAINING_EXAMPLES + TIMESTEPS, dtype=np.float32)))
test_X, test_y = generate_data(np.sin(np.linspace(
test_start, test_end, TESTING_EXAMPLES + TIMESTEPS, dtype=np.float32)))
with tf.Session() as sess:
# 訓(xùn)練模型。
train(sess, train_X, train_y)
# 使用訓(xùn)練好的模型對(duì)測(cè)試數(shù)據(jù)進(jìn)行預(yù)測(cè)。
run_eval(sess, test_X, test_y)