多无人机--强化学习

发布于:2025-02-10 ⋅ 阅读:(34) ⋅ 点赞:(0)

这个是我对于我的大创项目的构思,随着时间逐渐更新

项目概要

我们的项目平台来自挑战杯揭绑挂帅的无人机对抗项目,但是在由于时间原因,并未考虑强化学习,所以现在通过大创项目来弥补遗憾

我们项目分为三部分,分为虚拟机,态势系统,和运行程序端(使用主机)

虚拟机内包含各种无人机信息,并封装为接口供windows端控制

态势系统主要是用来显示战场的情况,使得态势可视化

运行程序端编写程序进行无人机控制

启动顺序为

虚拟机-》态势系统-》运行程序端

项目学习基础

强化学习:

       学习马尔可夫决策决策过程(MDP)

       学习强化学习主要算法:

              值迭代法,策略梯度法 重点学习PPO和DDPG

如果对于强化学习公式的了解较少的可以观看b站上的课程

【强化学习的数学原理】课程:从零开始到透彻理解(完结)_哔哩哔哩_bilibili

由于这里我们目前所使用的公式原因,先学习

了解仿真平台

       对于各种API的研究(前期工作)

        理解无人机的各种参数

对于linux系统的了解(前期工作)

        学习一些基础操作,并对于其提供的虚拟机实现了解

对于强化学习接口搭建(完成Gym接口)封装Linux接口作为训练环境

首先利用PPO/DDPG训练单无人机基础移动(边界避障,上下限制)

进行侦察训练,导弹躲避训练

然后再加入对抗系统,使得无人机与敌机进行交互

首先是蓝方设计固定策略进行训练

然后红蓝方都进行强化学习训练

目前较为适合的最终算法(改进的MADDPG)

基础知识

Linux

一些基础linux命令总结为linux命令

如下

linux命令-CSDN博客

然后需要查看shell脚本

这里推荐黑马程序员的课程

02.shell入门(1)_哔哩哔哩_bilibili

强化学习

然后是强化学习的基础知识

马尔可夫决策

基本元素

  1. 状态集(State Space)
    记为 S,表示系统可能处于的所有状态的集合。例如,在一个迷宫环境中,每个格子可以看作一个状态;在资源分配问题中,状态可以是当前资源的使用量、剩余量等的组合。

  2. 动作集(Action Space)
    记为 A,表示在每个状态下可执行的所有动作。例如,在迷宫中可向上、下、左、右移动;在资源分配问题中可以为“给某个任务分配多少资源”等不同策略选项。

  3. 状态转移概率(Transition Probability)
    记为 P(s′∣s,a),表示当前处于状态 s,执行动作 a 之后,转移到下一状态 s′ 的概率。这也是“马尔可夫”性质的来源:转移只与当前状态和当前动作相关,而与之前的历史状态无关。

  4. 奖励函数(Reward Function)
    记为 R(s,a)或 R(s,a,s′),表示在状态 s 执行动作 a并转移到状态 s′时得到的即时回报。这个回报值可能是正的(奖励)或负的(惩罚)。

  5. 折扣因子(Discount Factor)
    记为 γ,取值范围通常在 [0,1] 之间。它用于平衡短期和长期收益的重要性:当 γ越接近 1 时,更注重长期回报;当 γ越小,越关注即时回报。

