对于之前提到的DQN模型, 损失函数使用的
Q(state) = reward + Q(nextState)max
Q(state)由训练网络生成, Q(nextState)max由目标网络生成
这种损失函数会存在问题,即当Q(nextState)max总是大于0时,那么Q(state)总是在不停的增大,同时Q(nextState)max也在不断的增大, 即Q(state)存在被高估的情况
Double DQN的解决方法是,由Q(nextstate)训练网络来决定方向,由Q(nextState)目标网络来决定Qvalue的数值, 方向来获取目标函数的数值
max_action = self.q_net(next_states).max(1)[1].view(-1, 1) # 训练函数 max_next_q_values = self.target_q_net(next_states).gather(1, max_action) # 目标函数
场景: 倒立摆的环境, 一共11个动作,动作[0,1,2,3,...9,10], 需要转换为力矩[-2, -1.6, -1.2, ....1.2, 1.6, 2] 转化成离散的, 因为DQN只能处理离散的数据
train.py
import random import gym import numpy as np import collections import torch import torch.nn.functional as F import matplotlib.pyplot as plt from tqdm import tqdm from model import DQN import rl_utils from rl_utils import ReplayBuffer lr = 1e-2 # 学习率 num_episodes = 200 #迭代次数 hidden_dim = 128 #隐藏层 gamma = 0.98 epsilon = 0.01 target_update = 50 batch_size = 64 device = torch.device("cuda") if torch.cuda.is_available() else torch.device('cpu') buffer_size = 5000 # 观察数据的数量 minimal_size = 1000 replay_buffer = ReplayBuffer(buffer_size) env_name = "Pendulum-v0" env = gym.make(env_name) state_dim = env.observation_space.shape[0] # 这里的输入是4个变量 速度, 位置, 尖端速度, 杆的角度 action_dim = 11 def dis_to_con(disrete_action, env, action_dim): action_lowbound = env.action_space.low[0] action_upbound = env.action_space.high[0] return action_lowbound + (disrete_action / (action_dim - 1)) * (action_upbound - action_lowbound) def train_DQN(agent, env, num_episodes, replay_buffer, minimal_size, batch_size): return_list = [] max_q_value_list = [] max_q_value = 0 for i in range(10): with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar: for i_episode in range(int(num_episodes/10)): episode_return = 0 state = env.reset() done = False while not done: action = agent.take_action(state) # 使用累计来表示max_q_value max_q_value = agent.max_q_value( state) * 0.005 + max_q_value * 0.995 max_q_value_list.append(max_q_value) action_continuous = dis_to_con(action, env, agent.action_dim) # 将角度转换为离散数据 next_state, reward, done, _ = env.step([action_continuous]) # 将角度输入到环境中获得下一个状态, 奖励, 是否停止 replay_buffer.add(state, action, reward, next_state, done) # 加入到缓冲区 state = next_state episode_return += reward # 当buffer的数量超过一定数量的时候, 进行Q网络训练 if replay_buffer.size() > minimal_size: b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size=batch_size) # 从缓冲区取数据 transition_dict = { 'states':b_s, 'actions':b_a, 'next_states':b_ns, 'rewards': b_r, 'dones': b_d, } agent.update(transition_dict) # 更新网络 return_list.append(episode_return) if (i_episode + 1) % 10 == 0: pbar.set_postfix({ 'episode': '%d' % (num_episodes / 10 * i + i_episode), 'return': '%.3f' % np.mean(return_list[-10:]) }) pbar.update(1) return return_list, max_q_value_list agent = DQN(state_dim, hidden_dim, action_dim, lr, gamma, epsilon, target_update, device) return_list, max_q_value_list = train_DQN(agent, env, num_episodes, replay_buffer, minimal_size, batch_size) episodes_list = list(range(len(return_list))) plt.plot(episodes_list, return_list) plt.xlabel('Episodes') plt.ylabel('Returns') plt.title('DQN on {}'.format(env_name)) plt.show() frames_list = list(range(len(max_q_value_list))) plt.plot(frames_list, max_q_value_list) plt.axhline(0, c='orange', ls='--') plt.axhline(10, c='red', ls='--') plt.xlabel('Frames') plt.ylabel('Q value') plt.title('DQN on {}'.format(env_name)) plt.show()
model.py
import numpy as np import torch.nn import torch.nn.functional as F class Qnet(torch.nn.Module): '''只有一层的隐藏层的Q网络''' def __init__(self, state_dim, hidden_dim, action_dim): super(Qnet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, action_dim) def forward(self, x): x = F.relu(self.fc1(x)) return self.fc2(x) class DQN: """DQN算法""" def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma, epsilon, target_update, device, dqn_type="DoubleDQN_dim"): self.action_dim = action_dim self.q_net = Qnet(state_dim, hidden_dim, self.action_dim).to(device) # 目标网络 self.target_q_net = Qnet(state_dim, hidden_dim, self.action_dim).to(device) #使用Adam优化器 self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr=learning_rate) self.gamma = gamma # 折扣因子 self.epsilon = epsilon # epsilon-贪婪策略 self.target_update = target_update # 目标网络更新频率 self.count = 0 #计数器,记录更新次数 self.device = device self.dqn_type = dqn_type '''输入到模型获得动作, 使用的epsilon''' def take_action(self, state): if np.random.random() < self.epsilon: action = np.random.randint(self.action_dim) else: state = torch.tensor([state], dtype=torch.float).to(self.device) action = self.q_net(state).argmax().item() # item表示实际的数据 return action '''用来更新网络参数''' def update(self, transition_dict): states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device) actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device) rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).to(self.device) next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device) dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device) q_values = self.q_net(states).gather(1, actions) # 获得对应动作的Q值 # 下一个状态的最大Q值 if self.dqn_type == "DoubleDQN": max_action = self.q_net(next_states).max(1)[1].view(-1, 1) max_next_q_values = self.target_q_net(next_states).gather(1, max_action) else: max_next_q_values = self.target_q_net(next_states).max(1)[0].view(-1, 1) q_target = rewards + self.gamma * max_next_q_values dqn_loss = torch.mean(F.mse_loss(q_values, q_target)) self.optimizer.zero_grad() dqn_loss.backward() self.optimizer.step() if self.count % self.target_update == 0: self.target_q_net.load_state_dict( self.q_net.state_dict()) # 更新目标网络 self.count += 1 def max_q_value(self, state): state = torch.tensor([state], dtype=torch.float).to(self.device) return self.q_net(state).max().item()
rl_utils.py
from tqdm import tqdm import numpy as np import torch import collections import random class ReplayBuffer: def __init__(self, capacity): self.buffer = collections.deque(maxlen=capacity) def add(self, state, action, reward, next_state, done): self.buffer.append((state, action, reward, next_state, done)) def sample(self, batch_size): transitions = random.sample(self.buffer, batch_size) state, action, reward, next_state, done = zip(*transitions) return np.array(state), action, reward, np.array(next_state), done def size(self): return len(self.buffer) def moving_average(a, window_size): cumulative_sum = np.cumsum(np.insert(a, 0, 0)) middle = (cumulative_sum[window_size:] - cumulative_sum[:-window_size]) / window_size r = np.arange(1, window_size - 1, 2) begin = np.cumsum(a[:window_size - 1])[::2] / r end = (np.cumsum(a[:-window_size:-1])[::2] / r)[::-1] return np.concatenate((begin, middle, end)) def train_on_policy_agent(env, agent, num_episodes): return_list = [] for i in range(10): with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar: for i_episode in range(int(num_episodes / 10)): episode_return = 0 transition_dict = {'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': []} state = env.reset() done = False while not done: action = agent.take_action(state) next_state, reward, done, _ = env.step(action) transition_dict['states'].append(state) transition_dict['actions'].append(action) transition_dict['next_states'].append(next_state) transition_dict['rewards'].append(reward) transition_dict['dones'].append(done) state = next_state episode_return += reward return_list.append(episode_return) agent.update(transition_dict) if (i_episode + 1) % 10 == 0: pbar.set_postfix({'episode': '%d' % (num_episodes / 10 * i + i_episode + 1), 'return': '%.3f' % np.mean(return_list[-10:])}) pbar.update(1) return return_list def train_off_policy_agent(env, agent, num_episodes, replay_buffer, minimal_size, batch_size): return_list = [] for i in range(10): with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar: for i_episode in range(int(num_episodes / 10)): episode_return = 0 state = env.reset() done = False while not done: action = agent.take_action(state) next_state, reward, done, _ = env.step(action) replay_buffer.add(state, action, reward, next_state, done) state = next_state episode_return += reward if replay_buffer.size() > minimal_size: b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size) transition_dict = {'states': b_s, 'actions': b_a, 'next_states': b_ns, 'rewards': b_r, 'dones': b_d} agent.update(transition_dict) return_list.append(episode_return) if (i_episode + 1) % 10 == 0: pbar.set_postfix({'episode': '%d' % (num_episodes / 10 * i + i_episode + 1), 'return': '%.3f' % np.mean(return_list[-10:])}) pbar.update(1) return return_list def compute_advantage(gamma, lmbda, td_delta): td_delta = td_delta.detach().numpy() advantage_list = [] advantage = 0.0 for delta in td_delta[::-1]: advantage = gamma * lmbda * advantage + delta advantage_list.append(advantage) advantage_list.reverse() return torch.tensor(advantage_list, dtype=torch.float)
标签:dim,return,Double,模型,list,state,action,DQN,self From: https://www.cnblogs.com/my-love-is-python/p/16656223.html