1.获取数据集
from datasets import load_dataset
data = load_dataset(path="csv",data_files="data/news/train.csv",split="train")
print(data)
for i in data:
print(data["text"])
2.统计数据分布情况
这一步是为了验证数据集的质量。
import pandas as pd
df = pd.read_csv("data/news/train.csv")
#统计每个类别的数据量
category_counts = df["label"].value_counts()
#统计每个类别的比值
total_data = len(df)
category_ratios = (category_counts / total_data) *100
print(category_counts)
print(category_ratios)
使用这行之后,可以看到数据集非常的紊乱,分布非常的不均匀。所有我们需要进行数据清洗。
3.数据采样
一、数据过采样(Oversampling)
定义:通过增加少数类样本的数量,使各类别样本趋于平衡。
适用场景:
数据集较小
当数据集整体规模较小时,欠采样可能导致信息丢失,而过采样可以在不损失数据的情况下平衡类别分布。
例如:医疗诊断数据中,罕见病例的样本数量较少,过采样可以保留这些关键信息。
少数类样本非常重要
当少数类样本对任务至关重要时(如欺诈检测、罕见疾病诊断),过采样可以增强模型对少数类的识别能力。
例如:在信用卡欺诈检测中,欺诈交易占比极低,但漏检代价高昂。
模型对数据量敏感
某些模型(如深度学习模型)需要大量数据才能有效训练,过采样可以提供更多样本,避免模型欠拟合。
常用方法:
随机过采样:简单复制少数类样本。
SMOTE(Synthetic Minority Oversampling Technique):通过插值生成新的少数类样本。
ADASYN(Adaptive Synthetic Sampling):根据样本分布自适应生成新样本。
注意事项:
过采样可能导致过拟合,尤其是随机过采样时,模型可能过度依赖重复样本。
生成新样本时需确保其合理性,避免引入噪声。
二、数据欠采样(Undersampling)
定义:通过减少多数类样本的数量,使各类别样本趋于平衡。
适用场景:
数据集较大
当数据集规模较大时,欠采样可以显著减少计算成本,同时避免过采样带来的过拟合风险。
例如:在文本分类任务中,多数类样本数量庞大,欠采样可以加速训练过程。
多数类样本冗余
当多数类样本中存在大量相似或重复数据时,欠采样可以去除冗余信息,提升模型效率。
例如:在图像分类中,某些类别的图像可能高度相似,欠采样可以减少冗余。
计算资源有限
当计算资源有限时,欠采样可以降低数据规模,使模型训练更加高效。
常用方法:
随机欠采样:随机删除多数类样本。
NearMiss:基于距离选择与少数类样本最接近的多数类样本。
Tomek Links:去除边界上容易导致分类错误的样本。
注意事项:
欠采样可能导致信息丢失,尤其是当多数类样本中包含重要信息时。
可能削弱模型对多数类的识别能力。
三、选择依据
数据集规模
小数据集:优先考虑过采样。
大数据集:优先考虑欠采样。
任务目标
少数类样本重要:优先考虑过采样。
多数类样本冗余:优先考虑欠采样。
计算资源
资源有限:优先考虑欠采样。
资源充足:优先考虑过采样。
模型特性
对数据量敏感:优先考虑过采样。
对噪声敏感:优先考虑欠采样。
四、综合策略
在实际应用中,可以结合过采样和欠采样的优点,采用混合策略:
SMOTE + Tomek Links:先使用SMOTE生成少数类样本,再用Tomek Links清理边界样本。
集成方法:如EasyEnsemble或BalanceCascade,通过多次欠采样构建多个平衡子集,训练集成模型。
总结
过采样适用于小数据集、少数类样本重要或模型对数据量敏感的场景,但需注意过拟合风险。
欠采样适用于大数据集、多数类样本冗余或计算资源有限的场景,但需注意信息丢失问题。
根据具体任务需求和数据特点,灵活选择或结合两种方法,以达到最佳模型性能。
这里我们使用欠采样
import pandas as pd
from imblearn.under_sampling import RandomUnderSampler
from imblearn.over_sampling import RandomOverSampler
#读取CSV文件
csv_file_path = "data/Weibo/train.csv"
df = pd.read_csv(csv_file_path)
#定义重采样策略
#如果想要过采样,使用RandomOverSampler
#如果想要欠采样,使用RandomUnderSampler
#我们在这里使用RandomUnderSampler进行欠采样
#random_state控制随机数生成器的种子
rus = RandomUnderSampler(sampling_strategy="auto",random_state=42)
#将特征和标签分开
X = df[["text"]]
Y = df[["label"]]
print(Y)
#应用重采样
X_resampled,Y_resampled = rus.fit_resample(X,Y)
print(Y_resampled)
#合并特征和标签,创建系的DataFrame
df_resampled = pd.concat([X_resampled,Y_resampled],axis=1)
print(df_resampled)
#保存均衡数据到新的csv文件
df_resampled.to_csv("train.csv",index=False)
输出结果
D:\miniconda3\envs\shenduxuexi\python.exe D:\JKAI\demo_7\data_test.py
label
9 5045
6 5040
0 5040
7 5017
3 5000
4 4994
8 4983
1 4981
5 4950
2 4950
Name: count, dtype: int64
label
9 10.090
6 10.080
0 10.080
7 10.034
3 10.000
4 9.988
8 9.966
1 9.962
5 9.900
2 9.900
Name: count, dtype: float64
现在的数据分布非常的均匀
4.数据管理
from torch.utils.data import Dataset
from datasets import load_dataset
class MyDataset(Dataset):
def __init__(self,split):
#从磁盘加载数据
self.dataset = load_dataset(path="csv",data_files=f"train.csv",split="train")
def __len__(self):
return len(self.dataset)
def __getitem__(self, item):
text = self.dataset[item]["text"]
label = self.dataset[item]["label"]
return text,label
if __name__ == '__main__':
dataset = MyDataset("test")
for data in dataset:
print(data)
5.定义网络结构
from transformers import BertModel,BertConfig
import torch
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#加载预训练模型
pretrained = BertModel.from_pretrained(r"D:\JKAI\demo_7\model\bert-base-chinese\models--bert-base-chinese\snapshots\c30a6ed22ab4564dc1e3b2ecbf6e766b0611a33f").to(DEVICE)
# pretrained.embeddings.position_embeddings = torch.nn.Embedding(1024,768).to(DEVICE)
#config = BertConfig.from_pretrained(r"E:\PycharmProjects\demo_7\model\bert-base-chinese\models--bert-base-chinese\snapshots\c30a6ed22ab4564dc1e3b2ecbf6e766b0611a33f")
#config.max_position_embeddings = 1024
#print(config)
#使用配置文件初始化模型
#pretrained = BertModel(config).to(DEVICE)
#print(pretrained)
#定义下游任务
class Model(torch.nn.Module):
def __init__(self):
super().__init__()
#设计全连接网络,实现二分类任务
self.fc = torch.nn.Linear(768,8)
def forward(self,input_ids,attention_mask,token_type_ids):
#冻结Bert模型的参数,让其不参与训练
with torch.no_grad():
out = pretrained(input_ids=input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids)
#增量模型参与训练
out = self.fc(out.last_hidden_state[:,0])
return out
6.模型训练
#模型训练
import torch
from MyData import MyDataset
from torch.utils.data import DataLoader
from net import Model
from transformers import BertTokenizer,AdamW
#定义设备信息
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#定义训练的轮次
EPOCH= 3000
token = BertTokenizer.from_pretrained(r"D:\JKAI\demo_7\model\bert-base-chinese\models--bert-base-chinese\snapshots\c30a6ed22ab4564dc1e3b2ecbf6e766b0611a33f")
def collate_fn(data):
sents = [i[0]for i in data]
label = [i[1] for i in data]
#编码
data = token.batch_encode_plus(
batch_text_or_text_pairs=sents,
truncation=True,
max_length=512,
padding="max_length",
return_tensors="pt",
return_length=True
)
input_ids = data["input_ids"]
attention_mask = data["attention_mask"]
token_type_ids = data["token_type_ids"]
labels = torch.LongTensor(label)
return input_ids,attention_mask,token_type_ids,labels
#创建数据集
train_dataset = MyDataset("train")
train_loader = DataLoader(
dataset=train_dataset,
batch_size=120,
shuffle=True,
#舍弃最后一个批次的数据,防止形状出错
drop_last=True,
#对加载进来的数据进行编码
collate_fn=collate_fn
)
val_dataset = MyDataset("validation")
val_loader = DataLoader(
dataset=val_dataset,
batch_size=2,
shuffle=True,
#舍弃最后一个批次的数据,防止形状出错
drop_last=True,
#对加载进来的数据进行编码
collate_fn=collate_fn
)
if __name__ == '__main__':
#开始训练
print(DEVICE)
model = Model().to(DEVICE)
#定义优化器
optimizer = AdamW(model.parameters())
#定义损失函数
loss_func = torch.nn.CrossEntropyLoss()
#初始化最佳验证准确率
best_val_acc = 0.0
for epoch in range(EPOCH):
for i,(input_ids,attention_mask,token_type_ids,labels) in enumerate(train_loader):
#将数据存放到DEVICE上
input_ids, attention_mask, token_type_ids, labels = input_ids.to(DEVICE),attention_mask.to(DEVICE),token_type_ids.to(DEVICE),labels.to(DEVICE)
#前向计算(将数据输入模型,得到输出)
out = model(input_ids=input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids)
#根据输出,计算损失
loss = loss_func(out,labels)
#根据损失,优化参数
optimizer.zero_grad()
loss.backward()
optimizer.step()
#每隔5个批次输出训练信息
if i%5==0:
out = out.argmax(dim=1)
acc = (out==labels).sum().item()/len(labels)
print(f"epoch:{epoch},i:{i},loss:{loss.item()},acc:{acc}")
#验证模型(判断是否过拟合)
#设置为评估模式
model.eval()
#不需要模型参与训练
with torch.no_grad():
val_acc = 0.0
val_loss = 0.0
for i, (input_ids, attention_mask, token_type_ids, labels) in enumerate(val_loader):
# 将数据存放到DEVICE上
input_ids, attention_mask, token_type_ids, labels = input_ids.to(DEVICE), attention_mask.to(
DEVICE), token_type_ids.to(DEVICE), labels.to(DEVICE)
# 前向计算(将数据输入模型,得到输出)
out = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
# 根据输出,计算损失
val_loss += loss_func(out, labels)
out = out.argmax(dim=1)
val_acc+=(out==labels).sum().item()
val_loss /= len(val_loader)
val_acc /= len(val_loader)
print(f"验证集:loss:{val_loss},acc:{val_acc}")
#根据验证准确率保存最优参数
if val_acc > best_val_acc:
best_val_acc = val_acc
torch.save(model.state_dict(),"params/best_bert.pth")
print(f"Epoch:{epoch}:保存最优参数:acc:{best_val_acc}")
#保存最后一轮参数
torch.save(model.state_dict(),f"params/last_bert.pth")
print(epoch,f"Epcot:{epoch}最后一轮参数保存成功!")
7.体验模型
import torch
from net import Model
from transformers import BertTokenizer
#定义设备信息
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(DEVICE)
token = BertTokenizer.from_pretrained(r"D:\JKAI\demo_7\model\bert-base-chinese\models--bert-base-chinese\snapshots\c30a6ed22ab4564dc1e3b2ecbf6e766b0611a33f")
names = [ "like",
"disgust",
"happiness",
"sadness",
"anger",
"surprise",
"fear",
"none"]
model = Model().to(DEVICE)
def collate_fn(data):
sents = []
sents.append(data)
#编码
data = token.batch_encode_plus(
batch_text_or_text_pairs=sents,
truncation=True,
max_length=500,
padding="max_length",
return_tensors="pt",
return_length=True
)
input_ids = data["input_ids"]
attention_mask = data["attention_mask"]
token_type_ids = data["token_type_ids"]
return input_ids, attention_mask, token_type_ids
def test():
#加载训练参数
model.load_state_dict(torch.load("params/best_bert.pth",map_location=DEVICE))
#开启测试模式
model.eval()
while True:
data = input("请输入测试数据(输入‘q’退出):")
if data == 'q':
print("测试结束")
break
input_ids, attention_mask, token_type_ids = collate_fn(data)
input_ids, attention_mask, token_type_ids = input_ids.to(DEVICE), attention_mask.to(DEVICE), \
token_type_ids.to(DEVICE)
with torch.no_grad():
out = model(input_ids, attention_mask, token_type_ids)
out = out.argmax(dim=1)
print("模型判定:",names[out],"\n")
if __name__ == '__main__':
test()
可以准确的判断出,是哪一种类型的评价。