核心程序:
model = torch.load(path)
calculate_flops = 1
if calculate_flops:
total_params = sum(p.numel() for p in model.parameters())
print("Total parameters:", total_params)
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("Trainable parameters:", trainable_params)
# 确保模型构建完成
input_shape = (1, x.shape[1]) # batch size = 1
macs, params = get_model_complexity_info(model, input_shape, as_strings=True,
print_per_layer_stat=True, verbose=True)
print(f"FLOPs: {macs}")
print(f"Parameters: {params}")
每一模块的Flops计算结果,其中1Flops=2MACs=1次乘法+1次加法
最终结果:
完整程序,仅供参考:
import argparse
import torch
import h5py
import numpy as np
from transformer.Models import Transformer, MLP
import os
import re
import torch.nn as nn
from torch.nn import functional as F
from tensorboardX import SummaryWriter
import time
from ptflops import get_model_complexity_info
N = 30
snr = 40
learning_rate = 0.003
M = 3000
epochs = 10000
dataset_n = 100
def graph_normalize(signals):
for i in range(signals.shape[0]):
signals[i, :] = signals[i, :]/np.max(np.abs(signals[i, :]))
def generate_dataset():
base_dir = "D:\无线通信网络认知\论文1\大修意见\Reviewer2-7 多种深度学习方法对比实验\\test data 30 (mat)\\"
mat_file = h5py.File(base_dir + '30_nodes_dataset.mat', 'r')
# 获取数据集
signals = mat_file["Signals"][()]
tp = mat_file["Tp"][()]
tp_list = mat_file["Tp_list"][()]
Signals = np.swapaxes(signals, 2, 0)
Tp = np.swapaxes(tp, 2, 0)
# tp_list = tp_list - 1
# 关闭文件
mat_file.close()
#把每张图的数据和标签都装进去
x_list = []
y_list = []
for n in range(0,dataset_n,1):
# print("n: ",n)
signals = Signals[n,:,:]
graph_normalize(signals)
L = 2 * signals.shape[1] # 待分析的2个信号拼在一起
tp = Tp[n,:,:]
# x 的形状为 (100000, 3600)
x = np.zeros((N * N, L))
# y 的形状为 (100000, 1)
y = np.zeros((N * N, 1))
# 生成 x 和 y
index = 0
#处理正样本
for i in range(N):
for j in range(N):
if i!=j and tp[i, j]==1:
combined_signal = np.concatenate(
(signals[i, :], signals[j, :]))
x[index, :] = combined_signal
y[index, 0] = 1
index += 1
#算负样本总共有多少
n_pair_list = []
for i in range(N):
for j in range(N):
if i!=j and tp[i, j] == 0:
n_pair_list.append((i,j))
np.random.seed(42)
indices = np.arange(len(n_pair_list))
np.random.shuffle(indices)
n_pair_list = np.array(n_pair_list)
n_pair_list = n_pair_list[indices]
#根据正样本数取负样本
n_pair = n_pair_list[:index,:]
for k in range(n_pair.shape[0]):
i = n_pair[k, 0]
j = n_pair[k, 1]
combined_signal = np.concatenate(
(signals[i, :], signals[j, :]))
x[index, :] = combined_signal
y[index, 0] = 0
index += 1
x = x[:index,:]
y = y[:index, :]
# x = np.expand_dims(x, axis=-1)
# plt.plot(np.linspace(0, 1, 2520), np.hstack((x[1, :1260], x[2, 1260:])))
# c = 70
# plot_sig = np.vstack((x[c, :1260], x[c, 1260:]))
# print("y: ",1-y[c,0])
# plt.plot(np.linspace(0, 1, 1260), plot_sig.T)
x_list.append(x)
y_list.append(y)
x = np.vstack(x_list)
y = np.vstack(y_list)
return x, y
def generate_time_dataset():
base_dir = "D:\无线通信网络认知\论文1\大修意见\Reviewer2-7 多种深度学习方法对比实验\\test data 30 (mat)\\"
mat_file = h5py.File(base_dir + '30_nodes_dataset.mat', 'r')
signals = mat_file["Signals"][()]
tp = mat_file["Tp"][()]
# 获取数据集
Signals = np.swapaxes(signals, 2, 0)
Tp = np.swapaxes(tp, 2, 0)
# tp_list = tp_list - 1
# 关闭文件
mat_file.close()
x_list = []
for n in range(0,dataset_n,1):
# print("n: ",n)
signals = Signals[n, :, :]
L = 2 * signals.shape[1] # 待分析的2个信号拼在一起
tp = Tp[n, :, :]
# x 的形状为 (100000, 3600)
x = np.zeros((N * N, L))
# y 的形状为 (100000, 1)
y = np.zeros((N * N, 1))
# 生成 x 和 y
index = 0
for i in range(N):
for j in range(N):
combined_signal = np.concatenate((signals[i, :], signals[j, :]))
x[index, :] = combined_signal
index += 1
x_list.append(x)
x = np.vstack(x_list)
return x
def cal_performance(tra_pred,tra_true):
return F.mse_loss(tra_pred,tra_true)
def train(model, data, label, optimizer):
best_acc = 0
count = 0
# model = torch.load(model_dir + 'model.pt')
for epoch in range(epochs):
# if epoch!=0:
count = count + 1
if count>50:
break
optimizer.zero_grad() # 清零优化器梯度,梯度不清零会一直存在
# score = score.to(device)
correct_count = 0
# pred = model(before_track_data.get(p).to(device), after_track_data.get(q).to(device))
pre = model(data)
loss = loss_function(pre, label) # 计算一次损失
# loss = loss_function(pre_1.float(), data_1.y.float())
# loss = loss_function(pre_1, data_1.y)
# loss反向传播就行,这里没有acc监视器
loss.backward()
# print(" ")
# 用反向传播得到的梯度进行参数更新
optimizer.step()
# 计算准确率
with torch.no_grad():
# 输出是概率,转换成0/1预测值
pred_class = (pre >= 0.5).float()
correct = (pred_class == label).sum().item()
total = label.size(0)
accuracy = correct / total
if accuracy > best_acc:
best_acc = accuracy
torch.save(model, model_dir + 'epoch '+ str(round(epoch)) +' accuracy '+str(round(accuracy,3))+'.pt')
count = 0
print("epoch: ", epoch, " loss: ", loss.item(), "accuracy: ", accuracy)
def find_max_val_acc_file(folder_path):
max_val_acc = -1
max_file = None
# 正则表达式匹配 'val_acc_' 后面的数字
# pattern = re.compile(r'val_accuracy_([0-9]+\.[0-9]+)')
pattern = re.compile(r'accuracy ([0-9]+\.[0-9]+)')
for filename in os.listdir(folder_path):
match = pattern.search(filename)
if match:
val_acc = float(match.group(1))
if val_acc > max_val_acc:
max_val_acc = val_acc
max_file = filename
return max_file, max_val_acc
def test(data, label):
base_path = r'D:\english\WCNA\Transformer\fn 59 8\\'
max_file, max_val_acc = find_max_val_acc_file(base_path)
path = base_path + max_file
model = torch.load(path)
calculate_flops = 1
if calculate_flops:
# model.summary()
# print("Total parameters:", model.count_params())
total_params = sum(p.numel() for p in model.parameters())
print("Total parameters:", total_params)
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("Trainable parameters:", trainable_params)
# 确保模型构建完成
input_shape = (1, x.shape[1]) # batch size = 1
# input_shape = x.shape[1]
# input_shape = (1, x_val.shape[1]) # 例如 (batch_size=1, sequence_length=3600)
macs, params = get_model_complexity_info(model, input_shape, as_strings=True,
print_per_layer_stat=True, verbose=True)
print(f"FLOPs: {macs}")
print(f"Parameters: {params}")
# print(f"FLOPs: {flops / 10 ** 6:.2f} MFLOPs")
pre = model(data)
# loss = loss_function(pre, label)
with torch.no_grad():
# 输出是概率,转换成0/1预测值
pred_class = (pre >= 0.5).float()
correct = (pred_class == label).sum().item()
total = label.size(0)
accuracy = correct / total
print("accuracy: ", accuracy)
def calculate(data):
base_path = r'D:\englist\WCNA\Transformer\fn 59 8\\'
max_file, max_val_acc = find_max_val_acc_file(base_path)
path = base_path + max_file
model = torch.load(path)
t1 =time.time()
pre = model(data)
t2 = time.time()
delta_t = (t2 - t1)/100 #100个样本
print("time: ", delta_t)
if __name__ == '__main__':
x, y = generate_dataset()
indices = np.arange(y.shape[0])
np.random.shuffle(indices)
x = x[indices]
y = y[indices]
# x_down = x[:, ::2]
a = round(0.8 * len(indices))
# b = 12000
x_train = x[:a, :]
y_train = y[:a, :]
x_val = x[a:, :]
y_val = y[a:, :]
x_train = torch.tensor(x_train).float()
y_train = torch.tensor(y_train).float()
x_val = torch.tensor(x_val).float()
y_val = torch.tensor(y_val).float()
device = "cuda:0"
x_train = x_train.to(device)
y_train = y_train.to(device)
x_val = x_val.to(device)
y_val = y_val.to(device)
log_writer = SummaryWriter()
# os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
model_dir = "D:\englist\WCNA\Transformer\\fn 7 4\\"
loss_function = nn.BCELoss() # 损失函数
parser = argparse.ArgumentParser()
parser.add_argument('-epoch', type=int, default=epochs)
parser.add_argument('-b', '--batch_size', type=int, default=40)
parser.add_argument('-d_model', type=int, default=7)
parser.add_argument('-d_inner_hid', type=int, default=236)
parser.add_argument('-d_k', type=int, default=64)
parser.add_argument('-d_v', type=int, default=64)
parser.add_argument('-warmup', '--n_warmup_steps', type=int, default=2000)
parser.add_argument('-lr_mul', type=float, default=2.0)
parser.add_argument('-lr', type=float, default=0.001)
parser.add_argument('-n_head', type=int, default=2)
parser.add_argument('-n_layers', type=int, default=1)
parser.add_argument('-dropout', type=float, default=0.1)
parser.add_argument('-do_train', type=bool, default=False)
parser.add_argument('-do_retrain', type=bool, default=False)
parser.add_argument('-do_eval', type=bool, default=True)
parser.add_argument('-use_mlp', type=bool, default=False)
parser.add_argument('-calculate', type=bool, default=False)
opt = parser.parse_args()
opt.d_word_vec = opt.d_model
# device = "gpu:0"
# device="cpu"
transformer = Transformer(
2000,
2000,
d_k=opt.d_k,
d_v=opt.d_v,
d_model=opt.d_model,
d_word_vec=opt.d_word_vec,
d_inner=opt.d_inner_hid,
n_layers=opt.n_layers,
n_head=opt.n_head,
dropout=opt.dropout,
n_position = 250
).to(device)
mlp = MLP(10,10,25,50,use_extra_input=False).to(device)
model_train = transformer
if opt.use_mlp:
model_train = mlp
if opt.do_train == True:
parameters = mlp.parameters() if opt.use_mlp else transformer.parameters()
# optimizer = ScheduledOptim(
# optim.Adam(parameters, betas=(0.9, 0.98), eps=1e-09),
# opt.lr, opt.d_model, opt.n_warmup_steps, opt.use_mlp)
lr = 0.001
optimizer = torch.optim.Adam(model_train.parameters(), lr=lr)
if opt.do_retrain == True: # only used for transformer
checkpoint = torch.load("./checkpoint/ckpt.pth")
transformer.load_state_dict(checkpoint['net'])
optimizer.load_state_dict(checkpoint['optimizer'])
train(
model=model_train,
data=x_train,
label=y_train,
optimizer=optimizer
)
if opt.do_eval == True:
test(data=x_val,label=y_val)
if opt.calculate == True:
x = generate_time_dataset()
x = torch.tensor(x).float()
x = x.to("cuda:0")
calculate(data=x)
# model = torch.load(model_dir + 'model.pt')