基于Transformer的多资产收益预测模型实战(附PyTorch实现与避坑指南)

发布于:2025-05-09 ⋅ 阅读:(21) ⋅ 点赞:(0)

基于Transformer的多资产收益预测模型实战(附PyTorch模型训练及可视化完整代码)


一、项目背景与目标

在量化投资领域,利用时间序列数据预测资产收益是核心任务之一。传统方法如LSTM难以捕捉资产间的复杂依赖关系,而Transformer架构通过自注意力机制能有效建模多资产间的联动效应。
本文将从零开始构建一个基于PyTorch的多资产收益预测模型,涵盖数据生成、特征工程、模型设计、训练及可视化全流程,适合深度学习与量化投资的初学者入门。

二、核心技术栈

  • 数据处理:Pandas/Numpy(数据生成与预处理)
  • 深度学习框架:PyTorch(模型构建与训练)
  • 可视化:Matplotlib(结果分析)
  • 核心算法:Transformer(自注意力机制)

三、数据生成与预处理

1. 模拟金融数据生成

我们通过以下步骤生成包含5只资产的时间序列数据:

  • 市场基准因子:模拟市场整体趋势(几何布朗运动)
  • 行业因子:引入周期性波动区分不同行业(如科技、消费、能源)
  • 特质因子:每只资产的独立噪声
def generate_market_data(days=2000, n_assets=5):  
    np.random.seed(42)  
    market = np.cumprod(1 + np.random.normal(0.0003, 0.015, days))  # 市场基准  
    assets = []  
    sector_map = {0: "Tech", 1: "Tech", 2: "Consume", 3: "Consume", 4: "Energy"}  
    for i in range(n_assets):  
        sector_factor = 0.3 * np.sin(i * 0.8 + np.linspace(0, 10 * np.pi, days))  # 行业周期因子  
        idiosyncratic = np.cumprod(1 + np.random.normal(0.0002, 0.02, days))  # 特质因子  
        price = market * (1 + sector_factor) * idiosyncratic  # 价格合成  
        assets.append(price)  
    dates = pd.date_range("2015-01-01", periods=days)  
    return pd.DataFrame(np.array(assets).T, index=dates, columns=[f"Asset_{i}" for i in range(n_assets)])  

2. 数据形状说明

生成的DataFrame形状为[2000天, 5资产],索引为时间戳,列名为Asset_0到Asset_4。

四、特征工程:从价格到可训练数据

1. 基础时间序列特征

为每只资产计算以下特征:

  • 收益率(Return):相邻日价格变化率
  • 波动率(Volatility):20日滚动标准差年化
  • 移动平均(MA10):10日价格移动平均
  • 行业相对强弱(Sector_RS):资产价格与所属行业平均价格的比值
def create_features(data, lookback=60):  
    n_assets = data.shape[1]  
    sector_map = {0: "Tech", 1: "Tech", 2: "Consume", 3: "Consume", 4: "Energy"}  
    features = []  
    for i, asset in enumerate(data.columns):  
        df = pd.DataFrame()  
        df["Return"] = data[asset].pct_change()  
        df["Volatility"] = df["Return"].rolling(20).std() * np.sqrt(252)  # 年化波动率  
        df["MA10"] = data[asset].rolling(10).mean()  
        # 计算行业相对强弱  
        sector = sector_map[i]  
        sector_cols = [col for col in data.columns if sector_map[int(col.split("_")[1])] == sector]  
        df["Sector_RS"] = data[asset] / data[sector_cols].mean(axis=1)  
        features.append(df.dropna())  # 去除NaN  
    # 对齐时间索引  
    common_idx = features[0].index  
    for df in features[1:]:  
        common_idx = common_idx.intersection(df.index)  
    features = [df.loc[common_idx] for df in features]  
    # 构建3D特征张量 [样本数, 时间步, 资产数, 特征数]  
    X = np.stack([np.stack([feat.iloc[i-lookback:i] for i in range(lookback, len(feat))], axis=0) for feat in features], axis=2)  
    # 标签:未来5日平均收益率  
    y = np.array([data.loc[common_idx].iloc[i:i+5].pct_change().mean().values for i in range(lookback, len(common_idx))])  
    return X, y  

2. 输入输出形状

  • 特征张量X形状:[样本数, 时间步(60), 资产数(5), 特征数(4)]
  • 标签y形状:[样本数, 资产数(5)](每个样本对应5只资产的未来5日平均收益率)

