【实验15】LSTM的记忆能力实验

发布于:2024-12-18 ⋅ 阅读:(161) ⋅ 点赞:(0)

目录

1 模型构建

1.1 LSTM层

1.1.1 自定义LSTM算子

1.1.2 nn.LSTM

1.1.3 将自定义LSTM与pytorch内置的LSTM进行对比

1.2 模型汇总

2 模型训练

2.1 训练指定长度的数字预测模型

2.2 多组训练

2.3 损失函数展示 

3 模型评价

4 完整代码

5 LSTM模型门状态和单元状态的变化

5.1 更新LSTM模块

5.2 加载模型

5.3 完整代码

参考链接


点击查看作业

1 模型构建

        使用实验14中定义Model_RNN4SeqClass模型,并构建 LSTM 算子。只需要实例化 LSTM ,并传入Model_RNN4SeqClass模型,就可以用 LSTM 进行数字求和实验。

1.1 LSTM层

1.1.1 自定义LSTM算子

按照上述步骤,自定义LSTM算子,代码如下: 

class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, Wi_attr=None, Wf_attr=None, Wo_attr=None, Wc_attr=None,
                 Ui_attr=None, Uf_attr=None, Uo_attr=None, Uc_attr=None, bi_attr=None, bf_attr=None,
                 bo_attr=None, bc_attr=None):
        super(LSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # 初始化模型参数
        if Wi_attr==None:
             Wi= torch.zeros(size=[input_size, hidden_size], dtype=torch.float32)
        else:
             Wi = torch.tensor(Wi_attr, dtype=torch.float32)
        self.W_i = torch.nn.Parameter(Wi)

        if Wf_attr==None:
             Wf=torch.zeros(size=[input_size, hidden_size], dtype=torch.float32)
        else:
             Wf = torch.tensor(Wf_attr, dtype=torch.float32)
        self.W_f = torch.nn.Parameter(Wf)

        if Wo_attr==None:
             Wo=torch.zeros(size=[input_size, hidden_size], dtype=torch.float32)
        else:
             Wo = torch.tensor(Wo_attr, dtype=torch.float32)
        self.W_o =torch.nn.Parameter(Wo)

        if Wc_attr==None:
            Wc=torch.zeros(size=[input_size, hidden_size], dtype=torch.float32)
        else:
            Wc = torch.tensor(Wc_attr, dtype=torch.float32)
        self.W_c = torch.nn.Parameter(Wc)

        if Ui_attr==None:
            Ui = torch.zeros(size=[hidden_size, hidden_size], dtype=torch.float32)
        else:
            Ui = torch.tensor(Ui_attr, dtype=torch.float32)
        self.U_i = torch.nn.Parameter(Ui)
        if Uf_attr == None:
            Uf = torch.zeros(size=[hidden_size, hidden_size], dtype=torch.float32)
        else:
            Uf = torch.tensor(Uf_attr, dtype=torch.float32)
        self.U_f = torch.nn.Parameter(Uf)

        if Uo_attr == None:
            Uo = torch.zeros(size=[hidden_size, hidden_size], dtype=torch.float32)
        else:
            Uo = torch.tensor(Uo_attr, dtype=torch.float32)
        self.U_o = torch.nn.Parameter(Uo)

        if Uc_attr == None:
            Uc = torch.zeros(size=[hidden_size, hidden_size], dtype=torch.float32)
        else:
            Uc = torch.tensor(Uc_attr, dtype=torch.float32)
        self.U_c = torch.nn.Parameter(Uc)

        if bi_attr == None:
            bi = torch.zeros(size=[1,hidden_size], dtype=torch.float32)
        else:
            bi = torch.tensor(bi_attr, dtype=torch.float32)
        self.b_i = torch.nn.Parameter(bi)
        if bf_attr == None:
            bf = torch.zeros(size=[1,hidden_size], dtype=torch.float32)
        else:
            bf = torch.tensor(bf_attr, dtype=torch.float32)
        self.b_f = torch.nn.Parameter(bf)

        if bo_attr == None:
            bo = torch.zeros(size=[1,hidden_size], dtype=torch.float32)
        else:
            bo = torch.tensor(bo_attr, dtype=torch.float32)
        self.b_o = torch.nn.Parameter(bo)
        if bc_attr == None:
            bc = torch.zeros(size=[1,hidden_size], dtype=torch.float32)
        else:
            bc = torch.tensor(bc_attr, dtype=torch.float32)
        self.b_c = torch.nn.Parameter(bc)

    # 初始化状态向量和隐状态向量
    def init_state(self, batch_size):
        hidden_state = torch.zeros(size=[batch_size, self.hidden_size], dtype=torch.float32)
        cell_state = torch.zeros(size=[batch_size, self.hidden_size], dtype=torch.float32)
        return hidden_state, cell_state

    # 定义前向计算
    def forward(self, inputs, states=None):
        # inputs: 输入数据,其shape为batch_size x seq_len x input_size
        batch_size, seq_len, input_size = inputs.shape

        # 初始化起始的单元状态和隐状态向量,其shape为batch_size x hidden_size
        if states is None:
            states = self.init_state(batch_size)
        hidden_state, cell_state = states

        # 执行LSTM计算,包括:输入门、遗忘门和输出门、候选内部状态、内部状态和隐状态向量
        for step in range(seq_len):
            # 获取当前时刻的输入数据step_input: 其shape为batch_size x input_size
            step_input = inputs[:, step, :]
            # 计算输入门, 遗忘门和输出门, 其shape为:batch_size x hidden_size
            I_gate = F.sigmoid(torch.matmul(step_input, self.W_i) + torch.matmul(hidden_state, self.U_i) + self.b_i)
            F_gate = F.sigmoid(torch.matmul(step_input, self.W_f) + torch.matmul(hidden_state, self.U_f) + self.b_f)
            O_gate = F.sigmoid(torch.matmul(step_input, self.W_o) + torch.matmul(hidden_state, self.U_o) + self.b_o)
            # 计算候选状态向量, 其shape为:batch_size x hidden_size
            C_tilde = F.tanh(torch.matmul(step_input, self.W_c) + torch.matmul(hidden_state, self.U_c) + self.b_c)
            # 计算单元状态向量, 其shape为:batch_size x hidden_size
            cell_state = F_gate * cell_state + I_gate * C_tilde
            # 计算隐状态向量,其shape为:batch_size x hidden_size
            hidden_state = O_gate * F.tanh(cell_state)

        return hidden_state

测试一下:

if __name__ == "__main__":
    Wi_attr = [[0.1, 0.2], [0.1, 0.2]]
    Wf_attr = [[0.1, 0.2], [0.1, 0.2]]
    Wo_attr = [[0.1, 0.2], [0.1, 0.2]]
    Wc_attr = [[0.1, 0.2], [0.1, 0.2]]
    Ui_attr = [[0.0, 0.1], [0.1, 0.0]]
    Uf_attr = [[0.0, 0.1], [0.1, 0.0]]
    Uo_attr = [[0.0, 0.1], [0.1, 0.0]]
    Uc_attr = [[0.0, 0.1], [0.1, 0.0]]
    bi_attr = [[0.1, 0.1]]
    bf_attr = [[0.1, 0.1]]
    bo_attr = [[0.1, 0.1]]
    bc_attr = [[0.1, 0.1]]

    lstm = LSTM(2, 2, Wi_attr=Wi_attr, Wf_attr=Wf_attr, Wo_attr=Wo_attr, Wc_attr=Wc_attr,
                Ui_attr=Ui_attr, Uf_attr=Uf_attr, Uo_attr=Uo_attr, Uc_attr=Uc_attr,
                bi_attr=bi_attr, bf_attr=bf_attr, bo_attr=bo_attr, bc_attr=bc_attr)

    inputs = torch.as_tensor([[[1, 0]]], dtype=torch.float32)
    hidden_state = lstm(inputs)
    print(hidden_state)

 运行结果:

tensor([[0.0594, 0.0952]], grad_fn=<MulBackward0>)

1.1.2 nn.LSTM

        pytorch框架已经内置了LSTM的API torch.nn.LSTM,其与自己实现的LSTM不同点在于其实现时采用了两个偏置,同时矩阵相乘时参数在输入数据前面。

# 这里创建一个随机数组作为测试数据,数据shape为batch_size x seq_len x input_size
batch_size, seq_len, input_size = 8, 20, 32
inputs = torch.randn(size=[batch_size, seq_len, input_size])

# 设置模型的hidden_size
hidden_size = 32
torch_lstm = nn.LSTM(input_size, hidden_size)
self_lstm = LSTM(input_size, hidden_size)

self_hidden_state = self_lstm(inputs)
torch_outputs, (torch_hidden_state, torch_cell_state) = torch_lstm(inputs)

print("self_lstm hidden_state: ", self_hidden_state.shape)
print("torch_lstm outpus:", torch_outputs.shape)
print("torch_lstm hidden_state:", torch_hidden_state.shape)
print("torch_lstm cell_state:", torch_cell_state.shape)

运行结果: 

自定义LSTM模型的隐藏状态形状:  torch.Size([8, 32])
PyTorch内置LSTM模型的输出形状:  torch.Size([8, 20, 32])
PyTorch内置LSTM模型的隐藏状态形状:  torch.Size([1, 20, 32])
PyTorch内置LSTM模型的细胞状态形状:  torch.Size([1, 20, 32])

1.1.3 将自定义LSTM与pytorch内置的LSTM进行对比

if __name__ == "__main__":
    torch.seed()
    # 这里创建一个随机数组作为测试数据,数据shape为batch_size x seq_len x input_size
    batch_size, seq_len, input_size, hidden_size = 2, 5, 10, 10
    inputs = torch.randn([batch_size, seq_len, input_size])
    # 设置模型的hidden_size
    torch_lstm = nn.LSTM(input_size, hidden_size, bias=True)
    # 获取torch_lstm中的参数,并设置相应的paramAttr,用于初始化lstm
    print(torch_lstm.weight_ih_l0.T.shape)
    chunked_W = torch.split(torch_lstm.weight_ih_l0.T, split_size_or_sections=10, dim=-1)
    chunked_U = torch.split(torch_lstm.weight_hh_l0.T, split_size_or_sections=10, dim=-1)
    chunked_b = torch.split(torch_lstm.bias_hh_l0.T, split_size_or_sections=10, dim=-1)

    Wi_attr = chunked_W[0]
    Wf_attr = chunked_W[1]
    Wc_attr = chunked_W[2]
    Wo_attr = chunked_W[3]
    Ui_attr = chunked_U[0]
    Uf_attr = chunked_U[1]
    Uc_attr = chunked_U[2]
    Uo_attr = chunked_U[3]
    bi_attr = chunked_b[0]
    bf_attr = chunked_b[1]
    bc_attr = chunked_b[2]
    bo_attr = chunked_b[3]
    self_lstm = LSTM(input_size, hidden_size, Wi_attr=Wi_attr, Wf_attr=Wf_attr, Wo_attr=Wo_attr, Wc_attr=Wc_attr,
                     Ui_attr=Ui_attr, Uf_attr=Uf_attr, Uo_attr=Uo_attr, Uc_attr=Uc_attr,
                     bi_attr=bi_attr, bf_attr=bf_attr, bo_attr=bo_attr, bc_attr=bc_attr)

    # 进行前向计算,获取隐状态向量,并打印展示
    self_hidden_state = self_lstm(inputs)
    torch_outputs, (torch_hidden_state, _) = torch_lstm(inputs)
    print("torch SRN:\n", torch_hidden_state.detach().numpy().squeeze(0))
    print("self SRN:\n", self_hidden_state.detach().numpy())

运行结果:

torch.Size([10, 40])
torch SRN:
 [[ 0.2969815   0.1086378   0.05495247  0.16001733 -0.01765362 -0.01666818
  -0.4510294   0.20638691 -0.09251023  0.09739175]
 [ 0.23277274 -0.10821889  0.03411719 -0.11256714 -0.02222441  0.09861005
   0.00121753 -0.06674305 -0.00696032 -0.02799534]
 [-0.15640028  0.073895    0.06848664 -0.2405297  -0.07884839 -0.09683921
   0.02928804  0.13920753 -0.10820512 -0.2054754 ]
 [ 0.18748651  0.05720734  0.02512662  0.03938162  0.0495715  -0.08013964
   0.07909831  0.01476525 -0.03023764 -0.06460781]
 [ 0.0621758  -0.06849629 -0.13514322 -0.16372547  0.13364998 -0.04975438
   0.08765104 -0.0199671  -0.05091303 -0.16918302]]
self SRN:
 [[ 0.25378907 -0.214644   -0.19686568  0.09091267  0.03282058  0.12264293
   0.23509862  0.0867407   0.06242546 -0.076696  ]
 [ 0.07579923  0.04447011  0.0274142  -0.23055007  0.01518452 -0.11758403
   0.05007537  0.03515109 -0.20356342 -0.21308717]]

Process finished with exit code 0

 可以看到,两者的输出基本是一致的。

1.2 模型汇总

        使用实验十四的Model_RNN4SeqClass作为预测模型,不同在于在实例化时将传入实例化的LSTM层。

导入所需的模块:

from model1 import load_data,DataLoader,Model_RNN4SeqClass,DigitSumDataset

2 模型训练

2.1 训练指定长度的数字预测模型

# 训练轮次
num_epochs = 500
# 学习率
lr = 0.001
# 输入数字的类别数
num_digits = 10
# 将数字映射为向量的维度
input_size = 32
# 隐状态向量的维度
hidden_size = 32
# 预测数字的类别数
num_classes = 19
# 批大小
batch_size = 8
# 模型保存目录
save_dir = "./checkpoints"

# 可以设置不同的length进行不同长度数据的预测实验
def train(length):
    print(f"\n====> Training LSTM with data of length {length}.")
    # 固定随机种子
    np.random.seed(0)
    random.seed(0)
    torch.manual_seed(0)

    # 加载长度为length的数据
    data_path = f"../实验14-循环神经网络(1)SRN记忆能力-梯度爆炸/SRN记忆能力/datasets/{length}"
    train_examples, dev_examples, test_examples = load_data(data_path)
    train_set, dev_set, test_set = DigitSumDataset(train_examples), DigitSumDataset(dev_examples), DigitSumDataset(
        test_examples)
    train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size)
    dev_loader = torch.utils.data.DataLoader(dev_set, batch_size=batch_size)
    test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size)
    # 实例化模型
    base_model = LSTM(input_size, hidden_size)
    model = Model_RNN4SeqClass(base_model, num_digits, input_size, hidden_size, num_classes)
    # 指定优化器
    optimizer = torch.optim.Adam(lr=lr, params=model.parameters())
    # 定义评价指标
    metric = Accuracy()
    # 定义损失函数
    loss_fn = nn.CrossEntropyLoss()

    # 基于以上组件,实例化Runner
    runner = RunnerV3(model, optimizer, loss_fn, metric)

    # 进行模型训练
    model_save_path = os.path.join(save_dir, f"best_lstm_model_{length}.pdparams")
    runner.train(train_loader, dev_loader, num_epochs=num_epochs, eval_steps=100, log_steps=100,
                 save_path=model_save_path)

    return runner

