首页 > 编程语言 >25/1/7 算法笔记<强化学习> sac_learn代码拆解

25/1/7 算法笔记<强化学习> sac_learn代码拆解

时间:2025-01-08 12:33:41浏览次数:8  
标签:25 log dim sac self state learn action soft

昨天我们看了V-REP中一个github项目的环境代码,今天我们来分析下他的强化学习代码。

git链接:

https://github.com/deep-reinforcement-learning-book/Chapter16-Robot-Learning-in-Simulation.

首先导入了库

import math
import random

import gym
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal   #正态分布概率函数

设置多进程启动方式

torch.multiprocessing.set_start_method('forkserver', force=True)

在使用 torTorch.multiprocessing 模块时,启动子进程的方式非常重要,尤其是在使用 CUDA 时。默认的启动方式('fork')可能会导致问题,因为 CUDA 对进程分叉(fork)的处理方式不太友好。

from IPython.display import clear_output  #Ipython显示功能
import matplotlib.pyplot as plt       
from matplotlib import animation
from IPython.display import display   

from sawyer_grasp_env_boundingbox import GraspEnv  #导入昨天的自定义环境
import argparse            #用于解析命令行
import time                #用于时间相关操作
import pickle              #用于序列化和反序列化

import torch.multiprocessing as mp    #多进程处理
from torch.multiprocessing import Process#多进程工具

from multiprocessing import Process, Manager#多进程共享工具
from multiprocessing.managers import BaseManager#创建自定义的共享对象管理器

根据条件选择GPU还是CPU,并打印出当前设备

GPU = True
device_idx = 0
if GPU:
    device = torch.device("cuda:" + str(device_idx) if torch.cuda.is_available() else "cpu")
else:
    device = torch.device("cpu")
print(device)

使用python的argparse模块解析命令行,让用户可以通过命令行指定是进行训练(--train)还是测试(--test)。

parser = argparse.ArgumentParser(description='Train or test neural net motor controller.')
parser.add_argument('--train', dest='train', action='store_true', default=False)
parser.add_argument('--test', dest='test', action='store_true', default=False)  #添加命令行参数

args = parser.parse_args() #解析命令行参数

设置经验回放缓冲区

class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity  #设置缓冲区最大存储量
        self.buffer = []    #开辟空间存储
        self.position = 0
    
    def push(self, state, action, reward, next_state, done): #存放动作
        if len(self.buffer) < self.capacity:    
            self.buffer.append(None)  #这种做法通常用于初始化缓冲区,或者在某些特殊情况下确保缓冲区的长度与容量一致。
        self.buffer[self.position] = (state, action, reward, next_state, done)  #存放状态,动作,奖励,下一个状态,动作有没有结束
        self.position = int((self.position + 1) % self.capacity)   #更新当前位置写入的索引,并确保索引在缓冲区容量范围内循环

sample函数,用于从缓冲区随机抽取一批经验数据,供训练模型使用

def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)   #batch是一个列表,表示一条经验数据
        state, action, reward, next_state, done = map(np.stack, zip(*batch)) # stack for each element
        ''' 
        the * serves as unpack: sum(a,b) <=> batch=(a,b), sum(*batch) ;
        zip: a=[1,2], b=[2,3], zip(a,b) => [(1, 2), (2, 3)] ;
        the map serves as mapping the function on each list element: map(square, [2,3]) => [4,9] ;
        np.stack((1,2)) => array([1, 2])
        '''
        return state, action, reward, next_state, done

设置len和get_length函数

def __len__(self):  # cannot work in multiprocessing case, len(replay_buffer) is not available in proxy of manager!
        return len(self.buffer)

    def get_length(self):
        return len(self.buffer)

标准化函数

