首页 > 编程语言 >强化学习从基础到进阶–案例与实践[7.1]:深度确定性策略梯度DDPG算法、双延迟深度确定性策略梯度TD3算法详解项目实战

强化学习从基础到进阶–案例与实践[7.1]:深度确定性策略梯度DDPG算法、双延迟深度确定性策略梯度TD3算法详解项目实战

时间:2023-06-27 23:33:14浏览次数:50  
标签:state 梯度 self batch 回合 算法 确定性 cfg action

强化学习从基础到进阶--案例与实践[7.1]:深度确定性策略梯度DDPG算法、双延迟深度确定性策略梯度TD3算法详解项目实战

1、定义算法

1.1 定义模型

!pip uninstall -y parl
!pip install parl
import parl
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
class Actor(parl.Model):
    def __init__(self, n_states, n_actions):
        super(Actor, self).__init__()

        self.l1 = nn.Linear(n_states, 400)
        self.l2 = nn.Linear(400, 300)
        self.l3 = nn.Linear(300, n_actions)

    def forward(self, state):
        x = F.relu(self.l1(state))
        x = F.relu(self.l2(x))
        return paddle.tanh(self.l3(x))

class Critic(parl.Model):
    def __init__(self, n_states, n_actions):
        super(Critic, self).__init__()

        self.l1 = nn.Linear(n_states, 400)
        self.l2 = nn.Linear(400 + n_actions, 300)
        self.l3 = nn.Linear(300, 1)

    def forward(self, state, action):
        x = F.relu(self.l1(state))
        x = F.relu(self.l2(paddle.concat([x, action], 1)))
        return self.l3(x)
class ActorCritic(parl.Model):
    def __init__(self, n_states, n_actions):
        super(ActorCritic, self).__init__()
        self.actor_model = Actor(n_states, n_actions)
        self.critic_model = Critic(n_states, n_actions)

    def policy(self, state):
        return self.actor_model(state)

    def value(self, state, action):
        return self.critic_model(state, action)

    def get_actor_params(self):
        return self.actor_model.parameters()

    def get_critic_params(self):
        return self.critic_model.parameters()

1.2 定义经验回放

from collections import deque
import random
class ReplayBuffer:
    def __init__(self, capacity: int) -> None:
        self.capacity = capacity
        self.buffer = deque(maxlen=self.capacity)
    def push(self,transitions):
        '''_summary_
        Args:
            trainsitions (tuple): _description_
        '''
        self.buffer.append(transitions)
    def sample(self, batch_size: int, sequential: bool = False):
        if batch_size > len(self.buffer):
            batch_size = len(self.buffer)
        if sequential: # sequential sampling
            rand = random.randint(0, len(self.buffer) - batch_size)
            batch = [self.buffer[i] for i in range(rand, rand + batch_size)]
            return zip(*batch)
        else:
            batch = random.sample(self.buffer, batch_size)
            return zip(*batch)
    def clear(self):
        self.buffer.clear()
    def __len__(self):
        return len(self.buffer)

1.3 定义智能体

import parl
import paddle
import numpy as np


class DDPGAgent(parl.Agent):
    def __init__(self, algorithm,memory,cfg):
        super(DDPGAgent, self).__init__(algorithm)
        self.n_actions = cfg['n_actions']
        self.expl_noise = cfg['expl_noise']
        self.batch_size = cfg['batch_size'] 
        self.memory = memory
        self.alg.sync_target(decay=0)

    def sample_action(self, state):
        action_numpy = self.predict_action(state)
        action_noise = np.random.normal(0, self.expl_noise, size=self.n_actions)
        action = (action_numpy + action_noise).clip(-1, 1)
        return action

    def predict_action(self, state):
        state = paddle.to_tensor(state.reshape(1, -1), dtype='float32')
        action = self.alg.predict(state)
        action_numpy = action.cpu().numpy()[0]
        return action_numpy

    def update(self):
        if len(self.memory) < self.batch_size: 
            return
        state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(
            self.batch_size)
        done_batch = np.expand_dims(done_batch , -1)
        reward_batch = np.expand_dims(reward_batch, -1)
        state_batch = paddle.to_tensor(state_batch, dtype='float32')
        action_batch = paddle.to_tensor(action_batch, dtype='float32')
        reward_batch = paddle.to_tensor(reward_batch, dtype='float32')
        next_state_batch = paddle.to_tensor(next_state_batch, dtype='float32')
        done_batch = paddle.to_tensor(done_batch, dtype='float32')
        critic_loss, actor_loss = self.alg.learn(state_batch, action_batch, reward_batch, next_state_batch,
                                                 done_batch)

2. 定义训练

