熵最小化Entropy Minimization (二): 案例实施

发布于:2025-06-11 ⋅ 阅读:(18) ⋅ 点赞:(0)

前面介绍了熵最小化常用的权重函数汇总半监督学习:低密度分离假设 (Low-Density Separation Assumption)标签平滑信息最大化等相关的知识点,本文采用一个MNIST10分类的数据集来进一步体会它们的效果。

案例实施

对比方法

  • 纯监督学习方法(”supervised“):仅含10%的标签数据。(损失函数=监督损失)
  • 熵最小化方法(“entropy”):10%的标签数据+无监督数据。(损失函数=监督损失+熵最小化损失)
  • 伪标签方法(“pseudo”):10%的标签数据+无监督数据(伪标签),即将模型预测置信度大的标签作为对应样本的伪标签。(损失函数=监督损失+伪标签监督损失)
  • 熵最小化+伪标签(“pseudo_entropy”):10%的标签数据+无监督数据(生成伪标签)。(损失函数=监督损失+熵最小化+伪标签监督损失)
  • 信息最大化(“inform_max”):10%的标签数据+无监督数据。损失函数=监督损失+信息最大化损失。

代码

说明:

  • create_semi_supervised_dataset():表示从全体训练数据集中随机选取指定比例的带标签和不带标签数据,用于训练。
  • entropy_loss():熵最小化损失函数。
  • LabelSmoothingCrossEntropy():标签平滑交叉熵,效果可能会比交叉熵好,本文用的是timm包中的代码。
  • inform_max_loss():信息最大化损失函数。
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import numpy as np
import matplotlib.pyplot as plt
import random
import argparse
from timm.loss import LabelSmoothingCrossEntropy
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader


device = torch.device("cuda")

def parse_args():
    parser = argparse.ArgumentParser(description='train')
    parser.add_argument('--compare', type=bool, default=True, help='比较所有方法')
    parser.add_argument('--method', type=str, default='inform_max', choices=['supervised', 'entropy', 'pseudo', 'pseudo_entropy', 'inform_max'])
    # 数据集参数
    parser.add_argument('--data_path', type=str, default='G:\CV_opensource_code\datasets\mnist', help='MNIST数据集路径')
    parser.add_argument('--download', type=bool, default=False, help='是否下载MNIST数据集')
    # 训练参数
    parser.add_argument('--epochs', type=int, default=1000, help='训练轮数')
    parser.add_argument('--bs', type=int, default=128, help='batch size')
    parser.add_argument('--optimizer', type=str, default='adam', choices=['adam', 'sgd'], help='优化器类型')
    parser.add_argument('--lr', type=float, default=0.001, help='学习率')
    parser.add_argument('--label_smooth', type=bool, default=True, help='采用标签平滑损失或者交叉熵损失')
    # 半监督参数
    parser.add_argument('--labeled_ratio', type=float, default=0.1, help='整个训练集中带标签样本比率')
    parser.add_argument('--lambda_et', type=float, default=.5, help='IM的熵损失权重')
    parser.add_argument('--lambda_div', type=float, default=.5, help='IM的多样化损失权重')
    parser.add_argument('--alpha', type=float, default=0.5, help='无标签损失的权重')
    parser.add_argument('--confidence_threshold', type=float, default=0.95, help='伪标签的置信度阈值')
    args = parser.parse_args()
    return args


def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True


def create_semi_supervised_dataset(dataset, labeled_ratio=0.1):
    labeled_indices = []
    unlabeled_indices = []

    targets = np.array(dataset.targets) # label
    for i in range(10):
        class_indices = np.where(targets == i)[0]
        np.random.shuffle(class_indices)
        n_labeled = int(len(class_indices) * labeled_ratio)
        labeled_indices.extend(class_indices[:n_labeled])   # 取labeled_ratio比率的样本作为带标签的样本
        unlabeled_indices.extend(class_indices[n_labeled:]) # 剩余的作为未标记的

    return torch.utils.data.Subset(dataset, labeled_indices), torch.utils.data.Subset(dataset, unlabeled_indices)


class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = nn.functional.relu(self.conv1(x))
        x = nn.functional.relu(self.conv2(x))
        x = nn.functional.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = nn.functional.relu(self.fc1(x))
        x = self.dropout2(x)
        return self.fc2(x)


def entropy_loss(predictions):
    probabilities = torch.softmax(predictions, dim=1)
    log_probs = torch.log_softmax(predictions, dim=1)
    entropy = -torch.sum(probabilities * log_probs, dim=1)
    return torch.mean(entropy)