class NormalizedActions(gym.ActionWrapper):
    def _action(self, action):
        low  = self.action_space.low
        high = self.action_space.high
        
        action = low + (action + 1.0) * 0.5 * (high - low)
        action = np.clip(action, low, high)
        
        return action

    def _reverse_action(self, action):
        low  = self.action_space.low
        high = self.action_space.high
        
        action = 2 * (action - low) / (high - low) - 1
        action = np.clip(action, low, high)
        
        return action

_action函数将标准化动作映射回原始动作空间

_reverse_action将原始动作映射到标准化动作空间

 为什么这样设计?

  1. 线性映射

    • 线性映射保持了动作的相对比例,不会改变动作的分布特性。

    • 这对于强化学习算法的训练非常重要,因为非线性映射可能会引入额外的复杂性。

  2. 边界对齐

    • 确保原始动作的最小值和最大值分别映射到标准化动作的 -1 和 1

    • 这样可以充分利用标准化动作空间的范围。

  3. 可逆性

    • 标准化和反标准化公式是互逆的,可以方便地在两个空间之间转换。

    • 这对于算法的实现和调试非常有用。

  4. 数值稳定性

    • 标准化后的动作范围是 [-1, 1],避免了过大或过小的数值,提高了数值稳定性。

价值网络设置

表示在状态s下智能体未来可能获得的累计回报的期望值