def train(cfg, env, agent):
    ''' 训练
    '''
    print(f"开始训练!")
    rewards = []  # 记录所有回合的奖励
    for i_ep in range(cfg["train_eps"]):
        ep_reward = 0  
        state = env.reset()  
        for i_step in range(cfg['max_steps']):
            action = agent.sample_action(state) # 采样动作
            next_state, reward, done, _ = env.step(action)  
            agent.memory.push((state, action, reward,next_state, done)) 
            state = next_state  
            agent.update()  
            ep_reward += reward  
            if done:
                break
        rewards.append(ep_reward)
        if (i_ep + 1) % 10 == 0:
            print(f"回合:{i_ep+1}/{cfg['train_eps']},奖励:{ep_reward:.2f}")
    print("完成训练!")
    env.close()
    res_dic = {'episodes':range(len(rewards)),'rewards':rewards}
    return res_dic

def test(cfg, env, agent):
    print("开始测试!")
    rewards = []  # 记录所有回合的奖励
    for i_ep in range(cfg['test_eps']):
        ep_reward = 0  
        state = env.reset()  
        for i_step in range(cfg['max_steps']):
            action = agent.predict_action(state) 
            next_state, reward, done, _ = env.step(action)  
            state = next_state  
            ep_reward += reward 
            if done:
                break
        rewards.append(ep_reward)
        print(f"回合:{i_ep+1}/{cfg['test_eps']},奖励:{ep_reward:.2f}")
    print("完成测试!")
    env.close()
    return {'episodes':range(len(rewards)),'rewards':rewards}

3、定义环境

OpenAI Gym中其实集成了很多强化学习环境,足够大家学习了,但是在做强化学习的应用中免不了要自己创建环境,比如在本项目中其实不太好找到Qlearning能学出来的环境,Qlearning实在是太弱了,需要足够简单的环境才行,因此本项目写了一个环境,大家感兴趣的话可以看一下,一般环境接口最关键的部分即使reset和step。

import gym
import os
import paddle
import numpy as np
import random
from parl.algorithms import DDPG
class NormalizedActions(gym.ActionWrapper):
    ''' 将action范围重定在[0.1]之间
    '''
    def action(self, action):
        low_bound   = self.action_space.low
        upper_bound = self.action_space.high
        action = low_bound + (action + 1.0) * 0.5 * (upper_bound - low_bound)
        action = np.clip(action, low_bound, upper_bound)
        return action

    def reverse_action(self, action):
        low_bound   = self.action_space.low
        upper_bound = self.action_space.high
        action = 2 * (action - low_bound) / (upper_bound - low_bound) - 1
        action = np.clip(action, low_bound, upper_bound)
        return action
def all_seed(env,seed = 1):
    ''' 万能的seed函数
    '''
    env.seed(seed) # env config
    np.random.seed(seed)
    random.seed(seed)
    paddle.seed(seed)
def env_agent_config(cfg):
    env = NormalizedActions(gym.make(cfg['env_name'])) # 装饰action噪声
    if cfg['seed'] !=0:
        all_seed(env,seed=cfg['seed'])
    n_states = env.observation_space.shape[0]
    n_actions = env.action_space.shape[0]
    print(f"状态维度:{n_states},动作维度:{n_actions}")
    cfg.update({"n_states":n_states,"n_actions":n_actions}) # 更新n_states和n_actions到cfg参数中
    memory = ReplayBuffer(cfg['memory_capacity'])
    model = ActorCritic(n_states, n_actions)
    algorithm = DDPG(model, gamma=cfg['gamma'], tau=cfg['tau'], actor_lr=cfg['actor_lr'], critic_lr=cfg['critic_lr'])
    agent = DDPGAgent(algorithm,memory,cfg)
    return env,agent

4、设置参数

到这里所有qlearning模块就算完成了,下面需要设置一些参数,方便大家“炼丹”,其中默认的是笔者已经调好的~。另外为了定义了一个画图函数,用来描述奖励的变化。

import argparse
import matplotlib.pyplot as plt
import seaborn as sns
def get_args():
    """ 超参数
    """
    parser = argparse.ArgumentParser(description="hyperparameters")      
    parser.add_argument('--algo_name',default='DDPG',type=str,help="name of algorithm")
    parser.add_argument('--env_name',default='Pendulum-v0',type=str,help="name of environment")
    parser.add_argument('--train_eps',default=200,type=int,help="episodes of training")
    parser.add_argument('--test_eps',default=20,type=int,help="episodes of testing")
    parser.add_argument('--max_steps',default=100000,type=int,help="steps per episode, much larger value can simulate infinite steps")
    parser.add_argument('--gamma',default=0.99,type=float,help="discounted factor")
    parser.add_argument('--critic_lr',default=1e-3,type=float,help="learning rate of critic")
    parser.add_argument('--actor_lr',default=1e-4,type=float,help="learning rate of actor")
    parser.add_argument('--memory_capacity',default=80000,type=int,help="memory capacity")
    parser.add_argument('--expl_noise',default=0.1,type=float)
    parser.add_argument('--batch_size',default=128,type=int)
    parser.add_argument('--target_update',default=2,type=int)
    parser.add_argument('--tau',default=1e-2,type=float)
    parser.add_argument('--critic_hidden_dim',default=256,type=int)
    parser.add_argument('--actor_hidden_dim',default=256,type=int)
    parser.add_argument('--device',default='cpu',type=str,help="cpu or cuda")  
    parser.add_argument('--seed',default=1,type=int,help="random seed")
    args = parser.parse_args([])    
    args = {**vars(args)} # 将args转换为字典  
    # 打印参数
    print("训练参数如下:")
    print(''.join(['=']*80))
    tplt = "{:^20}\t{:^20}\t{:^20}"
    print(tplt.format("参数名","参数值","参数类型"))
    for k,v in args.items():
        print(tplt.format(k,v,str(type(v))))   
    print(''.join(['=']*80))                  
    return args