2.2 多组训练

lstm_runners = {}
lengths = [10, 15, 20, 25, 30, 35]
for length in lengths:
    runner = train(length)
    lstm_runners[length] = runner

运行结果: 

====> Training LSTM with data of length 10.
[Train] epoch: 0/500, step: 0/19000, loss: 2.83505
[Train] epoch: 2/500, step: 100/19000, loss: 2.77561
[Evaluate]  dev score: 0.09000, dev loss: 2.86460
[Evaluate] best accuracy performence has been updated: 0.00000 --> 0.09000
[Train] epoch: 5/500, step: 200/19000, loss: 2.48144
.....
====> Training LSTM with data of length 15.
[Train] epoch: 0/500, step: 0/19000, loss: 2.83505
[Train] epoch: 2/500, step: 100/19000, loss: 2.78581
[Evaluate]  dev score: 0.07000, dev loss: 2.86551
.....

====> Training LSTM with data of length 20.
[Train] epoch: 0/500, step: 0/19000, loss: 2.83505
[Train] epoch: 2/500, step: 100/19000, loss: 2.76947
[Evaluate]  dev score: 0.10000, dev loss: 2.85964
[Evaluate] best accuracy performence has been updated: 0.00000 --> 0.10000
......

====> Training LSTM with data of length 30.
[Train] epoch: 0/500, step: 0/19000, loss: 2.83505
[Train] epoch: 2/500, step: 100/19000, loss: 2.78386
[Evaluate]  dev score: 0.12000, dev loss: 2.86110
......
[Train] epoch: 492/500, step: 18700/19000, loss: 0.05709
[Evaluate]  dev score: 0.87000, dev loss: 0.55586
[Train] epoch: 494/500, step: 18800/19000, loss: 0.05657
[Evaluate]  dev score: 0.89000, dev loss: 0.54544
[Evaluate] best accuracy performence has been updated: 0.88000 --> 0.89000
[Train] epoch: 497/500, step: 18900/19000, loss: 0.04575
[Evaluate]  dev score: 0.87000, dev loss: 0.54134
[Evaluate]  dev score: 0.83000, dev loss: 0.61010
[Train] Training done!

