从零实现基于BERT的中文文本情感分析的任务

发布于:2025-05-30 ⋅ 阅读:(20) ⋅ 点赞:(0)

✨不使用BERT预训练模型,从零开始训练!

weibo_senti_100k数据集

1. model.py

# model.py
import torch
import torch.nn as nn
import math

class BertEmbeddings(nn.Module):
    def __init__(self, vocab_size, d_model, max_len, dropout_prob, padding_idx):
        super().__init__()
        self.token_embeddings = nn.Embedding(vocab_size, d_model, padding_idx=padding_idx)
        self.position_embeddings = nn.Embedding(max_len, d_model)
        # Segment embeddings (可选, 对于单句分类可简化或省略)
        # self.segment_embeddings = nn.Embedding(2, d_model)

        self.layer_norm = nn.LayerNorm(d_model, eps=1e-12)
        self.dropout = nn.Dropout(dropout_prob)
        self.register_buffer("position_ids", torch.arange(max_len).expand((1, -1)))
        self.padding_idx = padding_idx

    def forward(self, input_ids, segment_ids=None): # segment_ids 通常是全0
        seq_length = input_ids.size(1)

        token_embeds = self.token_embeddings(input_ids)

        position_ids = self.position_ids[:, :seq_length]
        position_embeds = self.position_embeddings(position_ids)

        embeddings = token_embeds + position_embeds

        # if segment_ids is not None:
        #     segment_embeds = self.segment_embeddings(segment_ids)
        #     embeddings += segment_embeds

        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

class MultiHeadSelfAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout_prob):
        super().__init__()
        assert d_model % num_heads == 0
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        self.query = nn.Linear(d_model, d_model)
        self.key = nn.Linear(d_model, d_model)
        self.value = nn.Linear(d_model, d_model)
        self.fc_out = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout_prob)
        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim]))

    def forward(self, query, key, value, attention_mask=None):
        batch_size = query.shape[0]
        self.scale = self.scale.to(query.device) # Ensure scale is on the correct device

        Q = self.query(query)
        K = self.key(key)
        V = self.value(value)

        Q = Q.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
        K = K.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
        V = V.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)

        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale

        if attention_mask is not None:
            # attention_mask is [batch_size, 1, 1, seq_len]
            energy = energy.masked_fill(attention_mask == 0, -1e10)

        attention_probs = torch.softmax(energy, dim=-1)
        attention_probs = self.dropout(attention_probs)

        x = torch.matmul(attention_probs, V)
        x = x.permute(0, 2, 1, 3).contiguous()
        x = x.view(batch_size, -1, self.num_heads * self.head_dim)
        x = self.fc_out(x)
        return x

class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout_prob):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout_prob)
        self.activation = nn.GELU()

    def forward(self, x):
        x = self.dropout(self.activation(self.fc1(x)))
        x = self.fc2(x)
        return x

class BertEncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout_prob):
        super().__init__()
        self.self_attn = MultiHeadSelfAttention(d_model, num_heads, dropout_prob)
        self.attn_layer_norm = nn.LayerNorm(d_model, eps=1e-12)
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout_prob)
        self.ffn_layer_norm = nn.LayerNorm(d_model, eps=1e-12)
        self.dropout = nn.Dropout(dropout_prob)

    def forward(self, src, src_mask):
        _src = self.self_attn(src, src, src, src_mask)
        src = self.attn_layer_norm(src + self.dropout(_src))
        _src = self.ffn(src)
        src = self.ffn_layer_norm(src + self.dropout(_src))
        return src