决策过程

  • 观察状态
    系统(或智能体)观察当前状态 s。

  • 选择动作
    根据一定的策略(policy)π\piπ,在状态 sss 下选择一个动作 aaa。策略 π\piπ 可以理解为一个函数或规则,用于指定在不同状态下执行哪一个动作。

  • 环境反馈

    • 状态转移:在环境中执行动作 aaa 后,系统会随机地转移到下一个状态 s′s's′(由转移概率 P(s′∣s,a)P(s' \mid s,a)P(s′∣s,a) 决定)。
    • 得到奖励:与此同时,系统给予执行该动作的即时回报 R(s,a)R(s,a)R(s,a) 或 R(s,a,s′)R(s, a, s')R(s,a,s′)。
  • 更新决策
    基于新的状态 s′s's′ 和获得的奖励,智能体可以对其策略 π\piπ 进行更新或继续保持不变,具体取决于使用的算法(例如价值迭代、策略迭代、Q 学习、深度强化学习等)。

  • 进入下一轮决策
    新的状态 s′s's′ 成为当前状态,系统重复上述过程,直到达到终止条件(如达到目标状态、达到最大交互步数、收敛到稳定策略等)

PPO

DDPG 

note:无人机飞行是连续的动作,使用 DDPG

聚焦连续动作空间,使用确定性策略和 Critic-Q 网络来估计动作价值,具备较高的数据利用效率,但也对训练稳定性和超参数选择有更高要求。

MADDPG

多无人机对战是多智能体和DDPG的结合

  • 集中式 Critic:在训练过程中,每个智能体的 Critic 都可以访问 全局信息,包括所有智能体的状态和动作。这使得 Critic 在更新时对环境动态和其他智能体决策有更全面的认识,缓解了环境非平稳问题。
  • 分散式 Actor:在执行阶段,每个智能体只基于自身的局部观测来进行决策,保持灵活性与可扩展性。

初步研究

动作设置

我们的无人机控制包含如下参数:

下面是一个示例:

        我们使用机动号操作无人机进行对战,一共执行五个状态,平飞,俯冲,平飞加减速,爬升,转弯

奖励函数设置

初步设计为分为多个阶段,进行分开训练,分为巡航,进攻,躲避,撤退四个策略,通过条件进行状态转移

开始设计初步的奖励和惩罚函数

巡航:

        奖励项:侦察到敌方无人机,侦察到敌方无人机时的高度差

        惩罚项:碰撞到边界

进攻:

        奖励项:导弹命中敌方无人机

        惩罚项:敌方无人机脱离我方锁定

躲避:

        奖励:躲避敌方导弹

撤退:

        奖励:??

        惩罚:被敌方无人机侦测

当前代码

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
import gym
from gym import spaces
import time
from CommunicationTool import *


# 大概分为三个步骤:读取无人机接口,搭建无人机环境,进行强化学习训练

# 下面定义无人机接口函数
def send_control_plane():
    print("执行无人机动作")


def read_state(env):
    print("读取无人机状态")


class Manyagent_plane_Env(gym.Env):
    def __init__(self, num_agents, state_dim, action_dim):  # 智能体数量,状态参数,动作维度(目前不确定,稍后需要修改)
        super(Manyagent_plane_Env, self).__init__()
        self.num_agents = num_agents
        self.state_dim = state_dim
        self.action_dim = action_dim
        # 设定禁飞区条件:高度低于 2000 米时视为禁飞区
        self.altitude_not_high = 2000.0
        self.altitude_not_low = 14500.0
        # 设置四个边的限制
        self.dong = 0
        self.xi = 0
        self.nan = 0
        self.bei = 0

        # 为每个无人机定义观测和动作空间(这里仅用于接口说明,实际训练时用 MADDPG 管理)
        self.observation_space = spaces.Tuple(
            [spaces.Box(low=-np.inf, high=np.inf, shape=(state_dim,), dtype=np.float32) for _ in range(num_agents)])
        self.action_space = spaces.Tuple(
            [spaces.Box(low=-1, high=1, shape=(action_dim,), dtype=np.float32) for _ in range(num_agents)])

        # 初始化每个无人机状态:状态为 [x, y, z, vx, vy, vz]
        # x, y 随机在 [0,10] 内;z(高度)随机在 [1900, 2100] 内;初始速度均设为0
        self.drone_states = []
        '''这里需要写每个无人机的初始化条件'''

        '''---------------------------------------------------'''

    def step(self):
        rewards = []
        next_states = []
        dones = []
        # 执行无人机动作(此处需要选择动作)
        send_control_plane()

        # 读入执行动作后状态
        state_i = read_state(self)
        next_states.append(state_i)

        '''设计奖励函数yu惩罚函数'''

        """--------------------"""
        """结束条件"""
        """上面写结束条件 1.超过最大步数,2.飞机均被摧毁"""
        dones.append()  # 判断本回合是否结束
        info = {}
        return next_states, rewards, dones, info

    def reset(self):
        print("状态重置")


"""下面开始定义MADDPG的组件"""

# 3.1 Actor 网络:将单个无人机状态映射到连续动作
"""下面的隐藏层可以修改"""
"""这里需要对forward进行修改,需要修改取值范围,这里为示例代码"""


class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, action_bounds, hidden_dim=64):
        super(Actor, self).__init__()
        self.action_bounds = torch.tensor(action_bounds, dtype=torch.float32)
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, action_dim)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        raw_output = torch.tanh(self.fc3(x))
        lower_bound = self.action_bounds[:, 0]  # shape: [action_dim]
        upper_bound = self.action_bounds[:, 1]  # shape: [action_dim]
        # 利用广播机制进行缩放
        action = lower_bound + (upper_bound - lower_bound) * ((raw_output + 1) / 2)
        return action


