本篇文章是博主强化学习RL领域学习时,用于个人学习、研究或者欣赏使用,并基于博主对相关等领域的一些理解而记录的学习摘录和笔记,若有不当和侵权之处,指出后将会立即改正,还望谅解。文章分类在强化学习专栏: 强化学习(8)---《【MADRL】多智能体近端策略优化(MAPPO)算法》
多智能体近端策略优化算法 MAPPO(Multi-Agent Proximal Policy Optimization)是PPO(Proximal Policy Optimization)在多智能体环境中的一种扩展,它通过在多智能体系统中引入PPO的策略优化机制,实现了在协作和竞争环境中更加高效的策略学习。MAPPO是一种基于策略梯度的多智能体强化学习算法,特别适用于混合协作和竞争的多智能体场景。
论文:The Surprising Effectiveness of PPO in Cooperative, Multi-Agent Games
PPO 是近年来最流行的强化学习算法之一,它通过引入裁剪的策略更新,解决了传统策略梯度方法(如TRPO)中策略更新步长过大导致训练不稳定的问题。在多智能体环境中,多个智能体同时学习策略,每个智能体的行为会影响其他智能体的决策,因此需要一个鲁棒且稳定的策略优化方法。MAPPO通过中心化的Critic和去中心化的Actor来实现多智能体的协同训练,并采用PPO的优势来提高多智能体环境下的学习效率和稳定性。
MAPPO继承了PPO的核心思想,并结合了多智能体系统的特点,采用了集中式训练,分布式执行的架构:

MAPPO算法主要分为两部分:策略更新和价值函数估计。我们将分别介绍这两部分的公式。
![( \mathbb{E}_{\pi} [R] )](https://www.zhujile.com/file/imges/otherimg/2025-12-17/5e288732909a837ac50afd1dac5e1a8f.png)
来更新策略。然而,直接使用策略梯度更新容易导致大的策略变化。因此,PPO引入了一个裁剪目标函数来限制每次更新的策略变化幅度。MAPPO也遵循相同的原则,但应用在每个智能体 ( i ) 的策略上。 PPO的目标函数为:
![[ L^{CLIP}(\theta) = \mathbb{E}_t \left[ \min \left( r_t(\theta) \hat{A}_t, \text{clip}(r_t(\theta), 1 - \epsilon, 1 + \epsilon) \hat{A}_t \right) \right] ]](https://www.zhujile.com/file/imges/otherimg/2025-12-17/c2660a174e0383fc5db96d4f2a9f72b0.png)
其中:

是当前策略与旧策略的比率;

是优势函数的估计值,用于衡量动作

在状态

下的优势;

是裁剪的阈值,用于控制策略更新的幅度。
MAPPO中的每个智能体

采用类似的目标函数进行策略更新,但每个智能体的策略仅依赖于自己的观测

,即:
![[ L^{CLIP}_i(\theta_i) = \mathbb{E}_t \left[ \min \left( r_t(\theta_i) \hat{A}_t^i, \text{clip}(r_t(\theta_i), 1 - \epsilon, 1 + \epsilon) \hat{A}_t^i \right) \right] ]](https://www.zhujile.com/file/imges/otherimg/2025-12-17/367407000756368527a7f94128e6f40f.png)
其中

是智能体

的策略更新比率,

是智能体

的优势估计值。

的价值函数

由一个中心化的Critic网络估计。Critic网络使用全局状态

和所有智能体的动作

来估计全局的价值函数。MAPPO通过使用中心化Critic保证每个智能体在训练过程中可以考虑到其他智能体的策略,从而学到更有效的策略。 Critic的目标是最小化均方误差(MSE)损失函数:
![[ L(\phi_i) = \mathbb{E}{s_t, r_t, s{t+1}} \left[ \left( V_i(s_t; \phi_i) - R_t \right)^2 \right] ]](https://www.zhujile.com/file/imges/otherimg/2025-12-17/ecdf034bcdb07dd13fe2f429509be359.png)
其中,

是从当前时刻

到未来的累计回报,通常通过时间差分法(TD目标)进行估计:
![[ R_t = r_t + \gamma V_i(s_{t+1}; \phi'_i) ]](https://www.zhujile.com/file/imges/otherimg/2025-12-17/436d8af97d95d0cfa760cc536a404523.png)
其中,

是折扣因子,

是目标网络的参数。

用于衡量某个动作

相对于当前策略下平均动作的优劣程度。优势函数可以通过以下公式估计:
![[ \hat{A}t^i = \delta_t + (\gamma \lambda) \delta{t+1} + ... + (\gamma \lambda)^{T-t+1} \delta{T-1} ]](https://www.zhujile.com/file/imges/otherimg/2025-12-17/1fa4648d1dbc0e5f4e09f95bb104ecfc.png)
其中

是时间差分误差,定义为:
![[ \delta_t = r_t + \gamma V(s_{t+1}) - V(s_t) ]](https://www.zhujile.com/file/imges/otherimg/2025-12-17/17d0270bf04717f9c8b2f24fb98ee6ec.png)
这里

是GAE(Generalized Advantage Estimation)中的权重参数,用于平衡偏差和方差。

和Critic网络

,并初始化对应的目标网络。

和优势函数

。
![[ L^{CLIP}_i(\theta_i) = \mathbb{E}_t \left[ \min \left( r_t(\theta_i) \hat{A}_t^i, \text{clip}(r_t(\theta_i), 1 - \epsilon, 1 + \epsilon) \hat{A}_t^i \right) \right] ]](https://www.zhujile.com/file/imges/otherimg/2025-12-17/367407000756368527a7f94128e6f40f.png)
![[ L(\phi_i) = \mathbb{E}{s_t, r_t, s{t+1}} \left[ \left( V_i(s_t; \phi_i) - R_t \right)^2 \right] ]](https://www.zhujile.com/file/imges/otherimg/2025-12-17/ecdf034bcdb07dd13fe2f429509be359.png)
![[ \hat{A}t^i = \sum{l=0}^{T-t} (\gamma \lambda)^l \delta_{t+l} ]](https://www.zhujile.com/file/imges/otherimg/2025-12-17/7a140b2dfe6b3af27ed55244feb2332a.png)
MAPPO是对PPO算法的多智能体扩展,采用了中心化的Critic和去中心化的Actor结构,能够在多智能体环境中提供稳定、高效的策略优化。通过PPO的裁剪更新机制,MAPPO在策略更新过程中保持了良好的收敛性和鲁棒性,是当前研究和应用中广泛使用的算法之一。
若是下面代码复现困难或者有问题,欢迎评论区留言;需要以整个项目形式的代码,请在评论区留下您的邮箱,以便于及时分享给您(私信难以及时回复)。
主文件:MAPPO_MPE_main
import torch
import numpy as np
from torch.utils.tensorboard import SummaryWriter
import argparse
from normalization import Normalization, RewardScaling
from replay_buffer import ReplayBuffer
from mappo_mpe import MAPPO_MPE
from environment import Env
class Runner_MAPPO_MPE:
def __init__(self, args, env_name, number, seed):
self.args = args
self.env_name = env_name
self.number = number
self.seed = seed
# Set random seed
np.random.seed(self.seed)
torch.manual_seed(self.seed)
# Create env
self.env = Env(env_name, discrete=True) # Discrete action space
self.args.N = self.env.n # The number of agents
self.args.obs_dim_n = [self.env.observation_space[i].shape[0] for i in range(self.args.N)] # obs dimensions of N agents
self.args.action_dim_n = [self.env.action_space[i].n for i in range(self.args.N)] # actions dimensions of N agents
# Only for homogenous agents environments like Spread in MPE,all agents have the same dimension of observation space and action space
self.args.obs_dim = self.args.obs_dim_n[0] # The dimensions of an agent's observation space
self.args.action_dim = self.args.action_dim_n[0] # The dimensions of an agent's action space
self.args.state_dim = np.sum(self.args.obs_dim_n) # The dimensions of global state space(Sum of the dimensions of the local observation space of all agents)
print("observation_space=", self.env.observation_space)
print("obs_dim_n={}".format(self.args.obs_dim_n))
print("action_space=", self.env.action_space)
print("action_dim_n={}".format(self.args.action_dim_n))
# Create N agents
self.agent_n = MAPPO_MPE(self.args)
self.replay_buffer = ReplayBuffer(self.args)
# Create a tensorboard
self.writer = SummaryWriter(log_dir='runs/MAPPO/MAPPO_env_{}_number_{}_seed_{}'.format(self.env_name, self.number, self.seed))
self.evaluate_rewards = [] # Record the rewards during the evaluating
self.total_steps = 0
if self.args.use_reward_norm:
print("------use reward norm------")
self.reward_norm = Normalization(shape=self.args.N)
elif self.args.use_reward_scaling:
print("------use reward scaling------")
self.reward_scaling = RewardScaling(shape=self.args.N, gamma=self.args.gamma)
def run(self, ):
evaluate_num = -1 # Record the number of evaluations
while self.total_steps < self.args.max_train_steps:
if self.total_steps // self.args.evaluate_freq > evaluate_num:
self.evaluate_policy() # Evaluate the policy every 'evaluate_freq' steps
evaluate_num += 1
_, episode_steps = self.run_episode_mpe(evaluate=False) # Run an episode
self.total_steps += episode_steps
if self.replay_buffer.episode_num == self.args.batch_size:
self.agent_n.train(self.replay_buffer, self.total_steps) # Training
self.replay_buffer.reset_buffer()
self.evaluate_policy()
self.env.close()
def evaluate_policy(self, ):
evaluate_reward = 0
for _ in range(self.args.evaluate_times):
episode_reward, _ = self.run_episode_mpe(evaluate=True)
evaluate_reward += episode_reward
evaluate_reward = evaluate_reward / self.args.evaluate_times
self.evaluate_rewards.append(evaluate_reward)
print("total_steps:{} \t evaluate_reward:{}".format(self.total_steps, evaluate_reward))
self.writer.add_scalar('evaluate_step_rewards_{}'.format(self.env_name), evaluate_reward, global_step=self.total_steps)
# Save the rewards and models
np.save('./data_train/MAPPO_env_{}_number_{}_seed_{}.npy'.format(self.env_name, self.number, self.seed), np.array(self.evaluate_rewards))
self.agent_n.save_model(self.env_name, self.number, self.seed, self.total_steps)
def run_episode_mpe(self, evaluate=False):
episode_reward = 0
obs_n = self.env.reset()
if self.args.use_reward_scaling:
self.reward_scaling.reset()
if self.args.use_rnn: # If use RNN, before the beginning of each episode,reset the rnn_hidden of the Q network.
self.agent_n.actor.rnn_hidden = None
self.agent_n.critic.rnn_hidden = None
for episode_step in range(self.args.episode_limit):
a_n, a_logprob_n = self.agent_n.choose_action(obs_n, evaluate=evaluate) # Get actions and the corresponding log probabilities of N agents
s = np.array(obs_n).flatten() # In MPE, global state is the concatenation of all agents' local obs.
v_n = self.agent_n.get_value(s) # Get the state values (V(s)) of N agents
obs_next_n, r_n, done_n, _ = self.env.step(a_n)
episode_reward += r_n[0]
if not evaluate:
if self.args.use_reward_norm:
r_n = self.reward_norm(r_n)
elif args.use_reward_scaling:
r_n = self.reward_scaling(r_n)
# Store the transition
self.replay_buffer.store_transition(episode_step, obs_n, s, v_n, a_n, a_logprob_n, r_n, done_n)
obs_n = obs_next_n
if all(done_n):
break
if not evaluate:
# An episode is over, store v_n in the last step
s = np.array(obs_n).flatten()
v_n = self.agent_n.get_value(s)
self.replay_buffer.store_last_value(episode_step + 1, v_n)
return episode_reward, episode_step + 1
if __name__ == '__main__':
parser = argparse.ArgumentParser("Hyperparameters Setting for MAPPO in MPE environment")
parser.add_argument("--max_train_steps", type=int, default=int(3e6), help=" Maximum number of training steps")
parser.add_argument("--episode_limit", type=int, default=25, help="Maximum number of steps per episode")
parser.add_argument("--evaluate_freq", type=float, default=5000, help="Evaluate the policy every 'evaluate_freq' steps")
parser.add_argument("--evaluate_times", type=float, default=3, help="Evaluate times")
parser.add_argument("--batch_size", type=int, default=32, help="Batch size (the number of episodes)")
parser.add_argument("--mini_batch_size", type=int, default=8, help="Minibatch size (the number of episodes)")
parser.add_argument("--rnn_hidden_dim", type=int, default=64, help="The number of neurons in hidden layers of the rnn")
parser.add_argument("--mlp_hidden_dim", type=int, default=64, help="The number of neurons in hidden layers of the mlp")
parser.add_argument("--lr", type=float, default=5e-4, help="Learning rate")
parser.add_argument("--gamma", type=float, default=0.99, help="Discount factor")
parser.add_argument("--lamda", type=float, default=0.95, help="GAE parameter")
parser.add_argument("--epsilon", type=float, default=0.2, help="GAE parameter")
parser.add_argument("--K_epochs", type=int, default=15, help="GAE parameter")
parser.add_argument("--use_adv_norm", type=bool, default=True, help="Trick 1:advantage normalization")
parser.add_argument("--use_reward_norm", type=bool, default=True, help="Trick 3:reward normalization")
parser.add_argument("--use_reward_scaling", type=bool, default=False, help="Trick 4:reward scaling. Here, we do not use it.")
parser.add_argument("--entropy_coef", type=float, default=0.01, help="Trick 5: policy entropy")
parser.add_argument("--use_lr_decay", type=bool, default=True, help="Trick 6:learning rate Decay")
parser.add_argument("--use_grad_clip", type=bool, default=True, help="Trick 7: Gradient clip")
parser.add_argument("--use_orthogonal_init", type=bool, default=True, help="Trick 8: orthogonal initialization")
parser.add_argument("--set_adam_eps", type=float, default=True, help="Trick 9: set Adam epsilon=1e-5")
parser.add_argument("--use_relu", type=float, default=False, help="Whether to use relu, if False, we will use tanh")
parser.add_argument("--use_rnn", type=bool, default=False, help="Whether to use RNN")
parser.add_argument("--add_agent_id", type=float, default=False, help="Whether to add agent_id. Here, we do not use it.")
parser.add_argument("--use_value_clip", type=float, default=False, help="Whether to use value clip.")
args = parser.parse_args()
runner = Runner_MAPPO_MPE(args, env_name="simple_spread", number=1, seed=0)
runner.run()移植事项:
1.注意环境参数的设置格式
2.注意环境的返回值利用
3.注意主运行流程的runner.run()的相关设置,等
可借鉴:【MADRL】基于MADRL的单调价值函数分解(QMIX)算法 中关于 QMIX算法移植的注意事项和代码注释。
文章若有不当和不正确之处,还望理解与指出。由于部分文字、图片等来源于互联网,无法核实真实出处,如涉及相关争议,请联系博主删除。如有错误、疑问和侵权,欢迎评论留言联系作者