97%精准加密流量检测:CNN+ResNet实战解析

发布于:2025-07-08 ⋅ 阅读:(10) ⋅ 点赞:(0)

实现的功能如下:

实现功能:加密流量检测

数据集:USTC-TFC2016(使用 .pcap 文件)

目标类型:多分类,准确率97%以上,使用4大指标评估。

特征提取:不包含 payload,只提取协议头部特征,按照双向流(五元组)进行划分,不使用全部数据                    包,截取一个双向流前64个包进行研究

模型构建:采用视觉模型,模型采用CNN和ResNet复合而成

下文从五个方面进行讲解,会提供完整可运行的代码。

关于过程中涉及到的名词方面,比如什么是CNN、什么是归一化处理、什么是ResNet等,我放到最后了,有不理解的可以查询,有基础理解的可以只看前面快速上手使用。

若本文对你有帮助,请点个免费的赞谢谢啦!

一、环境配置

vscode中安装以下内容

torch==2.3.1
torchvision
numpy==1.24.3
scapy==2.5.0
scikit-learn==1.2.2
tqdm==4.65.0
matplotlib==3.7.1
pandas==2.0.3


使用USTC-TFC2016数据集中的pcap文件,需从网上自行下载

GitHub - davidyslu/USTC-TFC2016: Traffic dataset USTC-TFC2016

二、代码框架目录

整个项目分支如下

encryption_traffic_detection   根目录
│  eval.py    评估文件
│  requirements.txt        安装环境所需文件
│  train.py       训练文件
├─config
│  │  params.py     训练参数配置
│  │  paths.py      路径配置
├─data
│  │  dataset.py     数据集基类
│  │  preprocess.py     数据预处理
│  │
│  ├─processed     保存预处理后的文件
│  ├─raw_pcaps     存放.pacp文件
├─models
│  │  cnn_model.py      CNN模型
│  │  ensemble.py         合并CNN和ResNet模型
│  │  resnet_model.py   ResNet模型
├─saved_models    保存模型的路径
└─utils
    │  metrics.py        评估指标

三、具体实现

(一)preprocess.py

先从preprocess.py文件说起吧。

# 数据预处理
import os
import sys
# 获取项目根目录(假设preprocess.py在data/目录下)
root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 将根目录添加到Python路径
sys.path.append(root_dir)
import numpy as np
from scapy.all import rdpcap, IP, TCP, UDP
from tqdm import tqdm
from config import paths, params
import pickle

