模型汇总-数学建模

发布于:2025-08-29 ⋅ 阅读:(22) ⋅ 点赞:(0)

一、优化模型

1.线性规划

线性规划(Linear Programming, LP)是一种数学优化方法,用于在给定的线性约束条件下,找到线性目标函数的最大值或最小值。它是运筹学中最常用的方法之一。

线性规划的标准形式

最大化问题标准形式:

maximize: cᵀx
subject to: Ax ≤ b
            x ≥ 0

最小化问题标准形式:

minimize: cᵀx
subject to: Ax ≥ b
            x ≥ 0

其中:

  • x 是决策变量向量

  • c 是目标函数系数向量

  • A 是约束条件系数矩阵

  • b 是约束条件右侧常数向量

Python实现线性规划

我们将使用SciPy库中的linprog函数来解决线性规划问题。

安装必要的库
pip install scipy numpy matplotlib

简单的最小化问题示例:

import numpy as np
from scipy.optimize import linprog
import matplotlib.pyplot as plt

# 定义目标函数系数(注意:linprog默认求最小值,所以对于最大化问题需要取负)
c = [-1, 4]  # 原目标函数为 -x1 + 4x2,但linprog求最小值,所以这里系数不变

# 定义不等式约束(左侧系数矩阵)
A = [
    [1, -1],   # x1 - x2 ≤ 4
    [2, 1],    # 2x1 + x2 ≤ 14
    [1, 3]     # x1 + 3x2 ≤ 18
]

# 定义不等式约束(右侧常数向量)
b = [4, 14, 18]

# 定义变量边界
x_bounds = (0, None)  # x1 ≥ 0
y_bounds = (0, None)  # x2 ≥ 0

# 求解线性规划问题
result = linprog(c, A_ub=A, b_ub=b, bounds=[x_bounds, y_bounds], method='highs')

# 输出结果
print("优化结果:")
print(f"状态: {result.message}")
print(f"最优解: x1 = {result.x[0]:.2f}, x2 = {result.x[1]:.2f}")
print(f"最优目标函数值: {result.fun:.2f}")
print(f"迭代次数: {result.nit}")

# 可视化
def plot_feasible_region():
    # 创建网格
    x = np.linspace(0, 10, 400)
    y = np.linspace(0, 10, 400)
    X, Y = np.meshgrid(x, y)
    
    # 约束条件
    constraint1 = X - Y <= 4    # x1 - x2 ≤ 4
    constraint2 = 2*X + Y <= 14 # 2x1 + x2 ≤ 14
    constraint3 = X + 3*Y <= 18 # x1 + 3x2 ≤ 18
    non_negative = (X >= 0) & (Y >= 0)
    
    # 可行域
    feasible = constraint1 & constraint2 & constraint3 & non_negative
    
    plt.figure(figsize=(10, 8))
    
    # 绘制可行域
    plt.imshow(feasible.astype(int), 
               extent=(x.min(), x.max(), y.min(), y.max()), 
               origin="lower", cmap="Greys", alpha=0.3)
    
    # 绘制约束线
    plt.plot(x, x - 4, label=r'$x_1 - x_2 = 4$')
    plt.plot(x, 14 - 2*x, label=r'$2x_1 + x_2 = 14$')
    plt.plot(x, (18 - x)/3, label=r'$x_1 + 3x_2 = 18$')
    
    # 绘制最优解点
    plt.plot(result.x[0], result.x[1], 'ro', markersize=10, label='最优解')
    
    # 绘制目标函数等值线
    Z = -X + 4*Y  # 目标函数值
    contours = plt.contour(X, Y, Z, levels=10, colors='gray', alpha=0.5)
    plt.clabel(contours, inline=True, fontsize=8)
    
    plt.xlim(0, 10)
    plt.ylim(0, 10)
    plt.xlabel(r'$x_1$')
    plt.ylabel(r'$x_2$')
    plt.title('线性规划问题可视化')
    plt.legend()
    plt.grid(True)
    plt.show()

plot_feasible_region()

示例2:生产计划问题(最大化问题)

# 最大化问题:利润 = 3A + 5B
# 由于linprog默认求最小值,我们需要对目标函数系数取负

# 目标函数系数(取负是因为要求最大值)
c = [-3, -5]  # 最大化 3A + 5B 等价于最小化 -3A -5B

# 约束条件系数矩阵
A = [
    [2, 1],   # 2A + B ≤ 10
    [1, 2]    # A + 2B ≤ 8
]

# 约束条件右侧常数
b = [10, 8]

# 变量边界
bounds = [(0, None), (0, None)]

# 求解
result = linprog(c, A_ub=A, b_ub=b, bounds=bounds, method='highs')

print("\n生产计划问题结果:")
print(f"状态: {result.message}")
print(f"最优生产计划: 产品A = {result.x[0]:.2f}, 产品B = {result.x[1]:.2f}")
print(f"最大利润: {-result.fun:.2f}元")  # 取负得到原始目标函数值

示例3:更复杂的线性规划问题

# 最小化:-2x1 + x2 - x3
# 约束:
# x1 + x2 + x3 ≤ 6
# -x1 + 2x2 ≤ 4
# x1, x2, x3 ≥ 0

c = [-2, 1, -1]  # 目标函数系数

A = [
    [1, 1, 1],   # x1 + x2 + x3 ≤ 6
    [-1, 2, 0]   # -x1 + 2x2 ≤ 4
]

b = [6, 4]

bounds = [(0, None), (0, None), (0, None)]

result = linprog(c, A_ub=A, b_ub=b, bounds=bounds, method='highs')

print("\n复杂线性规划问题结果:")
print(f"状态: {result.message}")
print(f"最优解: x1 = {result.x[0]:.2f}, x2 = {result.x[1]:.2f}, x3 = {result.x[2]:.2f}")
print(f"最优目标函数值: {result.fun:.2f}")

处理等式约束

# 最小化:2x1 + 3x2
# 约束:
# x1 + x2 = 10
# 2x1 + x2 ≥ 15
# x1 ≥ 0, x2 ≥ 0

c = [2, 3]  # 目标函数系数

# 等式约束
A_eq = [[1, 1]]  # x1 + x2 = 10
b_eq = [10]

# 不等式约束
A_ub = [[-2, -1]]  # 2x1 + x2 ≥ 15 等价于 -2x1 - x2 ≤ -15
b_ub = [-15]

bounds = [(0, None), (0, None)]

result = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method='highs')

print("\n含等式约束的线性规划问题结果:")
print(f"状态: {result.message}")
print(f"最优解: x1 = {result.x[0]:.2f}, x2 = {result.x[1]:.2f}")
print(f"最优目标函数值: {result.fun:.2f}")

线性规划的应用领域

  1. 生产计划:优化资源分配和生产调度

  2. 运输问题:最小化运输成本

  3. 投资组合优化:在风险约束下最大化收益

  4. 人力资源规划:优化人员分配

  5. 网络流问题:最大化网络流量


2.整数规划

整数规划(Integer Programming, IP)是数学规划的一个分支,要求部分或全部决策变量取整数值。它是线性规划的扩展,但增加了变量必须为整数的约束。

整数规划的类型

  1. 纯整数规划:所有决策变量都必须取整数值

  2. 混合整数规划:部分决策变量取整数值,其余可以是实数

  3. 0-1整数规划:决策变量只能取0或1

最大化或最小化目标函数:

1. 分支定界法

最常用的整数规划求解方法,通过系统地枚举可行解的子集来寻找最优解。

2. 割平面法

通过添加额外的约束(割平面)来缩小可行域,逐步逼近整数解。

3. 启发式算法

如遗传算法、模拟退火等,用于求解大规模整数规划问题。

以下是使用Python的PuLP库求解整数规划的完整示例:

import pulp
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Polygon

def solve_integer_programming():
    """
    求解整数规划问题的完整示例
    """
    # 创建问题实例
    prob = pulp.LpProblem("Integer_Programming_Example", pulp.LpMaximize)
    
    # 定义决策变量 (必须为整数)
    x1 = pulp.LpVariable('x1', lowBound=0, cat='Integer')
    x2 = pulp.LpVariable('x2', lowBound=0, cat='Integer')
    
    # 定义目标函数
    prob += 3*x1 + 2*x2, "Objective Function"
    
    # 添加约束条件
    prob += 2*x1 + x2 <= 6, "Constraint 1"
    prob += x1 + 2*x2 <= 8, "Constraint 2"
    prob += x1 <= 2, "Constraint 3"
    prob += x2 <= 3, "Constraint 4"
    
    # 求解问题
    prob.solve()
    
    # 输出结果
    print("Status:", pulp.LpStatus[prob.status])
    print("Optimal Solution:")
    for v in prob.variables():
        print(f"{v.name} = {v.varValue}")
    print(f"Optimal Value: {pulp.value(prob.objective)}")
    
    return prob