2.3 损失函数展示 

# 画出训练过程中的损失图
for length in lengths:
    runner = lstm_runners[length]
    save_path = os.path.dirname(f'./images/6.6_{length}.pdf')
    # 如果目录不存在,则创建它
    if not os.path.exists(save_path):
        os.makedirs(save_path)
    plot_training_loss(runner, save_path, sample_step=100)

        【分析】对比下面实验十四中普通循环网络的可视化结果,可见LSTM模型在序列长度增加时,收敛情况比SRN模型更好。但是随着序列长度的增加,训练集上的损失逐渐不稳定,验证集上的损失整体趋向于变大,这说明当序列长度增加时,保持长期依赖的能力同样在逐渐变弱。

3 模型评价

# =================模型测试=================
lstm_dev_scores = []
lstm_test_scores = []
for length in lengths:
    print(f"Evaluate LSTM with data length {length}.")
    runner = lstm_runners[length]
    # 加载训练过程中效果最好的模型
    model_path = os.path.join(save_dir, f"best_lstm_model_{length}.pdparams")
    runner.load_model(model_path)

    # 加载长度为length的数据
    data_path = f"../实验14-循环神经网络(1)SRN记忆能力-梯度爆炸/SRN记忆能力/datasets/{length}"
    train_examples, dev_examples, test_examples = load_data(data_path)
    test_set = DigitSumDataset(test_examples)
    test_loader = DataLoader(test_set, batch_size=batch_size)

    # 使用测试集评价模型,获取测试集上的预测准确率
    score, _ = runner.evaluate(test_loader)
    lstm_test_scores.append(score)
    lstm_dev_scores.append(max(runner.dev_scores))