class PCAPPreprocessor:#处理网络数据包捕获(PCAP)文件,将其转换为适合机器学习的特征矩阵
    def __init__(self):#初始化类的参数
        self.max_packets = params.MAX_PACKETS_PER_FLOW#每个流最多保留的数据包数量(从外部参数 params 获取),特征矩阵的行数
        self.feature_dim = params.FEATURE_DIM# 特征向量维度(从外部参数获取),特征矩阵的列数
    
    def process_directory(self, input_dir, output_dir):
        """处理整个目录下的PCAP文件"""
        all_flows = []
        all_labels = []
        
        # 遍历目录中的每个PCAP文件
        for filename in tqdm(os.listdir(input_dir)):    # tqdm用于显示进度条
            if not filename.endswith('.pcap'):
                continue                                # 跳过非PCAP文件
                
            # 从文件名获取标签 (格式: label_xxx.pcap)
            label = filename.split('_')[0]              #以MySQL.pcap为例,没有下划线,直接提取整个文件名。
            #label = os.path.splitext(filename)[0] 

            # 处理单个PCAP文件
            pcap_path = os.path.join(input_dir, filename)
            flows = self.process_pcap(pcap_path)        #将 PCAP 文件解析为多个流(flows),每个流包含最多 64 个数据包,每个数据包会被转换为 6 维特征向量.
            
            # 收集所有流和标签
            all_flows.extend(flows)                 #all_flows存储所有 PCAP 文件中提取的流,每个流是一个 [64, 6] 的特征矩阵
            all_labels.extend([label] * len(flows)) #存储每个流对应的标签,对于 MySql.pcap,所有流的标签均为 MySql.pcap
        
        # 保存处理后的数据
        self.save_processed_data(all_flows, all_labels, output_dir)  #会将特征矩阵和标签保存到指定目录
    
    def process_pcap(self, pcap_path):
        """处理单个PCAP文件"""
        try:
            packets = rdpcap(pcap_path)     # 使用Scapy库读取PCAP文件,将二进制文件转为python对象列表,每个对象代表一个网络数据包。
            flows = self._group_packets(packets)    # 按五元组 分组数据包。源IP、目的IP、源端口、目的端口、传输层协议。区分不同设备间的通信。

            # 为每个流创建特征矩阵
            flow_matrices = [self._create_flow_matrix(flow) for flow in flows.values()]
            return flow_matrices
        except Exception as e:
            print(f"Error processing {pcap_path}: {e}")
            return []
    
    def _group_packets(self, packets):
        """按五元组分组数据包,再将同一网络连接的所有数据包(包括请求和响应)归为一组,形成逻辑上的 “流”。避免只是分析单项流量,缺失响应状态、往返时间等信息。"""
        flows = {}
        for pkt in packets:
            if not (IP in pkt and (TCP in pkt or UDP in pkt)):
                continue  # 跳过非IP或非TCP/UDP数据包
                
            # 获取五元组
            src = pkt[IP].src   # 源IP
            dst = pkt[IP].dst   # 目的IP
            proto = pkt[IP].proto   # 协议号 (6=TCP, 17=UDP)
            sport = pkt.sport if hasattr(pkt, 'sport') else 0   # 源端口
            dport = pkt.dport if hasattr(pkt, 'dport') else 0   # 目的端口
            
            # 创建双向流标识。通过排序源 / 目的地址对,确保请求和响应属于同一流
            #flow_id = tuple(sorted([(src, sport), (dst, dport)]) + (proto,))
            flow_id = tuple(sorted([(src, sport), (dst, dport)]) + [proto]) #通过sorted()函数处理源 IP / 端口对和目的 IP / 端口对,生成方向无关的流 ID。将同一连接的双向流量视为同一个流
            if flow_id not in flows:
                flows[flow_id] = []      #使用字典flows存储分组结果
            flows[flow_id].append(pkt)   # 将数据包添加到对应流
            
        return flows
    
    def _create_flow_matrix(self, packets):
        """创建流特征矩阵 (max_packets x feature_dim)"""
        matrix = np.zeros((self.max_packets, self.feature_dim)) # 初始化零矩阵
        
        for i, pkt in enumerate(packets[:self.max_packets]):    # 限制每个流的最大包数
            # 协议头部特征 (不包含payload,只提取数据包的协议头部特征,而忽略其负载(应用层数据))          pkt.load为payload 数据
            pkt_len = len(pkt)  # 数据包总长度
            tos = pkt[IP].tos if IP in pkt else 0   # 服务类型字段
            ttl = pkt[IP].ttl if IP in pkt else 0   # 生存时间
            sport = pkt.sport if hasattr(pkt, 'sport') else 0 # 源端口
            dport = pkt.dport if hasattr(pkt, 'dport') else 0 # 目的端口
            flags = self._get_flags(pkt) # 获取协议标志位
            
            matrix[i] = [pkt_len, tos, ttl, sport, dport, flags]  # 填充特征矩阵(当前使用6个特征)
            
        return matrix
    
    def _get_flags(self, pkt):
        """获取协议标志位"""
        if TCP in pkt:
            return int(pkt[TCP].flags) #直接使用 Scapy 的 flags 属性(如pkt[TCP].flags返回数值)
        elif UDP in pkt:
            return 0x11  # 自定义UDP标志(0x11对应十进制 17,即 UDP 协议号),因为UDP 协议是无连接的,没有标准的标志位字段,UDP 的协议号是 17(在 IP 头部字段中协议类型为17)
        return 0    # 其他协议返回0
    
    def save_processed_data(self, flows, labels, output_dir):
        """保存处理后的数据"""
        # 创建标签映射 (字符串标签 → 数字索引)
        unique_labels = sorted(set(labels)) # 获取所有唯一标签,根据流的那个对应标签生成的
        label_to_idx = {label: idx for idx, label in enumerate(unique_labels)}
        
        # 转换标签为索引
        label_indices = [label_to_idx[label] for label in labels]
        
        # 保存数据到压缩NPZ文件
        data_path = os.path.join(output_dir, 'processed_data.npz')#processed_data.npz压缩的 NumPy 数组文件,包含特征矩阵和标签索引
        np.savez_compressed(
            data_path,
            flows=np.array(flows, dtype=np.float32),    # 流特征矩阵
            labels=np.array(label_indices, dtype=np.int32)  # 标签索引
        )
        
        # 保存标签映射(用于后续模型预测时转换回原始标签)
        label_map_path = os.path.join(output_dir, 'label_map.pkl')
        with open(label_map_path, 'wb') as f:
            pickle.dump(label_to_idx, f)
        
        print(f"Saved processed data to {data_path}")
        print(f"Total flows: {len(flows)}, Classes: {len(unique_labels)}")

# 主处理函数
def main():
    preprocessor = PCAPPreprocessor()
    preprocessor.process_directory(paths.PCAP_DIR, paths.PROCESSED_DIR)

if __name__ == "__main__":
    main()

在这个文件里,主要是定义了一个数据预处理类,用于处理pcap文件,将pacp文件的二进制数据转为特征矩阵

在这个类里呢,先是初始化一下类的参数,从配置文件中获取,配置一下特征矩阵的行数和列数。

接着定义了几个数据处理函数,process_directory函数遍历处理整个目录下的pcap文件,提取pcap文件的名当做标签,调用process_pcap将单个pcap文件解析为多个流

在process_pcap里使用Scapy库读取pcap文件,将二进制数据转为python对象列表,再将python对象列表传入_group_packets函数按五元组分组数据包

在_group_packets函数里,按五元组分组数据包,将同一连接的双向流量视为一个流,划分多个流。同一网络连接的所有数据包(包括请求和响应)归为一组,形成逻辑上的 “流”。网络流量的数据包里有很多个不同设备间的通信,流就是其中某一组设备互相交换的数据,把所有它们交换的数据包分为一组。

分组完数据包(将单个pcap文件分为多个流)后,回到process_pcap函数,再为每个流创建特征矩阵,使用_create_flow_matrix函数

在_create_flow_matrix函数里,提取流的数据包里的协议头部特征,不提取payload包含的具体传输数据内容,把协议头部特征填充到特征矩阵里。

至此,流被处理为numpy数组(特征矩阵)。数据从二进制变为python列表对象,将其划分为流存入字典中,再提取特征转为numpy数组,方便后面机器学习处理数据。

还剩下保存模型的函数,没什么好讲的,继续下一步

(二)train.py

import torch
import torch.optim as optim #优化器模块
import torch.nn as nn #神经网络模块,包含层定义和损失函数
from torch.optim.lr_scheduler import ReduceLROnPlateau
from data.dataset import create_dataloaders #自定义数据加载模块,用于处理 PCAP 特征数据
from models.ensemble import ModelEnsemble #自定义模型集成类,可能融合多个子模型
from utils.metrics import calculate_metrics, focal_loss #包含评估指标计算函数和 Focal Loss 实现