# 3.2 Critic 网络:输入所有无人机的状态和动作拼接后的向量,输出 Q 值
class Critic(nn.Module):
    def __init__(self, total_state_dim, total_action_dim, hidden_dim=64):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(total_state_dim + total_action_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)

    def forward(self, states, actions):
        x = torch.cat([states, actions], dim=1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        Q = self.fc3(x)
        return Q


# 3.3 经验回放缓冲区
class ReplayBuffer:
    def __init__(self, capacity):
        # 这里定义缓冲区大小,为双端队列,当数据大于缓冲区大小的时候,去掉最开始的元素
        self.buffer = deque(maxlen=capacity)

    # 向缓冲区添加新的经验样本。
    def push(self, *transition):
        self.buffer.append(transition)

    # 从缓冲区中随机抽取一批经验数据,通常用于训练时构造 mini-batch。
    '''batch_size为抽取样本数'''

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        return map(np.array, zip(*batch))

    def __len__(self):
        return len(self.buffer)


# 单个无人机(智能体)的 MADDPG 结构
class MADDPGAgent:
    def __init__(self, state_dim, action_dim, total_state_dim, total_action_dim, action_bounds, lr_actor=1e-3,
                 lr_critic=1e-3, gamma=0.95, tau=0.01):
        self.gamma = gamma  # 折扣因子
        self.tau = tau  # 软更新系数

        # Actor 网络接收智能体的状态(维度为 state_dim)作为输入,并输出一个动作向量(维度为 action_dim)。
        self.actor = Actor(state_dim, action_dim, action_bounds)
        self.actor_target = Actor(state_dim, action_dim, action_bounds)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr_actor)

        # 实例化 Critic 网络,该网络输入所有智能体拼接后的状态和动作(总维度分别为 total_state_dim 和 total_action_dim),输出对应的 Q 值。
        self.critic = Critic(total_state_dim, total_action_dim)
        self.critic_target = Critic(total_state_dim, total_action_dim)
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr_critic)

    # 更新单个智能体的 Actor 和 Critic 网络参数
    def update(self, samples, agent_index, num_agents):
        # samples: (states, actions, rewards, next_states, dones)
        states, actions, rewards, next_states, dones = samples
        '''
        samples为经验回访池中采样的一组数据
        states:当前时刻所有智能体的状态拼接成一个大数组;
        actions:所有智能体在当前时刻的动作拼接成一个大数组;
        rewards:每个智能体的奖励(通常是一个二维数组,每一列对应一个智能体);
        next_states:所有智能体在下一时刻的状态;
        dones:每个智能体是否结束(done标志)。
        '''

        # 将数据转换为 PyTorch 张量,并提取当前智能体相关数据
        states = torch.FloatTensor(states)
        actions = torch.FloatTensor(actions)
        rewards = torch.FloatTensor(rewards[:, agent_index]).unsqueeze(1)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones[:, agent_index]).unsqueeze(1)

        # 假设每个智能体状态维度为 state_dim
        # 所有智能体的状态拼接在一起,所以总体状态维度除以智能体数就是单个智能体的状态维度。同理,动作维度也是这样计算。
        state_dim = states.shape[1] // num_agents
        action_dim = actions.shape[1] // num_agents

        # 提取当前智能体的 next_state
        agent_next_state = next_states[:, agent_index * state_dim:(agent_index + 1) * state_dim]
        # 根据目标状态计算下一步动作
        next_action = self.actor_target(agent_next_state)

        '''这里简化:将其他智能体的动作保持不变,仅更新当前智能体的部分'''
        # 复制当前批次中所有智能体的动作数据,并将其转换为 NumPy 数组。
        actions_next = actions.clone().detach().numpy()
        # 将当前智能体(索引为 agent_index)的动作部分替换为目标 Actor 网络计算出的下一动作 next_action。
        actions_next[:, agent_index * action_dim:(agent_index + 1) * action_dim] = next_action.detach().numpy()
        # 将替换后的 NumPy 数组 actions_next 再转换回 PyTorch 的 FloatTensor
        actions_next = torch.FloatTensor(actions_next)

        '''下面为流程,不用管'''
        target_Q = self.critic_target(next_states, actions_next)
        target_Q = rewards + self.gamma * target_Q * (1 - dones)
        current_Q = self.critic(states, actions)
        critic_loss = nn.MSELoss()(current_Q, target_Q.detach())

        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # Actor 更新:最大化当前 Q 值
        agent_state = states[:, agent_index * state_dim:(agent_index + 1) * state_dim]
        current_action = self.actor(agent_state)
        actions_pred = actions.clone()
        actions_pred[:, agent_index * action_dim:(agent_index + 1) * action_dim] = current_action
        actor_loss = -self.critic(states, actions_pred).mean()

        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # 目标网络软更新
        for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
        for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)


