实现的功能如下:
实现功能:加密流量检测 数据集:USTC-TFC2016(使用 .pcap 文件) 目标类型:多分类,准确率97%以上,使用4大指标评估。 特征提取:不包含 payload,只提取协议头部特征,按照双向流(五元组)进行划分,不使用全部数据 包,截取一个双向流前64个包进行研究 模型构建:采用视觉模型,模型采用CNN和ResNet复合而成 |
下文从五个方面进行讲解,会提供完整可运行的代码。
关于过程中涉及到的名词方面,比如什么是CNN、什么是归一化处理、什么是ResNet等,我放到最后了,有不理解的可以查询,有基础理解的可以只看前面快速上手使用。
若本文对你有帮助,请点个免费的赞谢谢啦!
一、环境配置
vscode中安装以下内容
torch==2.3.1
torchvision
numpy==1.24.3
scapy==2.5.0
scikit-learn==1.2.2
tqdm==4.65.0
matplotlib==3.7.1
pandas==2.0.3
使用USTC-TFC2016数据集中的pcap文件,需从网上自行下载
GitHub - davidyslu/USTC-TFC2016: Traffic dataset USTC-TFC2016
二、代码框架目录
整个项目分支如下
encryption_traffic_detection 根目录
│ eval.py 评估文件
│ requirements.txt 安装环境所需文件
│ train.py 训练文件
├─config
│ │ params.py 训练参数配置
│ │ paths.py 路径配置
├─data
│ │ dataset.py 数据集基类
│ │ preprocess.py 数据预处理
│ │
│ ├─processed 保存预处理后的文件
│ ├─raw_pcaps 存放.pacp文件
├─models
│ │ cnn_model.py CNN模型
│ │ ensemble.py 合并CNN和ResNet模型
│ │ resnet_model.py ResNet模型
├─saved_models 保存模型的路径
└─utils
│ metrics.py 评估指标
三、具体实现
(一)preprocess.py
先从preprocess.py文件说起吧。
# 数据预处理
import os
import sys
# 获取项目根目录(假设preprocess.py在data/目录下)
root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 将根目录添加到Python路径
sys.path.append(root_dir)
import numpy as np
from scapy.all import rdpcap, IP, TCP, UDP
from tqdm import tqdm
from config import paths, params
import pickle
class PCAPPreprocessor:#处理网络数据包捕获(PCAP)文件,将其转换为适合机器学习的特征矩阵
def __init__(self):#初始化类的参数
self.max_packets = params.MAX_PACKETS_PER_FLOW#每个流最多保留的数据包数量(从外部参数 params 获取),特征矩阵的行数
self.feature_dim = params.FEATURE_DIM# 特征向量维度(从外部参数获取),特征矩阵的列数
def process_directory(self, input_dir, output_dir):
"""处理整个目录下的PCAP文件"""
all_flows = []
all_labels = []
# 遍历目录中的每个PCAP文件
for filename in tqdm(os.listdir(input_dir)): # tqdm用于显示进度条
if not filename.endswith('.pcap'):
continue # 跳过非PCAP文件
# 从文件名获取标签 (格式: label_xxx.pcap)
label = filename.split('_')[0] #以MySQL.pcap为例,没有下划线,直接提取整个文件名。
#label = os.path.splitext(filename)[0]
# 处理单个PCAP文件
pcap_path = os.path.join(input_dir, filename)
flows = self.process_pcap(pcap_path) #将 PCAP 文件解析为多个流(flows),每个流包含最多 64 个数据包,每个数据包会被转换为 6 维特征向量.
# 收集所有流和标签
all_flows.extend(flows) #all_flows存储所有 PCAP 文件中提取的流,每个流是一个 [64, 6] 的特征矩阵
all_labels.extend([label] * len(flows)) #存储每个流对应的标签,对于 MySql.pcap,所有流的标签均为 MySql.pcap
# 保存处理后的数据
self.save_processed_data(all_flows, all_labels, output_dir) #会将特征矩阵和标签保存到指定目录
def process_pcap(self, pcap_path):
"""处理单个PCAP文件"""
try:
packets = rdpcap(pcap_path) # 使用Scapy库读取PCAP文件,将二进制文件转为python对象列表,每个对象代表一个网络数据包。
flows = self._group_packets(packets) # 按五元组 分组数据包。源IP、目的IP、源端口、目的端口、传输层协议。区分不同设备间的通信。
# 为每个流创建特征矩阵
flow_matrices = [self._create_flow_matrix(flow) for flow in flows.values()]
return flow_matrices
except Exception as e:
print(f"Error processing {pcap_path}: {e}")
return []
def _group_packets(self, packets):
"""按五元组分组数据包,再将同一网络连接的所有数据包(包括请求和响应)归为一组,形成逻辑上的 “流”。避免只是分析单项流量,缺失响应状态、往返时间等信息。"""
flows = {}
for pkt in packets:
if not (IP in pkt and (TCP in pkt or UDP in pkt)):
continue # 跳过非IP或非TCP/UDP数据包
# 获取五元组
src = pkt[IP].src # 源IP
dst = pkt[IP].dst # 目的IP
proto = pkt[IP].proto # 协议号 (6=TCP, 17=UDP)
sport = pkt.sport if hasattr(pkt, 'sport') else 0 # 源端口
dport = pkt.dport if hasattr(pkt, 'dport') else 0 # 目的端口
# 创建双向流标识。通过排序源 / 目的地址对,确保请求和响应属于同一流
#flow_id = tuple(sorted([(src, sport), (dst, dport)]) + (proto,))
flow_id = tuple(sorted([(src, sport), (dst, dport)]) + [proto]) #通过sorted()函数处理源 IP / 端口对和目的 IP / 端口对,生成方向无关的流 ID。将同一连接的双向流量视为同一个流
if flow_id not in flows:
flows[flow_id] = [] #使用字典flows存储分组结果
flows[flow_id].append(pkt) # 将数据包添加到对应流
return flows
def _create_flow_matrix(self, packets):
"""创建流特征矩阵 (max_packets x feature_dim)"""
matrix = np.zeros((self.max_packets, self.feature_dim)) # 初始化零矩阵
for i, pkt in enumerate(packets[:self.max_packets]): # 限制每个流的最大包数
# 协议头部特征 (不包含payload,只提取数据包的协议头部特征,而忽略其负载(应用层数据)) pkt.load为payload 数据
pkt_len = len(pkt) # 数据包总长度
tos = pkt[IP].tos if IP in pkt else 0 # 服务类型字段
ttl = pkt[IP].ttl if IP in pkt else 0 # 生存时间
sport = pkt.sport if hasattr(pkt, 'sport') else 0 # 源端口
dport = pkt.dport if hasattr(pkt, 'dport') else 0 # 目的端口
flags = self._get_flags(pkt) # 获取协议标志位
matrix[i] = [pkt_len, tos, ttl, sport, dport, flags] # 填充特征矩阵(当前使用6个特征)
return matrix
def _get_flags(self, pkt):
"""获取协议标志位"""
if TCP in pkt:
return int(pkt[TCP].flags) #直接使用 Scapy 的 flags 属性(如pkt[TCP].flags返回数值)
elif UDP in pkt:
return 0x11 # 自定义UDP标志(0x11对应十进制 17,即 UDP 协议号),因为UDP 协议是无连接的,没有标准的标志位字段,UDP 的协议号是 17(在 IP 头部字段中协议类型为17)
return 0 # 其他协议返回0
def save_processed_data(self, flows, labels, output_dir):
"""保存处理后的数据"""
# 创建标签映射 (字符串标签 → 数字索引)
unique_labels = sorted(set(labels)) # 获取所有唯一标签,根据流的那个对应标签生成的
label_to_idx = {label: idx for idx, label in enumerate(unique_labels)}
# 转换标签为索引
label_indices = [label_to_idx[label] for label in labels]
# 保存数据到压缩NPZ文件
data_path = os.path.join(output_dir, 'processed_data.npz')#processed_data.npz压缩的 NumPy 数组文件,包含特征矩阵和标签索引
np.savez_compressed(
data_path,
flows=np.array(flows, dtype=np.float32), # 流特征矩阵
labels=np.array(label_indices, dtype=np.int32) # 标签索引
)
# 保存标签映射(用于后续模型预测时转换回原始标签)
label_map_path = os.path.join(output_dir, 'label_map.pkl')
with open(label_map_path, 'wb') as f:
pickle.dump(label_to_idx, f)
print(f"Saved processed data to {data_path}")
print(f"Total flows: {len(flows)}, Classes: {len(unique_labels)}")
# 主处理函数
def main():
preprocessor = PCAPPreprocessor()
preprocessor.process_directory(paths.PCAP_DIR, paths.PROCESSED_DIR)
if __name__ == "__main__":
main()
在这个文件里,主要是定义了一个数据预处理类,用于处理pcap文件,将pacp文件的二进制数据转为特征矩阵
在这个类里呢,先是初始化一下类的参数,从配置文件中获取,配置一下特征矩阵的行数和列数。
接着定义了几个数据处理函数,process_directory函数遍历处理整个目录下的pcap文件,提取pcap文件的名当做标签,调用process_pcap将单个pcap文件解析为多个流
在process_pcap里使用Scapy库读取pcap文件,将二进制数据转为python对象列表,再将python对象列表传入_group_packets函数按五元组分组数据包
在_group_packets函数里,按五元组分组数据包,将同一连接的双向流量视为一个流,划分多个流。同一网络连接的所有数据包(包括请求和响应)归为一组,形成逻辑上的 “流”。网络流量的数据包里有很多个不同设备间的通信,流就是其中某一组设备互相交换的数据,把所有它们交换的数据包分为一组。
分组完数据包(将单个pcap文件分为多个流)后,回到process_pcap函数,再为每个流创建特征矩阵,使用_create_flow_matrix函数
在_create_flow_matrix函数里,提取流的数据包里的协议头部特征,不提取payload包含的具体传输数据内容,把协议头部特征填充到特征矩阵里。
至此,流被处理为numpy数组(特征矩阵)。数据从二进制变为python列表对象,将其划分为流存入字典中,再提取特征转为numpy数组,方便后面机器学习处理数据。
还剩下保存模型的函数,没什么好讲的,继续下一步
(二)train.py
import torch
import torch.optim as optim #优化器模块
import torch.nn as nn #神经网络模块,包含层定义和损失函数
from torch.optim.lr_scheduler import ReduceLROnPlateau
from data.dataset import create_dataloaders #自定义数据加载模块,用于处理 PCAP 特征数据
from models.ensemble import ModelEnsemble #自定义模型集成类,可能融合多个子模型
from utils.metrics import calculate_metrics, focal_loss #包含评估指标计算函数和 Focal Loss 实现
from config import paths, params #存储路径配置和训练超参数
import os
import numpy as np
import time
def train_model():
# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# 创建数据加载器
train_loader, val_loader, test_loader = create_dataloaders() #从data.dataset模块调用函数创建数据加载器
# 初始化模型
model = ModelEnsemble(num_classes=params.NUM_CLASSES).to(device)
# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(
model.parameters(),
lr=params.LEARNING_RATE,
weight_decay=1e-5 # L2正则化
)
# 学习率调度器
scheduler = ReduceLROnPlateau(
optimizer, mode='max', factor=0.5, patience=5, verbose=True)
# 训练循环
best_val_acc = 0.0 #记录验证集最佳准确率
early_stop_counter = 0 #早停计数器
max_early_stop = 10 #连续 10 个 epoch 无提升则停止训练
for epoch in range(params.EPOCHS):
start_time = time.time()
# 训练阶段
model.train()#将模型设置为训练模式
train_loss = 0.0
all_preds = []
all_labels = []
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device) #从训练集加载一批数据到 GPU
# 前向传播,计算模型输出
outputs = model(inputs) #输入数据
loss = focal_loss(outputs, labels) # 使用Focal Loss处理类别不平衡问题
# 反向传播
optimizer.zero_grad() #清除上一批次的梯度
loss.backward() #计算当前批次的梯度
optimizer.step() #更新模型参数
# 记录损失和预测,累积损失和预测结果
train_loss += loss.item() * inputs.size(0)
_, preds = torch.max(outputs, 1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
# 计算训练指标
train_metrics = calculate_metrics(np.array(all_labels), np.array(all_preds))
train_loss = train_loss / len(train_loader.dataset)
# 验证阶段
val_metrics = evaluate_model(model, val_loader, device) #evaluate_model 函数在验证集上评估模型性能
# 更新学习率
scheduler.step(val_metrics['accuracy']) #学习率调度器根据验证准确率调整学习率
# 记录时间
epoch_time = time.time() - start_time #记录每个 epoch 的训练时间
# 打印日志
print(
f"Epoch {epoch+1}/{params.EPOCHS} | Time: {epoch_time:.2f}s | "
f"Train Loss: {train_loss:.4f} | Train Acc: {train_metrics['accuracy']:.4f} | "
f"Val Acc: {val_metrics['accuracy']:.4f} | Val F1: {val_metrics['f1']:.4f}"
)
# 保存最佳模型
if val_metrics['accuracy'] > best_val_acc:
best_val_acc = val_metrics['accuracy']
early_stop_counter = 0
save_model(model, epoch, val_metrics)
else:
early_stop_counter += 1
# 早停检查
if early_stop_counter >= max_early_stop:
print(f"Early stopping at epoch {epoch+1}")
break
# 最终测试
print("Training completed. Running final test evaluation...")
test_metrics = evaluate_model(model, test_loader, device) #在独立测试集上评估最终模型性能
print(
f"Test Results: Acc={test_metrics['accuracy']:.4f}, "
f"Precision={test_metrics['precision']:.4f}, "
f"Recall={test_metrics['recall']:.4f}, "
f"F1={test_metrics['f1']:.4f}"
)
def evaluate_model(model, data_loader, device):
"""评估模型性能"""
model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
for inputs, labels in data_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
return calculate_metrics(np.array(all_labels), np.array(all_preds))
def save_model(model, epoch, metrics):
"""保存最佳模型"""
model_path = os.path.join(
paths.MODEL_SAVE_DIR,
f"best_model_epoch{epoch+1}_acc{metrics['accuracy']:.4f}.pth"
)
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'val_accuracy': metrics['accuracy']
}, model_path)
print(f"Saved best model to {model_path} with accuracy {metrics['accuracy']:.4f}")
if __name__ == "__main__":
train_model()
在train.py里,先选择训练设备为CPU或者GPU,不清楚自己设备是否启用了GPU的,可看我上一篇文章。
接着创建数据加载器,使用dataset.py文件里的create_dataloaders函数,该函数主要使用TrafficDataset类实例化三种数据集。
在TrafficDataset类里,先加载预处理后的特征矩阵,加载标签名到数字索引的映射,因为模型无法处理文字标签,同时因为输出时要便于直观查看分类,还需要添加一个反向的索引到标签名的映射。再往下划分数据集,随机打乱样本的顺序,按照配置参数里的配置将样本划分为70%训练15%验证15%测试。
划分完后,计算均值和标准差,使用transforms.Normalize定义归一化转换器,将输入数据标准化,同时根据有无其他增强函数来按顺序执行,normalize为归一化操作。实际上并未用到transform增强函数,直接执行if分支里的normalize进行归一化
定义__getitem__函数,给特征矩阵添加一路通道,将特征矩阵转换为torch.Tensor类型,将 NumPy 数组flow转换为 PyTorch 浮点型张量,并应用归一化转换,将所有特征映射到均值为0标准差为1的分布里,避免模型过于拟合特征值大的类型。这个函数会在数据加载器DataLoader里被自动调用
随后返回三个数据加载器。
接着train.py,下一步是初始化模型,使用ensemble.py中的ModelEnsemble类加权合并CNN和ResNet模型。
在ModelEnsemble类里,创建CNN和ResNet两个模型的实例,然后冻结ResNet的所有层,解冻特征处理用于对特征分类。前向传播函数中按照60%CNN和40%的ResNet模型权重组合进行训练
ResNet模型
使用到的是ResNet18模型,先加载在torchvision.models库里预训练好的ResNet18模型,将输入的一通道(dataset里添加的1路通道)数据拓展为三通道,符合ResNet18的模型输入,随后就是经过第一层分类层,将512维的输入维度,提取特征输出,第二层分类层将第一层的输出映射到类别上。
CNN模型
CNN模型里,先定义的特征提取层,再定义分类层。
特征提取层里,定义了三层卷积,使用一个3x3的卷积核,一开始会随机生成一个3x3的权重矩阵,会随着学习逐步调整,特征经过三层卷积后会被逐步提取出来,卷积操作后使用池化操作,将时间和特征维度减半,由于池化将特征图减半,因此逐层增加输入通道数弥补特征图的信息损失。
进入分类层前需要对特征图进行展平,卷积输出后的多维张量转为二维张量,1024=128通道x8时间同步x1特征。然后经过隐藏层,将1024维度减半为512。在dropout正则化,根据512个特征对应每个类别的匹配分数,输出本次训练得到的数据属于哪个类别。
再回到train.py
定义一下损失函数、学习率调度器、训练参数这些,开始循环训练,遵循标准的机器学习范式,先前向传播,再反向传播计算梯度损失来更新模型参数。然后评估一下训练指标,打印日志,保存模型。
(三)dataset.py
# PyTorch数据集类
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader #PyTorch 数据集基类和数据加载器
from config import paths, params
import pickle #用于读取标签映射的序列化文件
from torchvision import transforms
# 计算数据集的均值和标准差
def calculate_mean_std():
data_path = os.path.join(paths.PROCESSED_DIR, 'processed_data.npz')
data = np.load(data_path)
flows = data['flows']
flows = np.expand_dims(flows, axis=1) # 添加通道维度
mean = np.mean(flows, axis=(0, 2, 3)) #0号轴:样本数 2号:最大包数 3号:特征维度
std = np.std(flows, axis=(0, 2, 3)) #对所有样本的所有包的所有特征维度进行计算
return mean, std
class TrafficDataset(Dataset): #继承 PyTorch 的Dataset基类,实现自定义流量数据集
def __init__(self, mode='train', transform=None): #指定数据集类型(训练 / 验证 / 测试),transform:增强函数
"""
加密流量数据集类
参数:
mode: 'train', 'val', 'test'
transform: 数据增强函数
"""
# 加载处理后的数据,即特征矩阵
data_path = os.path.join(paths.PROCESSED_DIR, 'processed_data.npz')
data = np.load(data_path)
flows = data['flows'] #形状为[样本数, 最大包数, 特征维度]的 numpy 数组
labels = data['labels'] #样本对应的标签索引数组
# 加载标签映射
label_map_path = os.path.join(paths.PROCESSED_DIR, 'label_map.pkl') #标签名→索引映射。模型无法处理文本
with open(label_map_path, 'rb') as f:
self.label_map = pickle.load(f)
self.idx_to_label = {v: k for k, v in self.label_map.items()} #反向生成索引→标签名映射idx_to_label。模型完成后,输出的是索引,将其转为标签
# 划分数据集
n_samples = len(flows)
indices = np.arange(n_samples)
np.random.shuffle(indices) #随机划分,打乱样本顺序,仍包含所有样本的索引。
#按params.TRAIN_RATIO和params.VAL_RATIO划分数据集
train_end = int(n_samples * params.TRAIN_RATIO) # 训练集结束位置
val_end = train_end + int(n_samples * params.VAL_RATIO) # 验证集结束位置
if mode == 'train': #根据初始的mode选择对应子集存储为类属性
self.flows = flows[indices[:train_end]] # 假设1000个索引,使用前700个随机索引
self.labels = labels[indices[:train_end]] #保持数据与标签的对应关系
elif mode == 'val':
self.flows = flows[indices[train_end:val_end]] # # 使用中间200个随机索引
self.labels = labels[indices[train_end:val_end]]
else: # test
self.flows = flows[indices[val_end:]] # 使用最后100个随机索引
self.labels = labels[indices[val_end:]]
# 计算均值和标准差
mean, std = calculate_mean_std()
# 定义归一化转换,归一化的核心目的是将不同特征的取值范围缩放到相同尺度,将所有特征映射到均值为 0、标准差为 1的分布
#归一化主要影响输入特征的尺度大小,而梯度下降算法中,其更新参数的步长与梯度的大小相关,如果输入数据的特征尺度差异较大,不同特征对应的梯度大小也会有很大差异
normalize = transforms.Normalize(mean=mean, std=std)
if transform is None:
self.transform = transforms.Compose([
#transforms.ToTensor(),
normalize
])
else:
self.transform = transforms.Compose([
transform,
#transforms.ToTensor(),
normalize
])
print(f"Loaded {mode} dataset: {len(self.flows)} samples") #打印样本数量
#self.transform = transform
#print(f"Loaded {mode} dataset: {len(self.flows)} samples") #打印样本数量
def __len__(self): #__len__方法返回数据集样本数量
return len(self.flows)
def __getitem__(self, idx): #__getitem__方法实现索引访问
flow = self.flows[idx] #从flows和labels中获取指定索引的特征矩阵和标签
label = self.labels[idx]
# 添加通道维度 (C, H, W) = (1, 64, 6) (从[64,6]变为[1,64,6])以适配 CNN 输入,符合的输入格式(通道数、高度、宽度)
flow = np.expand_dims(flow, axis=0)
if self.transform: #如果指定了transform,则对特征矩阵进行预处理(如归一化)。开头定义的是未指定
#flow = self.transform(flow)
# 先转换为张量,确保维度顺序正确
flow_tensor = torch.tensor(flow, dtype=torch.float32)
# 应用转换(此时不需要 ToTensor,因为已经是张量)
flow_tensor = self.transform(flow_tensor)
else:
flow_tensor = torch.tensor(flow, dtype=torch.float32)
return torch.tensor(flow, dtype=torch.float32), torch.tensor(label, dtype=torch.long) #将 预处理后的NumPy 数组转换为 PyTorch 张量,设置数据类型(特征为float32,标签为long)
def get_label_name(self, idx):
"""获取类别的实际名称"""
return self.idx_to_label.get(idx, "unknown") #将数字索引转换为原始标签名
def get_num_classes(self):
return len(self.label_map) #返回数据集中的类别总数
# 数据增强函数
def random_dropout(flow, dropout_rate=0.1):
"""随机丢弃部分包特征"""
# 确保输入形状为 [1, 64, 6]
assert flow.shape[0] == 1, f"Input shape should be [1, H, W], got {flow.shape}"
# 在数据包维度上应用 dropout(第1维,对应64个包)
mask = np.random.choice([0, 1], size=flow.shape[1], p=[dropout_rate, 1-dropout_rate]) #按dropout_rate概率生成 0-1 掩码(0 表示丢弃,1 表示保留),掩码应用于每个数据包的特征维度(模拟网络丢包或特征缺失)
flow = flow * mask.reshape(1, -1, 1)
return flow
# 创建数据加载器
def create_dataloaders(batch_size=params.BATCH_SIZE):
#实例化三种数据集(训练 / 验证 / 测试)
train_dataset = TrafficDataset(mode='train', transform=random_dropout)#训练集应用random_dropout增强
val_dataset = TrafficDataset(mode='val')
test_dataset = TrafficDataset(mode='test')
#使用DataLoader创建批次数据迭代器
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) #shuffle=True:训练集打乱顺序以提升训练稳定性。batch_size:批次大小
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
return train_loader, val_loader, test_loader
(四)ensemble.py
# 模型集成
import torch.nn as nn
from .cnn_model import EncryptedTrafficCNN #EncryptedTrafficCNN(自定义 CNN)
from .resnet_model import TrafficResNet #TrafficResNet(基于 ResNet 的模型)
from config import params
class ModelEnsemble(nn.Module): #nn.Module是 PyTorch 所有神经网络模块的基类
def __init__(self, num_classes=params.NUM_CLASSES):
"""
模型集成:CNN + ResNet
参数:
weights: 两个模型的权重
"""
super(ModelEnsemble, self).__init__() #调用父类nn.Module的初始化方法
self.cnn_model = EncryptedTrafficCNN(num_classes) #创建两个子模型实例,指定分类类别数num_classes
self.resnet_model = TrafficResNet(num_classes) #num_classes从配置参数获取,流量分类的类别数
# 冻结ResNet所有层,通过设置param.requires_grad = False/True控制参数是否参与梯度更新
for param in self.resnet_model.resnet.parameters(): #ResNet 的基础特征提取层(resnet.parameters())被冻结,不参与训练
param.requires_grad = False #原因:预训练的 ResNet-18 在大规模数据集上学习到了通用的图像特征特征(如边缘、纹理)对各类视觉任务通用,无需重新学习
# 解冻最后两层,因为这些层学习到的是与特定任务相关的高级语义特征(如分类任务中的类别特征)
for param in self.resnet_model.resnet.layer4.parameters(): #layer4:ResNet 的最后一个残差块组,负责提取高级语义特征
param.requires_grad = True
for param in self.resnet_model.resnet.fc.parameters(): #fc:全连接层,负责分类决策,解冻这两层以便针对流量分类任务微调
param.requires_grad = True
#前向传播
def forward(self, x): #输入数据x同时通过 CNN 和 ResNet
cnn_out = self.cnn_model(x)
resnet_out = self.resnet_model(x)
# 加权平均集成,将两个模型的输出按配置权重线性组合,60%CNN、40%ResNet。结合 CNN 的局部特征提取能力和 ResNet 的深层语义表达能力
ensemble_out = (
params.ENSEMBLE_WEIGHTS[0] * cnn_out +
params.ENSEMBLE_WEIGHTS[1] * resnet_out
)
return ensemble_out #集成输出保持与单个模型相同的维度
(五)resnet_model.py
# ResNet模型
import torch.nn as nn
from torchvision.models import resnet18
from config import params
class TrafficResNet(nn.Module):
def __init__(self, num_classes=params.NUM_CLASSES):
super().__init__()
self.resnet = resnet18(pretrained=True) #加载预训练的 ResNet18 模型,可加速训练并提高泛化能力
# 添加1x1卷积扩展通道。将一路输入变为三路。参数矩阵形状:[3, 1, 1, 1](输出通道 = 3,输入通道 = 1,核大小 = 1x1)
self.channel_expander = nn.Conv2d(1, 3, kernel_size=1, bias=False) #要求输入为 3 通道(RGB 图像),,而流量数据通常是 1 通道(单通道特征图)
# self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
# 分类层修改
num_features = self.resnet.fc.in_features#获取原始 ResNet 的全连接层输入维度(通常为 512)
self.resnet.fc = nn.Sequential(
nn.Linear(num_features, params.DENSE_UNITS), #第一个线性层:512 → params.DENSE_UNITS(如 512)
nn.ReLU(inplace=True),
nn.Dropout(p=params.DROPOUT_RATE),
nn.Linear(params.DENSE_UNITS, num_classes) #第二个线性层:params.DENSE_UNITS → num_classes(如 2 类)
)
def forward(self, x):
if x.shape[1] == 1: #检查输入通道数:若为 1 通道,则通过channel_expander扩展为 3 通道
x = self.channel_expander(x) # 1x1卷积扩展到3通道
return self.resnet(x)
(六)cnn_model.py
# CNN模型
import torch.nn as nn
from config import params
class EncryptedTrafficCNN(nn.Module): #定义用于加密流量分类的 CNN 模型
def __init__(self, num_classes=params.NUM_CLASSES):
"""
用于加密流量分类的CNN模型
输入形状: [batch, 1, 64, 6] (通道, 包数, 特征维度)
"""
super(EncryptedTrafficCNN, self).__init__()
# 特征提取层,卷积核在两个维度上滑动,对应特征矩阵的的高度和宽度
self.features = nn.Sequential(
nn.Conv2d(1, params.CNN_FILTERS[0], kernel_size=params.CNN_KERNEL_SIZE, padding=1), #二维卷积,用于提取空间特征,Conv2D的第1/2个参数:输入通道数:1(输入)→ 32 → 64 → 128
nn.BatchNorm2d(params.CNN_FILTERS[0]), #nn.BatchNorm2d加速训练并提高稳定性,属于模型内的归一化。在每一批数据通过卷积层后,对特征图进行归一化(减均值、除标准差),加速训练并提高稳定性。
nn.ReLU(inplace=True), #nn.ReLU引入非线性,允许模型学习不同协议的特征组合。
nn.MaxPool2d(kernel_size=(2, 2)), #前两层使用 (2,2) 池化:将时间和特征维度均缩小一半。
nn.Conv2d(params.CNN_FILTERS[0], params.CNN_FILTERS[1], kernel_size=params.CNN_KERNEL_SIZE, padding=1),#kernel_size卷积核大小, padding=1保持特征图尺寸不变
nn.BatchNorm2d(params.CNN_FILTERS[1]),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=(2, 2)),#前两层使用 (2,2) 池化:将时间和特征维度均缩小一半
nn.Conv2d(params.CNN_FILTERS[1], params.CNN_FILTERS[2], kernel_size=params.CNN_KERNEL_SIZE, padding=1),
nn.BatchNorm2d(params.CNN_FILTERS[2]),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=(2, 1))#最后一层使用 (2,1) 池化:仅缩小时间维度,保留特征维度
)
"""卷积核(3, 3)会同时作用于相邻的 3 个数据包和每个数据包的 3 个特征;池化层(2, 2)会对相邻的 2 个数据包和 2 个特征进行下采样。"""
# 分类层
self.classifier = nn.Sequential(
nn.Linear(params.CNN_FILTERS[2] * 8 * 1, params.DENSE_UNITS), # 输入维度:128(通道数)×8(时间步)×1(特征)= 1024 隐藏层:params.DENSE_UNITS(512,为输入维度1024的一半)
nn.ReLU(inplace=True),
nn.Dropout(p=params.DROPOUT_RATE), #Dropout params.DROPOUT_RATE(为 0.5)训练时随机丢弃 50% 的神经元,防止过拟合
nn.Linear(params.DENSE_UNITS, num_classes) #输出层:num_classes(分类类别数,配置为20)
#最后一层线性变换:相当于一个 “分类器”,根据 512 维特征计算每个水果类别的匹配分数,最终选择分数最高的类别作为预测结果
)
def forward(self, x):
x = self.features(x) #通过卷积层提取特征
x = x.view(x.size(0), -1) # 展平,将多维张量展平为二维([batch_size, 1024])
x = self.classifier(x) #通过全连接层进行分类
return x
(七)params.py
# 特征提取参数
MAX_PACKETS_PER_FLOW = 64 # 每个流取前64个包
FEATURE_DIM = 6 # 每个包的6维特征,从五元组里提取,可调节。
# 训练参数
BATCH_SIZE = 128
EPOCHS = 20
LEARNING_RATE = 0.001
NUM_CLASSES = 20 # USTC-TFC2016有20个类别
# 模型参数
CNN_FILTERS = [32, 64, 128]
CNN_KERNEL_SIZE = (3, 3)
DENSE_UNITS = 512
DROPOUT_RATE = 0.5
# 数据划分
TRAIN_RATIO = 0.7 #70%训练
VAL_RATIO = 0.15 #15%验证
TEST_RATIO = 0.15 #15%测试
# 集成模型权重
ENSEMBLE_WEIGHTS = [0.6, 0.4] # CNN和ResNet的权重
(八)paths.py
import os
# 数据集路径配置
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DATA_DIR = os.path.join(BASE_DIR, 'data')
PCAP_DIR = os.path.join(DATA_DIR, 'raw_pcaps')
PROCESSED_DIR = os.path.join(DATA_DIR, 'processed')
# 确保目录存在
os.makedirs(PCAP_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True)
# 模型保存路径
MODEL_SAVE_DIR = os.path.join(BASE_DIR, 'saved_models')
os.makedirs(MODEL_SAVE_DIR, exist_ok=True)
# 日志路径
LOG_DIR = os.path.join(BASE_DIR, 'logs')
os.makedirs(LOG_DIR, exist_ok=True)
四、核心流程梳理
加载配置(参数和路径)→ 定义预处理参数(MAX_PACKETS_PER_FLOW等)
↓
数据预处理(preprocess.py):
遍历PCAP文件 → 按五元组分组 → 提取每个流的前N个包 → 提取每个包的6维特征 → 形成特征矩阵(64x6)→ 从文件名提取标签
↓
保存预处理数据(processed_data.npz和label_map.pkl)
↓
创建PyTorch数据集(dataset.py):
加载预处理数据 → 划分训练/验证/测试集 → 定义数据增强(随机丢弃)→ 创建DataLoader
↓
构建模型(cnn_model.py, resnet_model.py, ensemble.py):
初始化CNN模型 → 初始化ResNet模型(修改输入通道)→ 构建集成模型(加权平均)
↓
训练模型(train.py):
设置设备 → 定义损失函数(Focal Loss)和优化器(Adam)→ 学习率调度器 → 训练循环(前向、损失、反向、优化)→ 验证集评估 → 保存最佳模型 → 早停
↓
评估模型(eval.py):
加载测试集 → 加载保存的模型 → 预测 → 计算四大指标(准确率、精确率、召回率、F1)→ 输出结果
注意:在训练过程中,每个epoch结束后会在验证集上评估,并根据验证集准确率调整学习率。训练完成后会在测试集上做最终评估。
五、名词解释
(一)、USTC-TFC2016
中科大网络安全实验室发布的流量分类数据集,包含安卓和Windows环境下的加密流量。特别之处在于同时提供原始流量包和会话级特征,具有预处理好的CSV特征文件这个亮点。
流量类型: 包含良性(正常应用)流量和恶意软件流量。
良性流量类别 (10类):BitTorrent, Facetime, FTP, Gmail, MySQL, Outlook, Skype, SMB, Weibo, WorldOfWarcraft
恶意软件流量类别 (10类):Cridex, Geodo, Htbot, Miuref, Neris, Nsis-ay, Shifu, Tinba, Virut, Zeus
数据格式:分为pcap和csv文件
原始流量包 (pcap文件): 包含抓取到的原始网络数据包。
会话级特征 (csv文件): 从原始pcap文件中提取了大量的统计特征,并按网络会话/流(通常是基于五元组:源IP、源端口、目的IP、目的端口、传输层协议)组织成结构化的CSV文件。
USTC-TFC2016/
├── Malware/ # 恶意流量
│ ├── Cridex/
│ ├── Geodo/
│ └── ... # 共10个恶意类别
└── Normal/ # 正常流量
├── BitTorrent/
├── Facetime/
└── ... # 共10个正常类别
放置步骤:
- 在项目根目录创建文件夹:data/raw_pcaps
- 将所有PCAP文件复制到该目录:
- 将Malware/和Normal/下的所有.pcap文件复制到data/raw_pcaps
- 文件应直接放在raw_pcaps下,不要保留子目录结构
文件命名示例:Cridex_0001.pcap, BitTorrent_0001.pcap等
(二)、pcap文件
pcap 文件由两部分组成:全局文件头和数据包记录,每个数据包按捕获顺序依次存储,以二进制方式存储,可捕获物理层和数链层的加密流量,无法处理应用层的加密流量,pcap包是加密流量的基础数据载体。
全局文件头:24B,定义文件属性
数据包记录:包头16B+数据包数据,存储数据链路层以上的原始数据,长度不超过全局文件头定义的最大长度。
(三)、流
在网络技术中,“流” 指的是具有相同特征的一组数据包,这些特征通常包括
五元组信息:源 IP 地址、目的 IP 地址、源端口、目的端口、传输层协议(如 TCP/UDP)。
可以选择性的保留每个流里的数据包数量,比如只保存64个数据包,多余的被废弃
(四)、五元组
五元组:源 IP、目的 IP、源端口、目的端口、协议号
按五元组分组数据包:
将一个流里的多个数据包按照它们的五元信息进行归类,同一个流的所有数据包同属于一个五元组。
划分完五元组后,下一步就是根据五元组信息来提取维度特征,生成特征矩阵。
(五)、Dataset
数据集,描述结构化或半结构化的数据。在本项目中,实现自定义的流量数据集,即使用预处理后的特征矩阵
(六)、Transfrom
是用于数据预处理和增强的函数或函数链
创建数据集dataset的时候,可以指定也可以不指定transform。若指定了,则每次从数据集获取样本时都会自动应用transform的变换。
常见的变换包括:标准化、归一化(特征缩放到[0,1]范围)、数据增强(图像翻转、旋转)
(七)、pytorch张量
PyTorch 张量是一种多维数组,是 PyTorch 框架的核心数据结构,其设计灵感源于 NumPy 数组,但增加了以下关键特性:
硬件加速支持:可直接在 GPU 上运算(NumPy 数组仅能在 CPU 上运行)
自动微分支持:内置计算图记录,是深度学习框架的基础
分布式计算支持:可在多设备间高效传输和运算
如果把 NumPy 数组看作 "普通计算器",那么 PyTorch 张量就是 "带编程功能的科学计算器",专为深度学习任务定制
张量作为模型输入输出的标准格式
图像数据:形状通常为(batch_size, channels, height, width),如(32, 3, 224, 224)表示 32 张 3 通道 224×224 的图像
文本数据:形状通常为(batch_size, sequence_length),如(64, 100)表示 64 个长度为 100 的文本序列
标签数据:形状通常为(batch_size,)的长整型张量(分类任务)
(八)、ResNet残差网络
是一种卷积神经网络架构,解决传统CNN在深度增加时面临的梯度消失/爆炸和退化问题,使得可以训练超深层网络
传统CNN是输入x,经过一层层卷积层后输出H(x)
而残差网络是在CNN的基础上,输入x,最后输出的H(x)=F(x)+x。即直接将输入x和输出F(x)相加,形成残差学习。如果x和F(x)不同维,则需要通过1x1卷积(称为 “投影映射”)调整x维度,确保加法可行。
本项目采用ResNet-18模型,共18层
初始卷积层1层
残差块:16层
Layer1:两个残差块(每块2层卷积)--共四层
Layer2:2个残差块—4层
Layer3:2个残差块—4层
Layer4:2个残差块—4层
全局平均池化层:1层
Avgpool:将特征图降维为1x1
全连接层1层
Fc:输出分类所需的类别数(如1000层)
ResNet-18 层结构 | 是否冻结 | 功能描述
---------------------------------------------
conv1 | ✅ | 基础特征提取(边缘、纹理)
layer1 | ✅ | 低级特征(边缘组合)
layer2 | ✅ | 中级特征(简单部件)
layer3 | ✅ | 高级特征(复杂部件)
layer4 | ❌ | 特定任务的语义特征
avgpool | ✅ | 全局特征聚合
fc | ❌ | 分类决策
如上所示,使用ResNet18的时候,冻结以上层,方便直接使用ResNet在大模型上直接训练好的特征提取模型,避免该模型和自己的数据过拟合。解冻这些层,主要使用这些层来进行分类自己的数据。
(九)、CNN
是一种专门为处理具有网格状拓扑结构数据(如特征矩阵)而设计的深度学习模型
主要有:1.卷积层 2.激活函数层 3.池化层 4.全连接层 5.批量归一化层
- 卷积层
功能:通过卷积核(Filter)与输入数据进行局部滑动卷积,提取空间特征(如边缘、纹理、形状等)。一个卷积层可包含多个卷积核,每个卷积核学习检测不同类型的特征,输出的通道等于卷积核数。
核心概念:
局部感知野:每个神经元(输出特征图上的一个点)仅连接输入数据的局部区域,而非全连接,模拟视觉皮层的神经元响应特性。全连接层是每个神经元连接所有输入元素。
权值共享:同一卷积核的参数在整个输入空间中共享,大幅减少参数数量。
步长(Stride):卷积核滑动的间隔,控制输出特征图的尺寸。
padding:在输入边缘填充数据,保持输出特征图尺寸不变
什么是卷积核?本质是一个小的权重矩阵,通过与输入数据(如特征矩阵)进行卷积运算,提取特定的局部特征。之所以称为权重矩阵,因为矩阵中的每个数代表不同的权重,可以调整组合这些权重数,来检测特定类型的特征,权重一开始是随机的,后通过反向传播计算损失函数对权重的梯度,使用优化器迭代权重,使得卷积核逐渐学会识别有意义的特征。
梯度(Gradient) 是损失函数对权重的偏导数,代表损失函数在当前权重下的变化率。
梯度的意义:指示损失函数下降最快的方向
优化目标:找到一组权重,使损失函数值最小(即位于 “山谷” 底部)。因此,需要沿梯度的反方向更新权重。(因为是由权重—>导致损失函数下降,沿反向调整权重)
损失函数(Loss Function) 是衡量模型预测结果与真实标签之间差异的量化指标,值越小越接近真实,性能越好。
卷积运算:
输入图像 卷积核
[7 2 3 4] [1 0]
[4 5 6 1] × [0 1] → 输出: [12 8]
[3 2 1 5] [ 7 6]
[1 9 3 4]
2.激活函数层
为模型引入非线性能力,解决梯度消失问题
3.池化层
对特征图进行下采样,减少数据维度,同时保留关键特征并增强平移不变性
最大池化:取局部区域的最大值,保留显著特征
平均池化:取局部区域的平均值,保留整体特征
4.全连接层
将提取的特征映射到输出空间(如分类任务的类别数),通常位于模型末端。
5.批量归一层
对每个批次的数据进行归一化,稳定训练过程,加速收敛。
CNN工作原理:
CNN 通过多层卷积和池化操作,从底层到高层逐步提取特征:
底层:提取边缘、颜色等基础特征。
中层:组合基础特征形成纹理、简单形状。
高层:抽象出语义信息(如 “眼睛”“车轮” 等部件)
(十)、池化
特征提取过程中对数据进行降维、压缩信息,同时增强特征的鲁棒性(抗干扰性和适应性)
以二维数据为例,假设输入是一个矩阵,池化层会用一个固定大小的窗口(如 2×2)在矩阵上滑动,每次对窗口内的元素进行聚合,得到一个输出值,被聚合完后的数据不再被计算,从而减小数据的尺寸。步长为2时,会直接将64维缩减为32