Day58
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.seasonal import seasonal_decompose
from sklearn.metrics import mean_squared_error, mean_absolute_error
import warnings
warnings.filterwarnings('ignore')
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei'] # 指定默认字体
plt.rcParams['axes.unicode_minus'] = False # 解决保存图像时负号'-'显示为方块的问题
# 1. 加载太阳黑子数据集
def load_sunspots_data():
"""加载太阳黑子数据集并返回时间序列"""
# 使用statsmodels内置的太阳黑子数据集
data = sm.datasets.sunspots.load_pandas()
df = data.data
# 设置年份为索引
df['YEAR'] = pd.to_datetime(df['YEAR'], format='%Y')
df.set_index('YEAR', inplace=True)
return df
# 2. 数据可视化
def visualize_data(series):
"""可视化原始时间序列、分解图、ACF和PACF"""
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 原始时间序列
axes[0, 0].plot(series)
axes[0, 0].set_title('原始太阳黑子序列')
# 分解图
decomposition = seasonal_decompose(series, model='additive', period=11) # 太阳黑子周期约为11年
decomposition.plot()
plt.tight_layout()
# ACF图
plot_acf(series, lags=40, ax=axes[0, 1])
axes[0, 1].set_title('原始序列的ACF')
# PACF图
plot_pacf(series, lags=40, ax=axes[1, 0])
axes[1, 0].set_title('原始序列的PACF')
plt.tight_layout()
plt.show()
# 打印基本统计信息
print(f"数据基本统计信息:")
print(series.describe())
# 3. 平稳性检验
def test_stationarity(series, title='原始序列'):
"""进行ADF检验并输出结果"""
print(f"\n{title}的ADF平稳性检验:")
result = adfuller(series)
print(f'ADF统计量: {result[0]:.4f}')
print(f'p值: {result[1]:.4f}')
print('临界值:')
for key, value in result[4].items():
print(f' {key}: {value:.4f}')
if result[1] <= 0.05:
print("序列平稳(拒绝原假设,p值 <= 0.05)")
return True
else:
print("序列非平稳(不能拒绝原假设,p值 > 0.05)")
return False
# 4. 确定差分阶数d
def determine_differencing(series):
"""确定差分阶数d"""
# 检验原始序列
is_stationary = test_stationarity(series, '原始序列')
d = 0
if not is_stationary:
# 一阶差分
diff1 = series.diff().dropna()
is_stationary = test_stationarity(diff1, '一阶差分序列')
d = 1
if not is_stationary:
# 二阶差分
diff2 = diff1.diff().dropna()
is_stationary = test_stationarity(diff2, '二阶差分序列')
d = 2
print(f"\n确定差分阶数 d = {d}")
# 可视化差分后的序列及其ACF、PACF
if d > 0:
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
if d == 1:
axes[0].plot(diff1)
axes[0].set_title('一阶差分序列')
plot_acf(diff1, lags=40, ax=axes[1])
axes[1].set_title('一阶差分序列的ACF')
else:
axes[0].plot(diff2)
axes[0].set_title('二阶差分序列')
plot_acf(diff2, lags=40, ax=axes[1])
axes[1].set_title('二阶差分序列的ACF')
plt.tight_layout()
plt.show()
return d
# 5. 确定ARIMA的p和q参数
def determine_p_q(series, d):
"""基于差分后的序列确定p和q参数"""
# 对序列进行差分
if d == 0:
diff_series = series
elif d == 1:
diff_series = series.diff().dropna()
else:
diff_series = series.diff().diff().dropna()
# 绘制ACF和PACF图
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
plot_acf(diff_series, lags=40, ax=axes[0])
axes[0].set_title('差分序列的ACF')
plot_pacf(diff_series, lags=40, ax=axes[1])
axes[1].set_title('差分序列的PACF')
plt.tight_layout()
plt.show()
# 根据ACF和PACF图手动确定p和q
print("\n请根据ACF和PACF图确定p和q的值:")
print(" - PACF图中显著截断的滞后阶数为p")
print(" - ACF图中显著截断的滞后阶数为q")
# 这里使用自动化方法(AIC最小化)作为备选
best_aic = float('inf')
best_p = 0
best_q = 0
# 设置p和q的最大搜索范围
max_p = 5
max_q = 5
print("\n正在搜索最优(p,q)参数组合...")
for p in range(0, max_p + 1):
for q in range(0, max_q + 1):
try:
model = ARIMA(series, order=(p, d, q))
results = model.fit()
if results.aic < best_aic:
best_aic = results.aic
best_p = p
best_q = q
print(f'ARIMA({p},{d},{q}) - AIC: {results.aic:.2f}')
except:
continue
print(f"\nAIC最优参数: p = {best_p}, q = {best_q} (AIC = {best_aic:.2f})")
# 允许用户手动选择或使用AIC推荐值
use_auto = input("\n是否使用AIC推荐的参数?(y/n): ").lower().strip()
if use_auto == 'y':
p, q = best_p, best_q
else:
p = int(input("请输入p值: "))
q = int(input("请输入q值: "))
print(f"\n确定最终参数: p = {p}, q = {q}")
return p, q
# 6. 构建并训练ARIMA模型
def build_arima_model(series, p, d, q):
"""构建并训练ARIMA模型"""
print(f"\n构建ARIMA模型: ARIMA({p}, {d}, {q})")
model = ARIMA(series, order=(p, d, q))
model_fit = model.fit()
print("\n模型摘要:")
print(model_fit.summary())
# 绘制残差图
residuals = pd.DataFrame(model_fit.resid)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 残差时序图
axes[0].plot(residuals)
axes[0].set_title('模型残差')
# 残差直方图
residuals.hist(ax=axes[1])
axes[1].set_title('残差直方图')
plt.tight_layout()
plt.show()
# 残差的ACF和PACF图
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
plot_acf(residuals, lags=40, ax=axes[0])
axes[0].set_title('残差的ACF')
plot_pacf(residuals, lags=40, ax=axes[1])
axes[1].set_title('残差的PACF')
plt.tight_layout()
plt.show()
# Ljung-Box检验(残差白噪声检验)
lb_test = sm.stats.acorr_ljungbox(residuals, lags=[10], return_df=True)
print("\nLjung-Box检验(残差白噪声检验):")
print(lb_test)
# 正态性检验
print("\n残差正态性检验:")
print(f'偏度: {residuals.skew()[0]:.4f}')
print(f'峰度: {residuals.kurt()[0]:.4f}')
return model_fit
# 7. 模型预测
def make_predictions(model_fit, series, steps=30):
"""进行模型预测并可视化结果"""
# 获取预测结果
forecast = model_fit.get_forecast(steps=steps)
forecast_mean = forecast.predicted_mean
forecast_ci = forecast.conf_int(alpha=0.05) # 95%置信区间
# 生成预测日期
last_date = series.index[-1]
forecast_dates = pd.date_range(start=last_date + pd.DateOffset(years=1), periods=steps, freq='AS')
# 可视化预测结果
plt.figure(figsize=(14, 7))
# 绘制历史数据
plt.plot(series.index, series, label='历史数据')
# 绘制预测数据
plt.plot(forecast_dates, forecast_mean, 'r--', label='预测值')
plt.fill_between(forecast_dates, forecast_ci.iloc[:, 0], forecast_ci.iloc[:, 1], color='r', alpha=0.1, label='95%置信区间')
plt.title('太阳黑子数量预测')
plt.xlabel('年份')
plt.ylabel('太阳黑子数量')
plt.legend()
plt.grid(True)
plt.show()
# 打印预测结果
print("\n未来30年太阳黑子数量预测:")
forecast_df = pd.DataFrame({
'年份': forecast_dates.year,
'预测值': forecast_mean.values,
'下限': forecast_ci.iloc[:, 0].values,
'上限': forecast_ci.iloc[:, 1].values
})
print(forecast_df.round(2))
return forecast_df
# 主函数
def main():
print("===== 太阳黑子数据集ARIMA分析 =====")
# 1. 加载数据
df = load_sunspots_data()
series = df['SUNACTIVITY']
# 2. 数据可视化
visualize_data(series)
# 3. 确定差分阶数d
d = determine_differencing(series)
# 4. 确定p和q参数
p, q = determine_p_q(series, d)
# 5. 构建并训练ARIMA模型
model_fit = build_arima_model(series, p, d, q)
# 6. 模型预测
forecast = make_predictions(model_fit, series, steps=30)
print("\n===== ARIMA分析完成 =====")
if __name__ == "__main__":
main()