for length, dev_score, test_score in zip(lengths, lstm_dev_scores, lstm_test_scores):
    print(f"[LSTM] length:{length}, dev_score: {dev_score}, test_score: {test_score: .5f}")

运行结果: 

Evaluate LSTM with data length 10.
Evaluate LSTM with data length 15.
Evaluate LSTM with data length 20.
Evaluate LSTM with data length 25.
Evaluate LSTM with data length 30.
Evaluate LSTM with data length 35.
[LSTM] length:10, dev_score: 0.73, test_score:  0.69000
[LSTM] length:15, dev_score: 0.8, test_score:  0.84000
[LSTM] length:20, dev_score: 0.74, test_score:  0.72000
[LSTM] length:25, dev_score: 0.41, test_score:  0.44000
[LSTM] length:30, dev_score: 0.89, test_score:  0.85000
[LSTM] length:35, dev_score: 0.89, test_score:  0.80000

可视化一下:

# 在不同长度的验证集和测试集数据上的表现,绘制成图片进行观察=======================
plt.plot(lengths, lstm_dev_scores, '-o', color='#e8609b',  label="LSTM Dev Accuracy")
plt.plot(lengths, lstm_test_scores,'-o', color='#000000', label="LSTM Test Accuracy")

# 绘制坐标轴和图例
plt.ylabel("accuracy", fontsize='large')
plt.xlabel("sequence length", fontsize='large')
plt.legend(loc='lower left', fontsize='x-large')