def visualize_solution():
    """
    可视化整数规划问题的解
    """
    # 定义约束条件
    x = np.linspace(0, 4, 100)
    
    # 约束1: 2x1 + x2 <= 6 → x2 <= 6 - 2x1
    y1 = 6 - 2*x
    
    # 约束2: x1 + 2x2 <= 8 → x2 <= (8 - x1)/2
    y2 = (8 - x)/2
    
    # 约束3: x1 <= 2
    # 约束4: x2 <= 3
    
    # 绘制约束条件
    plt.figure(figsize=(10, 8))
    plt.plot(x, y1, label=r'$2x_1 + x_2 \leq 6$')
    plt.plot(x, y2, label=r'$x_1 + 2x_2 \leq 8$')
    plt.axvline(x=2, color='green', linestyle='--', label=r'$x_1 \leq 2$')
    plt.axhline(y=3, color='purple', linestyle='--', label=r'$x_2 \leq 3$')
    
    # 绘制可行域
    vertices = np.array([[0, 0], [2, 0], [2, 2], [1, 3], [0, 3]])
    feasible_region = Polygon(vertices, alpha=0.3, color='gray')
    plt.gca().add_patch(feasible_region)
    
    # 标记整数点
    for i in range(0, 4):
        for j in range(0, 4):
            if (2*i + j <= 6) and (i + 2*j <= 8) and (i <= 2) and (j <= 3):
                plt.plot(i, j, 'ro', markersize=8)
                plt.text(i+0.05, j+0.05, f'({i},{j})', fontsize=10)
    
    # 标记最优解
    plt.plot(2, 2, 'go', markersize=10, label='Optimal Solution (2,2)')
    
    # 设置图形属性
    plt.xlim(0, 3.5)
    plt.ylim(0, 4)
    plt.xlabel(r'$x_1$')
    plt.ylabel(r'$x_2$')
    plt.title('Integer Programming Visualization')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

def mixed_integer_example():
    """
    混合整数规划示例
    """
    # 创建问题实例
    prob = pulp.LpProblem("Mixed_Integer_Example", pulp.LpMaximize)
    
    # 定义变量 (x1为整数,x2为连续变量)
    x1 = pulp.LpVariable('x1', lowBound=0, cat='Integer')
    x2 = pulp.LpVariable('x2', lowBound=0, cat='Continuous')
    
    # 目标函数
    prob += 4*x1 + 3*x2, "Objective"
    
    # 约束条件
    prob += 2*x1 + x2 <= 8, "Constraint1"
    prob += x1 + 2*x2 <= 7, "Constraint2"
    prob += x1 <= 3, "Constraint3"
    
    # 求解
    prob.solve()
    
    # 输出结果
    print("\nMixed Integer Programming Results:")
    print("Status:", pulp.LpStatus[prob.status])
    for v in prob.variables():
        print(f"{v.name} = {v.varValue}")
    print(f"Optimal Value: {pulp.value(prob.objective)}")

def binary_integer_example():
    """
    0-1整数规划示例 (背包问题)
    """
    # 创建问题实例
    prob = pulp.LpProblem("Knapsack_Problem", pulp.LpMaximize)
    
    # 物品价值
    values = [10, 15, 40, 60, 20]
    # 物品重量
    weights = [1, 2, 4, 6, 3]
    # 背包容量
    capacity = 10
    
    # 创建0-1变量 (是否选择物品)
    items = range(len(values))
    x = [pulp.LpVariable(f'x{i}', cat='Binary') for i in items]
    
    # 目标函数:最大化总价值
    prob += pulp.lpSum(values[i] * x[i] for i in items)
    
    # 约束条件:总重量不超过容量
    prob += pulp.lpSum(weights[i] * x[i] for i in items) <= capacity
    
    # 求解
    prob.solve()
    
    # 输出结果
    print("\n0-1 Integer Programming (Knapsack Problem) Results:")
    print("Status:", pulp.LpStatus[prob.status])
    print("Selected items:")
    for i in items:
        if x[i].varValue == 1:
            print(f"Item {i+1} (Value: {values[i]}, Weight: {weights[i]})")
    print(f"Total Value: {pulp.value(prob.objective)}")
    print(f"Total Weight: {sum(weights[i] * x[i].varValue for i in items)}")

if __name__ == "__main__":
    # 求解基本整数规划问题
    problem = solve_integer_programming()
    
    # 可视化解
    visualize_solution()
    
    # 混合整数规划示例
    mixed_integer_example()
    
    # 0-1整数规划示例
    binary_integer_example()

1. 基本整数规划求解

  • 使用PuLP库定义和求解整数规划问题

  • 包含目标函数和约束条件的定义

  • 输出最优解和目标函数值

2. 可视化

  • 使用matplotlib绘制可行域和整数点

  • 标记最优解位置

  • 显示所有约束条件

3. 混合整数规划

  • 演示同时包含整数和连续变量的情况

  • 展示不同类型变量的定义方法

4. 0-1整数规划

  • 以背包问题为例展示0-1整数规划

  • 演示二进制变量的使用

安装依赖

运行代码前需要安装以下Python库:

pip install pulp numpy matplotlib

应用领域

整数规划在现实世界中有广泛应用:

  1. 物流与运输:车辆路径规划、设施选址

  2. 生产计划:排产优化、资源分配

  3. 金融:投资组合优化、风险管理

  4. 电信:网络设计、频率分配

  5. 能源:发电调度、电网优化

注意哦:

  1. 整数规划通常是NP难问题,大规模问题可能需要专门的求解器;

  2. 对于复杂问题,可能需要使用商业求解器如Gurobi、CPLEX等;

  3. 启发式算法可以用于求解大规模问题,但不能保证找到最优解。


3.动态规划

动态规划(Dynamic Programming,简称DP)是一种解决复杂问题的方法,它将问题分解为更小的子问题,并存储子问题的解以避免重复计算。

动态规划的核心思想

  1. 最优子结构:一个问题的最优解包含其子问题的最优解

  2. 重叠子问题:在递归求解过程中,相同的子问题会被多次计算

  3. 记忆化存储:存储已解决的子问题答案,避免重复计算

动态规划的两种实现方式

1. 自顶向下(记忆化搜索)

  • 从原问题开始,递归地解决子问题

  • 使用备忘录存储已计算的子问题结果

2. 自底向上(迭代法)

  • 从最小的子问题开始,逐步构建到原问题的解

  • 通常使用数组(DP表)来存储子问题的解

经典问题:斐波那契数列

1. 递归解法(低效)

def fib_recursive(n):
    if n <= 1:
        return n
    return fib_recursive(n-1) + fib_recursive(n-2)

2. 记忆化搜索(自顶向下)

def fib_memo(n, memo=None):
    if memo is None:
        memo = {}
    
    if n in memo:
        return memo[n]
    
    if n <= 1:
        return n
    
    memo[n] = fib_memo(n-1, memo) + fib_memo(n-2, memo)
    return memo[n]

3. 动态规划(自底向上)

def fib_dp(n):
    if n <= 1:
        return n
    
    # 创建DP表
    dp = [0] * (n + 1)
    dp[0], dp[1] = 0, 1
    
    # 填充DP表
    for i in range(2, n + 1):
        dp[i] = dp[i-1] + dp[i-2]
    
    return dp[n]

4. 空间优化版本

def fib_optimized(n):
    if n <= 1:
        return n
    
    prev, curr = 0, 1
    for _ in range(2, n + 1):
        prev, curr = curr, prev + curr
    
    return curr

经典问题:0-1背包问题

给定一组物品,每个物品有重量和价值,在不超过背包容量的情况下,选择物品使得总价值最大。

def knapsack(weights, values, capacity):
    n = len(weights)
    # 创建DP表:dp[i][w] 表示前i个物品在容量为w时的最大价值
    dp = [[0] * (capacity + 1) for _ in range(n + 1)]
    
    # 填充DP表
    for i in range(1, n + 1):
        for w in range(capacity + 1):
            # 当前物品重量大于当前容量,不能选择
            if weights[i-1] > w:
                dp[i][w] = dp[i-1][w]
            else:
                # 选择当前物品或不选择当前物品的最大值
                dp[i][w] = max(dp[i-1][w], 
                              dp[i-1][w - weights[i-1]] + values[i-1])
    
    return dp[n][capacity]

# 示例
weights = [2, 3, 4, 5]
values = [3, 4, 5, 6]
capacity = 5
print(knapsack(weights, values, capacity))  # 输出:7

动态规划解题步骤

  1. 定义状态:明确dp数组的含义

  2. 确定状态转移方程:找出如何从已知状态推导出未知状态

  3. 初始化:确定基础情况的值

  4. 确定遍历顺序:确保在计算当前状态时,所需的前置状态已经计算完成

  5. 返回结果:返回最终需要的状态值

完整示例:硬币找零问题

给定不同面额的硬币和一个总金额,计算凑成总金额所需的最少硬币数。

def coin_change(coins, amount):
    # dp[i] 表示凑成金额i所需的最少硬币数
    dp = [float('inf')] * (amount + 1)
    dp[0] = 0  # 金额为0时不需要任何硬币
    
    for coin in coins:
        for i in range(coin, amount + 1):
            dp[i] = min(dp[i], dp[i - coin] + 1)
    
    return dp[amount] if dp[amount] != float('inf') else -1

# 示例
coins = [1, 2, 5]
amount = 11
print(coin_change(coins, amount))  # 输出:3 (5+5+1)

动态规划的应用场景

  1. 最优化问题:求最大值、最小值

  2. 计数问题:求方案总数

  3. 存在性问题:判断是否存在某种方案

  4. 序列问题:字符串、数组相关的子序列、子数组问题

动态规划是一种强大的算法设计技术,通过将复杂问题分解为简单的子问题,并避免重复计算来提高效率。掌握动态规划需要:

  1. 理解最优子结构和重叠子问题的概念

  2. 熟练使用记忆化搜索和DP表两种实现方式

  3. 通过大量练习来培养识别DP问题和设计状态转移方程的能力


4.图论算法(Dijkstra、Floyd)

Dijkstra 算法(单源最短路径)

Dijkstra 算法用于计算图中单个源点到其他所有节点的最短路径。它采用贪心策略,每次选择当前已知最短路径的节点进行扩展。