class ValueNetwork(nn.Module):
    def __init__(self, state_dim, hidden_dim, init_w=3e-3):
        super(ValueNetwork, self).__init__()
        
        self.linear1 = nn.Linear(state_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
        self.linear3 = nn.Linear(hidden_dim, hidden_dim)
        self.linear4 = nn.Linear(hidden_dim, 1)
        # 对权重和偏置初始化,用均匀分布将权重和偏置初始化为范围内的随机值
        self.linear4.weight.data.uniform_(-init_w, init_w)
        self.linear4.bias.data.uniform_(-init_w, init_w)
        
    def forward(self, state):
        x = F.relu(self.linear1(state))
        x = F.relu(self.linear2(x))
        x = F.relu(self.linear3(x))
        x = self.linear4(x)
        return x

为什么需要权重初始化?

权重初始化对神经网络的训练非常重要,原因如下:

  1. 避免梯度消失或爆炸

    • 如果权重初始化过大或过小,可能导致梯度在反向传播时消失或爆炸。

    • 合适的初始化可以缓解这一问题。

  2. 加速收敛

    • 良好的初始化可以使网络更快地收敛到最优解。

  3. 打破对称性

    • 如果所有权重初始化为相同的值,网络中的神经元会学习相同的特征,导致性能下降。

    • 随机初始化可以打破对称性,使每个神经元学习不同的特征。

softQ网络

它的核心思想是在传统的Q-learning思想上引入熵正则化,从而鼓励策略的探索性。

class SoftQNetwork(nn.Module):
    def __init__(self, num_inputs, num_actions, hidden_size, init_w=3e-3):
        super(SoftQNetwork, self).__init__()
        
        self.linear1 = nn.Linear(num_inputs + num_actions, hidden_size)
        self.linear2 = nn.Linear(hidden_size, hidden_size)
        self.linear3 = nn.Linear(hidden_size, hidden_size)
        self.linear4 = nn.Linear(hidden_size, 1)
        
        self.linear4.weight.data.uniform_(-init_w, init_w)
        self.linear4.bias.data.uniform_(-init_w, init_w)
        
    def forward(self, state, action):
        x = torch.cat([state, action], 1) # the dim 0 is number of samples
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        x = F.relu(self.linear3(x))
        x = self.linear4(x)
        return x
        

输入的是状态s和动作a的拼接。

策略网络

策略网络的目标是直接学习策略,即在给定状态s下,智能体应该采取的动作a,策略可以是确定性的或随机性的

class PolicyNetwork(nn.Module):
    def __init__(self, num_inputs, num_actions, hidden_size, action_range=1., init_w=3e-3, log_std_min=-20, log_std_max=2):
        super(PolicyNetwork, self).__init__()
        
        self.log_std_min = log_std_min
        self.log_std_max = log_std_max
        
        self.linear1 = nn.Linear(num_inputs, hidden_size)
        self.linear2 = nn.Linear(hidden_size, hidden_size)
        self.linear3 = nn.Linear(hidden_size, hidden_size)
        self.linear4 = nn.Linear(hidden_size, hidden_size)

        self.mean_linear = nn.Linear(hidden_size, num_actions)
        self.mean_linear.weight.data.uniform_(-init_w, init_w)
        self.mean_linear.bias.data.uniform_(-init_w, init_w)
        
        self.log_std_linear = nn.Linear(hidden_size, num_actions)
        self.log_std_linear.weight.data.uniform_(-init_w, init_w)
        self.log_std_linear.bias.data.uniform_(-init_w, init_w)

        self.action_range = action_range
        self.num_actions = num_actions

        
    def forward(self, state):
        x = F.relu(self.linear1(state))
        x = F.relu(self.linear2(x))
        x = F.relu(self.linear3(x))
        x = F.relu(self.linear4(x))

        mean    = (self.mean_linear(x))
        log_std = self.log_std_linear(x)
        log_std = torch.clamp(log_std, self.log_std_min, self.log_std_max)
        
        return mean, log_std
    

evaluate函数

用来评估策略网络在给定状态下的动作和对数概率。

def evaluate(self, state, epsilon=1e-6):
        '''
        generate sampled action with state as input wrt the policy network;
        '''
        mean, log_std = self.forward(state)
        std = log_std.exp() # no clip in evaluation, clip affects gradients flow
        
        normal = Normal(0, 1)
        z      = normal.sample() 
        action_0 = torch.tanh(mean + std*z.to(device)) # TanhNormal distribution as actions; reparameterization trick
        action = self.action_range*action_0
        log_prob = Normal(mean, std).log_prob(mean+ std*z.to(device)) - torch.log(1. - action_0.pow(2) + epsilon) -  np.log(self.action_range)
        # both dims of normal.log_prob and -log(1-a**2) are (N,dim_of_action); 
        # the Normal.log_prob outputs the same dim of input features instead of 1 dim probability, 
        # needs sum up across the features dim to get 1 dim prob; or else use Multivariate Normal.
        log_prob = log_prob.sum(dim=1, keepdim=True)
        return action, log_prob, z, mean, log_std

选择动作函数

def get_action(self, state, deterministic):
    #将输入的状态s转为pytorch张量
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
    #调用策略网络的前向传播方法,输出动作均值和对数标准差
        mean, log_std = self.forward(state)
        std = log_std.exp() #将对数标准差转化为标准差
         
        normal = Normal(0, 1)  #创建一个标准正态分布
        z      = normal.sample().to(device)#从正态分布中采样一个随即值
        action = self.action_range* torch.tanh(mean + std*z) #计算采样动作,缩放动作
        
        action = self.action_range* torch.tanh(mean).detach().cpu().numpy()[0] if deterministic else action.detach().cpu().numpy()[0]
        return action

随机采取动作函数

从均匀分布中随机采样一个动作

def sample_action(self,):
        a=torch.FloatTensor(self.num_actions).uniform_(-1, 1)
        return self.action_range*a.numpy()

SAC训练函数

核心函数,用SAC方法来训练智能体

def __init__(self, replay_buffer, hidden_dim, action_range):
        self.replay_buffer = replay_buffer #经验回放缓冲区
        #softq网络
        self.soft_q_net1 = SoftQNetwork(state_dim, action_dim, hidden_dim).to(device)
        self.soft_q_net2 = SoftQNetwork(state_dim, action_dim, hidden_dim).to(device)
        #目标Q网络
        self.target_soft_q_net1 = SoftQNetwork(state_dim, action_dim, hidden_dim).to(device)
        self.target_soft_q_net2 = SoftQNetwork(state_dim, action_dim, hidden_dim).to(device)
        #策略网络
        self.policy_net = PolicyNetwork(state_dim, action_dim, hidden_dim, action_range).to(device)
        #熵系数,用于自动调整熵正则化的强度
        self.log_alpha = torch.zeros(1, dtype=torch.float32, requires_grad=True, device=device)
        print('Soft Q Network (1,2): ', self.soft_q_net1)
        print('Policy Network: ', self.policy_net)
        
        #初始化目标网络的参数
        for target_param, param in zip(self.target_soft_q_net1.parameters(), self.soft_q_net1.parameters()):
            target_param.data.copy_(param.data)
        for target_param, param in zip(self.target_soft_q_net2.parameters(), self.soft_q_net2.parameters()):
            target_param.data.copy_(param.data)
        
        #损失函数
        self.soft_q_criterion1 = nn.MSELoss()
        self.soft_q_criterion2 = nn.MSELoss()

        #学习率
        soft_q_lr = 3e-4
        policy_lr = 3e-4
        alpha_lr  = 3e-4
        #优化器
        self.soft_q_optimizer1 = optim.Adam(self.soft_q_net1.parameters(), lr=soft_q_lr)
        self.soft_q_optimizer2 = optim.Adam(self.soft_q_net2.parameters(), lr=soft_q_lr)
        self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=policy_lr)
        self.alpha_optimizer = optim.Adam([self.log_alpha], lr=alpha_lr)

