首页 > 其他分享 >强化学习-Double DQN(两个DQN模型)

强化学习-Double DQN(两个DQN模型)

时间:2022-09-04 21:57:54浏览次数:46  
标签:dim return Double 模型 list state action DQN self

对于之前提到的DQN模型, 损失函数使用的

Q(state) = reward + Q(nextState)max

Q(state)由训练网络生成, Q(nextState)max由目标网络生成

这种损失函数会存在问题,即当Q(nextState)max总是大于0时,那么Q(state)总是在不停的增大,同时Q(nextState)max也在不断的增大, 即Q(state)存在被高估的情况

Double DQN的解决方法是,由Q(nextstate)训练网络来决定方向,由Q(nextState)目标网络来决定Qvalue的数值, 方向来获取目标函数的数值

max_action = self.q_net(next_states).max(1)[1].view(-1, 1) # 训练函数 
max_next_q_values = self.target_q_net(next_states).gather(1, max_action) # 目标函数

场景: 倒立摆的环境, 一共11个动作,动作[0,1,2,3,...9,10], 需要转换为力矩[-2, -1.6, -1.2, ....1.2, 1.6, 2] 转化成离散的, 因为DQN只能处理离散的数据
train.py 

import random
import gym
import numpy as np
import collections
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
from tqdm import tqdm
from model import DQN
import rl_utils
from rl_utils import ReplayBuffer


lr = 1e-2 # 学习率
num_episodes = 200 #迭代次数
hidden_dim = 128 #隐藏层
gamma = 0.98
epsilon = 0.01
target_update = 50
batch_size = 64

device = torch.device("cuda") if torch.cuda.is_available() else torch.device('cpu')

buffer_size = 5000 # 观察数据的数量
minimal_size = 1000
replay_buffer = ReplayBuffer(buffer_size)
env_name = "Pendulum-v0"
env = gym.make(env_name)
state_dim = env.observation_space.shape[0] # 这里的输入是4个变量 速度, 位置, 尖端速度, 杆的角度
action_dim = 11

def dis_to_con(disrete_action, env, action_dim):
    action_lowbound = env.action_space.low[0]
    action_upbound = env.action_space.high[0]
    return action_lowbound + (disrete_action / (action_dim - 1)) * (action_upbound - action_lowbound)

def train_DQN(agent, env, num_episodes, replay_buffer, minimal_size,
          batch_size):
    return_list = []
    max_q_value_list = []
    max_q_value = 0

    for i in range(10):
        with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
            for i_episode in range(int(num_episodes/10)):
                episode_return = 0
                state = env.reset()
                done = False
                while not done:
                    action = agent.take_action(state)
                    # 使用累计来表示max_q_value

                    max_q_value = agent.max_q_value(
                        state) * 0.005 + max_q_value * 0.995

                    max_q_value_list.append(max_q_value)
                    action_continuous = dis_to_con(action, env, agent.action_dim)  # 将角度转换为离散数据
                    next_state, reward, done, _ = env.step([action_continuous]) # 将角度输入到环境中获得下一个状态, 奖励, 是否停止
                    replay_buffer.add(state, action, reward, next_state, done) # 加入到缓冲区
                    state = next_state
                    episode_return += reward 
                    # 当buffer的数量超过一定数量的时候, 进行Q网络训练
                    if replay_buffer.size() > minimal_size:
                        b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size=batch_size) # 从缓冲区取数据
                        transition_dict = {
                            'states':b_s,
                            'actions':b_a,
                            'next_states':b_ns,
                            'rewards': b_r,
                            'dones': b_d,
                        }
                        agent.update(transition_dict) # 更新网络

                return_list.append(episode_return)
                if (i_episode + 1) % 10 == 0:
                    pbar.set_postfix({
                        'episode':
                        '%d' % (num_episodes / 10 * i + i_episode),
                        'return':
                        '%.3f' % np.mean(return_list[-10:])
                    })
                pbar.update(1)
    return return_list, max_q_value_list



agent = DQN(state_dim, hidden_dim, action_dim, lr,
            gamma, epsilon, target_update, device)

return_list, max_q_value_list = train_DQN(agent, env, num_episodes, replay_buffer, minimal_size,
                                      batch_size)
episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('DQN on {}'.format(env_name))
plt.show()

frames_list = list(range(len(max_q_value_list)))
plt.plot(frames_list, max_q_value_list)
plt.axhline(0, c='orange', ls='--')
plt.axhline(10, c='red', ls='--')
plt.xlabel('Frames')
plt.ylabel('Q value')
plt.title('DQN on {}'.format(env_name))
plt.show()

 

 model.py 