算法步骤

  1. 初始化:设置距离数组 dist[],源点距离为 0,其他为无穷大

  2. 创建未访问节点集合

  3. 当未访问集合非空时:

    • 选择当前距离最小的未访问节点 u

    • 标记 u 为已访问

    • 更新 u 的所有未访问邻居的距离:如果通过 u 到达邻居的路径更短,则更新

时间复杂度

  • 使用优先队列:O((V+E)logV)

  • 使用数组:O(V²)

代码实现

import heapq
import sys

def dijkstra(graph, start):
    """
    Dijkstra 算法实现
    
    参数:
    graph: 邻接表表示的图,graph[i] = [(j, weight), ...]
    start: 起始节点
    
    返回:
    dist: 从起始点到各点的最短距离
    prev: 前驱节点数组,用于重建路径
    """
    n = len(graph)
    dist = [sys.maxsize] * n  # 初始化距离为无穷大
    dist[start] = 0
    prev = [-1] * n  # 记录前驱节点
    visited = [False] * n
    
    # 使用优先队列 (距离, 节点)
    heap = [(0, start)]
    
    while heap:
        current_dist, u = heapq.heappop(heap)
        
        if visited[u]:
            continue
            
        visited[u] = True
        
        for v, weight in graph[u]:
            if not visited[v]:
                new_dist = current_dist + weight
                if new_dist < dist[v]:
                    dist[v] = new_dist
                    prev[v] = u
                    heapq.heappush(heap, (new_dist, v))
    
    return dist, prev

def reconstruct_path(prev, start, end):
    """根据前驱数组重建路径"""
    path = []
    current = end
    while current != -1:
        path.append(current)
        current = prev[current]
    path.reverse()
    return path if path[0] == start else []

# 示例使用
if __name__ == "__main__":
    # 创建示例图 (有向图)
    # 节点0到节点1的权重为4,节点0到节点2的权重为2,等等
    graph = [
        [(1, 4), (2, 2)],      # 节点0
        [(2, 5), (3, 10)],     # 节点1
        [(1, 1), (3, 8)],      # 节点2
        [(4, 2)],              # 节点3
        [(3, 5)]               # 节点4
    ]
    
    start_node = 0
    dist, prev = dijkstra(graph, start_node)
    
    print("从节点", start_node, "出发的最短距离:")
    for i in range(len(dist)):
        path = reconstruct_path(prev, start_node, i)
        print(f"到节点{i}: 距离={dist[i]}, 路径={path}")

Floyd-Warshall 算法(多源最短路径)

Floyd 算法用于计算图中所有节点对之间的最短路径。它基于动态规划,通过中间节点来逐步优化路径。

定义 dp[k][i][j] = 从 i 到 j 且只经过节点 0..k 的最短路径长度
状态转移方程:dp[k][i][j] = min(dp[k-1][i][j], dp[k-1][i][k] + dp[k-1][k][j])

  1. 初始化距离矩阵:对角线为0,有边则为权重,无边则为无穷大

  2. 对于每个中间节点 k:

    • 对于每对节点 (i, j):

      • 如果通过 k 的路径更短,则更新距离

时间复杂度:O(V³)

def floyd_warshall(graph):
    """
    Floyd-Warshall 算法实现
    
    参数:
    graph: 邻接矩阵表示的图,graph[i][j] = 权重,无边时为无穷大
    
    返回:
    dist: 所有节点对之间的最短距离矩阵
    next_node: 用于重建路径的下一节点矩阵
    """
    n = len(graph)
    
    # 初始化距离矩阵和下一节点矩阵
    dist = [[0] * n for _ in range(n)]
    next_node = [[-1] * n for _ in range(n)]
    
    # 复制图到距离矩阵,并初始化下一节点
    for i in range(n):
        for j in range(n):
            dist[i][j] = graph[i][j]
            if graph[i][j] != float('inf') and i != j:
                next_node[i][j] = j
    
    # 动态规划过程
    for k in range(n):
        for i in range(n):
            if dist[i][k] == float('inf'):
                continue
            for j in range(n):
                # 如果通过k的路径更短,则更新
                if dist[i][k] + dist[k][j] < dist[i][j]:
                    dist[i][j] = dist[i][k] + dist[k][j]
                    next_node[i][j] = next_node[i][k]
    
    return dist, next_node

def reconstruct_floyd_path(next_node, start, end):
    """根据next_node矩阵重建路径"""
    if next_node[start][end] == -1:
        return []
    
    path = [start]
    while start != end:
        start = next_node[start][end]
        path.append(start)
    
    return path

# 示例使用
if __name__ == "__main__":
    # 创建示例图的邻接矩阵
    # 使用 float('inf') 表示无穷大
    n = 5  # 节点数量
    graph = [[float('inf')] * n for _ in range(n)]
    
    # 初始化对角线为0
    for i in range(n):
        graph[i][i] = 0
    
    # 添加边
    graph[0][1] = 4
    graph[0][2] = 2
    graph[1][2] = 5
    graph[1][3] = 10
    graph[2][1] = 1
    graph[2][3] = 8
    graph[3][4] = 2
    graph[4][3] = 5
    
    dist, next_node = floyd_warshall(graph)
    
    print("所有节点对之间的最短距离:")
    for i in range(n):
        for j in range(n):
            if i != j:
                path = reconstruct_floyd_path(next_node, i, j)
                print(f"从{i}到{j}: 距离={dist[i][j]}, 路径={path}")

算法比较

特性 Dijkstra Floyd-Warshall
问题类型 单源最短路径 多源最短路径
图类型 有向/无向,非负权重 有向/无向,可处理负权重(但无负环)
图类型 有向/无向,非负权重 有向/无向,可处理负权重(但无负环)
空间复杂度 O(V+E) O(V²)
适用场景 单源问题,稀疏图 多源问题,稠密图
  1. Dijkstra 不能处理负权重边,因为贪心策略会失效

  2. Floyd 可以检测负环:如果发现某个 dist[i][i] < 0,说明存在负环

  3. 对于稀疏图,多次运行 Dijkstra 可能比 Floyd 更高效

  4. 实际应用中可根据具体需求选择合适的算法


5.遗传算法

遗传算法(Genetic Algorithm, GA)是一种模拟自然选择和遗传学机制的优化算法。它通过模拟生物进化过程中的"适者生存"原则来搜索最优解。

遗传算法的基本流程

  1. 初始化种群:随机生成一组初始解(个体)

  2. 评估适应度:计算每个个体的适应度值

  3. 选择:根据适应度选择优秀的个体

  4. 交叉:将选中的个体进行交叉操作,产生新个体

  5. 变异:对新个体进行变异操作

  6. 重复:重复步骤2-5直到满足终止条件

Python完整实现

下面是一个解决函数优化问题的遗传算法完整实现:

import numpy as np
import matplotlib.pyplot as plt
from typing import List, Callable, Tuple