五、Transformer模型构建:核心架构解析

1. 模型设计目标

  • 处理多资产时间序列:同时输入5只资产的历史数据
  • 捕捉时间依赖资产间依赖:通过位置编码和自注意力机制
  • 输出多资产收益预测:回归问题,使用MSE损失

2. 关键组件解析

(1)资产嵌入层(Asset Embedding)

将每个资产的4维特征映射到64维隐空间:

self.asset_embed = nn.Linear(n_features=4, d_model=64)  

输入形状:(batch, seq_len, assets, features) → 输出:(batch, seq_len, assets, d_model)

(2)位置编码(Positional Embedding)

由于Transformer无内置时序信息,需手动添加位置编码:

self.time_pos = nn.Parameter(torch.randn(1, lookback=60, 1, d_model=64))  # 时间位置编码  
self.asset_pos = nn.Parameter(torch.randn(1, 1, n_assets=5, d_model=64))  # 资产位置编码  
  • 通过广播机制与资产嵌入相加,分别捕获时间和资产维度的位置信息。
(3)自定义Transformer编码器层(Custom Transformer Encoder Layer)

继承PyTorch原生层,返回注意力权重以可视化:

class CustomTransformerEncoderLayer(nn.TransformerEncoderLayer):  
    def __init__(self, d_model, nhead, dim_feedforward=256, dropout=0.1):  
        super().__init__(d_model, nhead, dim_feedforward, dropout, batch_first=True)  # 显式启用batch_first  
    def forward(self, src, src_mask=None, src_key_padding_mask=None):  
        src2, attn_weights = self.self_attn(src, src, src, need_weights=True)  # 获取注意力权重  
        src = src + self.dropout1(src2)  
        src = self.norm1(src)  
        src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))  
        src = src + self.dropout2(src2)  
        src = self.norm2(src)  
        return src, attn_weights  
(4)维度调整核心逻辑

在进入Encoder前,将张量形状从(batch, seq, assets, d_model)调整为(batch, seq*assets, d_model),以便Encoder处理:

x = x + self.time_pos + self.asset_pos  # 叠加位置编码  
x = x.permute(0, 1, 3, 2)  # 调整维度为 [batch, seq, d_model, assets]  
x = x.reshape(batch_size, seq_len * n_assets, d_model)  # 合并资产与序列维度  
x, attn_weights = self.encoder(x)  # Encoder输出形状:(batch, seq*assets, d_model)  
x = x.reshape(batch_size, seq_len, n_assets, d_model)  # 恢复维度  
(5)解码器(Decoder)

提取最后时间步特征,拼接后映射到5维收益空间:

self.decoder = nn.Linear(d_model * n_assets, n_assets)  # 输入320维,输出5维  

六、模型训练:从数据加载到优化

1. 数据加载器

使用PyTorch的DataLoader处理批量数据:

train_dataset = TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train))  
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)  

2. 训练配置

  • 损失函数:均方误差(MSE)
  • 优化器:Adam(学习率1e-4
  • 训练循环:50个epoch,记录训练/验证损失
model = AssetTransformer(n_features=4, n_assets=5, lookback=60)  
criterion = nn.MSELoss()  
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)  

七、可视化与结果分析

1. 自注意力矩阵热力图(Cross-Asset Attention Matrix)

提取第一层编码器的注意力权重,展示资产间的依赖关系(以最后时间步为例):

def plot_attention(attention_weights, asset_names):  
    num_assets = len(asset_names)  
    # 取第一个样本、第一个注意力头的权重,形状为 [batch, heads, query, key]  
    attn_matrix = attention_weights[0][0].detach().numpy()  # 假设batch=1  
    attn_matrix = attn_matrix.reshape(num_assets, num_assets)  # 恢复资产间注意力矩阵  
    plt.imshow(attn_matrix, cmap="viridis", aspect="auto")  
    plt.colorbar()  
    plt.title("Cross-Asset Attention Matrix (First Layer)")  
    plt.xlabel("Key Assets")  
    plt.ylabel("Query Assets")  
    plt.xticks(range(num_assets), asset_names)  
    plt.yticks(range(num_assets), asset_names)  
    plt.tight_layout()  
    plt.show()  

在这里插入图片描述

核心作用:可视化Transformer编码器中资产间的依赖关系,揭示模型如何通过自注意力机制捕捉不同资产的联动效应。

