手写PPO_clip(FrozenLake环境)

发布于:2025-07-27 ⋅ 阅读:(11) ⋅ 点赞:(0)

参考:白话PPO训练

成功截图

算法组件

四大部分

         同A2C相比,PPO算法额外引入了一个old_actor_model. 

        在PPO的训练中,首先使用old_actor_model与环境进行交互得到经验,然后利用一批经验优化actor_model,最后再将actor_model的参数复制回old_actor_model

超参数

        同A2C相比,PPO_clip多了两个参数: 单批数据更新次数和截断阈值

  • times_per_update: 在收集到的一批数据上,进行多少次梯度更新。
  • clip_param(ε) : PPO裁剪目标函数中的阈值,通常取 0.1 或 0.2

训练过程

        整体训练框架同A2C, 差别在于使用old_policy采集经验,然后优化new_policy,最后复制回old_policy.

        PPO为了高效利用经验数据,在一批经验上进行多次数据更新。

目标函数

 1. critic的目标函数同A2C

 2. actor的目标函数为PPO_clip

     

完整代码

import torch
import torch.nn as nn
from torch.nn import functional as F
import gymnasium as gym
import tqdm
from torch.distributions import Categorical
from typing import  Tuple
import copy

class PolicyNetwork(nn.Module):
    def __init__(self, n_observations: int, n_actions: int):
        super(PolicyNetwork, self).__init__()
        self.layer1 = nn.Linear(n_observations, 32)   
        self.layer2 = nn.Linear(32, 16)               
        self.layer3 = nn.Linear(16, n_actions)        
                                                      

    def forward(self, x: torch.Tensor) -> Categorical: 
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        action_logits = self.layer3(x)
        return Categorical(logits=action_logits)
    