class GeneticAlgorithm:
    def __init__(self, 
                 objective_func: Callable, 
                 population_size: int = 100,
                 chromosome_length: int = 10,
                 crossover_rate: float = 0.8,
                 mutation_rate: float = 0.01,
                 elitism_count: int = 2,
                 bounds: Tuple[float, float] = (-10, 10)):
        """
        初始化遗传算法
        
        参数:
        objective_func: 目标函数(适应度函数)
        population_size: 种群大小
        chromosome_length: 染色体长度(基因数量)
        crossover_rate: 交叉概率
        mutation_rate: 变异概率
        elitism_count: 精英保留数量
        bounds: 变量的取值范围
        """
        self.objective_func = objective_func
        self.population_size = population_size
        self.chromosome_length = chromosome_length
        self.crossover_rate = crossover_rate
        self.mutation_rate = mutation_rate
        self.elitism_count = elitism_count
        self.bounds = bounds
        
        # 初始化种群
        self.population = self.initialize_population()
        self.best_individual = None
        self.best_fitness = float('-inf')
        self.fitness_history = []
        
    def initialize_population(self) -> np.ndarray:
        """初始化种群 - 随机生成二进制编码的染色体"""
        return np.random.randint(0, 2, 
                                size=(self.population_size, self.chromosome_length))
    
    def decode_chromosome(self, chromosome: np.ndarray) -> float:
        """将二进制染色体解码为实际数值"""
        # 将二进制转换为十进制
        decimal_value = int(''.join(map(str, chromosome)), 2)
        
        # 映射到指定范围
        min_bound, max_bound = self.bounds
        value = min_bound + decimal_value * (max_bound - min_bound) / (2**self.chromosome_length - 1)
        
        return value
    
    def calculate_fitness(self, chromosome: np.ndarray) -> float:
        """计算适应度值"""
        x = self.decode_chromosome(chromosome)
        return self.objective_func(x)
    
    def select_parents(self, fitness_values: np.ndarray) -> List[np.ndarray]:
        """轮盘赌选择父代"""
        # 确保适应度值为正
        min_fitness = np.min(fitness_values)
        if min_fitness < 0:
            fitness_values = fitness_values - min_fitness + 1e-6
        
        # 计算选择概率
        total_fitness = np.sum(fitness_values)
        probabilities = fitness_values / total_fitness
        
        # 选择父代
        selected_indices = np.random.choice(
            len(self.population), 
            size=self.population_size - self.elitism_count, 
            p=probabilities,
            replace=True
        )
        
        return [self.population[i] for i in selected_indices]
    
    def crossover(self, parent1: np.ndarray, parent2: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """单点交叉"""
        if np.random.rand() < self.crossover_rate:
            # 随机选择交叉点
            crossover_point = np.random.randint(1, self.chromosome_length - 1)
            
            # 执行交叉
            child1 = np.concatenate([parent1[:crossover_point], parent2[crossover_point:]])
            child2 = np.concatenate([parent2[:crossover_point], parent1[crossover_point:]])
            
            return child1, child2
        else:
            return parent1.copy(), parent2.copy()
    
    def mutate(self, chromosome: np.ndarray) -> np.ndarray:
        """位翻转变异"""
        mutated_chromosome = chromosome.copy()
        
        for i in range(len(mutated_chromosome)):
            if np.random.rand() < self.mutation_rate:
                mutated_chromosome[i] = 1 - mutated_chromosome[i]  # 翻转位
        
        return mutated_chromosome
    
    def evolve(self, generations: int = 100):
        """执行进化过程"""
        for generation in range(generations):
            # 计算适应度
            fitness_values = np.array([self.calculate_fitness(ind) for ind in self.population])
            
            # 记录最佳个体
            best_idx = np.argmax(fitness_values)
            current_best_fitness = fitness_values[best_idx]
            
            if current_best_fitness > self.best_fitness:
                self.best_fitness = current_best_fitness
                self.best_individual = self.population[best_idx].copy()
            
            self.fitness_history.append(self.best_fitness)
            
            # 选择精英
            elite_indices = np.argsort(fitness_values)[-self.elitism_count:]
            elites = [self.population[i] for i in elite_indices]
            
            # 选择父代
            parents = self.select_parents(fitness_values)
            
            # 交叉和变异产生新种群
            new_population = []
            
            # 添加精英
            new_population.extend(elites)
            
            # 生成后代
            for i in range(0, len(parents), 2):
                if i + 1 < len(parents):
                    parent1, parent2 = parents[i], parents[i+1]
                    child1, child2 = self.crossover(parent1, parent2)
                    new_population.append(self.mutate(child1))
                    new_population.append(self.mutate(child2))
            
            # 确保种群大小不变
            self.population = np.array(new_population[:self.population_size])
            
            # 打印进度
            if generation % 10 == 0:
                best_x = self.decode_chromosome(self.best_individual)
                print(f"Generation {generation}: Best Fitness = {self.best_fitness:.6f}, Best x = {best_x:.6f}")
    
    def get_best_solution(self) -> Tuple[float, float]:
        """获取最佳解"""
        best_x = self.decode_chromosome(self.best_individual)
        return best_x, self.best_fitness
    
    def plot_progress(self):
        """绘制进化过程"""
        plt.figure(figsize=(10, 6))
        plt.plot(self.fitness_history)
        plt.title('Evolution of Best Fitness')
        plt.xlabel('Generation')
        plt.ylabel('Fitness')
        plt.grid(True)
        plt.show()

# 示例:寻找函数 f(x) = -x^2 + 10 的最大值
def objective_function(x):
    return -x**2 + 10

# 运行遗传算法
if __name__ == "__main__":
    # 初始化遗传算法
    ga = GeneticAlgorithm(
        objective_func=objective_function,
        population_size=50,
        chromosome_length=20,
        crossover_rate=0.9,
        mutation_rate=0.01,
        elitism_count=2,
        bounds=(-10, 10)
    )
    
    # 运行进化
    ga.evolve(generations=100)
    
    # 获取结果
    best_x, best_fitness = ga.get_best_solution()
    print(f"\n最佳解: x = {best_x:.6f}, f(x) = {best_fitness:.6f}")
    
    # 绘制进化过程
    ga.plot_progress()
    
    # 验证结果
    print(f"理论最大值在 x=0 处,f(0)=10")
    print(f"实际找到的最大值在 x={best_x:.6f} 处,f(x)={best_fitness:.6f}")

更复杂的示例:多变量优化

# 多变量优化示例:寻找函数 f(x, y) = -x^2 - y^2 + 10 的最大值

class MultiVariableGA:
    def __init__(self, 
                 objective_func: Callable,
                 num_variables: int = 2,
                 population_size: int = 100,
                 chromosome_length_per_var: int = 10,
                 crossover_rate: float = 0.8,
                 mutation_rate: float = 0.01,
                 elitism_count: int = 2,
                 bounds: List[Tuple[float, float]] = [(-10, 10), (-10, 10)]):
        
        self.objective_func = objective_func
        self.num_variables = num_variables
        self.chromosome_length_per_var = chromosome_length_per_var
        self.total_chromosome_length = num_variables * chromosome_length_per_var
        self.bounds = bounds
        
        self.ga = GeneticAlgorithm(
            objective_func=self._evaluate,
            population_size=population_size,
            chromosome_length=self.total_chromosome_length,
            crossover_rate=crossover_rate,
            mutation_rate=mutation_rate,
            elitism_count=elitism_count,
            bounds=(0, 1)  # 占位值,实际解码在_evaluate中处理
        )
    
    def _evaluate(self, chromosome: np.ndarray) -> float:
        """解码染色体并评估适应度"""
        variables = []
        for i in range(self.num_variables):
            start_idx = i * self.chromosome_length_per_var
            end_idx = start_idx + self.chromosome_length_per_var
            var_chromosome = chromosome[start_idx:end_idx]
            
            # 解码单个变量
            decimal_value = int(''.join(map(str, var_chromosome)), 2)
            min_bound, max_bound = self.bounds[i]
            value = min_bound + decimal_value * (max_bound - min_bound) / (2**self.chromosome_length_per_var - 1)
            variables.append(value)
        
        return self.objective_func(*variables)
    
    def evolve(self, generations: int = 100):
        """执行进化"""
        self.ga.evolve(generations)
    
    def get_best_solution(self):
        """获取最佳解"""
        best_chromosome = self.ga.best_individual
        variables = []
        for i in range(self.num_variables):
            start_idx = i * self.chromosome_length_per_var
            end_idx = start_idx + self.chromosome_length_per_var
            var_chromosome = best_chromosome[start_idx:end_idx]
            
            decimal_value = int(''.join(map(str, var_chromosome)), 2)
            min_bound, max_bound = self.bounds[i]
            value = min_bound + decimal_value * (max_bound - min_bound) / (2**self.chromosome_length_per_var - 1)
            variables.append(value)
        
        best_fitness = self.ga.best_fitness
        return variables, best_fitness

# 定义多变量目标函数
def multi_var_objective(x, y):
    return -x**2 - y**2 + 10

# 运行多变量遗传算法
if __name__ == "__main__":
    multi_ga = MultiVariableGA(
        objective_func=multi_var_objective,
        num_variables=2,
        population_size=100,
        chromosome_length_per_var=15,
        crossover_rate=0.85,
        mutation_rate=0.02,
        bounds=[(-5, 5), (-5, 5)]
    )
    
    multi_ga.evolve(generations=100)
    
    best_vars, best_fitness = multi_ga.get_best_solution()
    print(f"\n最佳解: x = {best_vars[0]:.6f}, y = {best_vars[1]:.6f}, f(x,y) = {best_fitness:.6f}")
    print(f"理论最大值在 (0,0) 处,f(0,0)=10")

遗传算法的关键参数调优

  1. 种群大小:太小的种群可能无法充分探索搜索空间,太大的种群会增加计算成本

  2. 交叉率:控制交叉操作的概率,通常设置在0.6-0.9之间

  3. 变异率:通常设置较低的值(0.001-0.1),以避免破坏好的解

  4. 选择策略:除了轮盘赌选择,还可以使用锦标赛选择、排名选择等

  5. 编码方式:除了二进制编码,还可以使用实数编码、排列编码等

应用领域

遗传算法广泛应用于:

  • 函数优化

  • 机器学习参数调优

  • 调度问题

  • 路径规划

  • 神经网络设计

  • 图像处理


6.蚁群算法

蚁群算法(Ant Colony Optimization, ACO)是一种模拟蚂蚁觅食行为的群体智能优化算法。蚂蚁在寻找食物源时会释放信息素,其他蚂蚁能够感知这种信息素并倾向于跟随信息素浓度较高的路径,从而形成一种正反馈机制。

  1. 信息素:蚂蚁在路径上释放的化学物质,浓度越高表示路径越优

  2. 启发式信息:反映路径的直观吸引力(如距离的倒数)

  3. 状态转移规则:蚂蚁选择下一个节点的概率公式

  4. 信息素更新:包括挥发和增强两个过程

算法步骤

  1. 初始化参数和信息素矩阵

  2. 将蚂蚁放置在起始点

  3. 每只蚂蚁根据状态转移规则构建完整路径

  4. 计算每条路径的成本(如总距离)

  5. 更新信息素矩阵

  6. 重复步骤2-5直到满足终止条件

Python完整实现

下面以解决旅行商问题(TSP)为例实现蚁群算法:

import numpy as np
import matplotlib.pyplot as plt
import random

class AntColonyOptimization:
    def __init__(self, distances, n_ants, n_best, n_iterations, decay, alpha=1, beta=1):
        """
        初始化蚁群算法参数
        
        参数:
        distances: 城市间距离矩阵
        n_ants: 蚂蚁数量
        n_best: 每轮保留的最佳路径数量
        n_iterations: 迭代次数
        decay: 信息素挥发率
        alpha: 信息素重要程度参数
        beta: 启发式信息重要程度参数
        """
        self.distances = distances
        self.pheromone = np.ones(self.distances.shape) / len(distances)
        self.all_inds = range(len(distances))
        self.n_ants = n_ants
        self.n_best = n_best
        self.n_iterations = n_iterations
        self.decay = decay
        self.alpha = alpha
        self.beta = beta
        
    def run(self):
        """
        运行蚁群算法
        """
        best_path = None
        best_distance = float('inf')
        best_paths = []
        best_distances = []
        
        for iteration in range(self.n_iterations):
            all_paths = self.gen_all_paths()
            self.spread_pheromone(all_paths, self.n_best)
            
            # 更新最佳路径
            shortest_path = min(all_paths, key=lambda x: x[1])
            if shortest_path[1] < best_distance:
                best_path = shortest_path[0]
                best_distance = shortest_path[1]
            
            best_paths.append(best_path)
            best_distances.append(best_distance)
            
            # 信息素挥发
            self.pheromone *= (1 - self.decay)
            
            if iteration % 10 == 0:
                print(f"迭代次数 {iteration}: 最佳距离 = {best_distance}")
        
        return best_path, best_distance, best_paths, best_distances
    
    def gen_path_dist(self, path):
        """
        计算路径总距离
        """
        total_dist = 0
        for i in range(len(path)):
            total_dist += self.distances[path[i-1]][path[i]]
        return total_dist
    
    def gen_all_paths(self):
        """
        所有蚂蚁生成路径
        """
        all_paths = []
        for _ in range(self.n_ants):
            path = self.gen_path(0)  # 从城市0开始
            all_paths.append((path, self.gen_path_dist(path)))
        return all_paths
    
    def gen_path(self, start):
        """
        单只蚂蚁生成路径
        """
        path = [start]
        visited = set([start])
        
        while len(visited) < len(self.distances):
            probs = self.gen_move_probs(path[-1], visited)
            next_city = self.choose_next_city(probs)
            path.append(next_city)
            visited.add(next_city)
        
        return path
    
    def gen_move_probs(self, current, visited):
        """
        生成移动到下一个城市的概率
        """
        pheromone = np.copy(self.pheromone[current])
        pheromone[list(visited)] = 0  # 已访问城市概率设为0
        
        # 计算启发式信息(距离的倒数)
        heuristic = np.zeros(len(self.distances))
        for i in range(len(heuristic)):
            if i not in visited and self.distances[current][i] != 0:
                heuristic[i] = 1 / self.distances[current][i]
        
        # 计算概率分子
        numerator = (pheromone ** self.alpha) * (heuristic ** self.beta)
        
        # 归一化概率
        if np.sum(numerator) == 0:
            # 如果所有概率都为0,随机选择未访问城市
            unvisited = [i for i in self.all_inds if i not in visited]
            prob = np.zeros(len(self.distances))
            for city in unvisited:
                prob[city] = 1
            return prob / np.sum(prob)
        
        return numerator / np.sum(numerator)
    
    def choose_next_city(self, probs):
        """
        根据概率选择下一个城市
        """
        return np.random.choice(self.all_inds, 1, p=probs)[0]
    
    def spread_pheromone(self, all_paths, n_best):
        """
        更新信息素
        """
        # 按路径长度排序
        sorted_paths = sorted(all_paths, key=lambda x: x[1])
        
        # 只对最佳路径释放信息素
        for path, dist in sorted_paths[:n_best]:
            for i in range(len(path)):
                current_city = path[i]
                next_city = path[(i + 1) % len(path)]
                # 信息素增量与路径长度成反比
                self.pheromone[current_city][next_city] += 1 / dist
                self.pheromone[next_city][current_city] += 1 / dist

# 生成示例数据:随机城市坐标
def generate_cities(n_cities):
    np.random.seed(42)
    cities = np.random.rand(n_cities, 2) * 100
    return cities

# 计算城市间距离矩阵
def calculate_distances(cities):
    n = len(cities)
    distances = np.zeros((n, n))
    for i in range(n):
        for j in range(n):
            if i != j:
                distances[i][j] = np.sqrt((cities[i][0] - cities[j][0])**2 + 
                                         (cities[i][1] - cities[j][1])**2)
    return distances

# 可视化结果
def plot_results(cities, best_path, best_distances):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # 绘制最佳路径
    ax1.plot(cities[best_path, 0], cities[best_path, 1], 'o-')
    ax1.set_xlabel('X坐标')
    ax1.set_ylabel('Y坐标')
    ax1.set_title(f'最佳路径 (总距离: {best_distances[-1]:.2f})')
    
    # 绘制收敛曲线
    ax2.plot(best_distances)
    ax2.set_xlabel('迭代次数')
    ax2.set_ylabel('最短距离')
    ax2.set_title('收敛曲线')
    ax2.grid(True)
    
    plt.tight_layout()
    plt.show()

# 主函数
def main():
    # 参数设置
    n_cities = 20
    n_ants = 30
    n_best = 5
    n_iterations = 100
    decay = 0.1
    alpha = 1
    beta = 2
    
    # 生成数据
    cities = generate_cities(n_cities)
    distances = calculate_distances(cities)
    
    # 运行蚁群算法
    aco = AntColonyOptimization(distances, n_ants, n_best, n_iterations, decay, alpha, beta)
    best_path, best_distance, best_paths, best_distances = aco.run()
    
    print(f"\n最终最佳路径: {best_path}")
    print(f"最终最短距离: {best_distance}")
    
    # 可视化结果
    plot_results(cities, best_path, best_distances)

if __name__ == "__main__":
    main()
  1. 蚂蚁数量:一般为城市数量的1-2倍

  2. 信息素挥发率(decay):0.1-0.5之间,避免过早收敛

  3. α和β参数

    • α控制信息素影响(通常1-2)

    • β控制启发式信息影响(通常2-5)

  4. 迭代次数:根据问题复杂度调整,通常100-1000次

算法优缺点

优点:

  • 强大的全局搜索能力

  • 正反馈机制促进优良解的发现

  • 易于并行化处理

  • 适用于离散优化问题

缺点:

  • 收敛速度较慢

  • 参数调节复杂

  • 容易陷入局部最优(需配合局部搜索)

应用领域

  1. 旅行商问题(TSP)

  2. 车辆路径问题(VRP)

  3. 作业车间调度

  4. 网络路由优化

  5. 数据挖掘中的聚类分析


二、预测模型

1.ARIMA模型

ARIMA(Autoregressive Integrated Moving Average),全称为自回归积分滑动平均模型。它是时间序列预测中最经典、最常用的方法之一,尤其适用于处理非平稳序列。

1. 核心思想

ARIMA模型的核心思想是:将非平稳时间序列转化为平稳时间序列,然后将因变量仅对其滞后值以及随机误差项的现值和滞后值进行回归所建立的模型。

2. 模型结构:ARIMA(p, d, q)

模型由三个部分组成,对应三个参数 (p, d, q)

  • AR(p) - 自回归部分 (Autoregressive)

    • 含义:描述当前值与历史值之间的关系。用变量自身的历史时间数据来预测自己。

    • 模型:$X_t = c + \sum_{i=1}^{p} \phi_i X_{t-i} + \epsilon_t$

    • 参数 p:自回归项的阶数,表示用过去多少期的数据来预测当前值。可以通过偏自相关函数(PACF) 图来确定。

  • I(d) - 差分部分 (Integrated)

    • 含义:将非平稳时间序列转换为平稳序列的过程。差分可以消除数据中的趋势和季节性。

    • 操作:d阶差分就是计算d次相邻观测值之间的差值。

      • 1阶差分:$X’t = X_t - X{t-1}$

      • 2阶差分:$X’’t = X’t - X’{t-1} = (X_t - X{t-1}) - (X_{t-1} - X_{t-2})$

    • 参数 d:使序列变为平稳所需的最小差分阶数。通常通过ADF检验(单位根检验) 来确定。

  • MA(q) - 移动平均部分 (Moving Average)

    • 含义:描述当前值与历史预测误差之间的关系。它捕捉的是时间序列中随机冲击的效应。

    • 模型:$X_t = \mu + \epsilon_t + \sum_{i=1}^{q} \theta_i \epsilon_{t-i}$

    • 参数 q:移动平均项的阶数,表示模型使用过去多少个预测误差。可以通过自相关函数(ACF) 图来确定。

综合模型ARIMA(p, d, q)
对原始序列进行d阶差分后,得到的新序列遵循ARMA(p, q)模型。
$\Delta^d X_t = c + \sum_{i=1}^{p} \phi_i \Delta^d X_{t-i} + \epsilon_t + \sum_{i=1}^{q} \theta_i \epsilon_{t-i}$
其中,$\Delta^d$ 是d阶差分算子。

3. 建模步骤
  1. 序列平稳化 (确定d)

    • 绘制原始序列图,观察是否有明显趋势或季节性。

    • 进行ADF单位根检验。如果p-value > 0.05,则认为序列不平稳,需要进行差分。

    • 重复差分和ADF检验,直到序列平稳,记录差分次数 d

  2. 确定p和q

    • 对平稳化后的序列绘制ACF(自相关图)PACF(偏自相关图)

    • ACF图:拖尾(逐渐衰减到0)还是截尾(快速降为0)。

    • PACF图:拖尾还是截尾。

    • 粗略判断准则

      • p的确定:看PACF图。如果PACF在滞后p阶后截尾(突然趋于0),则p可以取该值。

      • q的确定:看ACF图。如果ACF在滞后q阶后截尾,则q可以取该值。

    • 更精确的方法是使用网格搜索(Grid Search),根据AIC(Akaike Information Criterion)BIC(Bayesian Information Criterion) 准则来选择模型,AIC/BIC越小越好。

  3. 参数估计

    • 使用最大似然估计(MLE)等方法来确定AR和MA项的系数 ($\phi_i$, $\theta_i$)。

  4. 模型诊断

    • 残差分析:一个好的ARIMA模型的残差应该是一个白噪声(均值为0,方差恒定,无自相关)。

      • 绘制残差图,观察是否在0附近随机波动。

      • 对残差进行Ljung-Box检验,如果p-value很大(>0.05),则不能拒绝残差是白噪声的原假设,说明模型拟合良好。

  5. 预测

    • 使用拟合好的模型进行未来值的预测。

4.Python

我们将使用经典的AirPassengers(航空公司乘客)数据集来演示。

1. 导入必要库
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

# 统计模型
from statsmodels.tsa.stattools import adfuller # ADF检验
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf # ACF/PACF图
from statsmodels.tsa.arima.model import ARIMA # ARIMA模型

# 评估指标
from sklearn.metrics import mean_squared_error, mean_absolute_error
import warnings
warnings.filterwarnings('ignore') # 忽略警告信息
2. 加载数据并探索
# 加载数据
url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/airline-passengers.csv"
data = pd.read_csv(url, parse_dates=['Month'], index_col='Month')
data.index.freq = 'MS' # 设置频率为“月起始”(Month Start)
series = data['Passengers']

# 查看数据
print(series.head())
plt.figure(figsize=(12, 5))
plt.plot(series)
plt.title('AirPassengers Dataset')
plt.xlabel('Year')
plt.ylabel('Passengers')
plt.grid(True)
plt.show()
3. 序列平稳化 - 确定参数d
# 定义ADF检验函数
def adf_test(series):
    result = adfuller(series)
    print('ADF Statistic: %f' % result[0])
    print('p-value: %f' % result[1])
    print('Critical Values:')
    for key, value in result[4].items():
        print('\t%s: %.3f' % (key, value))
    if result[1] <= 0.05:
        print("-> p-value <= 0.05. Reject the null hypothesis. Data is stationary.")
    else:
        print("-> p-value > 0.05. Fail to reject the null hypothesis. Data is non-stationary.")

# 对原始序列进行ADF检验
print("ADF Test for Original Series:")
adf_test(series)

# 进行1阶差分并再次检验
series_diff_1 = series.diff().dropna()
print("\nADF Test after 1st Order Differencing:")
adf_test(series_diff_1)

# 绘制差分后的序列
plt.figure(figsize=(12, 5))
plt.plot(series_diff_1)
plt.title('AirPassengers After 1st Order Differencing')
plt.grid(True)
plt.show()

输出:原始序列p值很大,非平稳。1阶差分后p值可能仍然>0.05(因为还有季节性),但对于演示,我们通常取d=1。对于有强季节性的数据,需要使用SARIMA(季节性ARIMA),但这里我们先按简单ARIMA处理,取d=1。

4. 确定参数p和q
# 对平稳化后的序列(差分后)绘制ACF和PACF图
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))

