前言
最近剛剛?cè)腴T深度學(xué)習(xí),在B站看的吳恩達(dá)老師的Deep Learning教程,五門課看完以后真的手癢癢,于是決定做一個對話機(jī)器人練練手,網(wǎng)上相關(guān)教程很多,但是能完成一整個工程,又比較容易入門的文章其實不多,之前查了很多篇博客,不想讓后人再重復(fù)走我的彎路,所以寫了這篇文章,希望能幫到各位和我一樣剛?cè)腴T的萌新
參考的文章:
- keras教程:手把手教你做聊天機(jī)器人(上)
- keras教程:手把手教你做聊天機(jī)器人(下)
- 中文NLP筆記:13 用 Keras 實現(xiàn)一個簡易聊天機(jī)器人
- 吳恩達(dá)深度學(xué)習(xí)第五課序列模型的編程練習(xí)
數(shù)據(jù)集下載
萌新不知道干啥用的可以參考第一篇文章keras教程:手把手教你做聊天機(jī)器人(上)
1.原始數(shù)據(jù)集 密碼: mqu9

data是訓(xùn)練語料,只有100條,數(shù)量很少,做出來也就圖一樂,真要做成能用的機(jī)器人還得幾十萬條數(shù)據(jù)
word_vector是訓(xùn)練好的詞向量,大家也可以換成自己的
2.我處理過的,已經(jīng)做好的訓(xùn)練集,可以直接跳過數(shù)據(jù)預(yù)處理階段直接開始訓(xùn)練
處理好的數(shù)據(jù)集 分享碼:9lyq
開始干活
1.先安裝相應(yīng)拓展
我用的是python 3.6.5+tensorflow1.9.0+keras
- jieba
- gensim
- numpy
- pickle
pickle是自帶的還是要下的?我有點忘了
2.對原始文本進(jìn)行數(shù)據(jù)處理
新建一個python文件utils.py,該文件專門用于數(shù)據(jù)處理
import jieba
import re
from gensim.models import word2vec
import os
import numpy as np
定義一個函數(shù)generate_segments,該函數(shù)使用jieba分詞將原始文本進(jìn)行分詞
def generate_segments(input_file,output_file):
data = open(input_file,'rb')
output = open(output_file,'a+',encoding="utf-8")
lines = data.readlines()
for line in lines:
line = line.strip()
seglist = jieba.cut(line)
segments = ''
for word in seglist:
segments = segments + ' ' + word
segments = segments + '\n'
segments = segments.lstrip()
output.write(segments)
data.close()
output.close()
函數(shù)寫好以后,運行一次,生成分好詞的文本文件


分好詞以后,我們需要將原數(shù)據(jù)里的問句和答句分開,生成輸入X和標(biāo)簽Y
def generate_XY(segments_file):
f = open(segments_file,'r',encoding='utf-8')
data = f.read()
X = []
Y = []
conversations = data.split('E')
for q_a in conversations:
if re.findall('.*M.*M.*',q_a,flags=re.DOTALL):
q_a = q_a.strip()
q_a_pair = q_a.split('M')
X.append(q_a_pair[1].strip())
Y.append(q_a_pair[2].strip())
f.close()
return X,Y
獲取到X和Y后,我們需要將它們向量化,這就需要之前下載的詞向量模型了,轉(zhuǎn)為詞向量后,我們還需要將樣本的長度進(jìn)行統(tǒng)一,統(tǒng)一長度為15,在最后插入終止符全1向量,短的句子需要用全1向量補(bǔ)齊,長的句子需要截斷
def XY_vector(X,Y):
print("加載詞向量模型")
model = word2vec.Word2Vec.load("word_vector/Word60.model")
X_vector = []
Y_vector = []
for sentence in X:
x = sentence.split(" ")
x_vec = [model[w] for w in x if w in model.wv.vocab]
X_vector.append(x_vec)
for sentence in Y:
y = sentence.split(" ")
y_vec = [model[w] for w in y if w in model.wv.vocab]
Y_vector.append(y_vec)
word_dim = len(X_vector[0][0])
end_word = np.ones(shape=(word_dim,))
#將長度統(tǒng)一
for vector in X_vector:
if len(vector) > 14:#大于14的情況
vector[14:] = []
vector.append(end_word)
else:
for i in range(15 - len(vector)):
vector.append(end_word)
for vector in Y_vector:
if len(vector) > 14:#大于14的情況
vector[14:] = []
vector.append(end_word)
else:
for i in range(15 - len(vector)):
vector.append(end_word)
return X_vector,Y_vector
最后還有一步是比較關(guān)鍵的,我們這次使用的是seq2seq結(jié)構(gòu),在訓(xùn)練decoder時,我們本需要將前一個單元的輸出作為后一個單元的輸入,但是這么做在訓(xùn)練時容易引起梯度爆炸,所以這次我們直接將答句作為decoder的輸入,并且開頭輸入全零,其他詞語延后一個時間步長,理解不了的可以看我的靈魂畫圖

