【群体智能优化算法系列 】一 粒子群算法 (Particle Swarm Optimization, PSO)

发布于:2025-06-24 ⋅ 阅读:(16) ⋅ 点赞:(0)

一:前言

粒子群算法是由Kennedy和Eberhart在1995年提出的一种基于群体智能的优化算法。该算法模拟鸟群觅食行为,通过粒子间的信息共享和协作来寻找最优解。

二:算法原理

2.1 核心思想

粒子群算法的核心思想可以通过鸟群找食物的故事​理解,

假设一群小鸟在森林里找食物(最优解):
​(1)每只鸟不知道食物在哪,但能感知自己当前位置离食物有多远(通过目标函数计算适应度)。

(2)​小鸟们会交流​:每只鸟记住自己飞过的最好位置(个体经验),同时整个鸟群知道所有鸟中最好的位置(群体经验)。

​(3)飞行策略​:每只鸟决定下一步飞哪里时,会综合三个方向:

  • 惯性方向​:保持原来的飞行方向(不想急转弯)。
  • 个体经验方向​:飞向自己曾经找到过食物的地方。
  • 群体经验方向​:飞向鸟群中其他鸟找到食物最多的地方。

​(4)动态调整​:小鸟们一边飞一边更新信息,最终整个鸟群会逐渐聚集到食物最丰富的位置

2.2 PSO核心公式​

PSO核心公式
在这里插入图片描述在这里插入图片描述

2.3 PSO算法流程图

在这里插入图片描述

三:python实现 二维Rastrigin函数 最低点检索例子

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from mpl_toolkits.mplot3d import Axes3D
import time

# 设置matplotlib支持中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

def fitness_function(position):
    """目标函数:二维Rastrigin函数"""
    x, y = position
    return 20 + (x**2 - 10 * np.cos(2 * np.pi * x)) + (y**2 - 10 * np.cos(2 * np.pi * y))

def pso_optimization(func, bounds, n_particles=30, max_iter=50, w=0.7, c1=1.5, c2=1.5):
    """粒子群优化算法实现"""
    dim = len(bounds)
    # 初始化粒子群
    particles = {
        'position': np.array([np.random.uniform(low, high, n_particles) for (low, high) in bounds]).T,
        'velocity': np.zeros((n_particles, dim)),
        'best_position': np.zeros((n_particles, dim)),
        'best_fitness': np.full(n_particles, np.inf),
        'history': []  # 存储迭代历史
    }
    
    # 初始全局最优
    global_best_pos = np.zeros(dim)
    global_best_fit = np.inf
    
    # 计算初始适应度
    for i in range(n_particles):
        fitness = func(particles['position'][i])
        particles['best_position'][i] = particles['position'][i].copy()
        particles['best_fitness'][i] = fitness
        
        if fitness < global_best_fit:
            global_best_fit = fitness
            global_best_pos = particles['position'][i].copy()
    
    particles['history'].append({
        'positions': particles['position'].copy(),
        'global_best': global_best_pos.copy()
    })
    
    # 主循环
    for t in range(max_iter):
        for i in range(n_particles):
            # 更新速度和位置
            r1, r2 = np.random.rand(), np.random.rand()
            particles['velocity'][i] = (w * particles['velocity'][i] +
                                       c1 * r1 * (particles['best_position'][i] - particles['position'][i]) +
                                       c2 * r2 * (global_best_pos - particles['position'][i]))
            
            # 更新位置
            particles['position'][i] += particles['velocity'][i]
            
            # 边界处理
            for d in range(dim):
                low, high = bounds[d]
                particles['position'][i][d] = np.clip(particles['position'][i][d], low, high)
            
            # 计算新适应度
            new_fitness = func(particles['position'][i])
            
            # 更新个体最优
            if new_fitness < particles['best_fitness'][i]:
                particles['best_position'][i] = particles['position'][i].copy()
                particles['best_fitness'][i] = new_fitness
                
                # 更新全局最优
                if new_fitness < global_best_fit:
                    global_best_fit = new_fitness
                    global_best_pos = particles['position'][i].copy()
        
        # 记录本次迭代状态
        particles['history'].append({
            'positions': particles['position'].copy(),
            'global_best': global_best_pos.copy()
        })
    
    return global_best_pos, global_best_fit, particles['history']

