首页 > 编程语言 >【MADRL】多智能体近端策略优化(MAPPO)算法

【MADRL】多智能体近端策略优化(MAPPO)算法

时间:2024-09-11 10:21:48浏览次数:10  
标签:MAPPO evaluate 体近 args add MADRL reward self

        本篇文章是博主强化学习RL领域学习时,用于个人学习、研究或者欣赏使用,并基于博主对相关等领域的一些理解而记录的学习摘录和笔记,若有不当和侵权之处,指出后将会立即改正,还望谅解。文章分类在强化学习专栏:

       强化学习(8)---《【MADRL】多智能体近端策略优化(MAPPO)算法》

【MADRL】多智能体近端策略优化(MAPPO)算法

目录

0.前言

1.背景与动机

2.算法结构

3.具体公式

4.算法流程

5.公式总结

6.优势与应用场景

7.结论

 [Python] MAPPO实现(可移植)


0.前言

       多智能体近端策略优化算法 MAPPO(Multi-Agent Proximal Policy Optimization)是PPO(Proximal Policy Optimization)在多智能体环境中的一种扩展,它通过在多智能体系统中引入PPO的策略优化机制,实现了在协作和竞争环境中更加高效的策略学习。MAPPO是一种基于策略梯度的多智能体强化学习算法,特别适用于混合协作和竞争的多智能体场景。

论文:The Surprising Effectiveness of PPO in Cooperative, Multi-Agent Games

代码:MADRL多智能体近端策略优化(MAPPO)算法
 


1.背景与动机

        PPO 是近年来最流行的强化学习算法之一,它通过引入裁剪的策略更新,解决了传统策略梯度方法(如TRPO)中策略更新步长过大导致训练不稳定的问题。在多智能体环境中,多个智能体同时学习策略,每个智能体的行为会影响其他智能体的决策,因此需要一个鲁棒且稳定的策略优化方法。MAPPO通过中心化的Critic和去中心化的Actor来实现多智能体的协同训练,并采用PPO的优势来提高多智能体环境下的学习效率和稳定性。


2.算法结构

        MAPPO继承了PPO的核心思想,并结合了多智能体系统的特点,采用了中心化训练,去中心化执行的架构:

  • 中心化训练:在训练阶段,所有智能体的Critic网络都能够访问全局的状态和其他智能体的动作信息,以便于学习到更准确的价值函数。
  • 去中心化执行:在执行阶段,每个智能体只使用自己观测到的局部状态和策略进行动作选择,保证了系统的分布式控制。