def inform_max_loss(logits, lambda_div=0.1, lambda_et=1.,  eps=1e-8):
    # 计算softmax概率
    probs = F.softmax(logits, dim=1)

    # 1. L_ent: 熵最小化损失,使预测更确定
    entropy_per_sample = -torch.sum(probs * torch.log(probs + eps), dim=1)
    entropy_loss = torch.mean(entropy_per_sample)

    # 2. L_div: 多样性最大化损失, 使类别分布均匀
    mean_probs = torch.mean(probs, dim=0)  # 边缘分布,由于样本是独立同分布的,这里考虑概率的平均值而非总和
    diversity_loss = -torch.sum(mean_probs * torch.log(mean_probs + eps))

    # L_IM总损失
    total_loss = lambda_et * entropy_loss - lambda_div * diversity_loss
    return total_loss


def evaluate(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total


def loss_supervised(model, labeled_inputs, labels, **kwargs):
    outputs = model(labeled_inputs)
    labeled_loss = kwargs['criterion'](outputs, labels)
    return labeled_loss, {'labeled_loss': labeled_loss.item()}


def loss_entropy(model, labeled_inputs, labels, unlabeled_inputs, **kwargs):
    outputs_labeled = model(labeled_inputs)
    labeled_loss = kwargs['criterion'](outputs_labeled, labels)

    outputs_unlabeled = model(unlabeled_inputs)
    ent_loss = entropy_loss(outputs_unlabeled)

    total_loss = labeled_loss + kwargs['alpha'] * ent_loss
    return total_loss, {
        'labeled_loss': labeled_loss.item(),
        'entropy_loss': ent_loss.item()
    }


def loss_pseudo(model, labeled_inputs, labels, unlabeled_inputs, **kwargs):
    outputs_labeled = model(labeled_inputs)
    labeled_loss = kwargs['criterion'](outputs_labeled, labels)

    outputs_unlabeled = model(unlabeled_inputs)
    unlabeled_probs = torch.softmax(outputs_unlabeled, dim=1)
    max_probs, pseudo_labels = torch.max(unlabeled_probs, dim=1)
    mask = max_probs.ge(kwargs['confidence_threshold'])

    if mask.sum() > 0:
        pl_loss = kwargs['criterion'](outputs_unlabeled[mask], pseudo_labels[mask])
    else:
        pl_loss = torch.tensor(0.0).to(device)

    total_loss = labeled_loss + kwargs['alpha'] * pl_loss
    return total_loss, {
        'labeled_loss': labeled_loss.item(),
        'pseudo_loss': pl_loss.item() if mask.sum() > 0 else 0.0,
        'pseudo_ratio': mask.sum().item() / unlabeled_inputs.size(0)
    }


def loss_pseudo_entropy(model, labeled_inputs, labels, unlabeled_inputs, **kwargs):
    outputs_labeled = model(labeled_inputs)
    labeled_loss = kwargs['criterion'](outputs_labeled, labels)

    outputs_unlabeled = model(unlabeled_inputs)
    unlabeled_probs = torch.softmax(outputs_unlabeled, dim=1)
    max_probs, pseudo_labels = torch.max(unlabeled_probs, dim=1)
    mask = max_probs.ge(kwargs['confidence_threshold'])

    if mask.sum() > 0:
        pl_loss = kwargs['criterion'](outputs_unlabeled[mask], pseudo_labels[mask])
    else:
        pl_loss = torch.tensor(0.0).to(device)

    ent_loss = entropy_loss(outputs_unlabeled)

    total_loss = labeled_loss + kwargs['alpha'] * (pl_loss + ent_loss)
    return total_loss, {
        'labeled_loss': labeled_loss.item(),
        'pseudo_loss': pl_loss.item() if mask.sum() > 0 else 0.0,
        'entropy_loss': ent_loss.item(),
        'pseudo_ratio': mask.sum().item() / unlabeled_inputs.size(0)
    }


def loss_inform_max(model, labeled_inputs, labels, unlabeled_inputs, **kwargs):
    outputs_labeled = model(labeled_inputs)
    labeled_loss = kwargs['criterion'](outputs_labeled, labels)

    outputs_unlabeled = model(unlabeled_inputs)
    im_loss = inform_max_loss(outputs_unlabeled, lambda_et=kwargs['lambda_et'], lambda_div=kwargs['lambda_div'])

    total_loss = labeled_loss + im_loss
    return total_loss, {
        'labeled_loss': labeled_loss.item(),
        'inform_max_loss': im_loss.item()
    }


def train(model, optimizer, criterion, train_loaders, test_loader, epochs, loss_function, **loss_kwargs):
    history = {'labeled_loss': [], 'entropy_loss': [], 'pseudo_loss': [], 'inform_max_loss': [], 'total_loss': [], 'accuracy': [], 'pseudo_ratio': []}
    labeled_loader = train_loaders['labeled']
    unlabeled_loader = train_loaders['unlabeled']

    for epoch in range(epochs):
        model.train()
        epoch_metrics = {k: 0.0 for k in history.keys() if k != 'accuracy'}
        epoch_counts = {'labeled': 0, 'unlabeled': 0}

        # 为监督学习方法创建虚拟无标签数据迭代器
        if unlabeled_loader is None:
            unlabeled_iter = iter([])
        else:
            unlabeled_iter = iter(unlabeled_loader)

        for batch_idx, (labeled_inputs, labels) in enumerate(labeled_loader):
            labeled_inputs, labels = labeled_inputs.to(device), labels.to(device)
            batch_size = labeled_inputs.size(0)
            epoch_counts['labeled'] += batch_size

            unlabeled_inputs = None
            try:
                unlabeled_data, _ = next(unlabeled_iter)
                unlabeled_inputs = unlabeled_data.to(device)
                epoch_counts['unlabeled'] += unlabeled_inputs.size(0)
            except StopIteration:
                if unlabeled_loader is not None:
                    unlabeled_iter = iter(unlabeled_loader)
                    unlabeled_data, _ = next(unlabeled_iter)
                    unlabeled_inputs = unlabeled_data.to(device)
                    epoch_counts['unlabeled'] += unlabeled_inputs.size(0)

            optimizer.zero_grad()
            loss_args = {'model': model, 'labeled_inputs': labeled_inputs, 'labels': labels, 'criterion': criterion, **loss_kwargs}

            if unlabeled_inputs is not None:
                loss_args['unlabeled_inputs'] = unlabeled_inputs

            total_loss, loss_metrics = loss_function(**loss_args)
            total_loss.backward()
            optimizer.step()

            # 累计指标
            for key in loss_metrics:
                if key in epoch_metrics:
                    if key == 'pseudo_ratio':
                        epoch_metrics[key] += loss_metrics[key] * batch_size
                    else:
                        epoch_metrics[key] += loss_metrics[key] * batch_size

            epoch_metrics['total_loss'] += total_loss.item() * batch_size

        # 计算平均指标
        for key in epoch_metrics:
            if key == 'pseudo_ratio':
                history[key].append(epoch_metrics[key] / epoch_counts['labeled'])
            else:
                history[key].append(epoch_metrics[key] / epoch_counts['labeled'])

        # 评估模型
        test_acc = evaluate(model, test_loader)
        history['accuracy'].append(test_acc)

        # 打印进度
        print(f"Epoch {epoch + 1}/{epochs}:", end=' ')
        for key, value in history.items():
            if key != 'accuracy' and value:
                print(f"{key}: {value[-1]:.4f}", end=' ')
        print(f"Test Acc: {test_acc:.2%}")

    return history


def Trainer(args):
    # dataset
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])

    train_ds = torchvision.datasets.MNIST(root=args.data_path, train=True, download=args.download, transform=transform)
    labeled_train_ds, unlabeled_train_ds = create_semi_supervised_dataset(train_ds, args.labeled_ratio)
    test_ds = torchvision.datasets.MNIST(root=args.data_path, train=False, download=args.download, transform=transform)

    print(f'labeled train data: {len(labeled_train_ds)}, unlabeled train data: {len(unlabeled_train_ds)}, test data: {len(test_ds)}')

    # dataloader
    labeled_loader = DataLoader(labeled_train_ds, batch_size=args.bs, shuffle=True)
    if args.method == 'supervised':
        unlabeled_loader = None
    else:
        unlabeled_loader = DataLoader(unlabeled_train_ds, batch_size=args.bs, shuffle=True)
    train_loaders = {'labeled': labeled_loader, 'unlabeled': unlabeled_loader}
    test_loader = DataLoader(test_ds, batch_size=args.bs, shuffle=False)

    # 选择损失函数
    loss_functions = {'supervised': loss_supervised, 'entropy': loss_entropy, 'pseudo': loss_pseudo, 'pseudo_entropy': loss_pseudo_entropy,
                      'inform_max': loss_inform_max}
    loss_function = loss_functions[args.method]

    # 初始化模型
    model = CNN().to(device)

    # 选择优化器
    if args.optimizer == 'adam':
        optimizer = optim.Adam(model.parameters(), lr=args.lr)
    elif args.optimizer == 'sgd':
        optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=0.9)
    else:
        raise ValueError(f"未知优化器: {args.optimizer}")

    # 交叉熵
    if args.label_smooth:
        criterion = LabelSmoothingCrossEntropy().to(device)
    else:
        criterion = nn.CrossEntropyLoss().to(device)

    # 训练参数
    loss_kwargs = {'alpha': args.alpha, 'lambda_et': args.lambda_et, 'lambda_div': args.lambda_div, 'confidence_threshold': args.confidence_threshold}
    history = train(model, optimizer, criterion, train_loaders, test_loader, args.epochs, loss_function, **loss_kwargs)
    return history