這么做其實也不是最優(yōu)解,更好的方法是一開始先直接輸入答句,到后來再逐漸將輸入換為前一個RNN單元的輸出,這種方法叫作scheduled sampling,不過由于我還不太熟悉keras,這次就先算了
定義一個函數(shù),根據(jù)標(biāo)簽Y,生成用于訓(xùn)練時的decoder_input
def generate_decoder_input(decoder_output):#生成解碼器的輸入序列,輸出序列的第一個詞是輸入序列的第二個詞,輸入序列的第一個單詞為全零向量
word_dim = len(decoder_output[0][0])
word_start = np.zeros(shape=(word_dim,))#全0
decoder_input = []
if not(decoder_input is decoder_output):
for example in decoder_output:
input_example = example[:14]
input_example.insert(0,word_start)
decoder_input.append(input_example)
return decoder_input
函數(shù)全部定義好以后,先生成分詞好的文本文件,再生成訓(xùn)練集,保存到data.npz文件中
if __name__ == "__main__":
output_file = "data/chatterbot_segments.txt"
X,Y = generate_XY(output_file)
X_vector,Y_vector = XY_vector(X,Y)
Y_input = generate_decoder_input(Y_vector)
print(Y_vector[0][:2])#測試用
print(Y_input[0][:2])#測試用
np.savez("data",data_X=X_vector,data_input_Y=Y_input,data_output_Y=Y_vector)
3.編寫訓(xùn)練模型
新建一個python文件,叫做train.py,先引入要用的模塊
import warnings
warnings.filterwarnings('ignore')#用于消除警告的,可以不加
from keras.models import Model
from keras.layers import Input, LSTM, Dense,TimeDistributed
from keras.optimizers import Adam
import numpy as np
import pickle
import utils
然后將處理好的訓(xùn)練集讀入,這里你可以用我提供的,也可以自己做
data = np.load("data.npz",allow_pickle=True)
data_X = data["data_X"]
data_input_Y = data["data_input_Y"]
data_output_Y = data["data_output_Y"]#準(zhǔn)備數(shù)據(jù)
m_X = len(data_X)
m_Y = len(data_input_Y)
word_dim = len(data_X[0][0])
word_Tx = len(data_X[0])
print("X的樣本數(shù)",m_X)
print("X每句話長度",word_Tx)
print("X維數(shù)",data_X.shape)
print("Y的樣本數(shù)",m_Y)
print("Y維數(shù)",data_input_Y.shape)
print("Y輸出維數(shù)",data_output_Y.shape)
print("詞向量維數(shù)",word_dim)
print("Y輸入第一個詞",data_input_Y[0][0])
print("Y輸出第一個詞",data_output_Y[0][0])
模型使用LSTM,先定義編碼器(encoder),將問句作為輸入,保存編碼器的最終狀態(tài),再定義解碼器(decoder),用編碼器的最終狀態(tài)作為解碼器的起始狀態(tài),在解碼器的輸出上再套一層全連接層,將輸出映射為詞向量長度,注意這里的全連接層激活函數(shù)為"linear",不能用tanh,relu,softmax等其他激活函數(shù)
hidden_dim = 256#LSTM隱藏層維數(shù)
encoder_input = Input(shape=(None,word_dim),name='encoder_input')#輸入是維度為行,樣本長度為列
encoder = LSTM(hidden_dim,return_state=True,name='encoder')
encoder_output,state_h,state_c = encoder(encoder_input)
encoder_state = [state_h,state_c]
decoder_input = Input(shape=(None,word_dim),name='decoder_input')
decoder = LSTM(hidden_dim,return_sequences=True,return_state=True,name='decoder')
decoder_output,_,_ = decoder(decoder_input,initial_state=encoder_state)
decoder_dense = TimeDistributed(Dense(output_dim=word_dim,activation='linear'),name='densor')
outputs = decoder_dense(decoder_output)
train_model = Model(inputs=[encoder_input,decoder_input],outputs=outputs)
設(shè)置一些訓(xùn)練的超參數(shù),注意損失函數(shù)用的是mse,不是categorical_crossentropy,因為這不是一個分類問題,訓(xùn)練3000輪,這個值可能有點大,會引起過擬合,建議多試試其他數(shù),訓(xùn)練好的模型保存到硬盤里
opt = Adam(lr=0.005,decay=0.01)
train_model.compile(optimizer=opt, loss='mse', metrics=['accuracy'])
train_model.fit([data_X,data_input_Y],data_output_Y,epochs=3000,batch_size=32)
train_model.save('train_model.h5')
train.py 完整代碼
import warnings
warnings.filterwarnings('ignore')#用于消除警告的,可以不加
from keras.models import Model
from keras.layers import Input, LSTM, Dense,TimeDistributed
from keras.optimizers import Adam
import numpy as np
import pickle
import utils
data = np.load("data.npz",allow_pickle=True)
data_X = data["data_X"]
data_input_Y = data["data_input_Y"]
data_output_Y = data["data_output_Y"]#準(zhǔn)備數(shù)據(jù)
m_X = len(data_X)
m_Y = len(data_input_Y)
word_dim = len(data_X[0][0])
word_Tx = len(data_X[0])
print("X的樣本數(shù)",m_X)
print("X每句話長度",word_Tx)
print("X維數(shù)",data_X.shape)
print("Y的樣本數(shù)",m_Y)
print("Y維數(shù)",data_input_Y.shape)
print("Y輸出維數(shù)",data_output_Y.shape)
print("詞向量維數(shù)",word_dim)
print("Y輸入第一個詞",data_input_Y[0][0])
print("Y輸出第一個詞",data_output_Y[0][0])
hidden_dim = 256#LSTM隱藏層維數(shù)
encoder_input = Input(shape=(None,word_dim),name='encoder_input')#輸入是維度為行,樣本為列
encoder = LSTM(hidden_dim,return_state=True,name='encoder')
encoder_output,state_h,state_c = encoder(encoder_input)
encoder_state = [state_h,state_c]
decoder_input = Input(shape=(None,word_dim),name='decoder_input')
decoder = LSTM(hidden_dim,return_sequences=True,return_state=True,name='decoder')
decoder_output,_,_ = decoder(decoder_input,initial_state=encoder_state)
decoder_dense = TimeDistributed(Dense(output_dim=word_dim,activation='linear'),name='densor')
outputs = decoder_dense(decoder_output)
train_model = Model(inputs=[encoder_input,decoder_input],outputs=outputs)
opt = Adam(lr=0.005,decay=0.01)
train_model.compile(optimizer=opt, loss='mse', metrics=['accuracy'])
train_model.fit([data_X,data_input_Y],data_output_Y,epochs=3000,batch_size=32)
train_model.save('train_model.h5')