from config import paths, params #存储路径配置和训练超参数
import os
import numpy as np
import time



def train_model():
    # 设置设备
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    
    # 创建数据加载器
    train_loader, val_loader, test_loader = create_dataloaders() #从data.dataset模块调用函数创建数据加载器
    
    # 初始化模型
    model = ModelEnsemble(num_classes=params.NUM_CLASSES).to(device)
    
    # 损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(
        model.parameters(), 
        lr=params.LEARNING_RATE,
        weight_decay=1e-5  # L2正则化
    )
    
    # 学习率调度器
    scheduler = ReduceLROnPlateau(
        optimizer, mode='max', factor=0.5, patience=5, verbose=True)
    
    # 训练循环
    best_val_acc = 0.0    #记录验证集最佳准确率
    early_stop_counter = 0     #早停计数器
    max_early_stop = 10         #连续 10 个 epoch 无提升则停止训练
    
    for epoch in range(params.EPOCHS):
        start_time = time.time()
        
        # 训练阶段
        model.train()#将模型设置为训练模式
        train_loss = 0.0
        all_preds = []
        all_labels = []
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)  #从训练集加载一批数据到 GPU
            
            # 前向传播,计算模型输出
            outputs = model(inputs)  #输入数据
            loss = focal_loss(outputs, labels)  # 使用Focal Loss处理类别不平衡问题
            
            # 反向传播
            optimizer.zero_grad() #清除上一批次的梯度
            loss.backward()         #计算当前批次的梯度
            optimizer.step()        #更新模型参数
            
            # 记录损失和预测,累积损失和预测结果
            train_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
        
        # 计算训练指标
        train_metrics = calculate_metrics(np.array(all_labels), np.array(all_preds))
        train_loss = train_loss / len(train_loader.dataset)
        
        # 验证阶段
        val_metrics = evaluate_model(model, val_loader, device)  #evaluate_model 函数在验证集上评估模型性能
        
        # 更新学习率
        scheduler.step(val_metrics['accuracy']) #学习率调度器根据验证准确率调整学习率
        
        # 记录时间
        epoch_time = time.time() - start_time  #记录每个 epoch 的训练时间
        
        # 打印日志
        print(
            f"Epoch {epoch+1}/{params.EPOCHS} | Time: {epoch_time:.2f}s | "
            f"Train Loss: {train_loss:.4f} | Train Acc: {train_metrics['accuracy']:.4f} | "
            f"Val Acc: {val_metrics['accuracy']:.4f} | Val F1: {val_metrics['f1']:.4f}"
        )
        
        # 保存最佳模型
        if val_metrics['accuracy'] > best_val_acc:
            best_val_acc = val_metrics['accuracy']
            early_stop_counter = 0
            save_model(model, epoch, val_metrics)
        else:
            early_stop_counter += 1
        
        # 早停检查
        if early_stop_counter >= max_early_stop:
            print(f"Early stopping at epoch {epoch+1}")
            break
    
    # 最终测试
    print("Training completed. Running final test evaluation...")
    test_metrics = evaluate_model(model, test_loader, device)  #在独立测试集上评估最终模型性能
    print(
        f"Test Results: Acc={test_metrics['accuracy']:.4f}, "
        f"Precision={test_metrics['precision']:.4f}, "
        f"Recall={test_metrics['recall']:.4f}, "
        f"F1={test_metrics['f1']:.4f}"
    )

def evaluate_model(model, data_loader, device):
    """评估模型性能"""
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    return calculate_metrics(np.array(all_labels), np.array(all_preds))

def save_model(model, epoch, metrics):
    """保存最佳模型"""
    model_path = os.path.join(
        paths.MODEL_SAVE_DIR, 
        f"best_model_epoch{epoch+1}_acc{metrics['accuracy']:.4f}.pth"
    )
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'val_accuracy': metrics['accuracy']
    }, model_path)
    print(f"Saved best model to {model_path} with accuracy {metrics['accuracy']:.4f}")

if __name__ == "__main__":
    train_model()

train.py里,先选择训练设备为CPU或者GPU,不清楚自己设备是否启用了GPU的,可看我上一篇文章。

接着创建数据加载器,使用dataset.py文件里的create_dataloaders函数,该函数主要使用TrafficDataset类实例化三种数据集。


在TrafficDataset类里,先加载预处理后的特征矩阵,加载标签名到数字索引的映射,因为模型无法处理文字标签,同时因为输出时要便于直观查看分类,还需要添加一个反向的索引到标签名的映射。再往下划分数据集,随机打乱样本的顺序,按照配置参数里的配置将样本划分为70%训练15%验证15%测试。

划分完后,计算均值和标准差,使用transforms.Normalize定义归一化转换器,将输入数据标准化,同时根据有无其他增强函数来按顺序执行,normalize为归一化操作。实际上并未用到transform增强函数,直接执行if分支里的normalize进行归一化

定义__getitem__函数,给特征矩阵添加一路通道,将特征矩阵转换为torch.Tensor类型,将 NumPy 数组flow转换为 PyTorch 浮点型张量,并应用归一化转换,将所有特征映射到均值为0标准差为1的分布里,避免模型过于拟合特征值大的类型。这个函数会在数据加载器DataLoader里被自动调用

随后返回三个数据加载器。


接着train.py,下一步是初始化模型,使用ensemble.py中的ModelEnsemble类加权合并CNN和ResNet模型。