# ACF图
plot_acf(series_diff_1, lags=40, ax=ax1)
ax1.set_title('Autocorrelation Function (ACF)')

# PACF图
plot_pacf(series_diff_1, lags=40, ax=ax2, method='ywm') # 使用 ywm 方法避免警告
ax2.set_title('Partial Autocorrelation Function (PACF)')

plt.tight_layout()
plt.show()

更严谨的方法是使用网格搜索确定p和q:

# 网格搜索寻找最佳p, q组合 (d已固定为1)
# 定义p, d, q参数的取值范围
p_range = range(0, 4)  # 0 to 3
q_range = range(0, 4)  # 0 to 3
d = 1

best_aic = np.inf
best_order = None

for p in p_range:
    for q in q_range:
        try:
            model = ARIMA(series, order=(p, d, q))
            results = model.fit()
            aic = results.aic
            if aic < best_aic:
                best_aic = aic
                best_order = (p, d, q)
            print(f'ARIMA({p},{d},{q}) - AIC: {aic:.2f}')
        except Exception as e:
            print(f'Error fitting ARIMA({p},{d},{q}): {e}')
            continue

print(f'\nBest Model Order: ARIMA{best_order} with AIC: {best_aic:.2f}')
5. 拟合ARIMA模型
# 使用确定的参数 (p,d,q) 拟合模型
# 这里我们使用 (2, 1, 1),请根据你的网格搜索结果修改
p, d, q = 2, 1, 1