class BertModel(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, max_len, dropout_prob, padding_idx):
        super().__init__()
        self.embeddings = BertEmbeddings(vocab_size, d_model, max_len, dropout_prob, padding_idx)
        self.encoder_layers = nn.ModuleList([
            BertEncoderLayer(d_model, num_heads, d_ff, dropout_prob)
            for _ in range(num_layers)
        ])

    def forward(self, input_ids, attention_mask):
        embedded = self.embeddings(input_ids)
        extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2)
        # extended_attention_mask = extended_attention_mask.to(dtype=next(self.parameters()).dtype) # for fp16
        # extended_attention_mask = (1.0 - extended_attention_mask) * -10000.0 # if mask values are 0 for masked

        encoder_output = embedded
        for layer in self.encoder_layers:
            encoder_output = layer(encoder_output, extended_attention_mask)
        return encoder_output

class SentimentClassifier(nn.Module):
    def __init__(self, bert_config): # Pass config as a dictionary or an object
        super().__init__()
        self.bert = BertModel(
            vocab_size=bert_config['vocab_size'],
            d_model=bert_config['d_model'],
            num_layers=bert_config['num_layers'],
            num_heads=bert_config['num_heads'],
            d_ff=bert_config['d_ff'],
            max_len=bert_config['max_len'],
            dropout_prob=bert_config['dropout_prob'],
            padding_idx=bert_config['padding_idx']
        )
        self.dropout = nn.Dropout(bert_config['dropout_prob'])
        self.classifier = nn.Linear(bert_config['d_model'], bert_config['num_classes'])

    def forward(self, input_ids, attention_mask):
        bert_output = self.bert(input_ids, attention_mask)
        cls_output = bert_output[:, 0, :]
        pooled_output = self.dropout(cls_output)
        logits = self.classifier(pooled_output)
        return logits

if __name__ == '__main__':
    # Simple test for model.py
    bert_config_test = {
        'vocab_size': 100, 'd_model': 128, 'num_layers': 2, 'num_heads': 4,
        'd_ff': 256, 'max_len': 30, 'dropout_prob': 0.1, 'num_classes': 2,
        'padding_idx': 0
    }
    model = SentimentClassifier(bert_config_test)
    print(f"Model instantiated with D_MODEL={bert_config_test['d_model']}")
    # Dummy input
    dummy_input_ids = torch.randint(0, 100, (2, 30)) # batch_size=2, seq_len=30
    dummy_attention_mask = torch.ones((2, 30))
    dummy_attention_mask[0, 15:] = 0 # Example padding
    
    out = model(dummy_input_ids, dummy_attention_mask)
    print("Output shape:", out.shape) # Expected: [2, num_classes]

2. train.py

# train.py
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import random
import json
import pandas as pd
from sklearn.model_selection import train_test_split
from tqdm import tqdm # 导入tqdm
from model import SentimentClassifier