更新函数

def update(self, batch_size, reward_scale=10., auto_entropy=True, use_demons=False, target_entropy=-2, gamma=0.99,soft_tau=1e-2):
        #获取状态动作奖励,下一个动作,是否完成
        state, action, reward, next_state, done = self.replay_buffer.sample(batch_size)
        
        if use_demons==True:   #从文件中加载预先收集的演示数据。,并于当前数据合并
            data_file=open('./demons_data/demon_data.pickle', "rb")
            demons_data = pickle.load(data_file)
            state_, action_, reward_, next_state_, done_=map(np.stack, zip(*demons_data))
            state = np.concatenate((state, state_), axis=0)
            action = np.concatenate((action, action_), axis=0)
            reward = np.concatenate((reward, reward_), axis=0)
            next_state = np.concatenate((next_state, next_state_), axis=0)
            done = np.concatenate((done, done_), axis=0)

        #将数据转换为PyTorch张量并移动到设备
        state      = torch.FloatTensor(state).to(device)
        next_state = torch.FloatTensor(next_state).to(device)
        action     = torch.FloatTensor(action).to(device)
        reward     = torch.FloatTensor(reward).unsqueeze(1).to(device)  # reward is single value, unsqueeze() to add one dim to be [reward] at the sample dim;
        done       = torch.FloatTensor(np.float32(done)).unsqueeze(1).to(device)

        #计算Q网络的预测值
        predicted_q_value1 = self.soft_q_net1(state, action)
        predicted_q_value2 = self.soft_q_net2(state, action)
        
        #评估策略网络
        new_action, log_prob, z, mean, log_std = self.policy_net.evaluate(state)
        new_next_action, next_log_prob, _, _, _ = self.policy_net.evaluate(next_state)
        #奖励归一化
        reward = reward_scale * (reward - reward.mean(dim=0)) / (reward.std(dim=0) + 1e-6) # normalize with batch mean and std; plus a small number to prevent numerical problem
    # 更新熵系数
        # alpha = 0.0  # trade-off between exploration (max entropy) and exploitation (max Q) 
        if auto_entropy is True:
            alpha_loss = -(self.log_alpha * (log_prob + target_entropy).detach()).mean()
            self.alpha_optimizer.zero_grad()
            alpha_loss.backward()
            self.alpha_optimizer.step()
            self.alpha = self.log_alpha.exp()
        else:
            self.alpha = 1.
            alpha_loss = 0
  • 演示数据通常由专家策略生成,用于引导智能体的学习。

  • 合并数据

    • 将演示数据与当前的经验数据合并,形成更大的数据集。

    • 合并后的数据可以用于训练智能体,从而提高学习效率。