def visualize_optimization(history, bounds, func):
    """可视化优化过程"""
    fig = plt.figure()  # 增加图形宽度以容纳colorbar
    
    # 控制变量
    animation_state = {
        'paused': False,
        'stopped': False,
        'current_frame': 0
    }
    
    # 创建函数网格
    x = np.linspace(bounds[0][0], bounds[0][1], 100)
    y = np.linspace(bounds[1][0], bounds[1][1], 100)
    X, Y = np.meshgrid(x, y)
    Z = np.zeros_like(X)
    for i in range(X.shape[0]):
        for j in range(X.shape[1]):
            Z[i, j] = func([X[i, j], Y[i, j]])
    
    # 创建子图
    ax1 = fig.add_subplot(121, projection='3d')
    ax2 = fig.add_subplot(122)
    
    # 初始化绘图元素
    surf = ax1.plot_surface(X, Y, Z, cmap='viridis', alpha=0.3)
    particles_3d = ax1.scatter([], [], [], c='red', s=30)
    best_3d = ax1.scatter([], [], [], c='gold', s=100, marker='*')
    
    contour = ax2.contourf(X, Y, Z, 20, cmap='viridis', alpha=0.6)
    # 添加等高线标签
    contour_lines = ax2.contour(X, Y, Z, 10, colors='black', alpha=0.4, linewidths=0.5)
    ax2.clabel(contour_lines, inline=True, fontsize=8, fmt='%.1f')
    
    # 添加颜色条
    cbar = plt.colorbar(contour, ax=ax2, shrink=0.8)
    cbar.set_label('适应度值', rotation=270, labelpad=15)
    
    particles_2d = ax2.scatter([], [], c='red', s=30, alpha=0.7)
    best_2d = ax2.scatter([], [], c='gold', s=100, marker='*')
    
    # 设置标签和标题
    ax1.set_xlabel('X')
    ax1.set_ylabel('Y')
    ax1.set_zlabel('f(X,Y)')
    ax1.view_init(elev=30, azim=45)
    
    ax2.set_xlabel('X')
    ax2.set_ylabel('Y')
    ax2.set_xlim(bounds[0])
    ax2.set_ylim(bounds[1])
    
    # 创建动画函数
    def on_key_press(event):
        """键盘事件处理"""
        if event.key == ' ':  # 空格键暂停/继续
            animation_state['paused'] = not animation_state['paused']
            status = "暂停" if animation_state['paused'] else "继续"
            print(f"动画{status}")
    
    def show_final_results():
        """显示最终结果并暂停"""
        final_frame = len(history) - 1
        final_positions = history[final_frame]['positions']
        final_best = history[final_frame]['global_best']
        
        # 更新到最终状态
        particles_3d._offsets3d = (final_positions[:, 0], final_positions[:, 1], 
                                  [func(p) for p in final_positions])
        best_3d._offsets3d = ([final_best[0]], [final_best[1]], 
                             [func(final_best)])
        particles_2d.set_offsets(final_positions)
        best_2d.set_offsets([final_best])
        
        # 更新标题显示最终结果
        final_fitness = func(final_best)
        ax1.set_title(f'优化完成! 总迭代: {final_frame}\n最优值: {final_fitness:.6f}\n最优位置: [{final_best[0]:.3f}, {final_best[1]:.3f}]')
        ax2.set_title(f'最终结果 - 误差: {abs(final_fitness):.6f}')
        
        plt.draw()
        print(f"\n=== 优化结果 ===")
        print(f"最优位置: [{final_best[0]:.6f}, {final_best[1]:.6f}]")
        print(f"最优适应度值: {final_fitness:.6f}")
        print(f"理论最优值: 0.0000 (在位置 [0, 0])")
        print(f"误差: {abs(final_fitness):.6f}")
        print("\n按任意键关闭窗口...")
        
        # 等待用户输入
        input()
    
    def update(frame):
        # 检查是否被停止
        if animation_state['stopped']:
            return particles_3d, best_3d, particles_2d, best_2d
            
        # 检查是否暂停
        if animation_state['paused']:
            return particles_3d, best_3d, particles_2d, best_2d
        
        animation_state['current_frame'] = frame
        
        # 当前迭代数据
        positions = history[frame]['positions']
        global_best = history[frame]['global_best']
        
        # 更新3D散点图
        particles_3d._offsets3d = (positions[:, 0], positions[:, 1], 
                                  [func(p) for p in positions])
        best_3d._offsets3d = ([global_best[0]], [global_best[1]], 
                             [func(global_best)])
        
        # 更新2D散点图
        particles_2d.set_offsets(positions)
        best_2d.set_offsets([global_best])
        
        # 更新标题
        current_fitness = func(global_best)
        ax1.set_title(f'迭代 {frame}/{len(history)-1}\n当前最优值: {current_fitness:.4f}\n[空格]暂停')
        ax2.set_title(f'等高线图 - 迭代 {frame}')
        
        # 如果是最后一帧,自动显示最终结果
        if frame == len(history) - 1:
            ani.event_source.stop()
            # 延迟显示最终结果
            plt.pause(0.5)  # 短暂暂停
            show_final_results()
        
        return particles_3d, best_3d, particles_2d, best_2d
    
    # 连接键盘事件
    fig.canvas.mpl_connect('key_press_event', on_key_press)
    
    # 创建动画
    ani = FuncAnimation(fig, update, frames=len(history), interval=200, 
                       blit=False, repeat=False)  # 改为不重复
    
    plt.tight_layout(pad=2.0)  # 增加边距以确保文字完整显示
    
    # 显示操作提示
    print("\n=== 操作说明 ===")
    print("[空格键] - 暂停/继续动画")
    print("[关闭窗口] - 退出程序")
    print("==================\n")
    
    return ani, fig

def main():
    # 设置函数搜索边界
    bounds = [(-5.12, 5.12), (-5.12, 5.12)]
    
    print("正在运行粒子群优化算法...")
    
    # 运行PSO优化
    best_pos, best_fit, history = pso_optimization(fitness_function, bounds)
    
    print(f"\n算法执行完成!")
    print(f"最优解位置: [{best_pos[0]:.6f}, {best_pos[1]:.6f}]")
    print(f"最优值: {best_fit:.6f}")
    
    # 可视化优化过程
    ani, fig = visualize_optimization(history, bounds, fitness_function)
    
    # 显示实时动画
    print("正在启动粒子群算法可视化...")
    
    try:
        plt.show()
    except KeyboardInterrupt:
        print("\n程序被用户中断")
    
    print("程序结束")

if __name__ == "__main__":
    main()

参考

【1】 粒子群优化算法(Particle Swarm Optimization, PSO)的详细解读
【2】一篇文章搞懂PSO(粒子群算法)理论讲解+Python代码示例讲解


网站公告

今日签到

点亮在社区的每一天
去签到