nlp(6)--构建找规律模型任务

发布于:2024-04-25 ⋅ 阅读:(17) ⋅ 点赞:(0)

前言

仅记录学习过程,有问题欢迎讨论

包含了两个例子
第一个为5分类任务
第二个为2分类任务(回归)
Demo1比Demo2难一点,放上边方便以后看。
练习顺序为 Demo2—>Demo1

代码

DEMO1:
"""
自定义一个模型
解决 5分类问题
问题如下:
给定5维向量,0-4下标哪个值对应最大,为对应分类
如 [1,3,4,1,7] 为 5 分类
如 [9,3,1,6,2] 为 1 分类

"""
import numpy as np
import torch
import torch.nn as nn
import torch.utils.data as Data
import matplotlib.pyplot as plt


class TorchModel(nn.Module):
    def __init__(self, input_size):
        super(TorchModel, self).__init__()
        # 5分类任务 y =0~4
        self.linear = nn.Linear(input_size, 5)
        # 激活函数 无需激活函数,不然损失函数降不下来,因为输出为【01】之间,实际的却是【0,4】
        # self.activation = torch.sigmoid  # sigmoid做激活函数
        # loss 交叉熵
        self.loss = nn.functional.cross_entropy

    # 传数据进来
    def forward(self, x, y=None):
        x = self.linear(x)
#
        # y_pred = self.activation(x)
        y_pred = x
        if y is None:
            return y_pred
        else:
            return self.loss(y_pred, y.long())


# def test():
#     x = torch.tensor(np.random.random(5), dtype=torch.float32)
#     y = torch.tensor(np.array(1), dtype=torch.long)
#     print(x.dtype)
#     print(y.dtype)
#     ce_loss = nn.CrossEntropyLoss()
#     loss = ce_loss(x, y)
#     print(loss)
#
# test()

def build_dataset(num):
    X = []
    Y = []
    for i in range(num):
        x = np.random.random(5)
        X.append(x)
        # 获取最大的值的index
        max_val, max_index = torch.max(torch.tensor(x), 0)
        Y.append(max_index)
    return torch.FloatTensor(np.array(X)), torch.FloatTensor(np.array(Y))


# evaluate accuracy
def evaluate(model):
    # test
    model.eval()
    test_simple_num = 100
    y_sum = np.zeros(5)
    x, y_true = build_dataset(test_simple_num)
    for i in range(test_simple_num):
        if int(y_true.data[i]) == 0:
            y_sum[0] += 1
        elif int(y_true.data[i]) == 1:
            y_sum[1] += 1
        elif int(y_true.data[i]) == 2:
            y_sum[2] += 1
        elif int(y_true.data[i]) == 3:
            y_sum[3] += 1
        else:
            y_sum[4] += 1
    print("本轮中y_sum的值为%s", y_sum)
    correct, wrong = 0, 0
    # 调用模型
    with torch.no_grad():
        y_pred = model(x)
        for y_p, y_t in zip(y_pred, y_true):
            # 通过获取最大值的下标来预测结果
            if int(torch.argmax(y_p)) == int(y_t):
                correct += 1
            else:
                wrong += 1
    print("正确预测个数:%d / %d, 正确率:%f" % (correct, test_simple_num, correct / (correct + wrong)))
    return correct / (correct + wrong)