# --- 0. Configuration & Constants ---
SEED = 1234
MAX_LEN = 128
BATCH_SIZE = 16
D_MODEL = 128
NUM_LAYERS = 2
NUM_HEADS = 4
D_FF = D_MODEL * 2
DROPOUT_PROB = 0.1
NUM_CLASSES = 2
LEARNING_RATE = 1e-4
N_EPOCHS = 10 # 稍微增加epoch数以便观察进度条和模型保存
TOKENIZER_CONFIG_PATH = "tokenizer_config.json"
MODEL_SAVE_PATH = "custom_bert_sentiment.pt"
DATA_PATH = "sentiment_data.csv"
TRAIN_TEST_SPLIT_RATIO = 0.2

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# --- 1. Load and Prepare Data from CSV ---
def load_data_from_csv(csv_path, text_col='review', label_col='label'):
    try:
        df = pd.read_csv(csv_path)
    except FileNotFoundError:
        print(f"Error: Data file not found at {csv_path}")
        exit()
    
    if text_col not in df.columns or label_col not in df.columns:
        print(f"Error: CSV must contain columns '{text_col}' and '{label_col}'. Found: {df.columns.tolist()}")
        exit()
        
    texts_data = df[text_col].astype(str).tolist()
    labels_data = df[label_col].tolist()
    
    unique_labels = sorted(list(set(labels_data)))
    print(f"Unique labels found in data: {unique_labels}")
    
    # --- 关键:确保标签是 0 和 1 ---
    # 假设你的CSV中,1代表积极,0(或其他值)代表消极
    # 如果你的原始标签是1和0,那么这一步不需要
    # 如果你的原始标签是1和其他非0值(比如-1或文本"negative"),你需要映射
    # 例如,如果你的label列是1(积极)和-1(消极):
    # label_map = {-1: 0, 1: 1}
    # labels_data = [label_map.get(l, l) for l in labels_data] 
    # 或者,如果只有1和0,且1是积极,0是消极,则可以直接使用
    # labels_data = [int(l) for l in labels_data] # 确保是整数

    # 检查标签是否已经是0和1,如果不是,你需要根据实际情况调整
    # 例如,如果你的标签是1和2,需要映射到0和1
    # if set(labels_data) == {1, 2}:
    #     print("Mapping labels from {1, 2} to {0, 1}")
    #     labels_data = [0 if l == 1 else 1 for l in labels_data] # 假设1映射为0,2映射为1
    # elif not (0 in unique_labels and 1 in unique_labels and len(unique_labels) == 2):
    #      print("Warning: Labels are not 0 and 1. Please adjust label mapping in load_data_from_csv.")
    #      # exit() # Consider exiting if labels are not as expected

    # 确保所有标签都是整数
    try:
        labels_data = [int(l) for l in labels_data]
    except ValueError:
        print("Error: Could not convert all labels to integers. Please check your label column.")
        exit()

    # 再次检查,确保是0和1 (假设二分类)
    final_unique_labels = sorted(list(set(labels_data)))
    if not (final_unique_labels == [0, 1] or (NUM_CLASSES == 1 and final_unique_labels == [0])): # 允许单类回归情况
         if NUM_CLASSES == 2 and not (0 in final_unique_labels and 1 in final_unique_labels and len(final_unique_labels) <= 2):
            print(f"Error: After processing, labels are {final_unique_labels}, but expected 0 and 1 for NUM_CLASSES=2.")
            print("Please check your label mapping logic in load_data_from_csv.")
            exit()
    print(f"Labels after processing: {final_unique_labels}")


    return texts_data, labels_data

all_texts, all_labels = load_data_from_csv(DATA_PATH)

if len(set(all_labels)) > NUM_CLASSES : # 允许标签种类少于NUM_CLASSES (例如,数据中只有一种标签)
    print(f"Warning: Number of unique labels found ({len(set(all_labels))}) is more than NUM_CLASSES ({NUM_CLASSES}).")
    print("Please check your data labels and NUM_CLASSES constant.")
    # exit()

train_texts, test_texts, train_labels, test_labels = train_test_split(
    all_texts, all_labels, test_size=TRAIN_TEST_SPLIT_RATIO, random_state=SEED, stratify=all_labels if len(set(all_labels)) > 1 else None
) # stratify only if more than 1 class
print(f"Total samples: {len(all_texts)}")
print(f"Training samples: {len(train_texts)}")
print(f"Test samples: {len(test_texts)}")