(1)数据提取逻辑
  • 注意力权重形状:通过自定义编码器层获取的注意力权重形状为 [batch_size, n_heads, query_length, key_length],其中 query_lengthkey_length 均为 时间步×资产数(本例中为 60×5=300)。
  • 关键筛选:聚焦最后一个时间步(当前预测时刻)的资产间注意力,提取最后 n_assets 个查询和键的权重,重塑为 [资产数, 资产数] 矩阵(本例中为 5×5),消除时间维度的干扰。
(2)图形元素解析
  • 横轴/纵轴:均为资产名称(Asset_0 到 Asset_4),横轴表示“键资产”(Key),纵轴表示“查询资产”(Query)。例如,纵轴Asset_0对应横轴Asset_1的单元格值,表示“资产0在计算时对资产1的关注度”。
  • 颜色映射:使用viridis色卡,深色(如紫色/蓝色)代表高注意力权重(接近1),浅色(如黄色/白色)代表低权重(接近0)。
  • 行业分组验证
    • 同一行业资产(如Tech行业的Asset_0和Asset_1)的交叉区域颜色应更深,表明模型关注行业内协同效应。
    • 跨行业资产(如Tech与Energy)的权重可能较低,颜色较浅,符合数据生成时的行业因子设计(行业间波动相关性较低)。
(3)金融意义
  • 资产联动捕捉:高权重表示模型认为这两项资产的历史特征对预测当前收益更重要,可能用于构建资产组合时的相关性分析。
  • 注意力异常排查:若某资产对自身的注意力权重显著高于其他资产(对角线元素突出),可能意味着模型过度依赖单一资产,存在过拟合风险。

2. 预测结果散点图(Predicted vs Actual Returns)

散点图对比实际收益与预测收益,理想情况下点分布在y=x直线附近:

plt.figure(figsize=(12, 6))  
for i in range(5):  
    plt.scatter(y_test[:, i], pred_test[:, i], alpha=0.5, label=f"Asset_{i}")  
plt.plot([-0.1, 0.1], [-0.1, 0.1], "k--", label="Perfect Prediction")  
plt.title("Predicted vs Actual 5-Day Returns")  
plt.xlabel("Actual Returns")  
plt.ylabel("Predicted Returns")  
plt.legend()  
plt.grid(alpha=0.3)  
plt.show()  

在这里插入图片描述

核心作用:评估模型对每只资产未来5日平均收益率的预测精度,直观展示预测值与实际值的偏离程度。

(1)图形结构
  • 横轴(X轴):实际收益率(Actual 5-Day Returns),范围约 [-0.1, 0.1](覆盖多数金融资产的短期波动区间)。
  • 纵轴(Y轴):预测收益率(Predicted Returns),与横轴范围一致,便于对比。
  • 散点分布
    • 每个资产(5只)用不同颜色区分(如Asset_0为蓝色,Asset_1为橙色),标签清晰标注。
    • 理想情况下,散点应紧密分布在黑色虚线 y=x 附近,表明预测值与实际值接近;散点越偏离虚线,预测误差越大。
(2)量化指标辅助解读
  • MSE损失:训练日志中显示的Test Loss(如0.0006)对应散点的整体离散程度,值越小,散点越集中。
  • 资产差异:若某资产(如Asset_4)的散点明显偏离对角线,可能是该资产的特质因子噪声较大,或模型对其行业特征的捕捉不足。
(3)实战意义
  • 预测可靠性判断:若多数散点位于对角线附近,且各资产分布均匀,说明模型泛化能力较强,可用于实际收益预测;反之,需优化特征工程或调整模型结构。

3. 策略回测累计收益曲线(Long-Short Portfolio Performance)

通过预测结果构建多空策略(买入前20%预测收益资产,卖空后20%),对比策略与市场收益:

def backtest_strategy(pred_returns, data, lookback=60):  
    long_thresh = np.quantile(pred_returns, 0.8, axis=1)[:, None]  
    short_thresh = np.quantile(pred_returns, 0.2, axis=1)[:, None]  
    long_mask = pred_returns >= long_thresh  
    short_mask = pred_returns <= short_thresh  
    # 标准化仓位(多头/空头资产等权分配)  
    position = (long_mask / long_mask.sum(axis=1)[:, None]) - (short_mask / short_mask.sum(axis=1)[:, None])  
    # 计算收益  
    returns = data.pct_change().iloc[lookback+1:lookback+1+len(position)]  
    strategy_ret = (position[:-1] * returns.iloc[1:]).sum(axis=1)  
    market_ret = returns.mean(axis=1)  
    return strategy_ret.cumsum(), market_ret.cumsum()  