# 3.5 多智能体 MADDPG 管理器
class MADDPG:
    def __init__(self, num_agents, state_dim, action_dim, action_bounds, buffer_capacity=100000, batch_size=1024):
        self.num_agents = num_agents
        self.batch_size = batch_size
        self.agents = []

        total_state_dim = state_dim * num_agents
        total_action_dim = action_dim * num_agents

        # 初始化智能体和经验回放缓冲区
        for _ in range(num_agents):
            agent = MADDPGAgent(state_dim, action_dim, total_state_dim, total_action_dim, action_bounds)
            self.agents.append(agent)

        self.replay_buffer = ReplayBuffer(buffer_capacity)

    # 智能体根据状态选择动作
    def act(self, states):
        # states: 列表,每个元素为一个智能体的状态(numpy 数组)
        actions = []
        for i, agent in enumerate(self.agents):
            state_tensor = torch.FloatTensor(states[i]).unsqueeze(0)
            action = agent.actor(state_tensor).detach().numpy()[0]
            actions.append(action)
        return actions

    def push(self, states, actions, rewards, next_states, dones):
        # 将所有智能体的数据打包存入经验回放缓冲区
        self.replay_buffer.push(states, actions, rewards, next_states, dones)

    def update(self):
        if len(self.replay_buffer) < self.batch_size:
            return
        samples = self.replay_buffer.sample(self.batch_size)
        for agent_index, agent in enumerate(self.agents):
            agent.update(samples, agent_index, self.num_agents)


# ------------------------------
# 4. 主训练循环
# ------------------------------
if __name__ == '__main__':
    # 设置参数
    num_agents = 4
    state_dim = 6  # [x, y, z, vx, vy, vz]
    action_dim = 0  # 控制动作的分量数量
    action_bounds = [[], [], []]  # 定义动作参数的范围
    env = Manyagent_plane_Env(num_agents, state_dim, action_dim)  # 环境生成
    maddpg = MADDPG(num_agents, state_dim, action_dim, action_bounds)  # 定义MADDPG类
    # 定义循环数量
    num_episodes = 1000

    for ep in range(num_episodes):
        states = env.reset()  # 返回 list,每个无人机状态
        # 初始化奖励为0
        ep_rewards = np.zeros(num_agents)
        done = [False] * num_agents
        # all(done) 检查所有智能体是否都已经结束回合。
        while not all(done):
            actions = maddpg.act(states)
            next_states, rewards, done, _ = env.step(actions)
            maddpg.push(states, actions, rewards, next_states, done)
            states = next_states
            ep_rewards += np.array(rewards)
            maddpg.update()
            time.sleep(0.05)


网站公告

今日签到

点亮在社区的每一天
去签到