# --- 2. Tokenizer --- (与之前版本相同)
class CharTokenizer:
    def __init__(self, max_vocab_size=20000):
        self.pad_token = "[PAD]"
        self.unk_token = "[UNK]"
        self.cls_token = "[CLS]"
        self.sep_token = "[SEP]"
        
        self.special_tokens = [self.pad_token, self.unk_token, self.cls_token, self.sep_token]
        self.vocab = {token: i for i, token in enumerate(self.special_tokens)}
        self.idx_to_token = {i: token for token, i in self.vocab.items()}
        self.max_vocab_size = max_vocab_size
        self.vocab_size = len(self.vocab)

    def build_vocab(self, texts_list):
        char_counts = {}
        for text_item in texts_list:
            for char_item in str(text_item):
                char_counts[char_item] = char_counts.get(char_item, 0) + 1
        
        sorted_chars = sorted(char_counts.items(), key=lambda x: x[1], reverse=True)
        
        for char, _ in sorted_chars:
            if len(self.vocab) >= self.max_vocab_size:
                break
            if char not in self.vocab:
                new_idx = len(self.vocab)
                self.vocab[char] = new_idx
                self.idx_to_token[new_idx] = char
        self.vocab_size = len(self.vocab)

    def tokenize(self, text):
        return [self.cls_token] + list(str(text)) + [self.sep_token]

    def tokens_to_ids(self, tokens):
        return [self.vocab.get(token, self.vocab[self.unk_token]) for token in tokens]

    def encode(self, text, max_len):
        tokens = self.tokenize(text)
        if len(tokens) > max_len:
            tokens = tokens[:max_len-1] + [self.sep_token]
        
        token_ids = self.tokens_to_ids(tokens)
        
        padding_len = max_len - len(token_ids)
        attention_mask = [1] * len(token_ids) + [0] * padding_len
        token_ids = token_ids + [self.vocab[self.pad_token]] * padding_len
        
        return torch.tensor(token_ids), torch.tensor(attention_mask)

    def save_vocab(self, filepath, max_len_to_save):
        config = {
            'vocab': self.vocab,
            'special_tokens': {
                'pad_token': self.pad_token, 'unk_token': self.unk_token,
                'cls_token': self.cls_token, 'sep_token': self.sep_token,
            },
            'pad_token_id': self.vocab[self.pad_token],
            'max_len': max_len_to_save
        }
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(config, f, ensure_ascii=False, indent=4)
        print(f"Tokenizer config saved to {filepath}")

tokenizer = CharTokenizer()
tokenizer.build_vocab(train_texts)
tokenizer.save_vocab(TOKENIZER_CONFIG_PATH, MAX_LEN)
print(f"Vocabulary size: {tokenizer.vocab_size}")

# --- 3. Dataset & DataLoader --- (与之前版本相同)
class SentimentDataset(Dataset):
    def __init__(self, texts_list, labels_list, tokenizer_instance, max_len_val):
        self.texts = texts_list
        self.labels = labels_list
        self.tokenizer = tokenizer_instance
        self.max_len = max_len_val

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx] # 确保这里已经是整数0或1
        input_ids, attention_mask = self.tokenizer.encode(text, self.max_len)
        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "label": torch.tensor(label, dtype=torch.long)
        }

train_dataset = SentimentDataset(train_texts, train_labels, tokenizer, MAX_LEN)
test_dataset = SentimentDataset(test_texts, test_labels, tokenizer, MAX_LEN)

train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# --- 4. Model Initialization --- (与之前版本相同)
bert_config = {
    'vocab_size': tokenizer.vocab_size,
    'd_model': D_MODEL,
    'num_layers': NUM_LAYERS,
    'num_heads': NUM_HEADS,
    'd_ff': D_FF,
    'max_len': MAX_LEN,
    'dropout_prob': DROPOUT_PROB,
    'num_classes': NUM_CLASSES,
    'padding_idx': tokenizer.vocab[tokenizer.pad_token]
}
model = SentimentClassifier(bert_config).to(device)

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'The model has {count_parameters(model):,} trainable parameters')

# --- 5. Training Setup --- (与之前版本相同)
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss()

