这个难度有些大,有两个policy,一个负责更新策略,另一个负责提供数据,实际这两个policy是一个东西,用policy1跑出一组数据给新的policy2训练,然后policy2跑数据给新的policy3训练,,,,直到policy(N-1)跑数据给新的policyN训练,过程感觉和DQN比较像,但是模型是actor critic 架构,on-policy转换成off-policy,使用剪切策略来限制策略的更新幅度,off-policy的好处是策略更新快,PPO的优化目标是最大化策略的期望回报,同时避免策略更新过大
import gym import torch import torch.nn as nn import torch.optim as optim import numpy as np import pygame import sys from collections import deque # 定义策略网络 class PolicyNetwork(nn.Module): def __init__(self): super(PolicyNetwork, self).__init__() self.fc = nn.Sequential( nn.Linear(4, 2), nn.Tanh(), nn.Linear(2, 2), # CartPole的动作空间为2 nn.Softmax(dim=-1) ) def forward(self, x): return self.fc(x) # 定义值网络 class ValueNetwork(nn.Module): def __init__(self): super(ValueNetwork, self).__init__() self.fc = nn.Sequential( nn.Linear(4, 2), nn.Tanh(), nn.Linear(2, 1) ) def forward(self, x): return self.fc(x) # 经验回放缓冲区 class RolloutBuffer: def __init__(self): self.states = [] self.actions = [] self.rewards = [] self.dones = [] self.log_probs = [] def store(self, state, action, reward, done, log_prob): self.states.append(state) self.actions.append(action) self.rewards.append(reward) self.dones.append(done) self.log_probs.append(log_prob) def clear(self): self.states = [] self.actions = [] self.rewards = [] self.dones = [] self.log_probs = [] def get_batch(self): return ( torch.tensor(self.states, dtype=torch.float), torch.tensor(self.actions, dtype=torch.long), torch.tensor(self.rewards, dtype=torch.float), torch.tensor(self.dones, dtype=torch.bool), torch.tensor(self.log_probs, dtype=torch.float) ) # PPO更新函数 def ppo_update(policy_net, value_net, optimizer_policy, optimizer_value, buffer, epochs=10, gamma=0.99, clip_param=0.2): states, actions, rewards, dones, old_log_probs = buffer.get_batch() returns = [] advantages = [] G = 0 adv = 0 dones = dones.to(torch.int) # print(dones) for reward, done, value in zip(reversed(rewards), reversed(dones), reversed(value_net(states))): if done: G = 0 adv = 0 G = reward + gamma * G #蒙特卡洛回溯G值 delta = reward + gamma * value.item() * (1 - done) - value.item() #TD差分 # adv = delta + gamma * 0.95 * adv * (1 - done) # adv = delta + adv*(1-done) returns.insert(0, G) advantages.insert(0, adv) returns = torch.tensor(returns, dtype=torch.float) #价值 advantages = torch.tensor(advantages, dtype=torch.float) advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8) #add baseline for _ in range(epochs): action_probs = policy_net(states) dist = torch.distributions.Categorical(action_probs) new_log_probs = dist.log_prob(actions) ratio = (new_log_probs - old_log_probs).exp() surr1 = ratio * advantages surr2 = torch.clamp(ratio, 1.0 - clip_param, 1.0 + clip_param) * advantages actor_loss = -torch.min(surr1, surr2).mean() optimizer_policy.zero_grad() actor_loss.backward() optimizer_policy.step() value_loss = (returns - value_net(states)).pow(2).mean() optimizer_value.zero_grad() value_loss.backward() optimizer_value.step() # 初始化环境和模型 env = gym.make('CartPole-v1') policy_net = PolicyNetwork() value_net = ValueNetwork() optimizer_policy = optim.Adam(policy_net.parameters(), lr=3e-4) optimizer_value = optim.Adam(value_net.parameters(), lr=1e-3) buffer = RolloutBuffer() # Pygame初始化 pygame.init() screen = pygame.display.set_mode((600, 400)) clock = pygame.time.Clock() draw_on = False # 训练循环 state = env.reset() for episode in range(10000): # 训练轮次 done = False state = state[0] step= 0 while not done: step+=1 state_tensor = torch.FloatTensor(state).unsqueeze(0) action_probs = policy_net(state_tensor) dist = torch.distributions.Categorical(action_probs) action = dist.sample() log_prob = dist.log_prob(action) next_state, reward, done, _ ,_ = env.step(action.item()) buffer.store(state, action.item(), reward, done, log_prob) state = next_state # 实时显示 for event in pygame.event.get(): if event.type == pygame.QUIT: pygame.quit() sys.exit() if draw_on: # 清屏并重新绘制 screen.fill((0, 0, 0)) cart_x = int(state[0] * 100 + 300) # 位置转换为屏幕坐标 pygame.draw.rect(screen, (0, 128, 255), (cart_x, 300, 50, 30)) pygame.draw.line(screen, (255, 0, 0), (cart_x + 25, 300), (cart_x + 25 - int(50 * np.sin(state[2])), 300 - int(50 * np.cos(state[2]))), 5) pygame.display.flip() clock.tick(600) if step >10000: draw_on = True ppo_update(policy_net, value_net, optimizer_policy, optimizer_value, buffer) buffer.clear() state = env.reset() print(f'Episode {episode} completed {step}.') # 结束训练 env.close() pygame.quit()
运行效果
标签:cartpole,log,self,torch,PPO,value,state,policy,近端 From: https://www.cnblogs.com/LiuXinyu12378/p/18192344