在这里插入图片描述
核心作用:验证基于预测结果的多空策略能否获取超额收益,对比策略表现与市场基准。

(1)策略构建逻辑
  • 多空筛选
    • 多头:预测收益前20%的资产(高于80%分位数),等权买入。
    • 空头:预测收益后20%的资产(低于20%分位数),等权卖空。
  • 仓位管理:多头和空头仓位分别标准化(权重和为1),确保风险中性。
(2)曲线元素解析
  • 蓝色曲线(Transformer Strategy)
    • 若曲线向上倾斜且斜率大于市场曲线,表明策略有效,能通过多空操作获取超额收益。
    • 若曲线波动较大,可能受限于模拟数据的高噪声,或需增加风险控制(如止损)。
  • 黄色曲线(Market Index)
    • 市场平均收益(所有资产等权平均),作为基准线。若策略曲线长期位于其上方,说明模型具备实际应用价值。
(3)金融指标延伸
  • 夏普比率:可进一步计算策略的风险调整后收益(假设无风险利率为0),公式为 (策略年化收益) / (策略收益标准差),值越高表明风险收益比越好。
  • 最大回撤:曲线中的回调幅度,反映策略的抗风险能力,与累计收益结合评估策略稳定性。

4. 可视化总结表

图形类型 核心价值 理想结果特征 常见异常信号
注意力热力图 资产间依赖关系建模验证 同行业资产权重高,跨行业权重低 对角线权重异常高(过拟合)
预测散点图 模型预测精度评估 散点紧密分布于y=x附近,MSE低 某资产散点显著偏离(特征不足)
回测收益曲线 策略有效性验证 策略曲线持续跑赢市场,夏普比率>1 曲线长期低于市场(模型失效)

通过这三类图形,可从模型机理(注意力)、预测能力(散点)、实战价值(回测)三个维度全面评估Transformer在多资产收益预测中的表现,为后续优化提供明确方向。

八、常见错误与解决方案

1. Transformer输入维度不匹配

错误信息TypeError: CustomTransformerEncoderLayer.forward() got an unexpected keyword argument 'is_causal'
原因:未正确处理PyTorch Transformer的隐含参数。
解决:在自定义编码器层的forward方法中添加is_causal=False默认参数,兼容框架逻辑。

2. 注意力权重形状错误

错误信息ValueError: cannot reshape array of size 300 into shape (5,5)
原因:未正确提取最后时间步的注意力权重,误将全序列权重直接重塑。
解决:先筛选最后时间步的查询/键资产权重,再重塑为[资产数, 资产数]矩阵:

# 正确提取最后时间步(假设序列长度60,资产数5)  
last_step_attn = attn_matrix[-5:, -5:]  # 取最后5个查询(资产)对最后5个键(资产)的权重  

3. 位置编码广播失败

错误信息RuntimeError: The size of tensor a (60) must match the size of tensor b (1) at non-singleton dimension 1
解决:确保位置编码维度包含资产轴(如[1, lookback, 1, d_model]),通过广播自动适配资产数。

九、项目总结与扩展方向

1. 项目亮点

  • 多维度建模:同时捕捉时间序列依赖(位置编码)和资产间依赖(自注意力)。
  • 完整量化流程:从数据生成到策略回测,复现真实量化研究闭环。
  • 维度调整详解:通过view()/reshape()处理复杂张量变换,解决PyTorch常见形状问题。

2. 改进方向

  • 数据增强:添加滑动窗口、噪声注入等技术提升模型鲁棒性。
  • 混合架构:结合CNN提取局部特征,或LSTM捕捉长期趋势。
  • 真实场景适配:接入股票/期货高频数据,优化数据预处理流程(如复权处理)。

十、给初学者的建议

  1. 分步调试:先用小数据集(如days=100, n_assets=2)验证代码逻辑,再扩展规模。
  2. 维度打印:在模型各关键节点添加print(x.shape),确保输入输出形状符合预期。
  3. 官方文档优先:PyTorch Transformer文档是理解注意力机制的最佳材料,重点关注batch_first参数和输入维度要求。