3.具体公式

        MAPPO算法主要分为两部分:策略更新价值函数估计。我们将分别介绍这两部分的公式。

  1. 策略更新

            在PPO中,策略更新的目标是通过最大化策略的期望回报 ( \mathbb{E}_{\pi} [R] ) 来更新策略。然而,直接使用策略梯度更新容易导致大的策略变化。因此,PPO引入了一个裁剪目标函数来限制每次更新的策略变化幅度。MAPPO也遵循相同的原则,但应用在每个智能体 ( i ) 的策略上。

    PPO的目标函数为:
    [ L^{CLIP}(\theta) = \mathbb{E}_t \left[ \min \left( r_t(\theta) \hat{A}_t, \text{clip}(r_t(\theta), 1 - \epsilon, 1 + \epsilon) \hat{A}_t \right) \right] ]
    其中:

    • ( r_t(\theta) = \frac{\pi_{\theta}(a_t | s_t)}{\pi_{\theta_{\text{old}}}(a_t | s_t)} )是当前策略与旧策略的比率;
    • ( \hat{A}_t )是优势函数的估计值,用于衡量动作 ( a_t )在状态 ( s_t ) 下的优势;
    • ( \epsilon )是裁剪的阈值,用于控制策略更新的幅度。

            MAPPO中的每个智能体( i ) 采用类似的目标函数进行策略更新,但每个智能体的策略仅依赖于自己的观测( o_i ),即:
    [ L^{CLIP}_i(\theta_i) = \mathbb{E}_t \left[ \min \left( r_t(\theta_i) \hat{A}_t^i, \text{clip}(r_t(\theta_i), 1 - \epsilon, 1 + \epsilon) \hat{A}_t^i \right) \right] ]
    其中 ( r_t(\theta_i) ) 是智能体( i )的策略更新比率,( \hat{A}_t^i )是智能体( i ) 的优势估计值。

  2. 价值函数估计

    每个智能体( i ) 的价值函数( V_i(s) )由一个中心化的Critic网络估计。Critic网络使用全局状态( s )和所有智能体的动作 ( a_1, a_2, ..., a_N )来估计全局的价值函数。MAPPO通过使用中心化Critic保证每个智能体在训练过程中可以考虑到其他智能体的策略,从而学到更有效的策略。

    Critic的目标是最小化均方误差(MSE)损失函数:[ L(\phi_i) = \mathbb{E}{s_t, r_t, s{t+1}} \left[ \left( V_i(s_t; \phi_i) - R_t \right)^2 \right] ]
    其中,( R_t )是从当前时刻 ( t )到未来的累计回报,通常通过时间差分法(TD目标)进行估计:[ R_t = r_t + \gamma V_i(s_{t+1}; \phi'_i) ]
    其中,( \gamma )是折扣因子,( \phi'_i )是目标网络的参数。

  3. 优势函数的计算

    优势函数( \hat{A}t^i ) 用于衡量某个动作( a_t ) 相对于当前策略下平均动作的优劣程度。优势函数可以通过以下公式估计:[ \hat{A}t^i = \delta_t + (\gamma \lambda) \delta{t+1} + ... + (\gamma \lambda)^{T-t+1} \delta{T-1} ]
    其中( \delta_t ) 是时间差分误差,定义为: [ \delta_t = r_t + \gamma V(s_{t+1}) - V(s_t) ]
    这里( \lambda )是GAE(Generalized Advantage Estimation)中的权重参数,用于平衡偏差和方差。


4.算法流程

  1. 初始化:为每个智能体初始化策略网络( \pi_i ) 和Critic网络 ( V_i ),并初始化对应的目标网络。

  2. 交互与经验收集:每个智能体根据当前策略与环境交互,并存储每个时间步的状态、动作、奖励、下一状态、价值等信息。

  3. 计算回报和优势:通过时间差分法计算每个智能体的回报 ( R_t )和优势函数( \hat{A}_t^i )

  4. 更新Critic网络:根据Critic损失函数更新每个智能体的Critic网络参数,最小化均方误差。

  5. 更新Actor网络:根据裁剪的PPO目标函数,使用策略梯度法更新每个智能体的策略网络参数。

  6. 软更新目标网络:使用软更新机制逐步更新目标网络的参数。

  7. 重复:循环进行以上步骤,直到智能体策略达到收敛。


5.公式总结

  • 策略更新目标[ L^{CLIP}_i(\theta_i) = \mathbb{E}_t \left[ \min \left( r_t(\theta_i) \hat{A}_t^i, \text{clip}(r_t(\theta_i), 1 - \epsilon, 1 + \epsilon) \hat{A}_t^i \right) \right] ]
  • Critic网络损失函数[ L(\phi_i) = \mathbb{E}{s_t, r_t, s{t+1}} \left[ \left( V_i(s_t; \phi_i) - R_t \right)^2 \right] ]
  • 优势函数估计[ \hat{A}t^i = \sum{l=0}^{T-t} (\gamma \lambda)^l \delta_{t+l} ]

6.优势与应用场景

  • 鲁棒性与稳定性:PPO算法引入的裁剪更新机制使策略梯度更新更加稳定,避免了更新幅度过大的问题。MAPPO继承了这一优势,能够在多智能体环境中提供更稳定的策略更新。

  • 中心化训练与去中心化执行:通过中心化的Critic结构,智能体可以利用全局信息进行策略训练,而去中心化的Actor使得智能体在执行过程中只依赖局部观测,提高了算法的灵活性和扩展性。

  • 适应复杂的多智能体环境:MAPPO可以处理多智能体环境中的协作、竞争或混合型任务,非常适用于复杂的多智能体系统,如机器人集群、自动驾驶车队、多人游戏等。


7.结论

        MAPPO是对PPO算法的多智能体扩展,采用了中心化的Critic和去中心化的Actor结构,能够在多智能体环境中提供稳定、高效的策略优化。通过PPO的裁剪更新机制,MAPPO在策略更新过程中保持了良好的收敛性和鲁棒性,是当前研究和应用中广泛使用的算法之一。


 [Python] MAPPO实现(可移植)

        若是下面代码复现困难或者有问题,欢迎评论区留言;需要以整个项目形式的代码,请在评论区留下您的邮箱,以便于及时分享给您(私信难以及时回复)。

主文件:MAPPO_MPE_main

import torch
import numpy as np
from torch.utils.tensorboard import SummaryWriter
import argparse
from normalization import Normalization, RewardScaling
from replay_buffer import ReplayBuffer
from mappo_mpe import MAPPO_MPE
from environment import Env


class Runner_MAPPO_MPE:
    def __init__(self, args, env_name, number, seed):
        self.args = args
        self.env_name = env_name
        self.number = number
        self.seed = seed
        # Set random seed
        np.random.seed(self.seed)
        torch.manual_seed(self.seed)
        # Create env
        self.env = Env(env_name, discrete=True) # Discrete action space
        self.args.N = self.env.n  # The number of agents
        self.args.obs_dim_n = [self.env.observation_space[i].shape[0] for i in range(self.args.N)]  # obs dimensions of N agents
        self.args.action_dim_n = [self.env.action_space[i].n for i in range(self.args.N)]  # actions dimensions of N agents
        # Only for homogenous agents environments like Spread in MPE,all agents have the same dimension of observation space and action space
        self.args.obs_dim = self.args.obs_dim_n[0]  # The dimensions of an agent's observation space
        self.args.action_dim = self.args.action_dim_n[0]  # The dimensions of an agent's action space
        self.args.state_dim = np.sum(self.args.obs_dim_n)  # The dimensions of global state space(Sum of the dimensions of the local observation space of all agents)
        print("observation_space=", self.env.observation_space)
        print("obs_dim_n={}".format(self.args.obs_dim_n))
        print("action_space=", self.env.action_space)
        print("action_dim_n={}".format(self.args.action_dim_n))

        # Create N agents
        self.agent_n = MAPPO_MPE(self.args)
        self.replay_buffer = ReplayBuffer(self.args)

        # Create a tensorboard
        self.writer = SummaryWriter(log_dir='runs/MAPPO/MAPPO_env_{}_number_{}_seed_{}'.format(self.env_name, self.number, self.seed))

        self.evaluate_rewards = []  # Record the rewards during the evaluating
        self.total_steps = 0
        if self.args.use_reward_norm:
            print("------use reward norm------")
            self.reward_norm = Normalization(shape=self.args.N)
        elif self.args.use_reward_scaling:
            print("------use reward scaling------")
            self.reward_scaling = RewardScaling(shape=self.args.N, gamma=self.args.gamma)

    def run(self, ):
        evaluate_num = -1  # Record the number of evaluations
        while self.total_steps < self.args.max_train_steps:
            if self.total_steps // self.args.evaluate_freq > evaluate_num:
                self.evaluate_policy()  # Evaluate the policy every 'evaluate_freq' steps
                evaluate_num += 1

            _, episode_steps = self.run_episode_mpe(evaluate=False)  # Run an episode
            self.total_steps += episode_steps

            if self.replay_buffer.episode_num == self.args.batch_size:
                self.agent_n.train(self.replay_buffer, self.total_steps)  # Training
                self.replay_buffer.reset_buffer()

        self.evaluate_policy()
        self.env.close()

    def evaluate_policy(self, ):
        evaluate_reward = 0
        for _ in range(self.args.evaluate_times):
            episode_reward, _ = self.run_episode_mpe(evaluate=True)
            evaluate_reward += episode_reward

        evaluate_reward = evaluate_reward / self.args.evaluate_times
        self.evaluate_rewards.append(evaluate_reward)
        print("total_steps:{} \t evaluate_reward:{}".format(self.total_steps, evaluate_reward))
        self.writer.add_scalar('evaluate_step_rewards_{}'.format(self.env_name), evaluate_reward, global_step=self.total_steps)
        # Save the rewards and models
        np.save('./data_train/MAPPO_env_{}_number_{}_seed_{}.npy'.format(self.env_name, self.number, self.seed), np.array(self.evaluate_rewards))
        self.agent_n.save_model(self.env_name, self.number, self.seed, self.total_steps)

    def run_episode_mpe(self, evaluate=False):
        episode_reward = 0
        obs_n = self.env.reset()
        if self.args.use_reward_scaling:
            self.reward_scaling.reset()
        if self.args.use_rnn:  # If use RNN, before the beginning of each episode,reset the rnn_hidden of the Q network.
            self.agent_n.actor.rnn_hidden = None
            self.agent_n.critic.rnn_hidden = None
        for episode_step in range(self.args.episode_limit):
            a_n, a_logprob_n = self.agent_n.choose_action(obs_n, evaluate=evaluate)  # Get actions and the corresponding log probabilities of N agents
            s = np.array(obs_n).flatten()  # In MPE, global state is the concatenation of all agents' local obs.
            v_n = self.agent_n.get_value(s)  # Get the state values (V(s)) of N agents
            obs_next_n, r_n, done_n, _ = self.env.step(a_n)
            episode_reward += r_n[0]

            if not evaluate:
                if self.args.use_reward_norm:
                    r_n = self.reward_norm(r_n)
                elif args.use_reward_scaling:
                    r_n = self.reward_scaling(r_n)

                # Store the transition
                self.replay_buffer.store_transition(episode_step, obs_n, s, v_n, a_n, a_logprob_n, r_n, done_n)

            obs_n = obs_next_n
            if all(done_n):
                break

        if not evaluate:
            # An episode is over, store v_n in the last step
            s = np.array(obs_n).flatten()
            v_n = self.agent_n.get_value(s)
            self.replay_buffer.store_last_value(episode_step + 1, v_n)

        return episode_reward, episode_step + 1


if __name__ == '__main__':
    parser = argparse.ArgumentParser("Hyperparameters Setting for MAPPO in MPE environment")
    parser.add_argument("--max_train_steps", type=int, default=int(3e6), help=" Maximum number of training steps")
    parser.add_argument("--episode_limit", type=int, default=25, help="Maximum number of steps per episode")
    parser.add_argument("--evaluate_freq", type=float, default=5000, help="Evaluate the policy every 'evaluate_freq' steps")
    parser.add_argument("--evaluate_times", type=float, default=3, help="Evaluate times")

    parser.add_argument("--batch_size", type=int, default=32, help="Batch size (the number of episodes)")
    parser.add_argument("--mini_batch_size", type=int, default=8, help="Minibatch size (the number of episodes)")
    parser.add_argument("--rnn_hidden_dim", type=int, default=64, help="The number of neurons in hidden layers of the rnn")
    parser.add_argument("--mlp_hidden_dim", type=int, default=64, help="The number of neurons in hidden layers of the mlp")
    parser.add_argument("--lr", type=float, default=5e-4, help="Learning rate")
    parser.add_argument("--gamma", type=float, default=0.99, help="Discount factor")
    parser.add_argument("--lamda", type=float, default=0.95, help="GAE parameter")
    parser.add_argument("--epsilon", type=float, default=0.2, help="GAE parameter")
    parser.add_argument("--K_epochs", type=int, default=15, help="GAE parameter")
    parser.add_argument("--use_adv_norm", type=bool, default=True, help="Trick 1:advantage normalization")
    parser.add_argument("--use_reward_norm", type=bool, default=True, help="Trick 3:reward normalization")
    parser.add_argument("--use_reward_scaling", type=bool, default=False, help="Trick 4:reward scaling. Here, we do not use it.")
    parser.add_argument("--entropy_coef", type=float, default=0.01, help="Trick 5: policy entropy")
    parser.add_argument("--use_lr_decay", type=bool, default=True, help="Trick 6:learning rate Decay")
    parser.add_argument("--use_grad_clip", type=bool, default=True, help="Trick 7: Gradient clip")
    parser.add_argument("--use_orthogonal_init", type=bool, default=True, help="Trick 8: orthogonal initialization")
    parser.add_argument("--set_adam_eps", type=float, default=True, help="Trick 9: set Adam epsilon=1e-5")
    parser.add_argument("--use_relu", type=float, default=False, help="Whether to use relu, if False, we will use tanh")
    parser.add_argument("--use_rnn", type=bool, default=False, help="Whether to use RNN")
    parser.add_argument("--add_agent_id", type=float, default=False, help="Whether to add agent_id. Here, we do not use it.")
    parser.add_argument("--use_value_clip", type=float, default=False, help="Whether to use value clip.")

    args = parser.parse_args()
    runner = Runner_MAPPO_MPE(args, env_name="simple_spread", number=1, seed=0)
    runner.run()

移植事项:

1.注意环境参数的设置格式

2.注意环境的返回值利用

3.注意主运行流程的runner.run()的相关设置,等

可借鉴:【MADRL】基于MADRL的单调价值函数分解(QMIX)算法​​​​​​ 中关于 QMIX算法移植的注意事项和代码注释。


     文章若有不当和不正确之处,还望理解与指出。由于部分文字、图片等来源于互联网,无法核实真实出处,如涉及相关争议,请联系博主删除。如有错误、疑问和侵权,欢迎评论留言联系作者,或者关注VX公众号:Rain21321,联系作者。

标签:MAPPO,evaluate,体近,args,add,MADRL,reward,self
From: https://blog.csdn.net/qq_51399582/article/details/142068153

相关文章

  • MATLAB代码:p2p 微电网 MADRL 多智体强化学习 关键词:p2p 微电网 MA
    MATLAB代码:p2p微电网MADRL多智体强化学习关键词:p2p微电网MADRL多智体强化学习IEEETSG参考文档:《Peer-to-PeerEnergyTradingandEnergyConversioninInterconnectedMulti-EnergyMicrogridsUsingMulti-AgentDeepReinforcementLearning》2021SCI一区IEEETrans......
  • MAPPO学习笔记(2) —— 从MAPPO论文入手
    在有了上一节一些有关PPO算法的概念作为基础后,我们就可以正式开始对于MAPPO这一算法的学习。那么,既然要学习一个算法,就不得不去阅读提出这一算法的论文。那么本篇博客将从......