熵系数alpha的作用:

alpha用于控制探索和利用之间的权衡。具体来说alpha时熵正则化项的系数,它决定了策略的随机性在优化目标中的重要性。

SAC的目标函数中引入了熵正则化项:

H(π(⋅∣st​)) 是策略 π 在状态 st​ 下的熵。

alpha是熵正则化系数。

  • 当 α较大时,算法更倾向于探索(高熵策略)。

  • 当 α较小时,算法更倾向于利用(低熵策略)。

Q函数的训练

通过训练Q函数,智能体可以学习到哪些动作在特定状态下更有价值。

# Training Q Function
        #计算目标q值
        #选择两个目标Q值中较小值,以减少过估计问题
        target_q_min = torch.min(self.target_soft_q_net1(next_state, new_next_action),self.target_soft_q_net2(next_state, new_next_action)) - self.alpha * next_log_prob
        target_q_value = reward + (1 - done) * gamma * target_q_min # if done==1, only reward
        #计算Q网络损失,目标 Q 值,使用 .detach() 断开计算图,避免梯度传播到目标网络。
        q_value_loss1 = self.soft_q_criterion1(predicted_q_value1, target_q_value.detach())  # detach: no gradients for the variable
        q_value_loss2 = self.soft_q_criterion2(predicted_q_value2, target_q_value.detach())


        #参数更新,通过反向传播算法,计算损失函数对模型参数的梯度。
        self.soft_q_optimizer1.zero_grad()#清空优化器的梯度缓存
        q_value_loss1.backward()      #计算梯度
        self.soft_q_optimizer1.step() #更新参数
        self.soft_q_optimizer2.zero_grad()
        q_value_loss2.backward()
        self.soft_q_optimizer2.step() 

目标q值:它表示在当前状态 ss 下采取动作 aa 后,智能体未来可能获得的累积回报的期望值。

训练策略网络

 # Training Policy Function
        #选择Q值中小的那个,以减少过估计
        predicted_new_q_value = torch.min(self.soft_q_net1(state, new_action),self.soft_q_net2(state, new_action))
        policy_loss = (self.alpha * log_prob - predicted_new_q_value).mean()

        self.policy_optimizer.zero_grad()
        policy_loss.backward()
        self.policy_optimizer.step()

软更新价值网络

# Soft update the target value net
        for target_param, param in zip(self.target_soft_q_net1.parameters(), self.soft_q_net1.parameters()):
            target_param.data.copy_(  # copy data value into target parameters
                target_param.data * (1.0 - soft_tau) + param.data * soft_tau
            )
        for target_param, param in zip(self.target_soft_q_net2.parameters(), self.soft_q_net2.parameters()):
            target_param.data.copy_(  # copy data value into target parameters
                #核心公式
                target_param.data * (1.0 - soft_tau) + param.data * soft_tau
            )
        return predicted_new_q_value.mean()

SAC中使用两个独立的Q网络来减少过估计问题,并为每个Q网络维护一个对应的目标网

  • 两个 Q 网络可以互相监督,避免单个 Q 网络的偏差影响整体训练。

保存,加载模型函数

def save_model(self, path):
        torch.save(self.soft_q_net1.state_dict(), path+'_q1')  # have to specify different path name here!
        torch.save(self.soft_q_net2.state_dict(), path+'_q2')
        torch.save(self.policy_net.state_dict(), path+'_policy')

    def load_model(self, path):
        #将加载的参数加载到模型中
        self.soft_q_net1.load_state_dict(torch.load(path+'_q1'))
        self.soft_q_net2.load_state_dict(torch.load(path+'_q2'))
        self.policy_net.load_state_dict(torch.load(path+'_policy'))

        self.soft_q_net1.eval()
        self.soft_q_net2.eval()
        self.policy_net.eval()#将模型设置为评估模式,用于推理或测试