在ModelEnsemble类里,创建CNN和ResNet两个模型的实例,然后冻结ResNet的所有层,解冻特征处理用于对特征分类。前向传播函数中按照60%CNN和40%的ResNet模型权重组合进行训练


ResNet模型


使用到的是ResNet18模型,先加载在torchvision.models库里预训练好的ResNet18模型,将输入的一通道(dataset里添加的1路通道)数据拓展为三通道,符合ResNet18的模型输入,随后就是经过第一层分类层,将512维的输入维度,提取特征输出,第二层分类层将第一层的输出映射到类别上。


CNN模型


CNN模型里,先定义的特征提取层,再定义分类层。

特征提取层里,定义了三层卷积,使用一个3x3的卷积核,一开始会随机生成一个3x3的权重矩阵,会随着学习逐步调整,特征经过三层卷积后会被逐步提取出来,卷积操作后使用池化操作,将时间和特征维度减半,由于池化将特征图减半,因此逐层增加输入通道数弥补特征图的信息损失。

进入分类层前需要对特征图进行展平,卷积输出后的多维张量转为二维张量,1024=128通道x8时间同步x1特征。然后经过隐藏层,将1024维度减半为512。在dropout正则化,根据512个特征对应每个类别的匹配分数,输出本次训练得到的数据属于哪个类别。


再回到train.py


定义一下损失函数、学习率调度器、训练参数这些,开始循环训练,遵循标准的机器学习范式,先前向传播,再反向传播计算梯度损失来更新模型参数。然后评估一下训练指标,打印日志,保存模型。


(三)dataset.py

# PyTorch数据集类
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader    #PyTorch 数据集基类和数据加载器
from config import paths, params
import pickle #用于读取标签映射的序列化文件
from torchvision import transforms

# 计算数据集的均值和标准差
def calculate_mean_std():
    data_path = os.path.join(paths.PROCESSED_DIR, 'processed_data.npz')
    data = np.load(data_path)
    flows = data['flows']
    flows = np.expand_dims(flows, axis=1)  # 添加通道维度
    mean = np.mean(flows, axis=(0, 2, 3)) #0号轴:样本数   2号:最大包数  3号:特征维度
    std = np.std(flows, axis=(0, 2, 3))  #对所有样本的所有包的所有特征维度进行计算
    return mean, std

class TrafficDataset(Dataset): #继承 PyTorch 的Dataset基类,实现自定义流量数据集
    def __init__(self, mode='train', transform=None): #指定数据集类型(训练 / 验证 / 测试),transform:增强函数
        """
        加密流量数据集类
        
        参数:
            mode: 'train', 'val', 'test'
            transform: 数据增强函数
        """
        # 加载处理后的数据,即特征矩阵
        data_path = os.path.join(paths.PROCESSED_DIR, 'processed_data.npz')
        data = np.load(data_path)
        flows = data['flows'] #形状为[样本数, 最大包数, 特征维度]的 numpy 数组
        labels = data['labels']  #样本对应的标签索引数组
        
        # 加载标签映射
        label_map_path = os.path.join(paths.PROCESSED_DIR, 'label_map.pkl') #标签名→索引映射。模型无法处理文本
        with open(label_map_path, 'rb') as f:
            self.label_map = pickle.load(f)
            self.idx_to_label = {v: k for k, v in self.label_map.items()} #反向生成索引→标签名映射idx_to_label。模型完成后,输出的是索引,将其转为标签
        
        # 划分数据集
        n_samples = len(flows)
        indices = np.arange(n_samples) 
        np.random.shuffle(indices)  #随机划分,打乱样本顺序,仍包含所有样本的索引。

        #按params.TRAIN_RATIO和params.VAL_RATIO划分数据集
        train_end = int(n_samples * params.TRAIN_RATIO)     # 训练集结束位置
        val_end = train_end + int(n_samples * params.VAL_RATIO) # 验证集结束位置
        
        if mode == 'train':  #根据初始的mode选择对应子集存储为类属性
            self.flows = flows[indices[:train_end]]     # 假设1000个索引,使用前700个随机索引
            self.labels = labels[indices[:train_end]]   #保持数据与标签的对应关系
        elif mode == 'val':
            self.flows = flows[indices[train_end:val_end]] # # 使用中间200个随机索引
            self.labels = labels[indices[train_end:val_end]]
        else:  # test
            self.flows = flows[indices[val_end:]]          # 使用最后100个随机索引
            self.labels = labels[indices[val_end:]]
        
        # 计算均值和标准差
        mean, std = calculate_mean_std()
        # 定义归一化转换,归一化的核心目的是将不同特征的取值范围缩放到相同尺度,将所有特征映射到均值为 0、标准差为 1的分布
        #归一化主要影响输入特征的尺度大小,而梯度下降算法中,其更新参数的步长与梯度的大小相关,如果输入数据的特征尺度差异较大,不同特征对应的梯度大小也会有很大差异
        normalize = transforms.Normalize(mean=mean, std=std)
        if transform is None:
            self.transform = transforms.Compose([
                #transforms.ToTensor(),
                normalize
            ])
        else:
            self.transform = transforms.Compose([
                transform,
                #transforms.ToTensor(),
                normalize
            ])
        print(f"Loaded {mode} dataset: {len(self.flows)} samples") #打印样本数量

        #self.transform = transform
        #print(f"Loaded {mode} dataset: {len(self.flows)} samples") #打印样本数量
    
    def __len__(self):  #__len__方法返回数据集样本数量
        return len(self.flows)
    
    def __getitem__(self, idx): #__getitem__方法实现索引访问
        flow = self.flows[idx]  #从flows和labels中获取指定索引的特征矩阵和标签
        label = self.labels[idx]
        
        
        # 添加通道维度 (C, H, W) = (1, 64, 6)               (从[64,6]变为[1,64,6])以适配 CNN 输入,符合的输入格式(通道数、高度、宽度)
        flow = np.expand_dims(flow, axis=0)  
        
        if self.transform: #如果指定了transform,则对特征矩阵进行预处理(如归一化)。开头定义的是未指定
            #flow = self.transform(flow)
             # 先转换为张量,确保维度顺序正确
            flow_tensor = torch.tensor(flow, dtype=torch.float32)
            # 应用转换(此时不需要 ToTensor,因为已经是张量)
            flow_tensor = self.transform(flow_tensor)
        else:
            flow_tensor = torch.tensor(flow, dtype=torch.float32)
        
        return torch.tensor(flow, dtype=torch.float32), torch.tensor(label, dtype=torch.long) #将 预处理后的NumPy 数组转换为 PyTorch 张量,设置数据类型(特征为float32,标签为long)
    
    def get_label_name(self, idx):
        """获取类别的实际名称"""
        return self.idx_to_label.get(idx, "unknown") #将数字索引转换为原始标签名
    
    def get_num_classes(self):
        return len(self.label_map) #返回数据集中的类别总数