import numpy as np
import torch.nn
import torch.nn.functional as F



class Qnet(torch.nn.Module):
    '''只有一层的隐藏层的Q网络'''
    def __init__(self, state_dim, hidden_dim, action_dim):
        super(Qnet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        return self.fc2(x)

class DQN:
    """DQN算法"""
    def __init__(self, state_dim, hidden_dim, action_dim, learning_rate,
                 gamma, epsilon, target_update, device, dqn_type="DoubleDQN_dim"):
        self.action_dim = action_dim
        self.q_net = Qnet(state_dim, hidden_dim,
                                 self.action_dim).to(device)
        # 目标网络
        self.target_q_net = Qnet(state_dim, hidden_dim,
                                 self.action_dim).to(device)
        #使用Adam优化器
        self.optimizer = torch.optim.Adam(self.q_net.parameters(),
                                          lr=learning_rate)
        self.gamma = gamma # 折扣因子
        self.epsilon = epsilon  # epsilon-贪婪策略
        self.target_update = target_update # 目标网络更新频率
        self.count = 0 #计数器,记录更新次数
        self.device = device
        self.dqn_type = dqn_type


    '''输入到模型获得动作, 使用的epsilon'''
    def take_action(self, state):
        if np.random.random() < self.epsilon:
            action = np.random.randint(self.action_dim)
        else:
            state = torch.tensor([state], dtype=torch.float).to(self.device)
            action = self.q_net(state).argmax().item() # item表示实际的数据

        return action

    '''用来更新网络参数'''
    def update(self, transition_dict):
        states = torch.tensor(transition_dict['states'],
                              dtype=torch.float).to(self.device)
        actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)
        rewards = torch.tensor(transition_dict['rewards'],
                               dtype=torch.float).to(self.device)
        next_states = torch.tensor(transition_dict['next_states'],
                                   dtype=torch.float).to(self.device)
        dones = torch.tensor(transition_dict['dones'],
                             dtype=torch.float).view(-1, 1).to(self.device)

        q_values = self.q_net(states).gather(1, actions) # 获得对应动作的Q值
        # 下一个状态的最大Q值
        if self.dqn_type == "DoubleDQN":
            max_action = self.q_net(next_states).max(1)[1].view(-1, 1)
            max_next_q_values = self.target_q_net(next_states).gather(1, max_action)
        else:
            max_next_q_values = self.target_q_net(next_states).max(1)[0].view(-1, 1)


        q_target = rewards + self.gamma * max_next_q_values
        dqn_loss = torch.mean(F.mse_loss(q_values, q_target))
        self.optimizer.zero_grad()
        dqn_loss.backward()
        self.optimizer.step()

        if self.count % self.target_update == 0:
            self.target_q_net.load_state_dict(
                self.q_net.state_dict())  # 更新目标网络

        self.count += 1

    def max_q_value(self, state):
        state = torch.tensor([state], dtype=torch.float).to(self.device)
        return self.q_net(state).max().item()

rl_utils.py 

from tqdm import tqdm
import numpy as np
import torch
import collections
import random


class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        transitions = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*transitions)
        return np.array(state), action, reward, np.array(next_state), done

    def size(self):
        return len(self.buffer)


def moving_average(a, window_size):
    cumulative_sum = np.cumsum(np.insert(a, 0, 0))
    middle = (cumulative_sum[window_size:] - cumulative_sum[:-window_size]) / window_size
    r = np.arange(1, window_size - 1, 2)
    begin = np.cumsum(a[:window_size - 1])[::2] / r
    end = (np.cumsum(a[:-window_size:-1])[::2] / r)[::-1]
    return np.concatenate((begin, middle, end))


def train_on_policy_agent(env, agent, num_episodes):
    return_list = []
    for i in range(10):
        with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
            for i_episode in range(int(num_episodes / 10)):
                episode_return = 0
                transition_dict = {'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': []}
                state = env.reset()
                done = False
                while not done:
                    action = agent.take_action(state)
                    next_state, reward, done, _ = env.step(action)
                    transition_dict['states'].append(state)
                    transition_dict['actions'].append(action)
                    transition_dict['next_states'].append(next_state)
                    transition_dict['rewards'].append(reward)
                    transition_dict['dones'].append(done)
                    state = next_state
                    episode_return += reward
                return_list.append(episode_return)
                agent.update(transition_dict)
                if (i_episode + 1) % 10 == 0:
                    pbar.set_postfix({'episode': '%d' % (num_episodes / 10 * i + i_episode + 1),
                                      'return': '%.3f' % np.mean(return_list[-10:])})
                pbar.update(1)
    return return_list