# --- 6. Training Loop with tqdm ---
def train_epoch(model_inst, dataloader, opt, crit, dev, epoch_num, total_epochs):
    model_inst.train()
    epoch_loss = 0
    epoch_corrects = 0
    total_samples = 0
    
    # 使用tqdm包装dataloader
    progress_bar = tqdm(dataloader, desc=f"Epoch {epoch_num}/{total_epochs} [Train]", leave=False)
    for batch in progress_bar:
        input_ids = batch['input_ids'].to(dev)
        attention_mask = batch['attention_mask'].to(dev)
        labels_batch = batch['label'].to(dev)
        
        opt.zero_grad()
        predictions = model_inst(input_ids, attention_mask)
        loss = crit(predictions, labels_batch)
        preds_classes = torch.argmax(predictions, dim=1)
        corrects = torch.sum(preds_classes == labels_batch)
        
        loss.backward()
        opt.step()
        
        current_loss = loss.item()
        current_acc = corrects.item() / input_ids.size(0)
        epoch_loss += current_loss * input_ids.size(0)
        epoch_corrects += corrects.item()
        total_samples += input_ids.size(0)

        # 更新tqdm的后缀信息
        progress_bar.set_postfix(loss=f"{current_loss:.3f}", acc=f"{current_acc:.3f}")
        
    return epoch_loss / total_samples, epoch_corrects / total_samples

def evaluate_epoch(model_inst, dataloader, crit, dev, epoch_num, total_epochs):
    model_inst.eval()
    epoch_loss = 0
    epoch_corrects = 0
    total_samples = 0
    
    # 使用tqdm包装dataloader
    progress_bar = tqdm(dataloader, desc=f"Epoch {epoch_num}/{total_epochs} [Valid]", leave=False)
    with torch.no_grad():
        for batch in progress_bar:
            input_ids = batch['input_ids'].to(dev)
            attention_mask = batch['attention_mask'].to(dev)
            labels_batch = batch['label'].to(dev)
            
            predictions = model_inst(input_ids, attention_mask)
            loss = crit(predictions, labels_batch)
            preds_classes = torch.argmax(predictions, dim=1)
            corrects = torch.sum(preds_classes == labels_batch)
            
            current_loss = loss.item()
            current_acc = corrects.item() / input_ids.size(0)
            epoch_loss += current_loss * input_ids.size(0)
            epoch_corrects += corrects.item()
            total_samples += input_ids.size(0)

            # 更新tqdm的后缀信息
            progress_bar.set_postfix(loss=f"{current_loss:.3f}", acc=f"{current_acc:.3f}")
            
    return epoch_loss / total_samples, epoch_corrects / total_samples

if __name__ == '__main__':
    print(f"\nStarting training on {device}...")
    best_valid_loss = float('inf')
    saved_model_epoch_message = "" # 用于存储模型保存信息

    for epoch in range(N_EPOCHS):
        epoch_num_display = epoch + 1 # 用于显示

        train_loss, train_acc = train_epoch(model, train_dataloader, optimizer, criterion, device, epoch_num_display, N_EPOCHS)
        valid_loss, valid_acc = evaluate_epoch(model, test_dataloader, criterion, device, epoch_num_display, N_EPOCHS)
        
        # 清除上一轮的进度条 (如果需要,tqdm的leave=False通常能处理好)
        # print("\r" + " " * 80 + "\r", end="") # 尝试清除进度条残留

        # 更新Epoch结束后的打印信息
        print(f"Epoch {epoch_num_display:02}/{N_EPOCHS}:")
        print(f"\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%")
        print(f"\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%")

        # 检查并保存最佳模型,将信息打印在Epoch总结之后
        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            torch.save(model.state_dict(), MODEL_SAVE_PATH)
            saved_model_epoch_message = f"\t==> Best validation loss: {best_valid_loss:.3f}. Model saved at epoch {epoch_num_display} to {MODEL_SAVE_PATH}"
            print(saved_model_epoch_message) # 立即打印保存信息
        elif saved_model_epoch_message: # 如果之前保存过,并且这一轮没有更好,可以清除或不打印
            pass # 或者打印之前的保存信息:print(saved_model_epoch_message)

    print("\nTraining finished.")
    if best_valid_loss != float('inf'):
        print(f"Best model (Val Loss: {best_valid_loss:.3f}) was saved to {MODEL_SAVE_PATH}")

3. predict.py