最后loss大概0.01,acc大概0.41
4.編寫預(yù)測模型,并進(jìn)行預(yù)測
新建一個predict.py文件,首先引入模塊,并設(shè)置好一些參數(shù),載入訓(xùn)練好的模型
import warnings
warnings.filterwarnings('ignore')
from keras.models import Model,load_model
from keras.layers import Input, LSTM, Dense,TimeDistributed
from keras.optimizers import Adam
from gensim.models import word2vec
import numpy as np
import jieba
import pickle
print("加載模型中")
model = load_model("train_model.h5")
word2vec_model = word2vec.Word2Vec.load("word_vector/Word60.model")
Ty = 30#回答的最長長度
hidden_dim = 256
word_dim = 60#超參數(shù)設(shè)置
雖然預(yù)測模型使用的還是之前訓(xùn)練模型里的結(jié)構(gòu),但是在預(yù)測時,我們會將decoder前一個單元的輸出作為下一個decoder單元的輸入,所以還需要重新定義一下網(wǎng)絡(luò)。在這個模型里,我們使用for循環(huán)將前一單元的輸出作為下一個單元的輸入,并把每個單元的輸出放進(jìn)一個list里
def predict_model(encoder_layer,decoder_layer,time_densor,word_dim,Ty):
X0 = Input(shape=(None,word_dim),name="sentence_input")
_,state_h,state_c = encoder_layer(X0)#獲得編碼器最終狀態(tài)
decoder_states_inputs = [state_h, state_c]
decoder_input = Input(shape=(1,word_dim),name='decoder_initial_input')
X = decoder_input
outputs = []
for i in range(Ty):
decoder_output,h,c = decoder_layer(X,initial_state=decoder_states_inputs)
output = time_densor(decoder_output)
decoder_states_inputs = [h,c]
X = output
outputs.append(output)
model = Model(input=[X0,decoder_input],outputs=outputs)
return model
這個函數(shù)需要接收模型的encoder_layer,decoder_layer,time_densor后面會說到怎么獲取這幾個實例
定義兩個函數(shù),一個用于將人輸入的問句向量化,另一個將機(jī)器輸出的序列轉(zhuǎn)換為文字
1.輸入向量化
def input_sentence_vector(sentence,word2vec_model):#將加載好的模型當(dāng)做參數(shù)傳入,加快運行速度
sentence = sentence.strip()
word_list = jieba.cut(sentence)
word_vector = [word2vec_model[w] for w in word_list if w in word2vec_model.wv.vocab]#轉(zhuǎn)為詞向量
word_dim = len(word_vector[0])
word_end = np.ones(shape=(word_dim,))#設(shè)置停止詞
if len(word_vector) > 14:#裁剪句子
word_vector[14:] = []
word_vector.append(word_end)
else:
for i in range(15 - len(word_vector)):
word_vector.append(word_end)
return np.array([word_vector])
該函數(shù)將人的輸入問句轉(zhuǎn)為詞向量序列,將加載好的詞向量模型作為參數(shù)接收,而不在函數(shù)里加載模型,避免每次調(diào)用函數(shù)時都要加載一次模型
2.輸出序列轉(zhuǎn)文字
def vec2Sentence(answer_sequence,word2vec_model):
answer_list = [word2vec_model.most_similar([answer_sequence[i][0][0]])[0] for i in range(Ty)]
answer = ''
for index,word_tuple in enumerate(answer_list):
if word_tuple[1]>0.75:#當(dāng)置信度小于75%時,就不把這個詞加入回答中
answer += str(word_tuple[0])
return answer
這里有個需要注意的地方,由于全1向量不一定是詞向量模型中的停止詞,而且網(wǎng)絡(luò)也不會輸出純粹的全1向量,為了避免輸出大量的無意義結(jié)果,我做了一個處理,只將置信度大于75%的單詞加入到回答中
主函數(shù)部分,先獲取predict_model需要的三個參數(shù),并初始化模型。get_layer()函數(shù)里的字符串是網(wǎng)絡(luò)層的名字,名字是在原來train.py創(chuàng)建網(wǎng)絡(luò)層時用參數(shù)name指定的,如果你起的名字和我不一樣,或者你沒寫name的話應(yīng)該會出問題
#sentence = np.zeros(shape=(1,15,60),dtype=float)#測試用的
print("加載模型層")
encoder = model.get_layer('encoder')#編碼器生成編碼狀態(tài)
decoder = model.get_layer('decoder')
densor = model.get_layer('densor')
decoder_model = predict_model(encoder,decoder,densor,word_dim,Ty)
最后就是問答的代碼了,predict_model里還是需要一個decoder_input的輸入,我試過不用,但是會莫名其妙報錯,最后只能手動指定一個全零的默認(rèn)值傳進(jìn)去了
while(True):
sentence = input("問:")
sentence_vec = input_sentence_vector(sentence,word2vec_model)
X = np.zeros(shape=[1,1,word_dim])#decoder默認(rèn)輸入,為零就好
answer_sequence = decoder_model.predict([sentence_vec,X])#輸出回答
print("答:",vec2Sentence(answer_sequence,word2vec_model))
predict.py完整代碼
import warnings
warnings.filterwarnings('ignore')
from keras.models import Model,load_model
from keras.layers import Input, LSTM, Dense,TimeDistributed
from keras.optimizers import Adam
from gensim.models import word2vec
import numpy as np
import jieba
import pickle
print("加載模型中")
model = load_model("train_model.h5")
word2vec_model = word2vec.Word2Vec.load("word_vector/Word60.model")
Ty = 30#回答的最長長度
hidden_dim = 256
word_dim = 60#超參數(shù)設(shè)置
def predict_model(encoder_layer,decoder_layer,time_densor,word_dim,Ty):
X0 = Input(shape=(None,word_dim),name="sentence_input")
_,state_h,state_c = encoder_layer(X0)#獲得編碼序列
decoder_states_inputs = [state_h, state_c]
decoder_input = Input(shape=(1,word_dim),name='decoder_initial_input')
X = decoder_input
outputs = []
for i in range(Ty):
decoder_output,h,c = decoder_layer(X,initial_state=decoder_states_inputs)
output = time_densor(decoder_output)
decoder_states_inputs = [h,c]
X = output
outputs.append(output)
model = Model(input=[X0,decoder_input],outputs=outputs)
return model
def input_sentence_vector(sentence,word2vec_model):#將加載好的模型當(dāng)做參數(shù)傳入,加快運行速度
sentence = sentence.strip()
word_list = jieba.cut(sentence)
word_vector = [word2vec_model[w] for w in word_list if w in word2vec_model.wv.vocab]#轉(zhuǎn)為詞向量
word_dim = len(word_vector[0])
word_end = np.ones(shape=(word_dim,))#設(shè)置停止詞
if len(word_vector) > 14:#裁剪句子
word_vector[14:] = []
word_vector.append(word_end)
else:
for i in range(15 - len(word_vector)):
word_vector.append(word_end)
return np.array([word_vector])
def vec2Sentence(answer_sequence,word2vec_model):
answer_list = [word2vec_model.most_similar([answer_sequence[i][0][0]])[0] for i in range(Ty)]
answer = ''
for index,word_tuple in enumerate(answer_list):
if word_tuple[1]>0.75:#當(dāng)置信概率小于75%時,就不把這個詞加入回答中
answer += str(word_tuple[0])
return answer
#sentence = np.zeros(shape=(1,15,60),dtype=float)#測試用的
print("加載模型層")
encoder = model.get_layer('encoder')#編碼器生成編碼狀態(tài)
decoder = model.get_layer('decoder')
densor = model.get_layer('densor')
decoder_model = predict_model(encoder,decoder,densor,word_dim,Ty)
while(True):
sentence = input("問:")
sentence_vec = input_sentence_vector(sentence,word2vec_model)
X = np.zeros(shape=[1,1,word_dim])#decoder默認(rèn)輸入,為零就好
answer_sequence = decoder_model.predict([sentence_vec,X])#輸出回答
print("答:",vec2Sentence(answer_sequence,word2vec_model))
運行測試一下

完全按訓(xùn)練集問大部分能答上來,但是語序稍微不一樣,或者問別的問題就拉胯了,這也沒有辦法,畢竟網(wǎng)絡(luò)比較簡單,訓(xùn)練集里數(shù)據(jù)也不夠,如果能用幾十萬條數(shù)據(jù)訓(xùn)練應(yīng)該會好很多
總結(jié)
由于網(wǎng)絡(luò)比較簡單,訓(xùn)練集數(shù)據(jù)量也很少,作為入門實踐還是相當(dāng)不錯的,以后想嘗試更高級點的東西,把a(bǔ)ttention機(jī)制、beam search、scheduled sampling啥的都給安排上,再換個百度貼吧對話數(shù)據(jù)集來訓(xùn)練,不過不知啥時才能做成了,咕咕咕