model = ARIMA(series, order=(p, d, q))
model_fit = model.fit()

# 输出模型摘要
print(model_fit.summary())
6. 模型诊断 - 残差分析
# 绘制残差图
residuals = model_fit.resid
plt.figure(figsize=(12, 5))
plt.plot(residuals)
plt.title('Residuals from ARIMA Model')
plt.grid(True)
plt.show()

# 残差的ACF图
plot_acf(residuals, lags=40)
plt.title('ACF of Residuals')
plt.show()

# Ljung-Box检验 (查看残差是否为白噪声)
from statsmodels.stats.diagnostic import acorr_ljungbox
lb_test = acorr_ljungbox(residuals, lags=10, return_df=True)
print("Ljung-Box test results for residuals:")
print(lb_test)

# 如果所有p-value都很大(>0.05),说明残差是白噪声,模型OK
7. 预测
# 划分训练集和测试集 (最后24个月作为测试集)
split_point = len(series) - 24
train, test = series.iloc[:split_point], series.iloc[split_point:]

# 在训练集上重新拟合模型
model_train = ARIMA(train, order=(p, d, q))
model_train_fit = model_train.fit()

# 进行预测
# start: 预测开始的索引
# end: 预测结束的索引
# dynamic: False 表示使用一步预测(更准确但可能滞后)
forecast = model_train_fit.get_prediction(start=len(train), end=len(series)-1, dynamic=False)
forecast_values = forecast.predicted_mean
forecast_ci = forecast.conf_int() # 置信区间

# 计算评估指标
mse = mean_squared_error(test, forecast_values)
rmse = np.sqrt(mse)
mae = mean_absolute_error(test, forecast_values)
print(f'\nTest MSE: {mse:.2f}')
print(f'Test RMSE: {rmse:.2f}')
print(f'Test MAE: {mae:.2f}')

# 绘制结果
plt.figure(figsize=(14, 7))
plt.plot(train, label='Training Data')
plt.plot(test, label='Actual Test Data', color='blue')
plt.plot(forecast_values, color='red', label='Forecast')
# 绘制置信区间
plt.fill_between(forecast_ci.index,
                 forecast_ci.iloc[:, 0],
                 forecast_ci.iloc[:, 1], color='pink', alpha=0.3, label='95% Confidence Interval')
plt.title('ARIMA Model Forecast vs Actuals')
plt.legend()
plt.grid(True)
plt.show()
8. 预测未来(未知)数据
# 使用全部数据重新拟合最终模型
final_model = ARIMA(series, order=(p, d, q))
final_model_fit = final_model.fit()

# 预测未来12个月
forecast_steps = 12
forecast_future = final_model_fit.get_forecast(steps=forecast_steps)
forecast_future_values = forecast_future.predicted_mean
forecast_future_ci = forecast_future.conf_int()

# 创建未来时间的索引
future_index = pd.date_range(series.index[-1] + pd.DateOffset(months=1), periods=forecast_steps, freq='MS')

# 绘制历史数据和未来预测
plt.figure(figsize=(14, 7))
plt.plot(series, label='Historical Data')
plt.plot(future_index, forecast_future_values, color='green', label='Future Forecast')
plt.fill_between(future_index,
                 forecast_future_ci.iloc[:, 0],
                 forecast_future_ci.iloc[:, 1], color='lightgreen', alpha=0.3, label='95% Confidence Interval')
plt.title(f'ARIMA{best_order} Forecast for Next {forecast_steps} Months')
plt.legend()
plt.grid(True)
plt.show()

# 打印预测值
print("\nFuture Forecast Values:")
print(pd.DataFrame({'Date': future_index, 'Forecasted_Passengers': forecast_future_values.values}).round(1))

2.SARIMA模型

SARIMA(Seasonal AutoRegressive Integrated Moving Average)模型是ARIMA模型的扩展,专门用于处理带有季节性(Seasonality)成分的时间序列数据。它结合了非季节性的ARIMA模型和季节性成分的模型。

一个完整的SARIMA模型通常表示为: SARIMA(p, d, q)×(P, D, Q, s)

让我们来分解这个复杂的符号:

1. 非季节性部分 (p, d, q)

这与标准ARIMA模型完全相同:

  • p (自回归项 - AR): 表示模型中使用的自身滞后观测值的数量。它捕捉当前值与过去值之间的关系(例如:昨天的销售额可能影响今天的销售额)。

  • d (差分次数 - I): 为了使时间序列平稳化(即均值、方差不随时间变化)所需的差分次数。一阶差分即用当前值减去前一个值 (Y_t - Y_{t-1})。

  • q (移动平均项 - MA): 表示模型中使用的过去滞后预测误差的数量。它捕捉历史预测误差对当前值的影响。

2. 季节性部分 (P, D, Q, s)

这是SARIMA独有的部分,用于建模季节性模式:

  • P (季节性自回归项): 类似于p,但它使用的是季节性周期滞后的观测值。例如,对于月度数据,P=1可能会使用去年同月的值来预测今年这个月的值。

  • D (季节性差分次数): 为了使序列的季节性成分平稳化所需的季节性差分次数。季节性差分即用当前值减去上一个周期的对应值。例如,月度数据的季节性一阶差分为 Y_t - Y_{t-12}

  • Q (季节性移动平均项): 类似于q,但它使用的是季节性周期滞后的预测误差。

  • s (季节周期长度): 这是最关键的参数,定义了季节循环的长度。

    • 月度数据: s = 12

    • 季度数据: s = 4

    • 日度数据(周周期): s = 7

    • 小时数据(天周期): s = 24

模型的思想

SARIMA模型的核心思想是:一个时间序列可以分解为趋势性(由p,d,q部分建模)和季节性(由P,D,Q,s部分建模)两部分。通过差分(dD)消除不平稳的趋势和季节因素后,再用自回归(pP)和移动平均(qQ)模型来捕捉剩下的序列相关性。