# predict.py
import torch
import json
from model import SentimentClassifier # Import from model.py

# --- 0. Configuration (should match training if not loaded from a central config) ---
# These are mainly for model architecture reconstruction if not fully in bert_config
# D_MODEL = 128
# NUM_LAYERS = 2
# NUM_HEADS = 4
# D_FF = D_MODEL * 2
# DROPOUT_PROB = 0.1
# NUM_CLASSES = 2
# MAX_LEN will be loaded from tokenizer_config

TOKENIZER_CONFIG_PATH = "tokenizer_config.json"
MODEL_SAVE_PATH = "custom_bert_sentiment.pt"

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# --- 1. Tokenizer (re-defined for prediction, loads vocab) ---
class CharTokenizerPredict:
    def __init__(self, vocab, special_tokens_info, pad_token_id, max_len):
        self.vocab = vocab
        self.pad_token = special_tokens_info['pad_token']
        self.unk_token = special_tokens_info['unk_token']
        self.cls_token = special_tokens_info['cls_token']
        self.sep_token = special_tokens_info['sep_token']
        self.pad_token_id = pad_token_id # vocab[self.pad_token]
        self.unk_token_id = vocab[self.unk_token]
        self.max_len = max_len

    @classmethod
    def from_config(cls, config_path):
        with open(config_path, 'r', encoding='utf-8') as f:
            config = json.load(f)
        return cls(
            vocab=config['vocab'],
            special_tokens_info=config['special_tokens'],
            pad_token_id=config['pad_token_id'],
            max_len=config['max_len']
        )

    def tokenize(self, text):
        return [self.cls_token] + list(text) + [self.sep_token]

    def tokens_to_ids(self, tokens):
        return [self.vocab.get(token, self.unk_token_id) for token in tokens]

    def encode(self, text): # max_len is now part of the instance
        tokens = self.tokenize(text)
        if len(tokens) > self.max_len:
            tokens = tokens[:self.max_len-1] + [self.sep_token]
        
        token_ids = self.tokens_to_ids(tokens)
        
        padding_len = self.max_len - len(token_ids)
        attention_mask = [1] * len(token_ids) + [0] * padding_len
        token_ids = token_ids + [self.pad_token_id] * padding_len
        
        return torch.tensor(token_ids), torch.tensor(attention_mask)

# --- 2. Load Tokenizer and Model Config ---
try:
    with open(TOKENIZER_CONFIG_PATH, 'r', encoding='utf-8') as f:
        tokenizer_json_config = json.load(f)
except FileNotFoundError:
    print(f"Error: Tokenizer config file not found at {TOKENIZER_CONFIG_PATH}")
    print("Please run train.py first to generate it.")
    exit()

tokenizer = CharTokenizerPredict.from_config(TOKENIZER_CONFIG_PATH)

# Reconstruct bert_config for model instantiation
# Values that are fixed or derived can be hardcoded or ideally also saved/loaded
# For now, let's assume some are known or can be inferred
# The crucial ones are vocab_size, max_len, padding_idx from tokenizer_json_config
# Others (d_model, num_layers, etc.) must match what was used in training.
# It's better practice to save the full bert_config during training if possible.

# For this example, let's use constants for model architecture params
# that were also used in train.py (or load them from a shared config file).
# We only *need* to get vocab_size, max_len, padding_idx from tokenizer_config.
# The rest define the model structure and must be identical to training.
D_MODEL_PREDICT = 128
NUM_LAYERS_PREDICT = 2
NUM_HEADS_PREDICT = 4
DROPOUT_PROB_PREDICT = 0.1
NUM_CLASSES_PREDICT = 2