# 数据增强函数
def random_dropout(flow, dropout_rate=0.1):
    """随机丢弃部分包特征"""
     # 确保输入形状为 [1, 64, 6]
    assert flow.shape[0] == 1, f"Input shape should be [1, H, W], got {flow.shape}"
    # 在数据包维度上应用 dropout(第1维,对应64个包)
    mask = np.random.choice([0, 1], size=flow.shape[1], p=[dropout_rate, 1-dropout_rate]) #按dropout_rate概率生成 0-1 掩码(0 表示丢弃,1 表示保留),掩码应用于每个数据包的特征维度(模拟网络丢包或特征缺失)
    flow = flow * mask.reshape(1, -1, 1)
    return flow

# 创建数据加载器
def create_dataloaders(batch_size=params.BATCH_SIZE):
    #实例化三种数据集(训练 / 验证 / 测试)
    train_dataset = TrafficDataset(mode='train', transform=random_dropout)#训练集应用random_dropout增强
    val_dataset = TrafficDataset(mode='val')
    test_dataset = TrafficDataset(mode='test')
    #使用DataLoader创建批次数据迭代器
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) #shuffle=True:训练集打乱顺序以提升训练稳定性。batch_size:批次大小
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    return train_loader, val_loader, test_loader

(四)ensemble.py

# 模型集成
import torch.nn as nn
from .cnn_model import EncryptedTrafficCNN #EncryptedTrafficCNN(自定义 CNN)
from .resnet_model import TrafficResNet #TrafficResNet(基于 ResNet 的模型)
from config import params

class ModelEnsemble(nn.Module):         #nn.Module是 PyTorch 所有神经网络模块的基类
    def __init__(self, num_classes=params.NUM_CLASSES):
        """
        模型集成:CNN + ResNet
        
        参数:
            weights: 两个模型的权重
        """
        super(ModelEnsemble, self).__init__()             #调用父类nn.Module的初始化方法
        self.cnn_model = EncryptedTrafficCNN(num_classes) #创建两个子模型实例,指定分类类别数num_classes
        self.resnet_model = TrafficResNet(num_classes)    #num_classes从配置参数获取,流量分类的类别数
        
        # 冻结ResNet所有层,通过设置param.requires_grad = False/True控制参数是否参与梯度更新
        for param in self.resnet_model.resnet.parameters(): #ResNet 的基础特征提取层(resnet.parameters())被冻结,不参与训练
            param.requires_grad = False                     #原因:预训练的 ResNet-18 在大规模数据集上学习到了通用的图像特征特征(如边缘、纹理)对各类视觉任务通用,无需重新学习
        # 解冻最后两层,因为这些层学习到的是与特定任务相关的高级语义特征(如分类任务中的类别特征)
        for param in self.resnet_model.resnet.layer4.parameters():  #layer4:ResNet 的最后一个残差块组,负责提取高级语义特征
            param.requires_grad = True
        for param in self.resnet_model.resnet.fc.parameters():      #fc:全连接层,负责分类决策,解冻这两层以便针对流量分类任务微调
            param.requires_grad = True                          
    
    #前向传播
    def forward(self, x): #输入数据x同时通过 CNN 和 ResNet 
        cnn_out = self.cnn_model(x)
        resnet_out = self.resnet_model(x)
        
        # 加权平均集成,将两个模型的输出按配置权重线性组合,60%CNN、40%ResNet。结合 CNN 的局部特征提取能力和 ResNet 的深层语义表达能力
        ensemble_out = (
            params.ENSEMBLE_WEIGHTS[0] * cnn_out + 
            params.ENSEMBLE_WEIGHTS[1] * resnet_out
        )
        return ensemble_out #集成输出保持与单个模型相同的维度

(五)resnet_model.py

# ResNet模型
import torch.nn as nn
from torchvision.models import resnet18
from config import params

