1、策略梯度介绍
相比与DQN,策略梯度方法的区别主要在于,我们对于在某个状态下所采取的动作,并不由一个神经网络来决定,而是由一个策略函数来给出,而这个策略函数的目的,就是使得最终的奖励的累加和最大,这也是训练目标,所以训练会围绕策略函数的梯度来进行。
2、策略函数
以Reinforce算法为例,
假设我们的目标是最大化累积奖励的期望,即最大化以下形式的目标函数J(θ):
J(θ) = E[∑[t=0 to T] (R_t)]
其中,E表示对所有可能的轨迹(trajectories)进行期望,R_t是在时间步t获得的即时奖励。我们的策略函数可以表示为π(a|s;θ),其中θ表示策略函数的参数。我们希望通过调整θ来最大化J(θ),因此我们需要计算目标函数J(θ)关于参数θ的梯度。
具体地,我们将目标函数J(θ)的梯度表示为:
∇θ J(θ) = E[∑[t=0 to T] (∇θ log π(a_t|s_t;θ) * R_t)]
在这里,∇θ表示关于参数θ的梯度,log π(a_t|s_t;θ)表示策略函数在状态s_t下选择动作a_t的对数概率,R_t表示在时间步t获得的即时奖励。
接下来的关键是将整个目标函数的期望转换为对每个轨迹的期望,并使用蒙特卡洛采样来估计这个期望。我们通过采样多个轨迹,计算每个轨迹的梯度,然后取所有轨迹的梯度的平均值作为对目标函数梯度的估计。
为了做到这一点,我们引入累积回报G_t,表示从时间步t开始的累积奖励。G_t的定义如下:
G_t = R_t + γ * R_{t+1} + γ^2 * R_{t+2} + ... + γ^(T-t) * R_T
其中,γ是折扣因子,用于调整未来奖励的重要性。
补充:蒙特卡洛采样法,当我们要计算这个目标期望时是非常困难的,此时我们通过大量采样的方法估算出期望值,这就是蒙特卡洛采样法。其次,这里的回报G不仅仅与当前得到的reward有关,也和只会可以得到的reward有关。
现在,我们可以将目标函数的梯度重写为:
∇θ J(θ) = E[∑[t=0 to T] (∇θ log π(a_t|s_t;θ) * G_t)]
这样,我们就将目标函数的期望转换为了对每个轨迹的期望,然后通过蒙特卡洛采样来估计这个期望。在实际应用中,我们会使用多个样本轨迹来计算梯度的样本均值,并使用梯度上升法来更新策略函数的参数θ,以优化目标函数J(θ)。
以下是实现该数学公式的代码块:
#每往前一步,都衰减0.02,如何加上当前步的反馈
reward_sum *= 0.98
reward_sum += rewards[i]
#重新计算动作概率
state = torch .FloatTensor(states[i]).reshape(1,4)
prob = model(state)
prob = prob[0,actions[i]]
loss = -prob.log()*reward_sum
loss.backward(retain_graph=True)
注意这里的神经网络返回的是动作的概率分布
3、仍以平衡车为例具体实现代码
import gym
from matplotlib import pyplot as plt
import torch
import random
from IPython import display
import numpy as np
#创建环境
env = gym.make('CartPole-v1')
env.reset()
#打印游戏
def show():
plt.imshow(env.render(mode='rgb_array'))
plt.axis('off')
plt.show()
#show()
#计算动作模型,也就是真正需要使用的模型
model = torch.nn.Sequential(
torch.nn.Linear(4,128),
torch.nn.ReLU(),
torch.nn.Linear(128,2),
torch.nn.Softmax(dim=1),
)
def get_action(state):
state = torch.FloatTensor(state).reshape(1, 4)
prob = model(state)
prob_normalized = prob[0].tolist()
prob_sum = sum(prob_normalized)
prob_normalized = [p / prob_sum for p in prob_normalized]
action = np.random.choice(range(2), p=prob_normalized, size=1)[0]
return action
def get_Date():
states = []
rewards = []
actions = []
state = env.reset()
over = False
while not over:
action = get_action(state)
next_state,reward,over,_ = env.step(action)
states.append(state)
rewards.append(reward)
actions.append(action)
state = next_state
return states,rewards,actions
def test(play):
state = env.reset()
reward_sum = 0
over = False
while not over:
action = get_action(state)
state,reward,over,_ = env.step((action))
reward_sum +=reward
if play and random.random()<0.2:
display.clear_output(wait=True)
show()
plt.close()
return reward_sum
def train():
optimizer = torch.optim.Adam(model.parameters(),lr=1e-3)
#玩N局每局游戏训练一次
for epoch in range(1000):
states,rewards,actions = get_Date()
optimizer.zero_grad()
#反馈和
reward_sum = 0
#从最后一步算起
for i in reversed(range(len(states))):
#每往前一步,都衰减0.02,如何加上当前步的反馈
reward_sum *= 0.98
reward_sum += rewards[i]
#重新计算动作概率
state = torch .FloatTensor(states[i]).reshape(1,4)
prob = model(state)
prob = prob[0,actions[i]]
loss = -prob.log()*reward_sum
loss.backward(retain_graph=True)
optimizer.step()
if epoch%100==0:
test_result = sum([test(play=False) for _ in range(10)])/10
print(epoch,test_result)
train()
test(play=True)