bert_config_predict = {
    'vocab_size': len(tokenizer_json_config['vocab']), # Use loaded vocab size
    'd_model': D_MODEL_PREDICT,
    'num_layers': NUM_LAYERS_PREDICT,
    'num_heads': NUM_HEADS_PREDICT,
    'd_ff': D_MODEL_PREDICT * 2, # Derived, ensure consistency
    'max_len': tokenizer_json_config['max_len'], # Use loaded max_len
    'dropout_prob': DROPOUT_PROB_PREDICT,
    'num_classes': NUM_CLASSES_PREDICT,
    'padding_idx': tokenizer_json_config['pad_token_id'] # Use loaded padding_idx
}

# --- 3. Load Model ---
model = SentimentClassifier(bert_config_predict).to(device)
try:
    model.load_state_dict(torch.load(MODEL_SAVE_PATH, map_location=device))
except FileNotFoundError:
    print(f"Error: Model file not found at {MODEL_SAVE_PATH}")
    print("Please run train.py first to train and save the model.")
    exit()
model.eval()
print("Model and tokenizer loaded successfully.")

# --- 4. Prediction Function ---
def predict_sentiment(text, model, tokenizer_instance, device): # Pass tokenizer instance
    model.eval()
    input_ids, attention_mask = tokenizer_instance.encode(text)
    
    input_ids = input_ids.unsqueeze(0).to(device)
    attention_mask = attention_mask.unsqueeze(0).to(device)
    
    with torch.no_grad():
        prediction = model(input_ids, attention_mask)
        
    probabilities = torch.softmax(prediction, dim=1)
    predicted_class = torch.argmax(probabilities, dim=1).item()
    
    return predicted_class, probabilities.squeeze().tolist()

# --- 5. Example Usage ---
if __name__ == '__main__':
    print("\n--- Predictions ---")
    test_sentences = [
        "祝福他们的友谊长存,大家都很开心",
        "天哪,好脏,没想到除了后厨脏乱差、预制菜问题,还有个别运送人员素质问题啊",
        "越想越气,想不通,今天一天没吃饭,想着抢票晚饭也没吃就特地找个地方安静答题,还说答完去吃饭,现在直接没胃口了,到底是个什么给题机制",
        "周董这个生日过得太有意义了 满满的感动",
        "但凡有个脑子就不会说这种脑残话"
    ]

    for sentence in test_sentences:
        pred_class, probs = predict_sentiment(sentence, model, tokenizer, device)
        label = 'Positive' if pred_class == 1 else 'Negative'
        print(f"Sentence: {sentence}")
        print(f"Predicted: {label} (raw: {pred_class})")
        print(f"Probabilities (Neg, Pos): {probs[0]:.4f}, {probs[1]:.4f}\n")

4. predict.py运行结果

root@autodl-container-49c4408e50-605ea07b:~# python predict.py
/root/predict.py:106: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.
  model.load_state_dict(torch.load(MODEL_SAVE_PATH, map_location=device))
Model and tokenizer loaded successfully.

--- Predictions ---
Sentence: 祝福他们的友谊长存,大家都很开心
Predicted: Positive (raw: 1)
Probabilities (Neg, Pos): 0.0001, 0.9999

Sentence: 天哪,好脏,没想到除了后厨脏乱差、预制菜问题,还有个别运送人员素质问题啊
Predicted: Negative (raw: 0)
Probabilities (Neg, Pos): 0.8593, 0.1407

Sentence: 越想越气,想不通,今天一天没吃饭,想着抢票晚饭也没吃就特地找个地方安静答题,还说答完去吃饭,现在直接没胃口了,到底是个什么给题机制
Predicted: Negative (raw: 0)
Probabilities (Neg, Pos): 0.7079, 0.2921

Sentence: 周董这个生日过得太有意义了 满满的感动
Predicted: Positive (raw: 1)
Probabilities (Neg, Pos): 0.0001, 0.9999

Sentence: 但凡有个脑子就不会说这种脑残话
Predicted: Negative (raw: 0)
Probabilities (Neg, Pos): 0.9773, 0.0227


网站公告

今日签到

点亮在社区的每一天
去签到