def smooth(data, weight=0.9):  
    '''用于平滑曲线,类似于Tensorboard中的smooth

    Args:
        data (List):输入数据
        weight (Float): 平滑权重,处于0-1之间,数值越高说明越平滑,一般取0.9

    Returns:
        smoothed (List): 平滑后的数据
    '''
    last = data[0]  # First value in the plot (first timestep)
    smoothed = list()
    for point in data:
        smoothed_val = last * weight + (1 - weight) * point  # 计算平滑值
        smoothed.append(smoothed_val)                    
        last = smoothed_val                                
    return smoothed

def plot_rewards(rewards,cfg,path=None,tag='train'):
    sns.set()
    plt.figure()  # 创建一个图形实例,方便同时多画几个图
    plt.title(f"{tag}ing curve on {cfg['device']} of {cfg['algo_name']} for {cfg['env_name']}")
    plt.xlabel('epsiodes')
    plt.plot(rewards, label='rewards')
    plt.plot(smooth(rewards), label='smoothed')
    plt.legend()

5、训练

# 获取参数
cfg = get_args() 
# 训练
env, agent = env_agent_config(cfg)
res_dic = train(cfg, env, agent)
 
plot_rewards(res_dic['rewards'], cfg, tag="train")  
# 测试
res_dic = test(cfg, env, agent)
plot_rewards(res_dic['rewards'], cfg, tag="test")  # 画出结果
训练参数如下:
================================================================================
        参数名         	        参数值         	        参数类型        
     algo_name      	        DDPG        	   <class 'str'>    
      env_name      	    Pendulum-v0     	   <class 'str'>    
     train_eps      	        200         	   <class 'int'>    
      test_eps      	         20         	   <class 'int'>    
     max_steps      	       100000       	   <class 'int'>    
       gamma        	        0.99        	  <class 'float'>   
     critic_lr      	       0.001        	  <class 'float'>   
      actor_lr      	       0.0001       	  <class 'float'>   
  memory_capacity   	       80000        	   <class 'int'>    
     expl_noise     	        0.1         	  <class 'float'>   
     batch_size     	        128         	   <class 'int'>    
   target_update    	         2          	   <class 'int'>    
        tau         	        0.01        	  <class 'float'>   
 critic_hidden_dim  	        256         	   <class 'int'>    
  actor_hidden_dim  	        256         	   <class 'int'>    
       device       	        cpu         	   <class 'str'>    
        seed        	         1          	   <class 'int'>    
================================================================================
状态维度:3,动作维度:1
开始训练!
回合:10/200,奖励:-922.80
回合:20/200,奖励:-390.80
回合:30/200,奖励:-125.50
回合:40/200,奖励:-822.66
回合:50/200,奖励:-384.92
回合:60/200,奖励:-132.26
回合:70/200,奖励:-240.20
回合:80/200,奖励:-242.37
回合:90/200,奖励:-127.13
回合:100/200,奖励:-365.29
回合:110/200,奖励:-126.27
回合:120/200,奖励:-231.47
回合:130/200,奖励:-1.98
回合:140/200,奖励:-223.84
回合:150/200,奖励:-123.29
回合:160/200,奖励:-362.06
回合:170/200,奖励:-126.93
回合:180/200,奖励:-119.77
回合:190/200,奖励:-114.72
回合:200/200,奖励:-116.01
完成训练!
开始测试!
回合:1/20,奖励:-125.61
回合:2/20,奖励:-0.97
回合:3/20,奖励:-130.02
回合:4/20,奖励:-117.46
回合:5/20,奖励:-128.45
回合:6/20,奖励:-124.48
回合:7/20,奖励:-118.31
回合:8/20,奖励:-127.18
回合:9/20,奖励:-118.09
回合:10/20,奖励:-0.55
回合:11/20,奖励:-117.72
回合:12/20,奖励:-1.08
回合:13/20,奖励:-124.74
回合:14/20,奖励:-133.55
回合:15/20,奖励:-234.81
回合:16/20,奖励:-126.93
回合:17/20,奖励:-128.20
回合:18/20,奖励:-124.76
回合:19/20,奖励:-119.91
回合:20/20,奖励:-287.89
完成测试!

