要查看圖文并茂的教程,請(qǐng)移步: http://studyai.com/pytorch-1.4/intermediate/reinforcement_q_learning.html
本教程演示如何使用PyTorch在 OpenAI Gym 的手推車連桿(CartPole-v0)任務(wù) 上訓(xùn)練深度Q-學(xué)習(xí)的智能體(Deep Q Learning(DQN)agent)。 任務(wù)(Task)
智能體(agent)必須在兩個(gè)動(dòng)作(action)之間做出決定——向左或向右移動(dòng)手推車(cart)——這樣連在手推車上的桿子(pole)就可以保持直立。 你可以在 Gym 網(wǎng)站 上找到一個(gè)包含各種算法和可視化的官方排行榜。
cartpole
上圖顯示了手推車連桿的運(yùn)行畫面(cartpole)
當(dāng)智能體觀察環(huán)境的當(dāng)前狀態(tài)(state)并選擇一個(gè)動(dòng)作時(shí),環(huán)境將遷移(transitions)到一個(gè)新狀態(tài), 并返回一個(gè)表明該動(dòng)作所造成的結(jié)果的獎(jiǎng)勵(lì)(reward)。在這項(xiàng)任務(wù)中,每增加一個(gè)時(shí)間步,獎(jiǎng)勵(lì)為+1, 如果桿子掉得太遠(yuǎn)或推車偏離中心超過2.4個(gè)單位距離,則環(huán)境終止。 這意味著表現(xiàn)更好的情景將持續(xù)更長(zhǎng)的時(shí)間,積累更大的回報(bào)。
手推車連桿(CartPole)任務(wù)的設(shè)計(jì)使得對(duì)智能體的輸入是4個(gè)表示環(huán)境狀態(tài)(位置、速度等)的實(shí)數(shù)值(real values)。 然而,神經(jīng)網(wǎng)絡(luò)完全可以通過觀察場(chǎng)景(looking at the scene)來解決任務(wù),因此我們將使用從屏幕上扣下來的 以購(gòu)物車為中心的圖像塊(image patch)作為輸入。正因?yàn)槿绱?,我們的結(jié)果無法直接與官方排行榜的結(jié)果相比 ——我們的任務(wù)要困難得多。不幸的是,這會(huì)減慢訓(xùn)練速度,因?yàn)槲覀儽仨氫秩?render)所有幀。
嚴(yán)格地說,我們將把狀態(tài)(state)表示為當(dāng)前屏幕上扣取的圖像塊和上一個(gè)屏幕上扣取的圖像塊之間的差分(difference)。 這將允許智能體從一個(gè)圖像中把連桿的速度也考慮進(jìn)去。
依賴包(Packages)
首先,我們導(dǎo)入依賴包. 第一個(gè)依賴包是 gym ,用于產(chǎn)生手推車連桿環(huán)境(environment), 安裝方式為(pip install gym)。其他的依賴包來自于PyTorch:
神經(jīng)網(wǎng)絡(luò) (torch.nn)
優(yōu)化 (torch.optim)
自動(dòng)微分 (torch.autograd)
視覺任務(wù)工具集 (torchvision - a separate package).
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
env = gym.make('CartPole-v0').unwrapped
# 設(shè)置 matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
from IPython import display
plt.ion()
# 查看 GPU 是否可用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
回放記憶/內(nèi)存(Replay Memory)
我們將使用經(jīng)驗(yàn)回放記憶(experience replay memory)來訓(xùn)練我們的DQN。 它存儲(chǔ)智能體觀察到的狀態(tài)遷移(transitions),允許我們稍后重用這些數(shù)據(jù)。 通過對(duì)過往經(jīng)驗(yàn)隨機(jī)抽樣,可以構(gòu)造一個(gè)不相關(guān)的訓(xùn)練批次。結(jié)果表明,該方法大大穩(wěn)定和改進(jìn)了DQN訓(xùn)練過程。
為了實(shí)現(xiàn)上述功能, 我們需要定義兩個(gè)類:
Transition - 一個(gè)命名元組(named tuple)用于表示環(huán)境中的單次狀態(tài)遷移(single transition)。
該類的作用本質(zhì)上是將狀態(tài)-動(dòng)作對(duì)[(state, action) pairs]映射到他們的下一個(gè)結(jié)果,即[(next_state, action) pairs], 其中的 狀態(tài)(state)是指從屏幕上獲得的差分圖像塊(screen diifference image)。
ReplayMemory - 一個(gè)大小有限的循環(huán)緩沖區(qū),用于保存最近觀察到的遷移(transition)。 該類還實(shí)現(xiàn)了一個(gè)采樣方法 .sample() 用來在訓(xùn)練過程中隨機(jī)的選擇一個(gè)遷移批次(batch of transitions)。
Transition = namedtuple('Transition',
('state', 'action', 'next_state', 'reward'))
class ReplayMemory(object):
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.position = 0
def push(self, *args):
"""Saves a transition."""
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.position] = Transition(*args)
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
現(xiàn)在,讓我們定義我們的模型。但首先,讓我們快速回顧一下 DQN 是什么。
DQN 算法
我們的環(huán)境是確定性的,所以為了簡(jiǎn)單起見,這里給出的所有方程也都是確定性的。 在強(qiáng)化學(xué)習(xí)文獻(xiàn)中,它們還包含對(duì)環(huán)境中隨機(jī)遷移(stochastic transitions)的期望。
我們的目標(biāo)是訓(xùn)練一個(gè)策略,該策略能使打折后的累計(jì)獎(jiǎng)勵(lì)最大化。 Rt0=∑∞t=t0γt?t0rt
, 其中 Rt0 也被稱之為 回報(bào)(return). 折扣因子, γ, 應(yīng)該是一個(gè) 0 到 1
之間的常量,以保證累計(jì)求和是可收斂的。 對(duì)我們的智能體來說,折扣因子使得來自遙遠(yuǎn)未來的獎(jiǎng)勵(lì)(far future rewards)不如即將到來的獎(jiǎng)勵(lì)(near future rewards)重要。 因?yàn)檫b遠(yuǎn)未來的獎(jiǎng)勵(lì)的不確定性要大于即將到來的獎(jiǎng)勵(lì)的不確定性。
Q-learning 背后的思想是: 如果我們有一個(gè)函數(shù) Q?:State×Action→R
能夠 告訴我們可以獲得的回報(bào)是多少, 那么如果要在某個(gè)給定的狀態(tài)上采取一個(gè)最優(yōu)動(dòng)作,只需要簡(jiǎn)單的構(gòu)建一個(gè)能夠使可獲得的回報(bào)最大化的策略即可:
π?(s)=argmaxa Q?(s,a)
然而, 我們并不知道外部世界環(huán)境的所有完整信息,所以我們沒有機(jī)會(huì)得到 Q?
。 但是,由于神經(jīng)網(wǎng)絡(luò)是通用函數(shù)逼近器,我們可以簡(jiǎn)單地創(chuàng)建一個(gè)神經(jīng)網(wǎng)絡(luò)并訓(xùn)練它,使它與 Q?
趨同。
對(duì)于我們的訓(xùn)練更新規(guī)則,我們將使用一個(gè)事實(shí),即某個(gè)策略的每一個(gè) Q
函數(shù)都遵循 貝爾曼方程:
Qπ(s,a)=r+γQπ(s′,π(s′))
等式兩邊的差稱為時(shí)間差誤差(temporal difference error),δ
:
δ=Q(s,a)?(r+γmaxaQ(s′,a))
為了最小化這個(gè)誤差, 我們將使用的損失函數(shù)為: Huber loss. 當(dāng)誤差很小時(shí),Huber損失的作用類似于均方誤差;但當(dāng)誤差較大時(shí),它的作用類似于平均絕對(duì)誤差—— 這使得當(dāng) Q
的估計(jì)值帶有非常大的噪聲時(shí),損失對(duì)異常值更加穩(wěn)健魯棒。 我們通過從回放記憶/緩存(replay memory)中采樣的一個(gè)批次的遷移樣本(transition samples) B
, 來計(jì)算Huber損失:
L=1|B|∑(s,a,s′,r) ∈ BL(δ)
whereL(δ)={12δ2|δ|?12for |δ|≤1,otherwise.
Q-網(wǎng)絡(luò)(Q-network)
我們的模型將是一個(gè)卷積神經(jīng)網(wǎng)絡(luò),它以當(dāng)前屏幕圖像塊和以前屏幕圖像塊之間的差分作為輸入。 它有兩個(gè)輸出,表示 Q(s,left)
和 Q(s,right) (其中 s
是網(wǎng)絡(luò)的輸入)。 實(shí)際上,該網(wǎng)絡(luò)正試圖預(yù)測(cè)在給定當(dāng)前輸入的情況下,采取每項(xiàng)行動(dòng)的預(yù)期回報(bào)(expected return)。
class DQN(nn.Module):
def __init__(self, h, w, outputs):
super(DQN, self).__init__()
self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
self.bn1 = nn.BatchNorm2d(16)
self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
self.bn2 = nn.BatchNorm2d(32)
self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
self.bn3 = nn.BatchNorm2d(32)
# 線性層的輸入連接數(shù)取決于conv2d層的輸出以及輸入圖像的尺寸,
# 因此需要計(jì)算出來:linear_input_size
def conv2d_size_out(size, kernel_size = 5, stride = 2):
return (size - (kernel_size - 1) - 1) // stride + 1
convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))
convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))
linear_input_size = convw * convh * 32
self.head = nn.Linear(linear_input_size, outputs)
# Called with either one element to determine next action, or a batch
# during optimization. Returns tensor([[left0exp,right0exp]...]).
def forward(self, x):
x = F.relu(self.bn1(self.conv1(x)))
x = F.relu(self.bn2(self.conv2(x)))
x = F.relu(self.bn3(self.conv3(x)))
return self.head(x.view(x.size(0), -1))
輸入抽取(Input extraction)
下面的代碼是從環(huán)境中提取和處理渲染圖像的. 它使用了 torchvision 包, 該包的使用使得 組合不同的圖像變換變得很容易。 運(yùn)行該cell后,它將顯示提取的圖像塊(patch)。
resize = T.Compose([T.ToPILImage(),
T.Resize(40, interpolation=Image.CUBIC),
T.ToTensor()])
def get_cart_location(screen_width):
world_width = env.x_threshold * 2
scale = screen_width / world_width
return int(env.state[0] * scale + screen_width / 2.0) # MIDDLE OF CART
def get_screen():
# Returned screen requested by gym is 400x600x3, but is sometimes larger
# such as 800x1200x3. Transpose it into torch order (CHW).
screen = env.render(mode='rgb_array').transpose((2, 0, 1))
# Cart is in the lower half, so strip off the top and bottom of the screen
_, screen_height, screen_width = screen.shape
screen = screen[:, int(screen_height*0.4):int(screen_height * 0.8)]
view_width = int(screen_width * 0.6)
cart_location = get_cart_location(screen_width)
if cart_location < view_width // 2:
slice_range = slice(view_width)
elif cart_location > (screen_width - view_width // 2):
slice_range = slice(-view_width, None)
else:
slice_range = slice(cart_location - view_width // 2,
cart_location + view_width // 2)
# Strip off the edges, so that we have a square image centered on a cart
screen = screen[:, :, slice_range]
# Convert to float, rescale, convert to torch tensor
# (this doesn't require a copy)
screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
screen = torch.from_numpy(screen)
# Resize, and add a batch dimension (BCHW)
return resize(screen).unsqueeze(0).to(device)
env.reset()
plt.figure()
plt.imshow(get_screen().cpu().squeeze(0).permute(1, 2, 0).numpy(),
interpolation='none')
plt.title('Example extracted screen')
plt.show()
訓(xùn)練(Training)
超參數(shù)與輔助函數(shù)
下面的代碼實(shí)現(xiàn)了我們的模型及其優(yōu)化器,并且定義了一些實(shí)用工具函數(shù):
select_action - 將根據(jù)epsilon貪婪策略選擇動(dòng)作。簡(jiǎn)單地說,我們有時(shí)會(huì)使用我們的模型來選擇動(dòng)作, 有時(shí)我們只是在所有可能的動(dòng)作集合中均勻采樣一個(gè)。 通過均勻采樣隨機(jī)選擇一個(gè)動(dòng)作的概率將從 EPS_START 開始,并呈指數(shù)衰減,朝 EPS_END 結(jié)束。 EPS_DECAY 控制著衰減速率。
plot_durations - 該輔助函數(shù)用來繪制每集劇情的持續(xù)時(shí)間,以及過去的最近100集(last 100 episodes)的平均持續(xù)時(shí)間(官方評(píng)估中使用的度量標(biāo)準(zhǔn))。 繪圖將在包含主訓(xùn)練循環(huán)的單元下面,并且將在每一集之后更新(update after every episode)。
BATCH_SIZE = 128
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10
# 獲取屏幕大小,以便我們可以根據(jù)Gym返回的形狀(shape)正確初始化網(wǎng)絡(luò)層。
# 此時(shí)的典型尺寸接近 3x40x90,這是 get_screen() 中壓縮和縮小渲染緩沖區(qū)的結(jié)果
init_screen = get_screen()
_, _, screen_height, screen_width = init_screen.shape
# 從 Gym 的動(dòng)作空間獲得動(dòng)作的數(shù)量
n_actions = env.action_space.n
policy_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()
optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(10000)
steps_done = 0
def select_action(state):
global steps_done
sample = random.random()
eps_threshold = EPS_END + (EPS_START - EPS_END) * \
math.exp(-1. * steps_done / EPS_DECAY)
steps_done += 1
if sample > eps_threshold:
with torch.no_grad():
# t.max(1) will return largest column value of each row.
# second column on max result is index of where max element was
# found, so we pick action with the larger expected reward.
return policy_net(state).max(1)[1].view(1, 1)
else:
return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)
episode_durations = []
def plot_durations():
plt.figure(2)
plt.clf()
durations_t = torch.tensor(episode_durations, dtype=torch.float)
plt.title('Training...')
plt.xlabel('Episode')
plt.ylabel('Duration')
plt.plot(durations_t.numpy())
# Take 100 episode averages and plot them too
if len(durations_t) >= 100:
means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
means = torch.cat((torch.zeros(99), means))
plt.plot(means.numpy())
plt.pause(0.001) # pause a bit so that plots are updated
if is_ipython:
display.clear_output(wait=True)
display.display(plt.gcf())
訓(xùn)練循環(huán)(Training loop)
最后,給出訓(xùn)練模型的代碼.
下面, 函數(shù) optimize_model 將執(zhí)行一個(gè)單步優(yōu)化。 它首先采樣一個(gè)批次的樣本(batch), 將所有張量連接成一個(gè)張量 并計(jì)算 Q(st,at)
和 V(st+1)=maxaQ(st+1,a), 然后將它們組合進(jìn)我們的損失(loss)中. 根據(jù)定義,如果 s 是一個(gè)終止?fàn)顟B(tài),則設(shè)定 V(s)=0 。 為了給算法增加穩(wěn)定性,我們還使用一個(gè)目標(biāo)網(wǎng)絡(luò)(target network)來計(jì)算 V(st+1)
。 目標(biāo)網(wǎng)絡(luò)的權(quán)重大部分時(shí)間保持凍結(jié)狀態(tài),但每隔一段時(shí)間就會(huì)用策略網(wǎng)絡(luò)(policy network)的權(quán)重更新一次。 更新間隔通常是若干優(yōu)化步(optimizition steps),但為了簡(jiǎn)單起見,我們將使用劇集(episodes)為更新間隔單位。
def optimize_model():
if len(memory) < BATCH_SIZE:
return
transitions = memory.sample(BATCH_SIZE)
# Transpose the batch (看 https://stackoverflow.com/a/19343/3343043 詳細(xì)解釋
# ). 把 Transitions 的 batch-array 轉(zhuǎn)換為 batch-arrays 的 Transition 。
batch = Transition(*zip(*transitions))
# 計(jì)算非最終狀態(tài)的mask 并把批次樣本串接(concantecate)起來
# (一個(gè)最終狀態(tài)(final state)是指在該狀態(tài)上(仿真)游戲就結(jié)束了)
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
batch.next_state)), device=device, dtype=torch.bool)
non_final_next_states = torch.cat([s for s in batch.next_state
if s is not None])
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
# 計(jì)算 Q(s_t, a) - 模型計(jì)算 Q(s_t), 然后我們?cè)趧?dòng)作列中選擇動(dòng)作
# 這些是根據(jù)策略網(wǎng)絡(luò)(policy_net)對(duì)batch中每個(gè)狀態(tài)所采取的操作
state_action_values = policy_net(state_batch).gather(1, action_batch)
# 計(jì)算所有下一個(gè)狀態(tài)的 V(s_{t+1})
# 對(duì)非最終下一個(gè)狀態(tài)的動(dòng)作的期望值是基于“舊的”target_net進(jìn)行計(jì)算的;
# selecting their best reward with max(1)[0].
# This is merged based on the mask, such that we'll have either the expected
# state value or 0 in case the state was final.
next_state_values = torch.zeros(BATCH_SIZE, device=device)
next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
# 計(jì)算期望 Q 值
expected_state_action_values = (next_state_values * GAMMA) + reward_batch
# 計(jì)算 Huber loss
loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))
# 優(yōu)化模型
optimizer.zero_grad()
loss.backward()
for param in policy_net.parameters():
param.grad.data.clamp_(-1, 1)
optimizer.step()
下面,你可以找到主要的訓(xùn)練循環(huán)。在開始時(shí),我們重置環(huán)境并初始化狀態(tài)張量。 然后,我們采樣一個(gè)動(dòng)作,執(zhí)行它,觀察下一個(gè)屏幕和獎(jiǎng)勵(lì)(總是1),并優(yōu)化我們的模型一次。 當(dāng)一次episode結(jié)束時(shí)(我們的模型失敗,game over),我們重新啟動(dòng)循環(huán)。
下面的代碼中, num_episodes 設(shè)置的較小. 你應(yīng)該下載notebook并運(yùn)行更多的epsiodes, 比如300+以獲得有意義的持續(xù)時(shí)間改進(jìn)。
num_episodes = 50
for i_episode in range(num_episodes):
# 初始化環(huán)境與狀態(tài)
env.reset()
last_screen = get_screen()
current_screen = get_screen()
state = current_screen - last_screen
for t in count():
# 選擇并執(zhí)行一個(gè)動(dòng)作
action = select_action(state)
_, reward, done, _ = env.step(action.item())
reward = torch.tensor([reward], device=device)
# 觀察一個(gè)新的狀態(tài)
last_screen = current_screen
current_screen = get_screen()
if not done:
next_state = current_screen - last_screen
else:
next_state = None
# 將狀態(tài)轉(zhuǎn)移保存到記憶內(nèi)存(memory)中
memory.push(state, action, next_state, reward)
# 移動(dòng)到下一個(gè)狀態(tài)
state = next_state
# 執(zhí)行一步優(yōu)化過程(on the target network)
optimize_model()
if done:
episode_durations.append(t + 1)
plot_durations()
break
# 更新目標(biāo)網(wǎng)絡(luò), copying all weights and biases in DQN
if i_episode % TARGET_UPDATE == 0:
target_net.load_state_dict(policy_net.state_dict())
print('Complete')
env.render()
env.close()
plt.ioff()
plt.show()
這是一個(gè)圖表,說明了整個(gè)數(shù)據(jù)流是如何產(chǎn)生的。
../_images/reinforcement_learning_diagram.jpg
動(dòng)作(Actions)可以隨機(jī)選擇,也可以基于策略選擇, u接著從gym環(huán)境中獲得下一步到達(dá)的狀態(tài). 我們將結(jié)果記錄在回放記憶/內(nèi)存(replay memory)中,并在每次迭代中運(yùn)行優(yōu)化步驟。 優(yōu)化器從replay memory中隨機(jī)選取一個(gè)批次的樣本來進(jìn)行新策略的訓(xùn)練。 “舊的” target_net 也會(huì)被用于優(yōu)化中以計(jì)算期望的Q值;它被偶爾更新以保持其最新。