def compare_methods(args):
    methods = ["supervised", "entropy", "pseudo", "pseudo_entropy", "inform_max"]
    results = {'best_acc': [], 'last_acc': [], 'final_loss': [], 'pseudo_ratio': []}
    histories = {}

    original_method = args.method

    for method in methods:
        print(f"\n训练方法: {method} method")
        args.method = method
        history = Trainer(args)
        histories[method] = history

        # 收集结果
        results['last_acc'].append(history['accuracy'][-1]*100)
        results['best_acc'].append(max(history['accuracy'])*100)
        results['final_loss'].append(history['total_loss'][-1])
        results['pseudo_ratio'].append(history['pseudo_ratio'][-1] if 'pseudo_ratio' in history else 0.0)

    args.method = original_method

    # 可视化结果
    plt.rcParams['axes.unicode_minus'] = False
    plt.rcParams['font.family'] = 'serif'
    plt.rcParams['font.serif'] = 'Times New Roman'
    plt.rcParams['font.weight'] = 'normal'
    plt.rcParams['font.size'] = 10
    plt.figure(figsize=(14, 10))
    colors = ['red', 'black', 'blue', 'g', 'magenta']
    line_st = ['-', '--', '-.', ':', (0, (3, 9, 1, 9))]

    # 损失曲线比较
    plt.subplot(2, 2, 1)
    for i, method in enumerate(methods):
        plt.plot(histories[method]['total_loss'], color=colors[i], linestyle=line_st[i], label=method, linewidth=1.3)
    plt.title('Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)

    # 准确率
    plt.subplot(2, 2, 2)
    for i, method in enumerate(methods):
        plt.plot(histories[method]['accuracy'], color=colors[i], linestyle=line_st[i], label=method)
    plt.title('Test Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)

    # 绘制双柱状图
    plt.subplot(2, 2, 3)
    last_acc = results['last_acc']
    best_acc = results['best_acc']
    x = np.arange(len(methods))  # 标签位置
    width = 0.35  # 柱状图宽度

    bar1 = plt.bar(x - width / 2, last_acc, width, label='Last Acc')
    bar2 = plt.bar(x + width / 2, best_acc, width, label='Best Acc')
    plt.ylabel('Accuracy')
    plt.ylim(90, 100)
    plt.xticks(x, methods)
    plt.legend()

    # 添加数值标签
    def add_labels(bars):
        for bar in bars:
            height = bar.get_height()
            plt.text(bar.get_x() + bar.get_width() / 2, height, f'{height:.2f}',
                     ha='center', va='bottom', fontsize=8)

    add_labels(bar1)
    add_labels(bar2)


    # 伪标签使用情况
    plt.subplot(2, 2, 4)
    pseudo_names = [methods[2], methods[3]]
    pseudo_ratios = [results['pseudo_ratio'][2], results['pseudo_ratio'][3]]
    plt.bar(pseudo_names, pseudo_ratios, color=[colors[2], colors[3]])
    plt.title('Pseudo Label Usage')
    plt.ylabel('Ratio')
    plt.ylim(0, 1)
    for i, v in enumerate(pseudo_ratios):
        plt.text(i, v + 0.02, f"{v:.2%}", ha='center')

    plt.tight_layout()
    plt.savefig('comparison_results.png', dpi=500)
    plt.show()


if __name__ == "__main__":
    set_seed(2025)
    args = parse_args()

    if args.compare:
        compare_methods(args)
    else:
        print(f'方法:{args.method} method')
        history = Trainer(args)
        print(f"\nBest acc (test): {max(history['accuracy']):.2%}, Last acc (test): {history['accuracy'][-1]:.2%}")

在这里插入图片描述

结论

  • 观察发现,一般标签平滑LabelSmoothingCrossEntropyCrossEntropy的效果有一定的提升。
  • 对比五种方法,联合伪标签+熵最小化效果有微弱的提升,其余方法对比纯监督方法没有竞争力。
  • 本案例没有任何调参,直接采用随机或者默认的参数,实际中可以采用学习率退火变权重等技巧,可能会涨点。

最后,上述源代码第一版是由deepseek生成,本人做了部分修改。因此,代码仅供参考。