class TrafficResNet(nn.Module):
    def __init__(self, num_classes=params.NUM_CLASSES):
        super().__init__()
        self.resnet = resnet18(pretrained=True)     #加载预训练的 ResNet18 模型,可加速训练并提高泛化能力
        
        # 添加1x1卷积扩展通道。将一路输入变为三路。参数矩阵形状:[3, 1, 1, 1](输出通道 = 3,输入通道 = 1,核大小 = 1x1)
        self.channel_expander = nn.Conv2d(1, 3, kernel_size=1, bias=False)  #要求输入为 3 通道(RGB 图像),,而流量数据通常是 1 通道(单通道特征图)

        
        
        # self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        
        # 分类层修改
        num_features = self.resnet.fc.in_features#获取原始 ResNet 的全连接层输入维度(通常为 512)
        self.resnet.fc = nn.Sequential(
            nn.Linear(num_features, params.DENSE_UNITS), #第一个线性层:512 → params.DENSE_UNITS(如 512)
            nn.ReLU(inplace=True),
            nn.Dropout(p=params.DROPOUT_RATE),
            nn.Linear(params.DENSE_UNITS, num_classes) #第二个线性层:params.DENSE_UNITS → num_classes(如 2 类)
        )
    
    def forward(self, x):
        if x.shape[1] == 1:               #检查输入通道数:若为 1 通道,则通过channel_expander扩展为 3 通道
            x = self.channel_expander(x)  # 1x1卷积扩展到3通道
        return self.resnet(x)

        

(六)cnn_model.py

# CNN模型
import torch.nn as nn
from config import params

class EncryptedTrafficCNN(nn.Module): #定义用于加密流量分类的 CNN 模型
    def __init__(self, num_classes=params.NUM_CLASSES):
        """
        用于加密流量分类的CNN模型
        
        输入形状: [batch, 1, 64, 6] (通道, 包数, 特征维度)
        """
        super(EncryptedTrafficCNN, self).__init__()
        
        # 特征提取层,卷积核在两个维度上滑动,对应特征矩阵的的高度和宽度
        self.features = nn.Sequential(
            nn.Conv2d(1, params.CNN_FILTERS[0], kernel_size=params.CNN_KERNEL_SIZE, padding=1),  #二维卷积,用于提取空间特征,Conv2D的第1/2个参数:输入通道数:1(输入)→ 32 → 64 → 128
            nn.BatchNorm2d(params.CNN_FILTERS[0]),      #nn.BatchNorm2d加速训练并提高稳定性,属于模型内的归一化。在每一批数据通过卷积层后,对特征图进行归一化(减均值、除标准差),加速训练并提高稳定性。
            nn.ReLU(inplace=True),  #nn.ReLU引入非线性,允许模型学习不同协议的特征组合。
            nn.MaxPool2d(kernel_size=(2, 2)), #前两层使用 (2,2) 池化:将时间和特征维度均缩小一半。
            
            nn.Conv2d(params.CNN_FILTERS[0], params.CNN_FILTERS[1], kernel_size=params.CNN_KERNEL_SIZE, padding=1),#kernel_size卷积核大小, padding=1保持特征图尺寸不变
            nn.BatchNorm2d(params.CNN_FILTERS[1]),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=(2, 2)),#前两层使用 (2,2) 池化:将时间和特征维度均缩小一半
            
            nn.Conv2d(params.CNN_FILTERS[1], params.CNN_FILTERS[2], kernel_size=params.CNN_KERNEL_SIZE, padding=1),
            nn.BatchNorm2d(params.CNN_FILTERS[2]),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=(2, 1))#最后一层使用 (2,1) 池化:仅缩小时间维度,保留特征维度
        )
        """卷积核(3, 3)会同时作用于相邻的 3 个数据包和每个数据包的 3 个特征;池化层(2, 2)会对相邻的 2 个数据包和 2 个特征进行下采样。"""


        # 分类层
        self.classifier = nn.Sequential(
            nn.Linear(params.CNN_FILTERS[2] * 8 * 1, params.DENSE_UNITS),  # 输入维度:128(通道数)×8(时间步)×1(特征)= 1024  隐藏层:params.DENSE_UNITS(512,为输入维度1024的一半)
            nn.ReLU(inplace=True),
            nn.Dropout(p=params.DROPOUT_RATE),          #Dropout  params.DROPOUT_RATE(为 0.5)训练时随机丢弃 50% 的神经元,防止过拟合
            nn.Linear(params.DENSE_UNITS, num_classes) #输出层:num_classes(分类类别数,配置为20)
            #最后一层线性变换:相当于一个 “分类器”,根据 512 维特征计算每个水果类别的匹配分数,最终选择分数最高的类别作为预测结果
        )
    def forward(self, x):
        x = self.features(x)   #通过卷积层提取特征
        x = x.view(x.size(0), -1)  # 展平,将多维张量展平为二维([batch_size, 1024])
        x = self.classifier(x)   #通过全连接层进行分类
        return x

(七)params.py


# 特征提取参数
MAX_PACKETS_PER_FLOW = 64  # 每个流取前64个包
FEATURE_DIM = 6            # 每个包的6维特征,从五元组里提取,可调节。

# 训练参数
BATCH_SIZE = 128
EPOCHS = 20
LEARNING_RATE = 0.001
NUM_CLASSES = 20           # USTC-TFC2016有20个类别

# 模型参数
CNN_FILTERS = [32, 64, 128]
CNN_KERNEL_SIZE = (3, 3)
DENSE_UNITS = 512
DROPOUT_RATE = 0.5

# 数据划分
TRAIN_RATIO = 0.7  #70%训练
VAL_RATIO = 0.15    #15%验证
TEST_RATIO = 0.15   #15%测试

# 集成模型权重
ENSEMBLE_WEIGHTS = [0.6, 0.4]  # CNN和ResNet的权重

(八)paths.py


import os

