参考:白话PPO训练
成功截图
算法组件
四大部分
同A2C相比,PPO算法额外引入了一个old_actor_model.
在PPO的训练中,首先使用old_actor_model与环境进行交互得到经验,然后利用一批经验优化actor_model,最后再将actor_model的参数复制回old_actor_model
超参数
同A2C相比,PPO_clip多了两个参数: 单批数据更新次数和截断阈值
- times_per_update: 在收集到的一批数据上,进行多少次梯度更新。
clip_param(ε)
: PPO裁剪目标函数中的阈值,通常取 0.1 或 0.2
训练过程
整体训练框架同A2C, 差别在于使用old_policy采集经验,然后优化new_policy,最后复制回old_policy.
PPO为了高效利用经验数据,在一批经验上进行多次数据更新。
目标函数
1. critic的目标函数同A2C
2. actor的目标函数为PPO_clip
完整代码
import torch
import torch.nn as nn
from torch.nn import functional as F
import gymnasium as gym
import tqdm
from torch.distributions import Categorical
from typing import Tuple
import copy
class PolicyNetwork(nn.Module):
def __init__(self, n_observations: int, n_actions: int):
super(PolicyNetwork, self).__init__()
self.layer1 = nn.Linear(n_observations, 32)
self.layer2 = nn.Linear(32, 16)
self.layer3 = nn.Linear(16, n_actions)
def forward(self, x: torch.Tensor) -> Categorical:
x = F.relu(self.layer1(x))
x = F.relu(self.layer2(x))
action_logits = self.layer3(x)
return Categorical(logits=action_logits)
class PPO_clip:
def __init__(self, env, total_episodes):
#############超参数#############
self.actor_lr = 0.01
self.critic_lr = 0.01
self.batch_size = 64
self.times_per_update = 5 # 多次更新参数
self.clip_param = 0.2 # 比率截断参数,一般取0.2或0.1
self.entropy_coeff = 0.01
self.value_loss_coeff = 0.5
self.gae_lambda = 0.95
self.discount_rate = 0.9
self.total_episodes = total_episodes
#############PPO_clip的核心要件#############
self.replay_buffer = []
self.actor_model = PolicyNetwork(16, 4)
self.old_actor_model = copy.deepcopy(self.actor_model)
self.critic_model = nn.Sequential( # 不需要像 actor model那么复杂
nn.Linear(16, 16), nn.ReLU(),
nn.Linear(16, 1)
)
############优化组件#############
self.actor_optimizer = torch.optim.Adam(self.actor_model.parameters(), lr=self.actor_lr)
self.critic_optimizer = torch.optim.Adam(self.critic_model.parameters(), lr=self.critic_lr)
self.env = env
self.count = 0
self.success = 0
def train(self):
bar = tqdm.tqdm(range(self.total_episodes), desc=f"episode {0} {self.success / (self.count+1e-8)}")
for i in bar:
state, info = self.env.reset()
done = False
truncated = False
# 收集经验 old_policy (fixed)
while not done or truncated:
action = self.choose_action(state)
new_state, r, done, truncated, info = self.env.step(action)
self.append_data(state, action, r, new_state, done)
state = new_state
if done or truncated:
self.count+=1
if new_state == 15:
self.success+=1
# 优化模型 new_policy (updated)
if len(self.replay_buffer) == self.batch_size:
self.optimize_model()
self.replay_buffer.clear()
# 复制new_policy到old_policy
self.old_actor_model.load_state_dict(self.actor_model.state_dict())
if i % 100 == 0:
self.count = 0
self.success = 0
bar.set_description(f"episode {i} {self.success / (self.count+1e-8)}")
def choose_action(self, state):
with torch.no_grad():
policy_dist = self.old_actor_model(self.state_to_input(state))
action_tensor = policy_dist.sample()
action = action_tensor.item()
return action
def optimize_model(self):
state = torch.stack([self.state_to_input(tup[0]) for tup in self.replay_buffer[-self.batch_size:]])
action = torch.IntTensor([tup[1] for tup in self.replay_buffer[-self.batch_size:]])
reward = torch.FloatTensor([tup[2] for tup in self.replay_buffer[-self.batch_size:]])
new_state = torch.stack([self.state_to_input(tup[3]) for tup in self.replay_buffer[-self.batch_size:]])
done = torch.FloatTensor([tup[4] for tup in self.replay_buffer[-self.batch_size:]])
# 以上state和new_state是二维的, 其他是一维的,即batch维
with torch.no_grad():
value = self.critic_model(state).squeeze()
last_value = self.critic_model(new_state[:-1]).squeeze()
next_value = torch.cat((value[1:], last_value))
# 相比一次TD误差, GAE效果显著之好
advantages, returns_to_go = self.compute_gae_and_returns(
reward, value, next_value, done,
self.discount_rate, self.gae_lambda
)
# 一份batch上的数据多次更新
for _ in range(self.times_per_update):
# 更新actor
policy_dist = self.actor_model(state)
old_policy_dist = self.old_actor_model(state)
new_log_prob = policy_dist.log_prob(action)
old_log_prob = old_policy_dist.log_prob(action).detach() # old 不要梯度
r = torch.exp(new_log_prob - old_log_prob) # 计算比率用exp(ln(a)-ln(b)) 就是 a/b
new_div_old_rate = r
actor_fn = -(torch.min(new_div_old_rate*advantages, torch.clamp(new_div_old_rate, 1-self.clip_param, 1+self.clip_param)*advantages) + self.entropy_coeff * policy_dist.entropy())
self.actor_optimizer.zero_grad()
actor_fn.mean().backward(retain_graph=True) # .mean() torch要求梯度得标量函数
self.actor_optimizer.step()
# 更新critic
v = self.critic_model(state).squeeze()
critic_fn = F.mse_loss(v, returns_to_go)
self.critic_optimizer.zero_grad()
(self.value_loss_coeff * critic_fn).backward()
self.critic_optimizer.step()
def compute_gae_and_returns(self,
rewards: torch.Tensor,
values: torch.Tensor,
next_values: torch.Tensor,
dones: torch.Tensor,
discount_rate: float,
lambda_gae: float,
) -> Tuple[torch.Tensor, torch.Tensor]:
advantages = torch.zeros_like(rewards)
last_advantage = 0.0
n_steps = len(rewards)
# 计算GAE
for t in reversed(range(n_steps)):
mask = 1.0 - dones[t]
delta = rewards[t] + discount_rate * next_values[t] * mask - values[t]
advantages[t] = delta + discount_rate * lambda_gae * last_advantage * mask
last_advantage = advantages[t]
# 返回给critic作为TD目标
returns_to_go = advantages + values
return advantages, returns_to_go
def append_data(self, state, action, r, new_state, done):
self.replay_buffer.append((state, action, r, new_state, done))
def state_to_input(self, state):
input_dim = 16
input = torch.zeros(input_dim, dtype=torch.float)
input[int(state)] = 1
return input
env = gym.make("FrozenLake-v1", is_slippery=False)
policy = PPO_clip(env, 2000)
policy.train()
env = gym.make("FrozenLake-v1", is_slippery=False, render_mode="human")
state, info = env.reset()
done = False
truncated = False
while True:
with torch.no_grad():
action=policy.choose_action(state)
new_state, reward, done, truncated, info = env.step(action)
state=new_state
if done or truncated:
state, info = env.reset()