fig_name = "./images/6.12.pdf"
plt.savefig(fig_name)
plt.show()

 运行结果:

对比下图实验十四的模型测试结果,可以看出LSTM模型的准确率显著高于SRN模型,表明LSTM模型保持长期依赖的能力要优于SRN模型(得益于LSTM最核心的门控机制).

4 完整代码

'''
@Function: 实现数字求和任务测试LSTM网络的记忆能力,验证LSTM在参数学习时可以有效解决长程依赖问题
@Author: lxy
@Date: 2024/12/7
'''
import os
import random
import torch
import torch.nn as nn
import numpy as np
from Runner import Accuracy,RunnerV3,plot_training_loss
from LSTMselfdefine import LSTM
import sys
from model1 import load_data,DataLoader,Model_RNN4SeqClass,DigitSumDataset
import matplotlib.pyplot as plt

# ===============模型训练===================
# 训练轮次
num_epochs = 500
# 学习率
lr = 0.001
# 输入数字的类别数
num_digits = 10
# 将数字映射为向量的维度
input_size = 32
# 隐状态向量的维度
hidden_size = 32
# 预测数字的类别数
num_classes = 19
# 批大小
batch_size = 8
# 模型保存目录
save_dir = "./checkpoints"

# 可以设置不同的length进行不同长度数据的预测实验
def train(length):
    print(f"\n====> Training LSTM with data of length {length}.")
    # 固定随机种子
    np.random.seed(0)
    random.seed(0)
    torch.manual_seed(0)

    # 加载长度为length的数据
    data_path = f"../实验14-循环神经网络(1)SRN记忆能力-梯度爆炸/SRN记忆能力/datasets/{length}"
    train_examples, dev_examples, test_examples = load_data(data_path)
    train_set, dev_set, test_set = DigitSumDataset(train_examples), DigitSumDataset(dev_examples), DigitSumDataset(
        test_examples)
    train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size)
    dev_loader = torch.utils.data.DataLoader(dev_set, batch_size=batch_size)
    test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size)
    # 实例化模型
    base_model = LSTM(input_size, hidden_size)
    model = Model_RNN4SeqClass(base_model, num_digits, input_size, hidden_size, num_classes)
    # 指定优化器
    optimizer = torch.optim.Adam(lr=lr, params=model.parameters())
    # 定义评价指标
    metric = Accuracy()
    # 定义损失函数
    loss_fn = nn.CrossEntropyLoss()

    # 基于以上组件,实例化Runner
    runner = RunnerV3(model, optimizer, loss_fn, metric)

    # 进行模型训练
    model_save_path = os.path.join(save_dir, f"best_lstm_model_{length}.pdparams")
    runner.train(train_loader, dev_loader, num_epochs=num_epochs, eval_steps=100, log_steps=100,
                 save_path=model_save_path)

    return runner

# ========================多组训练====================
lstm_runners = {}
lengths = [10, 15, 20, 25, 30, 35]
for length in lengths:
    runner = train(length)
    lstm_runners[length] = runner

# 画出训练过程中的损失图
for length in lengths:
    runner = lstm_runners[length]
    save_path = os.path.dirname(f'./images/6.6_{length}.pdf')
    # 如果目录不存在,则创建它
    if not os.path.exists(save_path):
        os.makedirs(save_path)
    plot_training_loss(runner, save_path, sample_step=100)
# =================模型测试=================
lstm_dev_scores = []
lstm_test_scores = []
for length in lengths:
    print(f"Evaluate LSTM with data length {length}.")
    runner = lstm_runners[length]
    # 加载训练过程中效果最好的模型
    model_path = os.path.join(save_dir, f"best_lstm_model_{length}.pdparams")
    runner.load_model(model_path)

    # 加载长度为length的数据
    data_path = f"../实验14-循环神经网络(1)SRN记忆能力-梯度爆炸/SRN记忆能力/datasets/{length}"
    train_examples, dev_examples, test_examples = load_data(data_path)
    test_set = DigitSumDataset(test_examples)
    test_loader = DataLoader(test_set, batch_size=batch_size)

    # 使用测试集评价模型,获取测试集上的预测准确率
    score, _ = runner.evaluate(test_loader)
    lstm_test_scores.append(score)
    lstm_dev_scores.append(max(runner.dev_scores))

for length, dev_score, test_score in zip(lengths, lstm_dev_scores, lstm_test_scores):
    print(f"[LSTM] length:{length}, dev_score: {dev_score}, test_score: {test_score: .5f}")

# 在不同长度的验证集和测试集数据上的表现,绘制成图片进行观察=======================
plt.plot(lengths, lstm_dev_scores, '-o', color='#e8609b',  label="LSTM Dev Accuracy")
plt.plot(lengths, lstm_test_scores,'-o', color='#000000', label="LSTM Test Accuracy")