更多优质内容请关注公号:汀丶人工智能

标签:state,梯度,self,batch,回合,算法,确定性,cfg,action
From: https://blog.51cto.com/u_15485092/6567756

相关文章

  • 算法:给定 n 个不同元素的数组,设计算法等概率取 m 个不同的元素
      有几种算法可以实现从n个不同元素的数组中等概率地取出m个不同元素,其中一种是Knuth-DurstenfeldShuffle算法,它的思想是:将1到n的数字存到数组中从数组中取一个1到剩下数字个数的随机数k从低位开始,将数组第k个数字取出,并保存到结果数组末尾重复第2步,直到取出m个数字......
  • 算法
    枚举 前缀和,差分前缀和:sum[i]=a[i]+sum[i-1] 前i个数的求和。差分:delta[i]=a[i]-a[i-1]第i个数-第i-1个数。例题:https://ac.nowcoder.com/acm/problem/166491#include<bits/stdc++.h>2usingnamespacestd;3/**4对于......
  • 垃圾回收与算法
    如何确定垃圾1、引用计数法:在Java中,引用和对象是有关联的,如果要操作对象则必须用引用进行。因此,很显然一个简单的办法是通过引用计数来判断一个对象是否可以回收。简单说,即一个对象如果没有任何与之关联的引用,即他们的引用计数都不为0,则说明对象不太可能再被用到,那么这个对象就是可......
  • 自动驾驶横纵向耦合控制-复现Apollo横纵向控制 基于动力学误差模型,使用mpc算法,一个控
    自动驾驶横纵向耦合控制-复现Apollo横纵向控制基于动力学误差模型,使用mpc算法,一个控制器同时控制横向和纵向,实现横纵向耦合控制matlab与simulink联合仿真,纵向控制已经做好油门刹车标定表,跟踪五次多项式换道轨迹,效果完美。内含三套代码,两套采用面向对象编程-一套只对控制量添加约......
  • re | 逆向算法笔记
    凯撒算法加密for(i=0;i<strlen(passwd);i++){if(passwd[i]>='A'&&passwd[i]<='Z'){passwd[i]=((passwd[i]-'A')+move)%26+'A';}elseif(passwd[i]>='a'&&......
  • 实践讲解强化学习之梯度策略、添加基线、优势函数、动作分配合适的分数
    摘要:本文将从实践案例角度为大家解读强化学习中的梯度策略、添加基线(baseline)、优势函数、动作分配合适的分数(credit)。本文分享自华为云社区《强化学习从基础到进阶-案例与实践[5]:梯度策略、添加基线(baseline)、优势函数、动作分配合适的分数(credit)》,作者:汀丶。1策略梯度算法如图......
  • Prim算法 最小值生成树
    前言:给定一个无向图,如果它的某个子图中任意两个顶点都互相连通并且是一棵树,那么这棵树就叫做生成树(SpanningTree)。如果边上有权值,那么使得边权和最小的生成树叫做最小生成树(MST,MinimumSpanningTree)。例如我们假设有这样一个图:把顶点看作村庄,边看作计划要修建的道路。......
  • 【算法】根据整数数组,生成正的素因子二位数组,并排序
    给定一个正整数或负整数的数组,I=[i1,..,in] 生成一个形式为的排序数组P [[p,I数组的所有ij的和,其中p是ij的素因子(p为正)]…]P将按素数的递增顺序进行排序。 示例:I={12,15};//结果=“(212)(327)(515)”[2,3,5]是I的元素的所有素因子的列表,因此是结果。 注意事项: 如果某些数字为......
  • 基于扩展卡尔曼滤波EKF的语音信号基音估计算法matlab仿真
    1.算法仿真效果matlab2022a仿真结果如下:   2.算法涉及理论知识概要      基音是语音信号的基本频率成分,它决定了语音的音调和声音的音高。在语音信号处理中,基音估计是一个重要的任务,它可以用于语音合成、语音识别、语音增强等应用。扩展卡尔曼滤波(ExtendedKalma......
  • 强化学习从基础到进阶-常见问题和面试必知必答[6]:演员-评论员算法(advantage actor-cri
    强化学习从基础到进阶-常见问题和面试必知必答[6]:演员-评论员算法(advantageactor-critic,A2C),异步A2C、与生成对抗网络的联系等详解1.核心词汇优势演员-评论员(advantageactor-critic,A2C)算法:一种改进的演员-评论员(actor-critic)算法。异步优势演员-评论员(asynchronousadvanta......