首页 > 编程问答 >使用 DQN 实现 pong,使用 python 中的特征向量而不是像素。我的 DQNA 实现代码正确吗,因为我的模型训练得不好

使用 DQN 实现 pong,使用 python 中的特征向量而不是像素。我的 DQNA 实现代码正确吗,因为我的模型训练得不好

时间:2024-07-30 06:17:34浏览次数:14  
标签:python tensorflow reinforcement-learning dqn

我正在致力于使用 OpenAI 的 Gym 为 Pong 游戏实现强化学习 (RL) 环境。目标是训练人工智能代理通过控制球拍来打乒乓球。

代理收到太多负面奖励,即使它看起来移动正确。具体来说,奖励函数会惩罚远离球的智能体,但这种情况发生得太频繁,即使球朝球拍移动时似乎也会发生。

观察空间:观察空间包括物体的位置和速度。球和球拍、球和球拍之间的距离以及球拍速度。

动作空间:动作空间由三个离散动作组成:向上移动、向下移动和保持静止。

奖励功能:奖励函数奖励智能体击球,并惩罚它错过球或远离球。

如何调整奖励函数,以仅在球向球移动时远离球时惩罚智能体桨?或者我的游戏逻辑或 DQNA 实施还有其他问题吗?任何有关提高训练性能的建议将不胜感激。

以下是我的 DQNA 代码:

import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Input, Dense, Dropout
from tensorflow.keras.optimizers import Adam
import numpy as np
import random
from collections import deque

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=10000)  # Increased replay memory size
        self.gamma = 0.99    # Increased discount rate
        self.epsilon = 1.0  # Exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995  # Slower epsilon decay
        self.learning_rate = 0.001  # Slightly increased learning rate
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()

    def _build_model(self):
        model = Sequential()
        model.add(Input(shape=(self.state_size,)))  # Added Input layer to define the input shape
        model.add(Dense(48, activation='relu'))  # Increased neurons to 48
        model.add(Dropout(0.2))  # Dropout layer for regularization
        model.add(Dense(48, activation='relu'))  # Increased neurons to 48
        model.add(Dropout(0.2))  # Dropout layer for regularization
        model.add(Dense(48, activation='relu'))  # Increased neurons to 48
        model.add(Dropout(0.2))  # Dropout layer for regularization
        model.add(Dense(48, activation='relu'))  # Increased neurons to 48
        model.add(Dropout(0.2))  # Dropout layer for regularization
        model.add(Dense(48, activation='relu'))  # Increased neurons to 48
        model.add(Dropout(0.2))  # Dropout layer for regularization
        model.add(Dense(48, activation='relu'))  # Increased neurons to 48
        model.add(Dropout(0.2))  # Dropout layer for regularization
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.randint(self.action_size)
        else:
            act_values = self.model.predict(state)
            return np.argmax(act_values[0])

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return 0  # Not enough memory to sample
        minibatch = random.sample(self.memory, batch_size)
        total_loss = 0
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma *
                          np.amax(self.target_model.predict(next_state)[0]))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            history = self.model.fit(state, target_f, epochs=1, verbose=0)
            total_loss += history.history['loss'][0]
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        return total_loss / batch_size  # Return average loss

    def update_target_model(self):
        # Copy weights from model to target_model
        self.target_model.set_weights(self.model.get_weights())

    def save(self, filename):
        self.model.save(f"{filename}.keras")

    def load(self, filename):
        self.model = load_model(f"{filename}")
        self.target_model = self._build_model()
        self.update_target_model()

以下是我的 Pong 逻辑代码:

import gym
from gym import spaces
import numpy as np
import pygame
from pygame.locals import K_w, K_s