Python完整代码实现

我们将使用经典的航空乘客数据集(Air Passengers)来演示,这是一个具有明显趋势和12个月季节性的月度数据。

步骤 1:导入必要的库

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from sklearn.metrics import mean_squared_error, mean_absolute_error
import warnings
warnings.filterwarnings('ignore') # 忽略警告信息,使输出更清晰

# 设置绘图风格
plt.style.use('ggplot')

步骤 2:加载和探索数据

# 加载数据
url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/airline-passengers.csv"
data = pd.read_csv(url, parse_dates=['Month'], index_col='Month')
data.index.freq = 'MS' # 非常重要!设置索引频率为“月初”(Month Start)

# 重命名列
data.columns = ['Passengers']

# 查看数据
print(data.head())
print("\n数据信息:")
print(data.info())

# 绘制时间序列图
plt.figure(figsize=(12, 5))
plt.plot(data)
plt.title('Monthly Airline Passengers (1949-1960)')
plt.xlabel('Date')
plt.ylabel('Number of Passengers')
plt.show()

步骤 3:时间序列分解

# 进行加法分解 (如果季节波动幅度不随时间变化,也可以用乘法‘multiplicative’)
result = seasonal_decompose(data['Passengers'], model='multiplicative') # 此数据更适合乘法模型

# 绘制分解结果
fig = result.plot()
fig.set_size_inches(12, 8)
plt.show()

步骤 4:平稳性检验 - ADF test

# 定义ADF检验函数
def adf_test(series):
    result = adfuller(series)
    print('ADF Statistic: %f' % result[0])
    print('p-value: %f' % result[1])
    print('Critical Values:')
    for key, value in result[4].items():
        print('\t%s: %.3f' % (key, value))
    if result[1] <= 0.05:
        print("-> 拒绝原假设,序列是平稳的。")
    else:
        print("-> 无法拒绝原假设,序列是非平稳的。")

print("原始序列的ADF检验:")
adf_test(data['Passengers'])

步骤 5:差分处理

为了使序列平稳,我们需要进行差分。通常先进行一阶常规差分消除趋势,再进行一阶季节性差分(周期s=12)消除季节性。

# 一阶常规差分
data['1st_Diff'] = data['Passengers'].diff(1)
# 季节性差分 (周期为12)
data['Seasonal_Diff'] = data['Passengers'].diff(12)

# 绘制差分后的序列
fig, axes = plt.subplots(2, 1, figsize=(12, 8))
axes[0].plot(data['1st_Diff'])
axes[0].set_title('First Order Differencing')
axes[1].plot(data['Seasonal_Diff'])
axes[1].set_title('Seasonal Differencing (s=12)')
plt.tight_layout()
plt.show()

# 对“一阶常规差分+一阶季节性差分”后的数据进行ADF检验
# 先drop掉NaN值
combined_diff = data['Passengers'].diff(1).diff(12).dropna()
print("\n‘一阶常规+一阶季节性’差分后的ADF检验:")
adf_test(combined_diff)
步骤 6:确定模型参数 (p, q, P, Q)

通过观察自相关图(ACF)偏自相关图(PACF)来确定 p, q, P, Q 的大致范围。

# 对平稳化后的序列(即双重差分后的数据)绘制ACF和PACF图
fig, axes = plt.subplots(2, 1, figsize=(12, 8))

# ACF图
plot_acf(combined_diff, lags=40, ax=axes[0])
axes[0].set_title('ACF for Stationary Series')

# PACF图
plot_pacf(combined_diff, lags=40, ax=axes[1], method='ywm') # ‘ywm’是Method的推荐值
axes[1].set_title('PACF for Stationary Series')

plt.tight_layout()
plt.show()
  • 非季节性参数 (p, q):

    • 最初几个滞后点(lag 1, 2, 3...)

    • PACF在lag 1和2后截尾(显著超出蓝色区域),提示 p 可能为 1 或 2。

    • ACF在lag 1后拖尾(逐渐衰减),提示 q 可能为 0 或 1。

  • 季节性参数 (P, Q):

    • 季节性滞后点(lag 12, 24, 36...)

    • PACF在lag 12处有一个显著的尖峰,提示 P 可能为 1。

    • ACF在lag 12处有一个显著的尖峰,提示 Q 可能为 1。

步骤 7:拟合SARIMA模型

# 定义模型参数
p, d, q = 1, 1, 1
P, D, Q, s = 1, 1, 1, 12

# 划分训练集和测试集 (例如,最后24个月作为测试集)
train_size = int(len(data) * 0.8)
train, test = data['Passengers'][:train_size], data['Passengers'][train_size:]
print(f"训练集大小: {len(train)}")
print(f"测试集大小: {len(test)}")

# 创建并拟合SARIMAX模型 (Statsmodels中使用SARIMAX)
model = SARIMAX(train,
                order=(p, d, q),         # 非季节性部分 (p, d, q)
                seasonal_order=(P, D, Q, s), # 季节性部分 (P, D, Q, s)
                enforce_stationarity=False,
                enforce_invertibility=False)

# 拟合模型
model_fit = model.fit(disp=False) # disp=False减少输出

# 输出模型总结
print(model_fit.summary())

步骤 8:模型诊断

# 绘制模型诊断图
model_fit.plot_diagnostics(figsize=(12, 8))
plt.show()

解读诊断图:

  1. 标准化残差图:应该没有明显的趋势或季节性。

  2. 残差直方图:应该近似服从正态分布(钟形曲线)。

  3. *正态Q-Q图:点应大致分布在45度线上。*

  4. 残差ACF图:没有任何自相关(所有滞后点都在置信区间内)。如果ACF图良好,说明模型已经充分捕捉了数据中的信息。

步骤 9:进行预测并评估

# 进行样本内动态预测和样本外预测
# ‘dynamic=False’意味着样本内预测使用全部历史信息
pred_dynamic = model_fit.get_prediction(start=pd.to_datetime('1958-01-01'), dynamic=False, full_results=True)
pred_dynamic_ci = pred_dynamic.conf_int() # 获取预测的置信区间

# 获取样本外预测(对未来时间的预测),这里预测测试集之后12个月
forecast_steps = len(test)
forecast_obj = model_fit.get_forecast(steps=forecast_steps)
forecast = forecast_obj.predicted_mean
forecast_ci = forecast_obj.conf_int()

# 将预测结果转换为Series,方便绘图
forecast_series = pd.Series(forecast, index=test.index)
lower_series = pd.Series(forecast_ci.iloc[:, 0], index=test.index)
upper_series = pd.Series(forecast_ci.iloc[:, 1], index=test.index)

# 绘制所有数据、预测值和置信区间
plt.figure(figsize=(12, 6))

# 绘制历史数据
plt.plot(data['Passengers'], label='Observed')

# 绘制样本外预测值
plt.plot(forecast_series, color='r', label='Forecast')

# 绘制置信区间
plt.fill_between(forecast_series.index, lower_series, upper_series, color='pink', alpha=0.3)

plt.title('SARIMA Forecast: Air Passengers')
plt.xlabel('Date')
plt.ylabel('Number of Passengers')
plt.legend(loc='upper left')
plt.show()

# 计算评估指标 (RMSE and MAE)
rmse = np.sqrt(mean_squared_error(test, forecast))
mae = mean_absolute_error(test, forecast)
print(f"测试集 RMSE: {rmse:.2f}")
print(f"测试集 MAE: {mae:.2f}")

步骤 10(可选):自动化寻找最佳参数

手动解读ACF/PACF很繁琐。我们可以使用pmdarima库的auto_arima函数自动搜索最佳参数。

# 首先需要安装 pmdarima
# pip install pmdarima
from pmdarima import auto_arima

# 使用auto_arima自动寻找最佳参数 (这会运行一段时间)
stepwise_model = auto_arima(data['Passengers'],
                            start_p=0, start_q=0,
                            max_p=3, max_q=3,
                            start_P=0, start_Q=0,
                            max_P=3, max_Q=3,
                            m=12, # 季节周期s
                            seasonal=True,
                            d=None, D=None, # 让模型自动判断d, D
                            trace=True, # 打印搜索过程
                            error_action='ignore',
                            suppress_warnings=True,
                            stepwise=True) # 使用逐步搜索算法,更快

print(stepwise_model.summary())
# 最佳模型参数会打印出来,然后你可以用这个参数重新拟合模型

总结

  1. 理解数据:可视化并检查趋势和季节性。

  2. 平稳化:通过差分(常规和季节性)使序列平稳,确定 dD

  3. 确定参数:通过ACF/PACF图初步判断 p, q, P, Q,或使用auto_arima自动搜索。

  4. 拟合模型:使用SARIMAX函数拟合模型。

  5. 模型诊断:检查残差是否为随机白噪声。

  6. 预测评估:进行预测并用量化指标(如RMSE)评估模型性能。


3.线性回归模型

线性回归是一种通过拟合自变量(X)和因变量(y)之间的线性关系来进行预测的监督学习算法。它的目标是找到一条直线(或一个超平面),使得所有数据点到这条直线的距离的平方和最小

1. 模型表示

对于最简单的一元线性回归(一个特征),模型方程表示为:

其中:

  • $y$ 是预测值(目标变量)。

  • $x$ 是特征(自变量)。

  • $w_1$ 是权重(Weight),也就是斜率。

  • $b$ 是偏置(Bias),也就是截距。

对于更一般的多元线性回归(多个特征),模型方程表示为:

我们可以用向量形式更简洁地表示。令 $\mathbf{w} = [w_1, w_2, ..., w_n]^T$, $\mathbf{x} = [x_1, x_2, ..., x_n]^T$,则:

为了简化计算,通常我们会将 $b$ 也并入权重向量 $\mathbf{w}$ 中。我们添加一个恒为 1 的特征 $x_0$,令 $w_0 = b$,那么公式就变成了:

其中 $\mathbf{w} = [w_0, w_1, w_2, ..., w_n]^T$, $\mathbf{x} = [1, x_1, x_2, ..., x_n]^T$。

3. 损失函数 (Loss Function)

如何找到最优的 $\mathbf{w}$?我们需要一个标准来衡量模型预测的好坏,这个标准就是损失函数

线性回归最常用的损失函数是均方误差 (MSE)。它是所有样本的预测值 $\hat{y}^{(i)}$ 与真实值 $y^{(i)}$ 之差的平方和的平均值。

其中 $m$ 是样本数量。

我们的目标就是找到一组参数 $\mathbf{w^*}$,使得损失函数 $J(\mathbf{w})$ 最小化

4. 求解方法:最小二乘法 (Ordinary Least Squares)

最小化 MSE 损失函数有两种主要方法:

a) 正规方程 (Normal Equation) - 闭式解
通过直接对损失函数求导并令导数为零,可以推导出一个解析解(直接计算公式):

其中:

  • $\mathbf{X}$ 是一个 $m \times (n+1)$ 的矩阵(包含了 $x_0=1$ 这一列)。

  • $\mathbf{y}$ 是一个 $m \times 1$ 的向量,包含所有真实标签。

优点:不需要选择学习率,一次计算即可得到最优解。
缺点:当特征数量 $n$ 非常大时(例如 >10000),计算矩阵的逆 $(X^TX)^{-1}$ 会非常慢,甚至不可行。

b) 梯度下降 (Gradient Descent) - 迭代解

这是一种迭代优化算法,适用于特征数量很大的情况。

核心思想:从一组随机的 $\mathbf{w}$ 开始,反复迭代,每次沿着损失函数下降最快的方向(即梯度负方向)更新 $\mathbf{w}$,逐步逼近最小值。

权重更新公式

其中 $\alpha$ 是学习率,控制每一步的步长。

对 MSE 损失函数求偏导,得到具体的更新公式:

所以对于每个 $w_j$:

优点:在特征数量很大时也能很好地工作。
缺点:需要选择学习率 $\alpha$,可能需要多次迭代才能收敛。

Python 完整实现

我们将实现两种方法:1. 正规方程2. 梯度下降

1. 导入必要的库

import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_regression
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler

2. 生成模拟数据

# 生成线性回归数据集
np.random.seed(42) # 确保结果可重现
X, y = make_regression(n_samples=100, n_features=1, noise=20, random_state=42)

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 可视化数据
plt.figure(figsize=(8, 6))
plt.scatter(X, y, alpha=0.7, c='blue', edgecolors='k', label='Data Points')
plt.title('Generated Regression Data')
plt.xlabel('Feature (X)')
plt.ylabel('Target (y)')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.7)
plt.show()

3. 方法一:使用正规方程实现

class LinearRegressionNormalEquation:
    def __init__(self):
        self.w = None # 权重参数
        self.b = None # 偏置参数(为了清晰,单独表示)

    def fit(self, X, y):
        # 为X添加一列1,用于计算偏置项b (即w0)
        X_b = np.c_[np.ones((X.shape[0], 1)), X] # [1, X]

        # 计算正规方程的解: w = (X^T * X)^(-1) * X^T * y
        # 使用np.linalg.pinv求伪逆,比inv更稳定,尤其在X^T.X接近奇异矩阵时
        self.w = np.linalg.pinv(X_b.T.dot(X_b)).dot(X_b.T).dot(y)
        
        # 将权重向量分解为偏置b和权重w
        self.b = self.w[0]
        self.w = self.w[1:]

    def predict(self, X):
        # 模型预测: y_pred = X * w + b
        return X.dot(self.w) + self.b

# 使用模型
model_ne = LinearRegressionNormalEquation()
model_ne.fit(X_train, y_train)
y_pred_ne = model_ne.predict(X_test)

# 评估模型
mse_ne = mean_squared_error(y_test, y_pred_ne)
r2_ne = r2_score(y_test, y_pred_ne)
print(f"正规方程模型 - MSE: {mse_ne:.2f}, R2: {r2_ne:.4f}")

# 可视化拟合结果
plt.figure(figsize=(8, 6))
plt.scatter(X, y, alpha=0.7, c='blue', edgecolors='k', label='Data Points')
x_line = np.linspace(X.min(), X.max(), 100).reshape(-1, 1)
y_line = model_ne.predict(x_line)
plt.plot(x_line, y_line, color='red', linewidth=3, label='Fitted Line (Normal Eq.)')
plt.title('Linear Regression using Normal Equation')
plt.xlabel('Feature (X)')
plt.ylabel('Target (y)')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.7)
plt.show()

4. 方法二:使用梯度下降实现

class LinearRegressionGradientDescent:
    def __init__(self, learning_rate=0.01, n_iters=1000):
        self.learning_rate = learning_rate
        self.n_iters = n_iters
        self.w = None
        self.b = 0
        self.loss_history = [] # 记录每次迭代的损失值

    def fit(self, X, y):
        m, n = X.shape
        # 初始化参数
        self.w = np.zeros(n)
        self.b = 0

        # 梯度下降迭代
        for i in range(self.n_iters):
            # 计算当前预测值
            y_pred = np.dot(X, self.w) + self.b
            
            # 计算误差 (y_pred - y)
            error = y_pred - y
            
            # 计算梯度
            dw = (2/m) * np.dot(X.T, error)
            db = (2/m) * np.sum(error)
            
            # 更新参数
            self.w -= self.learning_rate * dw
            self.b -= self.learning_rate * db
            
            # 记录当前迭代的损失 (MSE)
            loss = np.mean(error ** 2)
            self.loss_history.append(loss)
            
            # 可选:每100次迭代打印一次损失
            # if i % 100 == 0:
            #     print(f"Iteration {i}, Loss: {loss:.4f}")

    def predict(self, X):
        return np.dot(X, self.w) + self.b

# 注意:梯度下降对特征缩放敏感,最好先进行标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 使用模型
model_gd = LinearRegressionGradientDescent(learning_rate=0.1, n_iters=1000)
model_gd.fit(X_train_scaled, y_train)
y_pred_gd = model_gd.predict(X_test_scaled)

# 评估模型
mse_gd = mean_squared_error(y_test, y_pred_gd)
r2_gd = r2_score(y_test, y_pred_gd)
print(f"梯度下降模型 - MSE: {mse_gd:.2f}, R2: {r2_gd:.4f}")

# 可视化损失下降曲线
plt.figure(figsize=(8, 5))
plt.plot(range(model_gd.n_iters), model_gd.loss_history)
plt.title('Gradient Descent - Loss over Iterations')
plt.xlabel('Iterations')
plt.ylabel('Mean Squared Error Loss')
plt.grid(True, linestyle='--', alpha=0.7)
plt.show()

# 可视化拟合结果 (需要将标准化的X转换回原始尺度进行绘图)
plt.figure(figsize=(8, 6))
plt.scatter(X, y, alpha=0.7, c='blue', edgecolors='k', label='Data Points')
x_line = np.linspace(X.min(), X.max(), 100).reshape(-1, 1)
x_line_scaled = scaler.transform(x_line) # 将绘图用的X也标准化
y_line = model_gd.predict(x_line_scaled)
plt.plot(x_line, y_line, color='green', linewidth=3, label='Fitted Line (Gradient Descent)')
plt.title('Linear Regression using Gradient Descent')
plt.xlabel('Feature (X)')
plt.ylabel('Target (y)')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.7)
plt.show()

5. 与 Scikit-Learn 的实现进行对比

from sklearn.linear_model import LinearRegression

# 使用Scikit-Learn的线性回归(默认使用正规方程)
model_sklearn = LinearRegression()
model_sklearn.fit(X_train, y_train)
y_pred_sk = model_sklearn.predict(X_test)

# 评估模型
mse_sk = mean_squared_error(y_test, y_pred_sk)
r2_sk = r2_score(y_test, y_pred_sk)
print(f"Scikit-Learn模型 - MSE: {mse_sk:.2f}, R2: {r2_sk:.4f}")

# 比较我们实现的模型和Scikit-Learn的系数
print("\n系数比较:")
print(f"正规方程实现: w = {model_ne.w[0]:.4f}, b = {model_ne.b:.4f}")
print(f"Scikit-Learn实现: w = {model_sklearn.coef_[0]:.4f}, b = {model_sklearn.intercept_:.4f}")

总结与关键点

  1. 核心概念:线性回归通过最小化预测值与真实值之间的均方误差(MSE) 来找到最佳拟合直线。

  2. 两种解法

    • 正规方程:直接、精确,适合特征数较少(<10000)的数据集。

    • 梯度下降:迭代、通用,适合特征数非常多或数据集巨大的情况,但需要调整学习率迭代次数

  3. 特征缩放:在使用梯度下降法时,对特征进行标准化(Standardization)或归一化(Normalization)至关重要,这可以大大加快收敛速度。正规方程则不需要。

  4. 评估指标:常用 MSE(均方误差)R² Score(决定系数) 来评估模型性能。R² 越接近 1,说明模型拟合得越好。

  5. 我们的实现 vs Scikit-Learn:你应该会看到我们自己实现的模型与 Scikit-Learn 的结果几乎完全相同,这验证了我们代码的正确性。



网站公告

今日签到

点亮在社区的每一天
去签到