def main():
    batch_size = 10
    lr = 0.002
    input_size = 5
    train_simple = 5000
    epoch_size = 40
    # build model
    model = TorchModel(input_size)
    # 優化器
    optim = torch.optim.Adam(model.parameters(), lr=lr)
    # 訓練的數據
    X, Y = build_dataset(train_simple)
    # 分割數據
    dataset = Data.TensorDataset(X, Y)
    log = []
    data_item = Data.DataLoader(dataset, batch_size, shuffle=True)
    for epoch in range(epoch_size):
        # start training
        model.train()
        epoch_loss = []
        # x.shape == 20*5 y_true.shape == 20
        for x, y_true in data_item:
            # print(x, y_true)
            # 交叉熵需要传递整个x,y过去,而非单个的
            loss = model(x, y_true)
            # print(loss)
            # 反向传播过程,在反向传播过程中会计算每个参数的梯度值
            loss.backward()
            # 改變權重;所有的 optimizer 都实现了 step() 方法,该方法会更新所有的参数。
            optim.step()
            # 将上一轮计算的梯度清零,避免上一轮的梯度值会影响下一轮的梯度值计算
            optim.zero_grad()

            epoch_loss.append(loss.data)
        print("========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(epoch_loss)))
        # 测试准确率
        acc = evaluate(model)
        log.append([acc, float(np.mean(epoch_loss))])
    # save model
    torch.save(model.state_dict(), "model_work.pt")
    # 画图
    # print(log)
    plt.plot(range(len(log)), [l[0] for l in log], label="acc")  # 画acc曲线
    plt.plot(range(len(log)), [l[1] for l in log], label="loss")  # 画loss曲线
    plt.legend()
    plt.show()
    return


# 测试
def predict(model_path, test_vec_x):
    # 数据维度
    input_size = 5
    model = TorchModel(input_size)
    # 读取路径
    model.load_state_dict(torch.load(model_path))
    # 测试模式
    model.eval()
    with torch.no_grad():  # 不计算梯度
        # 模型预测的结果
        result = model.forward(torch.FloatTensor(test_vec_x))
        print(result[1])
    # for i in range(len(test_vec_x)):
    #     print(torch.argmax(result[i]), test_vec_x[i])
    for vec, res in zip(test_vec_x, result):
        print("输入:%s,预测类别:%s,概率值为:%s" % (vec, torch.argmax(res), res))


if __name__ == '__main__':
     main()
    # test_vec_x = [[0.27889086, 0.15229675, 0.41082123, 0.03504317, 0.18920843],
    #               [0.04963533, 0.5524256, 0.95758807, 0.95520434, 0.84890681],
    #               [0.98797868, 0.67482528, 0.13625847, 0.34675372, 0.19871392],
    #               [0.99349776, 0.59416669, 0.12579291, 0.41567412, 0.7358894]]
    #
    # predict("model_work.pt", test_vec_x)

DEMO2:
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt

"""

基于pytorch框架编写模型训练
实现一个自行构造的找规律(机器学习)任务
规律:x是一个5维向量,如果第1个数>第5个数,则为正样本,反之为负样本

"""


# 自定义模型
class TorchModel(nn.Module):
    def __init__(self, input_size):
        super(TorchModel, self).__init__()
        # 1*5 的线性层
        self.linear = nn.Linear(input_size, 1)
        # sigmoid归一化函数 激活层
        self.activation = torch.sigmoid
        # 均方差的损失函数()
        self.loss = nn.functional.mse_loss

    # 当输入真实标签,返回loss值;无真实标签,返回预测值 默认y=none
    def forward(self, x, y=None):
        x = self.linear(x)  # (batch_size, input_size) -> (batch_size, 1)
        y_pred = self.activation(x)
        if y is not None:
            return self.loss(y_pred, y)
        else:
            return y_pred


# 构建数据
def build_dataset(size):
    X = []
    Y = []
    for i in range(size):
        x, y = build_sample()
        X.append(x)
        Y.append(y)
    return torch.FloatTensor(X), torch.FloatTensor(Y)


# 生成一个样本, 样本的生成方法,代表了我们要学习的规律
# 随机生成一个5维向量,如果第一个值大于第五个值,认为是正样本,反之为负样本
def build_sample():
    x = np.random.random(5)
    if x[0] > x[4]:
        return x, 1
    else:
        return x, 0


# 评估目前模型效果
def evaluate(model):
    # 切换模型到测试模式!!!!
    model.eval()
    test_sample_num = 100
    x, y = build_dataset(test_sample_num)
    print("本次预测集中共有%d个正样本,%d个负样本" % (sum(y), test_sample_num - sum(y)))
    correct, wrong = 0, 0
    # 无需计算梯度
    with torch.no_grad():
        y_pred = model(x)  # 模型预测
        for y_p, y_t in zip(y_pred, y):  # 与真实标签进行对比
            if float(y_p) < 0.5 and int(y_t) == 0:
                correct += 1  # 负样本判断正确
            elif float(y_p) >= 0.5 and int(y_t) == 1:
                correct += 1  # 正样本判断正确
            else:
                wrong += 1
    print("正确预测个数:%d, 正确率:%f" % (correct, correct / (correct + wrong)))
    return correct / (correct + wrong)


def main():
    # 配置参数
    # 训练轮数
    epoch_num = 30
    # 小样本个数
    batch_size = 20
    # 总样本个数
    train_simple = 5000
    # 数据样本维度
    input_size = 5
    # 学习率
    lr = 0.002
    # 建立模型
    model = TorchModel(input_size)
    # 选择优化器
    optim = torch.optim.Adam(model.parameters(), lr=lr)
    log = []
    # 创建训练集
    train_x, train_y = build_dataset(train_simple)
    # 训练过程
    for epoch in range(epoch_num):
        model.train()
        # 本轮次损失函数 主要为了检查损失是否下降
        epoch_loss = []
        # python中“//”是一个算术运算符,表示整数除法,它可以返回商的整数部分(向下取整)
        for batch_index in range(train_simple // batch_size):
            # 代表取出来的具体的x,y_ture
            x = train_x[batch_index * batch_size: (batch_index + 1) * batch_size]
            y = train_y[batch_index * batch_size: (batch_index + 1) * batch_size]
            loss = model(x, y)
            loss.backward()  # 计算梯度(对 loss求导)
            optim.step()  # 更新权重(学习)
            optim.zero_grad()  # 梯度归0(不要影响到下一批次)
            epoch_loss.append(loss.item())
        # np.mean 表示计算数组元素的平均值
        print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(epoch_loss)))
        # 测试本轮模型结果 准确率
        acc = evaluate(model)
        log.append([acc, float(np.mean(epoch_loss))])
    # 保存模型 保存的是模型的权重!
    torch.save(model.state_dict(), "model.pt")
    # 画图
    print(log)
    plt.plot(range(len(log)), [l[0] for l in log], label="acc")  # 画acc曲线
    plt.plot(range(len(log)), [l[1] for l in log], label="loss")  # 画loss曲线
    plt.legend()
    plt.show()
    return


# 使用训练好的模型做预测
def predict(model_path, input_vec):
    input_size = 5
    model = TorchModel(input_size)
    model.load_state_dict(torch.load(model_path))  # 加载训练好的权重
    # print(model.state_dict())

    model.eval()  # 测试模式
    with torch.no_grad():  # 不计算梯度
        result = model.forward(torch.FloatTensor(input_vec))  # 模型预测
    for vec, res in zip(input_vec, result):
        print("输入:%s, 预测类别:%d, 概率值:%f" % (vec, round(float(res)), res))  # 打印结果

if __name__ == "__main__":
    main()
    # test_vec = [[0.27889086,0.15229675,0.31082123,0.03504317,0.18920843],
    #             [0.04963533,0.5524256,0.95758807,0.95520434,0.84890681],
    #             [0.08797868,0.67482528,0.13625847,0.34675372,0.19871392],
    #             [0.99349776,0.59416669,0.92579291,0.41567412,0.7358894]]
    # predict("model.pt", test_vec)