def train_off_policy_agent(env, agent, num_episodes, replay_buffer, minimal_size, batch_size):
    return_list = []
    for i in range(10):
        with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
            for i_episode in range(int(num_episodes / 10)):
                episode_return = 0
                state = env.reset()
                done = False
                while not done:
                    action = agent.take_action(state)
                    next_state, reward, done, _ = env.step(action)
                    replay_buffer.add(state, action, reward, next_state, done)
                    state = next_state
                    episode_return += reward
                    if replay_buffer.size() > minimal_size:
                        b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size)
                        transition_dict = {'states': b_s, 'actions': b_a, 'next_states': b_ns, 'rewards': b_r,
                                           'dones': b_d}
                        agent.update(transition_dict)
                return_list.append(episode_return)
                if (i_episode + 1) % 10 == 0:
                    pbar.set_postfix({'episode': '%d' % (num_episodes / 10 * i + i_episode + 1),
                                      'return': '%.3f' % np.mean(return_list[-10:])})
                pbar.update(1)
    return return_list


def compute_advantage(gamma, lmbda, td_delta):
    td_delta = td_delta.detach().numpy()
    advantage_list = []
    advantage = 0.0
    for delta in td_delta[::-1]:
        advantage = gamma * lmbda * advantage + delta
        advantage_list.append(advantage)
    advantage_list.reverse()
    return torch.tensor(advantage_list, dtype=torch.float)

 

标签:dim,return,Double,模型,list,state,action,DQN,self
From: https://www.cnblogs.com/my-love-is-python/p/16656223.html

相关文章

  • 五层模型
    00前言我们每天使用互联网,你是否想过,它是如何实现的?全世界几十亿台电脑,连接在一起,两两通信。上海的某一块网卡送出信号,洛杉矶的另一块网卡居然就收到了,两者实际上根本不......
  • 线程模型
    一、线程模型种类1.传统阻塞I/O服务模型 2.Reactor模式二、传统阻塞I/O服务模型三、Reactor模式 1.根据Reactor的数量和处理资源池线程的数量......
  • LIME模型---"Why Should I Trust You?": Explaining the Predictions of Any Classifi
    文章核心思想速读:提出了一个LIME解释技术能够对任何分类器做出的预测进行解释。L指LOCAL,意思是模型是针对局部某个样本的解释,而不是所有样本I指:INTERPRETABLE,可解释性,能......
  • 并发学习记录10:共享模型之无锁
    一个小例子引入importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.atomic.AtomicInteger;interfaceAccount{IntegergetBalance......
  • 使用扩散模型从文本生成图像
    1代的DALLE使用VQ-VAE的改进版,2代的DALLE2通过使用扩散模型将图片的生成提升到了一个新的高度,但是由于其计算量很大而且没有开源,我们普通用户并没有办法使用,但是StableD......
  • 第2周 CSS3基础语法与盒模型
    第一节CSS3基础入门1、CSS3简介 2、CSS3书写的位置 3、CSS3书写的语法 第二节CSS3选择器  第三节文本与字体属性 第四节盒模型......
  • Python机器学习-多元分类的5种模型
    Python机器学习-多元分类的5种模型最近上了些机器学习的课程,于是想透过Kaggle资料集来练习整个资料科学专案的流程,在模型训练阶段,虽然听过许多分类模型,但不是很了解其各别......
  • 并发学习记录09:共享模型之内存
    Java内存模型JMM指的是Javamemorymodel,它定义了主存,工作内存等抽象概念,相当于做一个隔离层,将底层CPU寄存器,缓存,硬件内存,CPU指令优化提供的功能通过一个简单接口给使用......
  • 双向长短期记忆模型如何工作(深度学习)
    双向长短期记忆模型如何工作(深度学习)Photoby弗雷迪·雅各布on不飞溅使用改进的双向长短期记忆神经网络(arXiv)检测恶意请求作者:WenhaoLi,BinchengZhan......
  • 在 ML 模型上进行协作变得容易
    在ML模型上进行协作变得容易所以当我在我的组织中部署我的第一个ML模型时,这发生在我身上,利益相关者对尝试它非常好奇。由于他们来自非技术背景,所以我认为分享我的jup......