通过本项目,读者可掌握多变量时间序列建模的核心思路,理解Transformer在序列建模中的优势,为后续量化模型开发打下坚实基础。

完整代码

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import os

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"


# ==================== 数据生成与预处理 ====================
def generate_market_data(days=2000, n_assets=5):
    np.random.seed(42)
    market = np.cumprod(1 + np.random.normal(0.0003, 0.015, days))

    assets = []
    for i in range(n_assets):
        sector_factor = 0.3 * np.sin(i * 0.8 + np.linspace(0, 10 * np.pi, days))
        idiosyncratic = np.cumprod(1 + np.random.normal(0.0002, 0.02, days))
        price = market * (1 + sector_factor) * idiosyncratic
        assets.append(price)

    dates = pd.date_range("2015-01-01", periods=days)
    return pd.DataFrame(
        np.array(assets).T, index=dates, columns=[f"Asset_{i}" for i in range(n_assets)]
    )


data = generate_market_data()


# ==================== 特征工程 ====================
def create_features(data, lookback=60):
    n_assets = data.shape[1]
    features = []
    sector_map = {0: "Tech", 1: "Tech", 2: "Consume", 3: "Consume", 4: "Energy"}

    for i, asset in enumerate(data.columns):
        df = pd.DataFrame()
        df["Return"] = data[asset].pct_change()
        df["Volatility"] = df["Return"].rolling(20).std() * np.sqrt(252)
        df["MA10"] = data[asset].rolling(10).mean()

        sector = sector_map[i]
        sector_cols = [
            col for col in data.columns if sector_map[int(col.split("_")[1])] == sector
        ]
        df["Sector_RS"] = data[asset] / data[sector_cols].mean(axis=1)
        features.append(df.dropna())

    common_idx = features[0].index
    for df in features[1:]:
        common_idx = common_idx.intersection(df.index)
    features = [df.loc[common_idx] for df in features]

    X = np.stack(
        [
            np.stack(
                [feat.iloc[i - lookback : i] for i in range(lookback, len(feat))],
                axis=0,
            )
            for feat in features
        ],
        axis=2,
    )

    y = np.array(
        [
            data.loc[common_idx].iloc[i : i + 5].pct_change().mean().values
            for i in range(lookback, len(common_idx))
        ]
    )

    valid_indices = ~np.isnan(y).any(axis=1) & ~np.isinf(y).any(axis=1)
    return X[valid_indices], y[valid_indices]


X, y = create_features(data)
print(f"数据形状: X={X.shape}, y={y.shape}")


# ==================== 模型定义 ====================
class CustomTransformerEncoderLayer(nn.TransformerEncoderLayer):
    """自定义编码器层以返回注意力权重"""

    def __init__(
        self, d_model, nhead, dim_feedforward=256, dropout=0.1, activation="relu"
    ):
        super().__init__(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            activation=activation,
            batch_first=True,
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, src, src_mask=None, src_key_padding_mask=None):
        src2, attn_weights = self.self_attn(
            src,
            src,
            src,
            attn_mask=src_mask,
            key_padding_mask=src_key_padding_mask,
            need_weights=True,
        )
        src = src + self.dropout1(src2)
        src = self.norm1(src)

        src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
        src = src + self.dropout2(src2)
        src = self.norm2(src)
        return src, attn_weights


class AssetTransformer(nn.Module):
    def __init__(self, n_features, n_assets, lookback, d_model=64, nhead=4):
        super().__init__()
        self.asset_embed = nn.Linear(n_features, d_model)
        self.time_pos = nn.Parameter(torch.randn(1, lookback, 1, d_model))
        self.asset_pos = nn.Parameter(torch.randn(1, 1, n_assets, d_model))
        self.final_norm = nn.LayerNorm(d_model)

        encoder_layer = CustomTransformerEncoderLayer(
            d_model=d_model, nhead=nhead, dim_feedforward=256
        )
        self.encoder = nn.TransformerEncoder(
            encoder_layer, num_layers=3, norm=nn.LayerNorm(d_model)
        )
        self.decoder = nn.Linear(d_model * n_assets, n_assets)
        self.attention_weights = None

    def forward(self, x):
        batch_size = x.size(0)
        seq_len = x.size(1)
        n_assets = x.size(2)

        x = self.asset_embed(x)
        x = x + self.time_pos + self.asset_pos

        x = x.view(batch_size, seq_len * n_assets, -1)

        attn_weights_list = []
        for layer in self.encoder.layers:
            x, attn_weights = layer(x)
            attn_weights_list.append(attn_weights)
        self.attention_weights = attn_weights_list

        x = x.view(batch_size, n_assets, seq_len, -1)
        x = x[:, :, -1, :]
        x = x.reshape(batch_size, -1)
        return self.decoder(x)