class PongEnv(gym.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self):
        super(PongEnv, self).__init__()

        self.width = 640
        self.height = 480
        self.ball_speed = 7
        self.paddle_speed = 12
        self.ball = pygame.Rect(self.width // 2 - 15, self.height // 2 - 15, 30, 30)
        self.player_paddle = pygame.Rect(self.width - 20, self.height // 2 - 70, 10, 140)
        self.ai_paddle = pygame.Rect(10, self.height // 2 - 70, 10, 140)
        self.ball_dx, self.ball_dy = self.ball_speed, self.ball_speed
        self.player_paddle_speed = 0  # Initialize player paddle speed

        self.action_space = spaces.Discrete(3)
        self.observation_space = spaces.Box(low=0, high=255, shape=(13,), dtype=np.float32)  # Updated shape

    def reset(self):
        self.ball.center = (self.width // 2, self.height // 2)
        self.player_paddle.centery = self.height // 2
        self.ai_paddle.centery = self.height // 2
        self.ball_dx, self.ball_dy = self.ball_speed, self.ball_speed
        self.player_paddle_speed = 0  # Reset player paddle speed
        return self.get_state()

    def get_state(self):
        distance_ball_player = self.ball.centery - self.player_paddle.centery
        distance_ball_ai = self.ball.centery - self.ai_paddle.centery
        state = [
            self.player_paddle.centery,
            self.ai_paddle.centery,
            self.ball.centerx,
            self.ball.centery,
            self.ball_dx,
            self.ball_dy,  # Adding ball's vertical speed
            distance_ball_player,  # Adding distance between ball and player paddle
            distance_ball_ai,  # Adding distance between ball and AI paddle
            self.player_paddle.top,
            self.player_paddle.bottom,
            self.ai_paddle.top,
            self.ai_paddle.bottom,
            self.player_paddle_speed,  # Adding player paddle speed
        ]
        return np.array(state, dtype=np.float32)

    def step(self, action):
        reward = 0
        done = False

        previous_player_paddle_position = self.player_paddle.centery  # Track previous position

        # Move player paddle
        if action == 0:
            self.player_paddle.centery -= self.paddle_speed
        elif action == 1:
            self.player_paddle.centery += self.paddle_speed

        # Ensure paddle stays within the screen
        if self.player_paddle.top < 0:
            self.player_paddle.top = 0
        if self.player_paddle.bottom > self.height:
            self.player_paddle.bottom = self.height

        # Calculate player paddle speed
        self.player_paddle_speed = self.player_paddle.centery - previous_player_paddle_position

        # Move the ball
        self.ball.x += self.ball_dx
        self.ball.y += self.ball_dy

        # Ball collision with top or bottom
        if self.ball.top <= 0 or self.ball.bottom >= self.height:
            self.ball_dy *= -1

        # Ball collision with paddles
        if self.ball.colliderect(self.player_paddle):
            self.ball_dx *= -1
            reward += 5  # Reward for hitting the ball with the paddle
            print(f"Ball hit by player paddle. Reward: {reward}")
        elif self.ball.colliderect(self.ai_paddle):
            self.ball_dx *= -1

        # Check for out of bounds
        if self.ball.left <= 0:
            done = True
            reward += 10  # Reward for scoring
            print(f"Ball out of left bounds. Reward: {reward}")
        elif self.ball.right >= self.width:
            done = True
            reward -= 10  # Penalty for opponent scoring
            print(f"Ball out of right bounds. Penalty: {reward}")

        # AI paddle movement
        if self.ai_paddle.centery < self.ball.centery:
            self.ai_paddle.centery += self.paddle_speed
        if self.ai_paddle.centery > self.ball.centery:
            self.ai_paddle.centery -= self.paddle_speed

        # Ensure AI paddle stays within the screen
        if self.ai_paddle.top < 0:
            self.ai_paddle.top = 0
        if self.ai_paddle.bottom > self.height:
            self.ai_paddle.bottom = self.height

        # Reward for moving towards the ball and penalty for moving away
        if self.ball_dx > 0:  # Ball moving towards the player paddle
            if (action == 0 and self.player_paddle.centery > self.ball.centery) or (action == 1 and self.player_paddle.centery < self.ball.centery):
                reward += 0.5
                print(f"Player paddle moving towards ball. Reward: {reward}")
            else:
                reward -= 0.5  # Penalty for moving away from the ball
                print(f"Player paddle moving away from ball. Penalty: {reward}")

        next_state = self.get_state()
        return next_state, reward, done, {}

    def render(self, mode='human'):
        if mode == 'human':
            pygame.init()
            screen = pygame.display.set_mode((self.width, self.height))
            screen.fill((0, 0, 0))
            pygame.draw.rect(screen, (255, 255, 255), self.player_paddle)
            pygame.draw.rect(screen, (255, 255, 255), self.ai_paddle)
            pygame.draw.ellipse(screen, (255, 255, 255), self.ball)
            pygame.draw.aaline(screen, (255, 255, 255), (self.width // 2, 0), (self.width // 2, self.height))
            pygame.display.flip() 

你的代码有一些可以改进的地方,可以提高训练性能。以下是对代码和可能调整的详细分析:

1. 奖励函数

你关于奖励函数惩罚代理远离球的观察是正确的,即使它朝着球拍移动。这可能是由于奖励函数的设计造成的。目前,当球朝着玩家球拍移动时,它会奖励朝着球移动,并惩罚远离球的移动。但是,这种奖励策略可能并不总是有效,因为它没有考虑到球拍和球之间的距离。

让我们修改奖励函数,以更好地反映代理的行为:

# ... 在 PongEnv 类中的 step 函数中 ...

# 奖励移动到球附近并惩罚远离球
if self.ball_dx > 0:  # 球朝玩家球拍移动
    distance_before = abs(self.ball.centery - self.player_paddle.centery)
    distance_after = abs(self.ball.centery - (self.player_paddle.centery + self.player_paddle_speed))
    if distance_after < distance_before:
        reward += 0.5  # 奖励靠近球
        print(f"玩家球拍靠近球。奖励:{reward}")
    else:
        reward -= 0.5  # 惩罚远离球
        print(f"玩家球拍远离球。惩罚:{reward}")

在这个修改后的奖励函数中,我们只在球拍实际靠近球时才给予奖励,而在球拍远离球时给予惩罚,而不管球拍的移动方向如何。这有助于代理学习更好地定位自身以击球。

2. 状态表示

你的状态表示非常全面,包括球的位置和速度、球拍的位置和球拍的速度。但是,你还可以通过添加派生特征(例如球和球拍之间的相对距离)来尝试进一步增强它。这可以帮助代理更容易地理解游戏状态。

3. DQNAgent 超参数

  • 增加回放内存大小: 你的回放内存大小为 10000。你可以尝试将其增加到更大的值,例如 100000 或更多。更大的回放内存可以存储更多经验,并有助于打破经验之间的相关性,从而提高训练稳定性。

  • epsilon 衰减率: 你的 epsilon 衰减率为 0.995。这可能是一个相当快的衰减率。你可以尝试使用更慢的衰减率,例如 0.999 或 0.9995。这将使代理在更长的时间内探索更多,这可能有助于找到更好的策略。

  • 学习率: 你的学习率为 0.001。虽然这个学习率是一个合理的起点,但你可以尝试一些不同的值,例如 0.0001 或 0.00005。较低的学习率可能有助于提高训练稳定性,但可能需要更多的时间来收敛。

  • 目标网络更新频率: 你没有在代码中包含更新目标网络的频率。建议定期更新目标网络,例如每 1000 或 5000 步更新一次。这有助于稳定训练过程。

  • 批量大小: 你可以尝试不同的批量大小,例如 32 或 64。

4. 网络架构

你的网络架构合理。但是,你可以尝试一些不同的激活函数,例如 LeakyReLU 或 ELU,而不是 ReLU。此外,你可以尝试添加更多层或更改每层的单元数来查看是否有帮助。

def _build_model(self):
    model = Sequential()
    model.add(Input(shape=(self.state_size,)))
    model.add(Dense(64, activation='relu'))  # 调整单元数
    model.add(Dense(64, activation='relu'))
    model.add(Dense(self.action_size, activation='linear'))
    model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
    return model

5. 训练时间

训练 DQN 代理可能需要很长时间才能收敛。确保你已经训练了足够长的时间。

调试技巧

  • 打印奖励和 epsilon 值: 在训练期间,打印奖励和 epsilon 值,以监控代理的进度。这可以帮助你识别任何潜在的问题,例如奖励函数没有按预期工作。

  • 可视化代理: 定期渲染游戏以可视化代理的性能。这可以帮助你识别代理所犯的任何明显错误。

请记住,强化学习可能很棘手,通常需要大量的试验和错误才能获得良好的结果。尝试不同的超参数、奖励函数和网络架构,以找到最适合你的问题的组合。

标签:python,tensorflow,reinforcement-learning,dqn
From: 78809166

相关文章

  • Python CDLL 无法加载两次
    我正在尝试用python创建一个密码管理器,但遇到了一个问题,一旦加载了一种类型的dll,我就无法加载不同的dll,在这个示例中,我加载了一个dll,并尝试解密加密的密码数据,它工作正常,直到我加载另一个不同的nss3.dll文件,此时它给我一个错误:“过程入口点HeapAlloc无法位于动态链......
  • 你能将 HTTPS 功能添加到 python Flask Web 服务器吗?
    我正在尝试构建一个Web界面来模拟网络设备上的静态接口,该网络设备使用摘要式身份验证和HTTPS。我想出了如何将摘要式身份验证集成到Web服务器中,但我似乎无法找到如何使用FLASK获取https,如果您可以向我展示如何实现,请评论我需要使用下面的代码做什么来实现这一点。from......
  • Python:比较 csv 文件并打印相似之处
    我需要比较两个csv文件并打印出它们的相似之处。第一个文件有名称和浓度,第二个文件就像只有名称的“最佳”列表,我需要绘制相似性图表。例如,这就是我的列表的样子:file1-old_file.csvname_id,conc_test1,conc_test2name1,####,####name2,###......
  • Python 类交叉引用
    我用Python创建了一个数独游戏。我有一个:单元格类-“保存”数字可能性单元格组-保存单元格类实例我使用这些组在数独中运行行、列和正方形功能。每个单元格包含所有组,他属于classCell:def__init__(groups):self.groups=groupscla......
  • Tensorflow 除法计算结果为无穷大
    我是Tensorflow的新手,对于明显的错误深表歉意。我尝试进行一个评估为inf的除法。下面的行产生了问题:style_loss1_1=tf.div(tf.reduce_sum(tf.pow(tensor_gen_gram1_1-tensor_style_gram1_1,2)),tf.mul(tf.mul(4.0,2517630976.0),9.0))......
  • 如何修复我的 Python Azure Function DevOps Pipeline 上的“找到 1 个函数(自定义)加载
    我正在尝试使用AzureDevOps构建管道将PythonAzureFunction部署到Azure门户。由于某种原因,代码被部署到服务器,但我在尝试访问端点时收到404错误。我收到一个错误,显示1functionsfound(Custom)0functionsloaded,以及在服务器上显示ModuleNotFound......
  • 使用 kivy 从 python 脚本的 buildozer 构建 android apk 时出错
    我想从使用kivy包构建的Python脚本构建apk为此,我使用googlecollab.这里是main.py脚本:importyoutube_dlfromkivy.appimportAppfromkivy.uix.boxlayoutimportBoxLayoutfromkivy.uix.buttonimportButtonfromkivy.uix.tex......
  • 自动解码并检索 S/MIME 加密电子邮件的正文 (python)
    我如何:用python代码连接我的邮件收件箱以自动获取未读电子邮件的加密内容;解码S/MIME加密电子邮件(我有密钥);检索电子邮件正文纯文本;检查正文(或主题)是否与某个关键字(现在为“test”)匹配,并在匹配时打印一些内容;在树莓派上使用此代码,无需手动......
  • Python 3 写入 DBF(带有 Memo 的 dBase IV)
    我需要在Python3中写入带有备注字段的dBaseIVdbf文件,但找不到合适的模块来执行此操作。我尝试过的包:Simpledbf-只读dbf-不支持dBaseIVdbfpy-不支持Python3dbfpy3-不支持dBaseIVYDbf-不支持备注字段pyshp-无法仅使用dbf文件......
  • Robin-Stocks Python 中的 order_buy_fractional_by_price 问题
    我在Robin-StocksPython包中的order_buy_fractional_by_price函数中遇到问题。在正常市场交易时间内下达以美元为基础的买入订单时,该订单被错误地设置为限价订单。对我来说看起来有问题的代码似乎是导致此问题的原因。我尝试在包管理器中本地修改或删除有问题的代码,但遇......