用于采样数据和训练模型的worker函数

通过多进程与其他worker并行执行

def worker(id, sac_trainer, rewards_queue, replay_buffer, max_episodes, max_steps, batch_size, explore_steps, \
            update_itr, AUTO_ENTROPY, DETERMINISTIC, USE_DEMONS, hidden_dim, model_path, headless):

    print(sac_trainer, replay_buffer)  # sac_tainer are not the same, but all networks and optimizers in it are the same; replay  buffer is the same one.
    env = GraspEnv(headless=headless)  #自定义的环境类,用于模拟抓取任务

    action_dim = env.action_space.shape[0]
    state_dim  = env.observation_space.shape[0]

    frame_idx=0

    for eps in range(max_episodes):
        
        episode_reward = 0
        state =  env.reset()
    #每间隔20个episode重新初始化环境,避免环境中的问题
        if eps%20==0 and eps>0:
            env.reinit()   
        
        for step in range(max_steps):
            #如果当前部署超过探索步数,则使用策略网络生成动作
            if frame_idx > explore_steps:
                action = sac_trainer.policy_net.get_action(state, deterministic = DETERMINISTIC)
            else:
                action = sac_trainer.policy_net.sample_action()
    
            try:
                next_state, reward, done, _ = env.step(action) 
            except KeyboardInterrupt:
                print('Finished')
                sac_trainer.save_model(model_path)
            #将经验数据存入回放缓冲区
            replay_buffer.push(state, action, reward, next_state, done)
            
            #更新当前状态
            state = next_state
            episode_reward += reward
            frame_idx += 1 #更新总步数
              
            # 如果缓冲区的数据量大小大于批量大小,则开始更新模型
            if replay_buffer.get_length() > batch_size:
                for i in range(update_itr):
                    #更新SAC算法的参数
                    _=sac_trainer.update(batch_size, reward_scale=10., auto_entropy=AUTO_ENTROPY, use_demons=USE_DEMONS, target_entropy=-1.*action_dim)
            #每间隔10个episode保存一次模型
            if eps % 10 == 0 and eps>0:
                sac_trainer.save_model(model_path)
            
            if done:
                break
        print('Episode: ', eps, '| Episode Reward: ', episode_reward)
        rewards_queue.put(episode_reward)

    sac_trainer.save_model(model_path)
    env.shutdown()
            

多进程环境中共享 Adam 优化器的参数函数

在多进程训练中,多个进程可能同时访问和更新优化器的状态(如动量项和平方梯度项)。为了确保这些状态在进程之间保持一致,需要将它们共享到内存中

def ShareParameters(adamoptim):
    ''' share parameters of Adamoptimizers for multiprocessing '''
    for group in adamoptim.param_groups:
        for p in group['params']:
            state = adamoptim.state[p] #获取优化器的状态
            # 优化器的步数
            state['step'] = 0
            state['exp_avg'] = torch.zeros_like(p.data) #动量项
            state['exp_avg_sq'] = torch.zeros_like(p.data)#票房梯度项

            # 将张量共享到内存中,以便多进程可以访问和修改同一份数据。
            state['exp_avg'].share_memory_()
            state['exp_avg_sq'].share_memory_()

绘制曲线并保存为图像的函数

def plot(rewards):
    clear_output(True)
    # plt.figure(figsize=(20,5))
    plt.plot(rewards)
    plt.savefig('sac_multi.png')
    # plt.show()
    plt.clf()

主函数

if __name__ == '__main__':

    replay_buffer_size = 1e6  #设置回访缓冲区大小
    BaseManager.register('ReplayBuffer', ReplayBuffer)#注册回放缓冲区类,使其可以通过管理器创建共享对象。
    manager = BaseManager()   #创建管理器对象,用于管理共享对象
    manager.start()           #启动管理器,使其可以创建和共享对象
    replay_buffer = manager.ReplayBuffer(replay_buffer_size)  #创建共享的回放缓冲区