# 数据集路径配置
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DATA_DIR = os.path.join(BASE_DIR, 'data')
PCAP_DIR = os.path.join(DATA_DIR, 'raw_pcaps')
PROCESSED_DIR = os.path.join(DATA_DIR, 'processed')

# 确保目录存在
os.makedirs(PCAP_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True)

# 模型保存路径
MODEL_SAVE_DIR = os.path.join(BASE_DIR, 'saved_models')
os.makedirs(MODEL_SAVE_DIR, exist_ok=True)

# 日志路径
LOG_DIR = os.path.join(BASE_DIR, 'logs')
os.makedirs(LOG_DIR, exist_ok=True)

四、核心流程梳理

加载配置(参数和路径)→ 定义预处理参数(MAX_PACKETS_PER_FLOW等)

数据预处理(preprocess.py):

遍历PCAP文件 → 按五元组分组 → 提取每个流的前N个包 → 提取每个包的6维特征 → 形成特征矩阵(64x6)→ 从文件名提取标签

保存预处理数据(processed_data.npz和label_map.pkl)

创建PyTorch数据集(dataset.py):

加载预处理数据 → 划分训练/验证/测试集 → 定义数据增强(随机丢弃)→ 创建DataLoader

构建模型(cnn_model.py, resnet_model.py, ensemble.py):

初始化CNN模型 → 初始化ResNet模型(修改输入通道)→ 构建集成模型(加权平均)

训练模型(train.py):

设置设备 → 定义损失函数(Focal Loss)和优化器(Adam)→ 学习率调度器 → 训练循环(前向、损失、反向、优化)→ 验证集评估 → 保存最佳模型 → 早停

评估模型(eval.py):

加载测试集 → 加载保存的模型 → 预测 → 计算四大指标(准确率、精确率、召回率、F1)→ 输出结果

注意:在训练过程中,每个epoch结束后会在验证集上评估,并根据验证集准确率调整学习率。训练完成后会在测试集上做最终评估。

五、名词解释

(一)、USTC-TFC2016        

中科大网络安全实验室发布的流量分类数据集,包含安卓和Windows环境下的加密流量。特别之处在于同时提供原始流量包和会话级特征,具有预处理好的CSV特征文件这个亮点。

流量类型: 包含良性(正常应用)流量和恶意软件流量。

良性流量类别 (10类):BitTorrent, Facetime, FTP, Gmail, MySQL, Outlook, Skype, SMB, Weibo, WorldOfWarcraft

恶意软件流量类别 (10类):Cridex, Geodo, Htbot, Miuref, Neris, Nsis-ay, Shifu, Tinba, Virut, Zeus

数据格式:分为pcap和csv文件

原始流量包 (pcap文件): 包含抓取到的原始网络数据包。

会话级特征 (csv文件): 从原始pcap文件中提取了大量的统计特征,并按网络会话/流(通常是基于五元组:源IP、源端口、目的IP、目的端口、传输层协议)组织成结构化的CSV文件。

USTC-TFC2016/

├── Malware/          # 恶意流量

│   ├── Cridex/

│   ├── Geodo/

│   └── ...           # 共10个恶意类别

└── Normal/           # 正常流量

    ├── BitTorrent/

    ├── Facetime/

    └── ...           # 共10个正常类别

放置步骤

  1. 在项目根目录创建文件夹:data/raw_pcaps
  2. 将所有PCAP文件复制到该目录:
    • Malware/Normal/下的所有.pcap文件复制到data/raw_pcaps
    • 文件应直接放在raw_pcaps下,不要保留子目录结构

文件命名示例:Cridex_0001.pcapBitTorrent_0001.pcap等 

(二)、pcap文件

pcap 文件由两部分组成:全局文件头和数据包记录,每个数据包按捕获顺序依次存储,以二进制方式存储,可捕获物理层和数链层的加密流量,无法处理应用层的加密流量,pcap包是加密流量的基础数据载体。

全局文件头:24B,定义文件属性

数据包记录:包头16B+数据包数据,存储数据链路层以上的原始数据,长度不超过全局文件头定义的最大长度。

(三)、流

在网络技术中,“流” 指的是具有相同特征的一组数据包,这些特征通常包括

五元组信息:源 IP 地址、目的 IP 地址、源端口、目的端口、传输层协议(如 TCP/UDP)。

可以选择性的保留每个流里的数据包数量,比如只保存64个数据包,多余的被废弃

(四)、五元组

五元组:源 IP、目的 IP、源端口、目的端口、协议号

按五元组分组数据包:

将一个流里的多个数据包按照它们的五元信息进行归类,同一个流的所有数据包同属于一个五元组。

划分完五元组后,下一步就是根据五元组信息来提取维度特征,生成特征矩阵。

(五)、Dataset

数据集,描述结构化或半结构化的数据。在本项目中,实现自定义的流量数据集,即使用预处理后的特征矩阵

(六)、Transfrom

是用于数据预处理和增强的函数或函数链

创建数据集dataset的时候,可以指定也可以不指定transform。若指定了,则每次从数据集获取样本时都会自动应用transform的变换。

常见的变换包括:标准化、归一化(特征缩放到[0,1]范围)、数据增强(图像翻转、旋转)

(七)、pytorch张量

PyTorch 张量是一种多维数组,是 PyTorch 框架的核心数据结构,其设计灵感源于 NumPy 数组,但增加了以下关键特性:

硬件加速支持:可直接在 GPU 上运算(NumPy 数组仅能在 CPU 上运行)

自动微分支持:内置计算图记录,是深度学习框架的基础

分布式计算支持:可在多设备间高效传输和运算