class PPO_clip:
    def __init__(self, env, total_episodes):
        #############超参数#############
        self.actor_lr = 0.01
        self.critic_lr = 0.01
        self.batch_size = 64

        self.times_per_update = 5 # 多次更新参数
        self.clip_param = 0.2     # 比率截断参数,一般取0.2或0.1
        self.entropy_coeff = 0.01
        self.value_loss_coeff = 0.5
        
        
        self.gae_lambda = 0.95 
        self.discount_rate = 0.9 

        self.total_episodes = total_episodes
        
        #############PPO_clip的核心要件#############
        self.replay_buffer = []
        self.actor_model = PolicyNetwork(16, 4)
        self.old_actor_model = copy.deepcopy(self.actor_model)
        self.critic_model = nn.Sequential( # 不需要像 actor model那么复杂
            nn.Linear(16, 16), nn.ReLU(),
            nn.Linear(16, 1)
        )

        ############优化组件#############
        self.actor_optimizer = torch.optim.Adam(self.actor_model.parameters(), lr=self.actor_lr) 
        self.critic_optimizer = torch.optim.Adam(self.critic_model.parameters(), lr=self.critic_lr)

        self.env = env

        self.count = 0
        self.success = 0

    def train(self):
        bar = tqdm.tqdm(range(self.total_episodes), desc=f"episode {0} {self.success / (self.count+1e-8)}") 
        for i in bar:
            state, info = self.env.reset()
            done = False
            truncated = False
            # 收集经验 old_policy (fixed)
            while not done or truncated:
                action = self.choose_action(state)
                new_state, r, done, truncated, info = self.env.step(action) 
                
                self.append_data(state, action, r, new_state, done)
                state = new_state
                
                if done or truncated:
                    self.count+=1
                    if new_state == 15: 
                        self.success+=1
                
                # 优化模型 new_policy (updated)
                if len(self.replay_buffer) == self.batch_size:
                    self.optimize_model()
                    self.replay_buffer.clear()
                    # 复制new_policy到old_policy
                    self.old_actor_model.load_state_dict(self.actor_model.state_dict()) 
                    
            
            
            if i % 100 == 0:
                self.count = 0
                self.success = 0
            bar.set_description(f"episode {i} {self.success / (self.count+1e-8)}")

    def choose_action(self, state):
        with torch.no_grad():
            policy_dist = self.old_actor_model(self.state_to_input(state))
            action_tensor = policy_dist.sample()
            action = action_tensor.item()
        return action
    

    def optimize_model(self):
        state = torch.stack([self.state_to_input(tup[0]) for tup in self.replay_buffer[-self.batch_size:]])
        action = torch.IntTensor([tup[1] for tup in self.replay_buffer[-self.batch_size:]])
        reward = torch.FloatTensor([tup[2] for tup in self.replay_buffer[-self.batch_size:]])
        new_state = torch.stack([self.state_to_input(tup[3]) for tup in self.replay_buffer[-self.batch_size:]])
        done = torch.FloatTensor([tup[4] for tup in self.replay_buffer[-self.batch_size:]])
        # 以上state和new_state是二维的, 其他是一维的,即batch维

        with torch.no_grad():
            value = self.critic_model(state).squeeze()
            last_value = self.critic_model(new_state[:-1]).squeeze()
            next_value = torch.cat((value[1:], last_value))
            
            # 相比一次TD误差, GAE效果显著之好 
            advantages, returns_to_go = self.compute_gae_and_returns(
                reward, value, next_value, done, 
                self.discount_rate, self.gae_lambda
            )

        # 一份batch上的数据多次更新
        for _ in range(self.times_per_update):
            # 更新actor
            policy_dist = self.actor_model(state)
            old_policy_dist = self.old_actor_model(state) 
              
            new_log_prob = policy_dist.log_prob(action)
            old_log_prob = old_policy_dist.log_prob(action).detach() # old 不要梯度 
            r = torch.exp(new_log_prob - old_log_prob) # 计算比率用exp(ln(a)-ln(b)) 就是 a/b
           
            new_div_old_rate = r
            actor_fn = -(torch.min(new_div_old_rate*advantages, torch.clamp(new_div_old_rate, 1-self.clip_param, 1+self.clip_param)*advantages) + self.entropy_coeff * policy_dist.entropy()) 
            self.actor_optimizer.zero_grad()
            actor_fn.mean().backward(retain_graph=True) # .mean() torch要求梯度得标量函数
            self.actor_optimizer.step()

            # 更新critic
            v = self.critic_model(state).squeeze()
            critic_fn = F.mse_loss(v, returns_to_go)
            self.critic_optimizer.zero_grad()
            (self.value_loss_coeff * critic_fn).backward()
            self.critic_optimizer.step()

    def compute_gae_and_returns(self,
                            rewards: torch.Tensor, 
                            values: torch.Tensor, 
                            next_values: torch.Tensor, 
                            dones: torch.Tensor, 
                            discount_rate: float, 
                            lambda_gae: float, 
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        advantages = torch.zeros_like(rewards)
        last_advantage = 0.0
        n_steps = len(rewards)

        # 计算GAE
        for t in reversed(range(n_steps)):
            mask = 1.0 - dones[t]
            delta = rewards[t] + discount_rate * next_values[t] * mask - values[t] 
            advantages[t] = delta + discount_rate * lambda_gae * last_advantage * mask
            last_advantage = advantages[t]

        # 返回给critic作为TD目标  
        returns_to_go = advantages + values 
        return advantages, returns_to_go



    def append_data(self, state, action, r, new_state, done):
        self.replay_buffer.append((state, action, r, new_state, done))

    def state_to_input(self, state):
        input_dim = 16
        input = torch.zeros(input_dim, dtype=torch.float)
        input[int(state)] = 1
        return input
    


env = gym.make("FrozenLake-v1", is_slippery=False)
policy = PPO_clip(env, 2000)
policy.train()

env = gym.make("FrozenLake-v1", is_slippery=False, render_mode="human")
state, info = env.reset()
done = False
truncated = False
while True:
    with torch.no_grad():
        action=policy.choose_action(state) 
    new_state, reward, done, truncated, info = env.step(action)
    state=new_state
    if done or truncated:
        state, info = env.reset()


网站公告

今日签到

点亮在社区的每一天
去签到