定义超参数

 # hyper-parameters for RL training, no need for sharing across processes
    max_episodes  = 500000
    max_steps   = 30 
    explore_steps = 0  
    batch_size=128
    update_itr = 1    #每次采样后更新模型的迭代次数
    AUTO_ENTROPY=True #是否自动调整熵系数
    DETERMINISTIC=False #是否使用确定性策略
    USE_DEMONS = False  #是否使用演示数据
    hidden_dim = 512   #神经网络的隐藏层
    model_path = './model/sac_multi'
    num_workers=6  
    headless = True    #是否以无头模式运行环境

创建实例用于训练SAC算法

    sac_trainer=SAC_Trainer(replay_buffer, hidden_dim=hidden_dim, action_range=action_range )
if args.train:
        #sac_trainer.load_model(model_path)  # 选择加载预训练模型
            
        #共享全局参数
        sac_trainer.soft_q_net1.share_memory()
        sac_trainer.soft_q_net2.share_memory()
        sac_trainer.target_soft_q_net1.share_memory()
        sac_trainer.target_soft_q_net2.share_memory()
        sac_trainer.policy_net.share_memory()
        sac_trainer.log_alpha.share_memory_()
        #共享优化器状态
        ShareParameters(sac_trainer.soft_q_optimizer1)
        ShareParameters(sac_trainer.soft_q_optimizer2)
        ShareParameters(sac_trainer.policy_optimizer)
        ShareParameters(sac_trainer.alpha_optimizer)
        
        rewards_queue=mp.Queue()  # 多进程队列,用于存储每个进程的奖励值,主进程可以从队列中获取奖励值并绘制训练曲线
        processes=[]
        rewards=[]

        for i in range(num_workers):
            process = Process(target=worker, args=(i, sac_trainer, rewards_queue, replay_buffer, max_episodes, max_steps, \
            batch_size, explore_steps, update_itr, AUTO_ENTROPY, DETERMINISTIC, USE_DEMONS, hidden_dim, model_path, headless))  
            process.daemon=True   #将进程设置为守护进程,当主进程结束时,所有子进程也会自动结束
            processes.append(process) #将进程对象添加到列表中,方便后续管理

        [p.start() for p in processes #启动所有工作进程
        while True:    #收集奖励
            r = rewards_queue.get()
            if r is not None:
                rewards.append(r)
            else:
                break

            if len(rewards)%20==0 and len(rewards)>0:
                # plot(rewards)
                np.save('reward_log', rewards)


        [p.join() for p in processes]  #等待所有进程结束
        sac_trainer.save_model(model_path)

测试模式

if args.test:
        env = GraspEnv(headless=False, control_mode='joint_velocity') # for visualizing in test
        #加载预训练模型
        trained_model_path1 = './model/trained_model/augmented_dense_reward/sac_multi'  # pre-trained model with augmented dense reward
        trained_model_path2 = './model/trained_model/dense_reward/sac_multi'  # pre-trained model with dense reward
        sac_trainer.load_model(model_path)  # new model after training
        
        for eps in range(30):
            state =  env.reset()
            episode_reward = 0

            for step in range(max_steps):
            #使用策略网络生成动作
                action = sac_trainer.policy_net.get_action(state, deterministic = DETERMINISTIC)
                next_state, reward, done, _ = env.step(action)  
                episode_reward += reward
                state=next_state

            print('Episode: ', eps, '| Episode Reward: ', episode_reward)

        env.shutdown()

以上就是今天的SAC代码分析

SAC的核心关键我认为是

1.引入了熵奖励

2.使用两种独立的神经网络来分别表示A和C,并且这个C(价值网络)由两个Q函数组成,减少了估计偏差

3.有离线策略,可以从经验回放池中采样训练数据,使得学习更稳定。

4.熵系数能够动态调整。

标签:25,log,dim,sac,self,state,learn,action,soft
From: https://blog.csdn.net/yyyy2711/article/details/144981357

相关文章

  • 笔记 2025.1.6:计数问题选讲-徐哲晨
    目录P4463[集训队互测2012]calc(拉插优化dp)P4484[BJWC2018]最长上升子序列(状压dp)ARC138E-DecreasingSubsequence(构造双射)P5400[CTS2019]随机立方体(二项式反演)AGC064D-RedandBlueChips(构造充要条件)CF1942G.BessieandCards(反射容斥)CF1874F.JellyfishandOEIS(容......
  • 2025年毕设ssm图书管理系统论文+源码
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容选题背景随着信息技术的迅猛发展,图书馆作为知识传播与存储的重要场所,其管理方式正逐步向自动化、智能化转型。关于图书管理系统的研究,现有研究主要以大型图书......
  • 2025年毕设ssm图书管理系统论文+源码
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容选题背景关于图书管理系统的研究,现有研究主要以传统图书馆的管理模式和大型图书馆的数字化建设为主,专门针对中小规模图书馆(如学校图书馆、社区图书馆)的图书管......
  • 2025毕设python租车换电管理系统的设计与实现程序+论文
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于租车换电管理系统的研究,现有研究主要集中在租车或新能源汽车换电的单独领域,专门针对两者结合的租车换电管理系统的研究较少。在国......
  • 最近在LLM领域大放异彩的强化学习,给医学图像处理带来的启发|个人观点·25-01-08
    小罗碎碎念应用于医学图像的模型,往往会落后于纯计算机视觉领域的模型,但是现在这个差距正在急剧缩小。昨晚睡觉前刷到了这么一篇推送,介绍了目前最新的一个国产AI开源项目——用更少的钱和资源办成了更大的事,大致看了一下,这个模型使用的方法是强化学习,而不是传统的知识蒸......
  • 国自然青年基金|联合病理图像和转录组数据预测结肠癌预后新方法研究|基金申请·25-01-05
    小罗碎碎念今天和大家分享的这个项目属于国自然青年基金,直接费用24.5万,与22年12月结题。本项目旨在通过联合病理图像和转录组数据来预测结肠癌的预后,以提高预测精度。病理检查是肿瘤诊断的金标准,但其蕴含的与癌症预后相关的丰富信息尚未被充分挖掘。同时,随着测序技术的......
  • SQL Server数据库备份、差异备份、日志备份脚本.250108
    1,sp脚本USE[master]GO/******Object:StoredProcedure[dbo].[sp_BackupDatabase]ScriptDate:2025/1/810:43:05******/SETANSI_NULLSONGOSETQUOTED_IDENTIFIERONGO--Author:Amadeus--Createdate:2021-10-20execsp_BackupDatabaseL--Des......
  • 2025毕设springboot Script全栈视频播放系统论文+源码
    系统程序文件列表开题报告内容研究背景随着互联网的飞速发展,在线视频已经成为人们日常生活中不可或缺的一部分。从教育学习到娱乐休闲,视频内容以其直观、生动、易于理解的特点,吸引了大量用户。然而,当前市场上的视频播放系统虽然众多,但在用户体验、内容管理以及系统架构的灵......
  • 2025毕设springboot ON-FITy论文+源码
    系统程序文件列表开题报告内容研究背景在当今社会,随着生活节奏的加快和工作压力的增大,越来越多的人开始关注自身健康问题,尤其是健身已成为许多人日常生活的重要组成部分。然而,传统的健身方式往往缺乏个性化和互动性,难以满足广大健身爱好者的多元化需求。ON-FITy项目正是在......
  • 2025毕设springboot MBCTS溯源系统论文+源码
    系统程序文件列表开题报告内容研究背景在当今快速发展的食品与农产品行业中,确保产品的安全与品质已成为消费者最为关心的问题之一。随着供应链的不断延长和复杂化,从原材料种植到最终消费品送达消费者手中的每一个环节都可能成为食品安全风险的潜在来源。为了有效应对这一问......