如果把 NumPy 数组看作 "普通计算器",那么 PyTorch 张量就是 "带编程功能的科学计算器",专为深度学习任务定制

张量作为模型输入输出的标准格式

图像数据:形状通常为(batch_size, channels, height, width),如(32, 3, 224, 224)表示 32 张 3 通道 224×224 的图像

文本数据:形状通常为(batch_size, sequence_length),如(64, 100)表示 64 个长度为 100 的文本序列

标签数据:形状通常为(batch_size,)的长整型张量(分类任务)

(八)、ResNet残差网络

是一种卷积神经网络架构,解决传统CNN在深度增加时面临的梯度消失/爆炸和退化问题,使得可以训练超深层网络

传统CNN是输入x,经过一层层卷积层后输出H(x)

残差网络是在CNN的基础上,输入x,最后输出的H(x)=F(x)+x。即直接将输入x和输出F(x)相加,形成残差学习。如果x和F(x)不同维,则需要通过1x1卷积(称为 “投影映射”)调整x维度,确保加法可行。

本项目采用ResNet-18模型,共18层

初始卷积层1层

残差块:16层

Layer1:两个残差块(每块2层卷积)--共四层

Layer2:2个残差块—4层

Layer3:2个残差块—4层

Layer4:2个残差块—4层

全局平均池化层:1层

                  Avgpool:将特征图降维为1x1

全连接层1层

                  Fc:输出分类所需的类别数(如1000层)

ResNet-18 层结构       | 是否冻结 | 功能描述

---------------------------------------------

conv1                  | ✅      | 基础特征提取(边缘、纹理)

layer1                 | ✅      | 低级特征(边缘组合)

layer2                 | ✅      | 中级特征(简单部件)

layer3                 | ✅      | 高级特征(复杂部件)

layer4                 | ❌      | 特定任务的语义特征

avgpool                | ✅      | 全局特征聚合

fc                     | ❌      | 分类决策

如上所示,使用ResNet18的时候,冻结以上层,方便直接使用ResNet在大模型上直接训练好的特征提取模型,避免该模型和自己的数据过拟合。解冻这些层,主要使用这些层来进行分类自己的数据。

 (九)、CNN

是一种专门为处理具有网格状拓扑结构数据(如特征矩阵)而设计的深度学习模型

主要有:1.卷积层  2.激活函数层   3.池化层   4.全连接层   5.批量归一化层  

  1. 卷积层

功能:通过卷积核(Filter)与输入数据进行局部滑动卷积,提取空间特征(如边缘、纹理、形状等)。一个卷积层可包含多个卷积核,每个卷积核学习检测不同类型的特征,输出的通道等于卷积核数。

核心概念

局部感知野:每个神经元(输出特征图上的一个点)仅连接输入数据的局部区域,而非全连接,模拟视觉皮层的神经元响应特性。全连接层是每个神经元连接所有输入元素。

权值共享:同一卷积核的参数在整个输入空间中共享,大幅减少参数数量。

步长(Stride):卷积核滑动的间隔,控制输出特征图的尺寸。

padding:在输入边缘填充数据,保持输出特征图尺寸不变

什么是卷积核?本质是一个小的权重矩阵,通过与输入数据(如特征矩阵)进行卷积运算,提取特定的局部特征。之所以称为权重矩阵,因为矩阵中的每个数代表不同的权重,可以调整组合这些权重数,来检测特定类型的特征,权重一开始是随机的,后通过反向传播计算损失函数对权重的梯度,使用优化器迭代权重,使得卷积核逐渐学会识别有意义的特征

梯度(Gradient) 是损失函数对权重的偏导数,代表损失函数在当前权重下的变化率

梯度的意义:指示损失函数下降最快的方向

优化目标:找到一组权重,使损失函数值最小(即位于 “山谷” 底部)。因此,需要沿梯度的反方向更新权重。(因为是由权重>导致损失函数下降,沿反向调整权重)

损失函数(Loss Function) 是衡量模型预测结果与真实标签之间差异的量化指标,值越小越接近真实,性能越好。

卷积运算

输入图像         卷积核

[7 2 3 4]             [1 0]

[4 5 6 1]      ×      [0 1]   →   输出: [12  8]

[3 2 1 5]                                         [ 7  6]

[1 9 3 4]

2.激活函数层

为模型引入非线性能力,解决梯度消失问题

3.池化层

对特征图进行下采样,减少数据维度,同时保留关键特征并增强平移不变性

最大池化:取局部区域的最大值,保留显著特征

平均池化:取局部区域的平均值,保留整体特征

4.全连接层

将提取的特征映射到输出空间(如分类任务的类别数),通常位于模型末端。

5.批量归一层

对每个批次的数据进行归一化,稳定训练过程,加速收敛。

CNN工作原理:

CNN 通过多层卷积和池化操作,从底层到高层逐步提取特征:

底层:提取边缘、颜色等基础特征。

中层:组合基础特征形成纹理、简单形状。

高层:抽象出语义信息(如 “眼睛”“车轮” 等部件)

(十)、池化

特征提取过程中对数据进行降维、压缩信息,同时增强特征的鲁棒性(抗干扰性和适应性)

以二维数据为例,假设输入是一个矩阵,池化层会用一个固定大小的窗口(如 2×2)在矩阵上滑动,每次对窗口内的元素进行聚合,得到一个输出值,被聚合完后的数据不再被计算,从而减小数据的尺寸。步长为2时,会直接将64维缩减为32

六、git仓代码

 XingYueQiong/encryption_traffic_detection: 加密流量监测


网站公告

今日签到

点亮在社区的每一天
去签到