# ==================== 模型训练 ====================
def train_model(X, y, lookback=60, n_epochs=50):
    split = int(0.8 * len(X))
    X_train, X_test = X[:split], X[split:]
    y_train, y_test = y[:split], y[split:]

    train_dataset = TensorDataset(
        torch.FloatTensor(X_train), torch.FloatTensor(y_train)
    )
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

    model = AssetTransformer(n_features=4, n_assets=5, lookback=lookback)
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

    for epoch in range(n_epochs):
        model.train()
        total_loss = 0
        for batch_x, batch_y in train_loader:
            optimizer.zero_grad()
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        model.eval()
        with torch.no_grad():
            test_pred = model(torch.FloatTensor(X_test))
            test_loss = criterion(test_pred, torch.FloatTensor(y_test))
            print(
                f"Epoch {epoch + 1}/{n_epochs} | "
                f"Train Loss: {total_loss / len(train_loader):.4f} | "
                f"Test Loss: {test_loss:.4f}"
            )

    return model, y_test, test_pred.detach().numpy()


model, y_test, pred_test = train_model(X, y)


# ==================== 可视化部分 ====================
def plot_attention(attention_weights, asset_names):
    plt.figure(figsize=(12, 8))

    # 取第一个样本、第一个注意力头的注意力权重
    num_assets = len(asset_names)
    num_seq = 60  # 假设序列长度为 60
    layer_weights = attention_weights[0][0, :, : num_assets * num_seq].detach().numpy()
    layer_weights = layer_weights.reshape(num_assets * num_seq, num_assets * num_seq)

    # 取每个资产最后一个时间步的注意力权重
    last_step_weights = layer_weights[-num_assets:, -num_assets:]

    plt.imshow(last_step_weights, cmap="viridis", aspect="auto")
    plt.colorbar()
    plt.title("Cross-Asset Attention Matrix (First Layer)")
    plt.xlabel("Key Assets")
    plt.ylabel("Query Assets")
    plt.xticks(range(num_assets), asset_names)
    plt.yticks(range(num_assets), asset_names)
    plt.tight_layout()
    plt.show()


if model.attention_weights:
    asset_names = data.columns.tolist()
    plot_attention(model.attention_weights, asset_names)
else:
    print("未能捕获注意力权重")

# 预测结果散点图
plt.figure(figsize=(12, 6))
for i in range(5):
    plt.scatter(y_test[:, i], pred_test[:, i], alpha=0.5, label=f"Asset_{i}")
plt.plot([-0.1, 0.1], [-0.1, 0.1], "k--")
plt.title("Predicted vs Actual Returns")
plt.xlabel("Actual 5-Day Returns")
plt.ylabel("Predicted Returns")
plt.legend()
plt.grid(alpha=0.3)
plt.show()


# ==================== 策略回测 ====================
def backtest_strategy(pred_returns, data, lookback=60):
    long_thresh = np.quantile(pred_returns, 0.8, axis=1)
    short_thresh = np.quantile(pred_returns, 0.2, axis=1)

    long_mask = pred_returns >= long_thresh[:, None]
    short_mask = pred_returns <= short_thresh[:, None]

    position = long_mask.astype(float) / long_mask.sum(axis=1)[:, None]
    position -= short_mask.astype(float) / short_mask.sum(axis=1)[:, None]

    returns = data.pct_change().iloc[lookback + 1 : lookback + 1 + len(position)]
    strategy_ret = (position[:-1] * returns.iloc[1:]).sum(axis=1)
    market_ret = returns.mean(axis=1)

    return strategy_ret.cumsum(), market_ret.cumsum()


strategy_cum, market_cum = backtest_strategy(pred_test, data)

plt.figure(figsize=(12, 6))
plt.plot(strategy_cum, label="Transformer Strategy")
plt.plot(market_cum, label="Market Index", alpha=0.7)
plt.title("Long-Short Portfolio Performance")
plt.xlabel("Trading Days")
plt.ylabel("Cumulative Returns")
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()


网站公告

今日签到

点亮在社区的每一天
去签到