# 绘制坐标轴和图例
plt.ylabel("accuracy", fontsize='large')
plt.xlabel("sequence length", fontsize='large')
plt.legend(loc='lower left', fontsize='x-large')

fig_name = "./images/6.12.pdf"
plt.savefig(fig_name)
plt.show()

5 LSTM模型门状态和单元状态的变化

5.1 更新LSTM模块

class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, para_attr=xavier_uniform):
        super(LSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        # 初始化模型参数
        self.W_i = torch.nn.Parameter(para_attr(torch.empty(size=[input_size, hidden_size], dtype=torch.float32)))
        self.W_f = torch.nn.Parameter(para_attr(torch.empty(size=[input_size, hidden_size], dtype=torch.float32)))
        self.W_o = torch.nn.Parameter(para_attr(torch.empty(size=[input_size, hidden_size], dtype=torch.float32)))
        self.W_c = torch.nn.Parameter(para_attr(torch.empty(size=[input_size, hidden_size], dtype=torch.float32)))
        self.U_i = torch.nn.Parameter(para_attr(torch.empty(size=[hidden_size, hidden_size], dtype=torch.float32)))
        self.U_f = torch.nn.Parameter(para_attr(torch.empty(size=[hidden_size, hidden_size], dtype=torch.float32)))
        self.U_o = torch.nn.Parameter(para_attr(torch.empty(size=[hidden_size, hidden_size], dtype=torch.float32)))
        self.U_c = torch.nn.Parameter(para_attr(torch.empty(size=[hidden_size, hidden_size], dtype=torch.float32)))
        self.b_i = torch.nn.Parameter(para_attr(torch.empty(size=[1, hidden_size], dtype=torch.float32)))
        self.b_f = torch.nn.Parameter(para_attr(torch.empty(size=[1, hidden_size], dtype=torch.float32)))
        self.b_o = torch.nn.Parameter(para_attr(torch.empty(size=[1, hidden_size], dtype=torch.float32)))
        self.b_c = torch.nn.Parameter(para_attr(torch.empty(size=[1, hidden_size], dtype=torch.float32)))

    # 初始化状态向量和隐状态向量
    def init_state(self, batch_size):
        hidden_state = torch.zeros(size=[batch_size, self.hidden_size], dtype=torch.float32)
        cell_state = torch.zeros(size=[batch_size, self.hidden_size], dtype=torch.float32)
        return hidden_state, cell_state

    # 定义前向计算
    def forward(self, inputs, states=None):
        batch_size, seq_len, input_size = inputs.shape  # inputs batch_size x seq_len x input_size

        if states is None:
            states = self.init_state(batch_size)
        hidden_state, cell_state = states

        # 定义相应的门状态和单元状态向量列表
        self.Is = []
        self.Fs = []
        self.Os = []
        self.Cs = []
        # 初始化状态向量和隐状态向量
        cell_state = torch.zeros(size=[batch_size, self.hidden_size], dtype=torch.float32)
        hidden_state = torch.zeros(size=[batch_size, self.hidden_size], dtype=torch.float32)

        # 执行LSTM计算,包括:隐藏门、输入门、遗忘门、候选状态向量、状态向量和隐状态向量
        for step in range(seq_len):
            input_step = inputs[:, step, :]
            I_gate = F.sigmoid(torch.matmul(input_step, self.W_i) + torch.matmul(hidden_state, self.U_i) + self.b_i)
            F_gate = F.sigmoid(torch.matmul(input_step, self.W_f) + torch.matmul(hidden_state, self.U_f) + self.b_f)
            O_gate = F.sigmoid(torch.matmul(input_step, self.W_o) + torch.matmul(hidden_state, self.U_o) + self.b_o)
            C_tilde = F.tanh(torch.matmul(input_step, self.W_c) + torch.matmul(hidden_state, self.U_c) + self.b_c)
            cell_state = F_gate * cell_state + I_gate * C_tilde
            hidden_state = O_gate * F.tanh(cell_state)
            # 存储门状态向量和单元状态向量
            self.Is.append(I_gate.numpy().copy())
            self.Fs.append(F_gate.numpy().copy())
            self.Os.append(O_gate.numpy().copy())
            self.Cs.append(cell_state.numpy().copy())
        return hidden_state

5.2 加载模型

        使用新的LSTM模型,重新实例化一个runner,使用序列长度为10的模型进行此项实验,因此需要加载序列长度为10的模型。 

# 训练轮次
num_epochs = 500
# 学习率
lr = 0.001
# 输入数字的类别数
num_digits = 10
# 将数字映射为向量的维度
input_size = 32
# 隐状态向量的维度
hidden_size = 32
# 预测数字的类别数
num_classes = 19
# 批大小
batch_size = 8
# 模型保存目录
save_dir = "./checkpoints"

# 实例化模型
base_model = LSTM(input_size, hidden_size)
model = Model_RNN4SeqClass(base_model, num_digits, input_size, hidden_size, num_classes)
# 指定优化器
optimizer = torch.optim.Adam(lr=lr, params=model.parameters())
# 定义评价指标
metric = Accuracy()
# 定义损失函数
loss_fn = torch.nn.CrossEntropyLoss()
# 基于以上组件,重新实例化Runner
runner = RunnerV3(model, optimizer, loss_fn, metric)

length = 10
# 加载训练过程中效果最好的模型
model_path = os.path.join(save_dir, f"best_lstm_model_{length}.pdparams")
runner.load_model(model_path)

        接下来,给定一条数字序列,并使用数字预测模型进行数字预测,这样便会将相应的门状态和单元状态向量保存至模型中. 然后分别从模型中取出这些向量,并将这些向量进行绘制展示。代码实现如下: 

import seaborn as sns
def plot_tensor(inputs, tensor, save_path, vmin=0, vmax=1):
    tensor = np.stack(tensor, axis=0)
    tensor = np.squeeze(tensor, 1).T

    plt.figure(figsize=(16, 6))
    # vmin, vmax定义了色彩图的上下界
    ax = sns.heatmap(tensor, vmin=vmin, vmax=vmax)
    ax.set_xticklabels(inputs)
    ax.figure.savefig(save_path)
    plt.show()
# 定义模型输入
inputs = [6, 7, 0, 0, 1, 0, 0, 0, 0, 0]
X = torch.tensor(inputs.copy())
X = X.unsqueeze(0)
# 进行模型预测,并获取相应的预测结果
logits = runner.predict(X)
predict_label = torch.argmax(logits, dim=-1)
print(f"predict result: {predict_label.numpy()[0]}")

# 输入门
Is = runner.model.rnn_model.Is
plot_tensor(inputs, Is, save_path="./images/6.13_I.pdf")
# 遗忘门
Fs = runner.model.rnn_model.Fs
plot_tensor(inputs, Fs, save_path="./images/6.13_F.pdf")
# 输出门
Os = runner.model.rnn_model.Os
plot_tensor(inputs, Os, save_path="./images/6.13_O.pdf")
# 单元状态
Cs = runner.model.rnn_model.Cs
plot_tensor(inputs, Cs, save_path="./images/6.13_C.pdf", vmin=-5, vmax=5)

运行结果:

predict result: 13

输入门:

 

遗忘门:

输出门:

单元状态:

【分析】

         当LSTM处理序列数据[6, 7, 0, 0, 1, 0, 0, 0, 0, 0]的过程中单元状态和门数值的变化图,其中横坐标为输入数字,纵坐标为相应门或单元状态向量的维度,颜色的深浅代表数值的大小

        当输入门遇到不同位置的数字0时,保持了相对一致的数值大小,表明对于0元素保持相同的门控过滤机制,避免输入信息的变化给当前模型带来困扰;、

        当遗忘门遇到数字1后,遗忘门数值在一些维度上变小,表明对某些信息进行了遗忘;随着序列的输入,输出门和单元状态在某些维度上数值变小,在某些维度上数值变大,表明输出门在根据信息的重要性选择信息进行输出,同时单元状态也在保持着对预测重要的一些信息 。

5.3 完整代码

'''
@Function: 观察当LSTM在处理一条数字序列的时候,相应门和单元状态
@Author: lxy
@Date: 2024/12/7
'''
import torch.nn.functional as F
from torch.nn.init import xavier_uniform
import os
import random
import torch
import torch.nn as nn
import numpy as np
from Runner import Accuracy,RunnerV3,plot_training_loss
import sys
from model1 import load_data,DataLoader,Model_RNN4SeqClass,DigitSumDataset
import matplotlib.pyplot as plt

# 更新LSTM和相关参数,定义相应列表进行存储这些门和单元状态在每个时刻的向量
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, para_attr=xavier_uniform):
        super(LSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        # 初始化模型参数
        self.W_i = torch.nn.Parameter(para_attr(torch.empty(size=[input_size, hidden_size], dtype=torch.float32)))
        self.W_f = torch.nn.Parameter(para_attr(torch.empty(size=[input_size, hidden_size], dtype=torch.float32)))
        self.W_o = torch.nn.Parameter(para_attr(torch.empty(size=[input_size, hidden_size], dtype=torch.float32)))
        self.W_c = torch.nn.Parameter(para_attr(torch.empty(size=[input_size, hidden_size], dtype=torch.float32)))
        self.U_i = torch.nn.Parameter(para_attr(torch.empty(size=[hidden_size, hidden_size], dtype=torch.float32)))
        self.U_f = torch.nn.Parameter(para_attr(torch.empty(size=[hidden_size, hidden_size], dtype=torch.float32)))
        self.U_o = torch.nn.Parameter(para_attr(torch.empty(size=[hidden_size, hidden_size], dtype=torch.float32)))
        self.U_c = torch.nn.Parameter(para_attr(torch.empty(size=[hidden_size, hidden_size], dtype=torch.float32)))
        self.b_i = torch.nn.Parameter(para_attr(torch.empty(size=[1, hidden_size], dtype=torch.float32)))
        self.b_f = torch.nn.Parameter(para_attr(torch.empty(size=[1, hidden_size], dtype=torch.float32)))
        self.b_o = torch.nn.Parameter(para_attr(torch.empty(size=[1, hidden_size], dtype=torch.float32)))
        self.b_c = torch.nn.Parameter(para_attr(torch.empty(size=[1, hidden_size], dtype=torch.float32)))

    # 初始化状态向量和隐状态向量
    def init_state(self, batch_size):
        hidden_state = torch.zeros(size=[batch_size, self.hidden_size], dtype=torch.float32)
        cell_state = torch.zeros(size=[batch_size, self.hidden_size], dtype=torch.float32)
        return hidden_state, cell_state

    # 定义前向计算
    def forward(self, inputs, states=None):
        batch_size, seq_len, input_size = inputs.shape  # inputs batch_size x seq_len x input_size

        if states is None:
            states = self.init_state(batch_size)
        hidden_state, cell_state = states

        # 定义相应的门状态和单元状态向量列表
        self.Is = []
        self.Fs = []
        self.Os = []
        self.Cs = []
        # 初始化状态向量和隐状态向量
        cell_state = torch.zeros(size=[batch_size, self.hidden_size], dtype=torch.float32)
        hidden_state = torch.zeros(size=[batch_size, self.hidden_size], dtype=torch.float32)

        # 执行LSTM计算,包括:隐藏门、输入门、遗忘门、候选状态向量、状态向量和隐状态向量
        for step in range(seq_len):
            input_step = inputs[:, step, :]
            I_gate = F.sigmoid(torch.matmul(input_step, self.W_i) + torch.matmul(hidden_state, self.U_i) + self.b_i)
            F_gate = F.sigmoid(torch.matmul(input_step, self.W_f) + torch.matmul(hidden_state, self.U_f) + self.b_f)
            O_gate = F.sigmoid(torch.matmul(input_step, self.W_o) + torch.matmul(hidden_state, self.U_o) + self.b_o)
            C_tilde = F.tanh(torch.matmul(input_step, self.W_c) + torch.matmul(hidden_state, self.U_c) + self.b_c)
            cell_state = F_gate * cell_state + I_gate * C_tilde
            hidden_state = O_gate * F.tanh(cell_state)
            # 存储门状态向量和单元状态向量
            self.Is.append(I_gate.numpy().copy())
            self.Fs.append(F_gate.numpy().copy())
            self.Os.append(O_gate.numpy().copy())
            self.Cs.append(cell_state.numpy().copy())
        return hidden_state
# 训练轮次
num_epochs = 500
# 学习率
lr = 0.001
# 输入数字的类别数
num_digits = 10
# 将数字映射为向量的维度
input_size = 32
# 隐状态向量的维度
hidden_size = 32
# 预测数字的类别数
num_classes = 19
# 批大小
batch_size = 8
# 模型保存目录
save_dir = "./checkpoints"

# =======================实例化模型===========================
base_model = LSTM(input_size, hidden_size)
model = Model_RNN4SeqClass(base_model, num_digits, input_size, hidden_size, num_classes)
# 指定优化器
optimizer = torch.optim.Adam(lr=lr, params=model.parameters())
# 定义评价指标
metric = Accuracy()
# 定义损失函数
loss_fn = torch.nn.CrossEntropyLoss()
# 基于以上组件,重新实例化Runner
runner = RunnerV3(model, optimizer, loss_fn, metric)

length = 10
# 加载训练过程中效果最好的模型
model_path = os.path.join(save_dir, f"best_lstm_model_{length}.pdparams")
runner.load_model(model_path)

import seaborn as sns
def plot_tensor(inputs, tensor, save_path, vmin=0, vmax=1):
    tensor = np.stack(tensor, axis=0)
    tensor = np.squeeze(tensor, 1).T

    plt.figure(figsize=(16, 6))
    # vmin, vmax定义了色彩图的上下界
    ax = sns.heatmap(tensor, vmin=vmin, vmax=vmax)
    ax.set_xticklabels(inputs)
    ax.figure.savefig(save_path)
    plt.show()
# 定义模型输入
inputs = [6, 7, 0, 0, 1, 0, 0, 0, 0, 0]
X = torch.tensor(inputs.copy())
X = X.unsqueeze(0)
# 进行模型预测,并获取相应的预测结果
logits = runner.predict(X)
predict_label = torch.argmax(logits, dim=-1)
print(f"predict result: {predict_label.numpy()[0]}")

# 输入门
Is = runner.model.rnn_model.Is
plot_tensor(inputs, Is, save_path="./images/6.13_I.pdf")
# 遗忘门
Fs = runner.model.rnn_model.Fs
plot_tensor(inputs, Fs, save_path="./images/6.13_F.pdf")
# 输出门
Os = runner.model.rnn_model.Os
plot_tensor(inputs, Os, save_path="./images/6.13_O.pdf")
# 单元状态
Cs = runner.model.rnn_model.Cs
plot_tensor(inputs, Cs, save_path="./images/6.13_C.pdf", vmin=-5, vmax=5)

参考链接

参考链接:

老师的博客园

飞桨AI Studio星河社区-人工智能学习与实训社区

LSTM从入门到精通

如何构建LSTM神经网络模型_lstm模型-CSDN博客   ---->写的很详细强推!


网站公告

今日签到

点亮在社区的每一天
去签到