无线通信网络拓扑推理采样率实验(数据生成)

发布于:2025-06-15 ⋅ 阅读:(12) ⋅ 点赞:(0)

一、内容描述

测试信号采样率对无线通信网络拓扑推理的效果

二、MATLAB数据生成程序

%创建时间:2025年6月10日
%zhouzhichao
%遍历生成100Hz到1MHz的无线通信网络拓扑推理数据用于测试



close all
clear
snr = 40;
n = 30;
dataset_n = 3;

% for fs = 1e3:1e3:1e6
for fs = 100e3:-1e3:1e3
    Sampling_M = round(1e6/fs);
    
    nodes_P = ones(n,1);
    
    count_3 = 0;
    count_2 = 0;
    count_1 = 0;
    
    
    %获取一帧信号及对应功率
    [ack,~] = ack_generate(Sampling_M);
    ack_L = length(ack);
    signal = ack;
    P_signal = sum(abs(signal).^2);
    ack_noise = randn(ack_L, 1);
    P_noise_1 = sum(ack_noise.^2);  % 计算当前噪声的能量
    a_slot_N = round(7*ack_L);
    
    ave_edge = round(1.5*n);
    L = ave_edge*3*a_slot_N;
    
    Signals = zeros(dataset_n,n,L);
    Tp = zeros(dataset_n,n,n);
    Tp_list = cell(dataset_n,1);
    
    A = 10^(snr/10);
    P_noise = P_signal/A;
    alpha = sqrt(P_noise / P_noise_1);  % 计算缩放因子
    noise = alpha*randn(1, L);
    
    for k=1:dataset_n
        disp(["fs: ",fs/1e3," kHz  k: ",k])
        pause(0.01)
        
        [tp,tp_list] = large_nodes_tp(nodes_P,n);
        
        n_edge = length(tp_list);
        
        
%         L = n_edge*3*a_slot_N;
        signals = zeros(n,L);
        c = 0;
        
        A = 10^(snr/10);
        P_noise = P_signal/A;
        alpha = sqrt(P_noise / P_noise_1);  % 计算缩放因子
        
        
        for epoch=1:3
            index_list = randperm(n_edge);
            for i =index_list
                this_slot_start_point = c*a_slot_N+1;
                p = tp_list(i,1);
                q = tp_list(i,2);
                P1 = nodes_P(p);
                P2 = nodes_P(q);
                
                
                %三次响应
                for m=1:3
                    r_P = 0.9 + (1.1 - 0.9) * rand;
                    signal = r_P*P1*ack;
                    signals(p,this_slot_start_point+(2*m-2)*ack_L:this_slot_start_point+(2*m-1)*ack_L-1) = signal;
                    
                    r_P = 0.9 + (1.1 - 0.9) * rand;
                    signal = r_P*P2*ack;
                    
                    
                    signals(q,this_slot_start_point+(2*m-1)*ack_L:this_slot_start_point+(2*m)*ack_L-1) = signal;
                    
                end
                
                
                c = c + 1;
            end
        end
        
        for i=1:n
            signals(i,:) = signals(i,:) + alpha*randn(1, L);
        end
        
        Signals(k,:,:) = signals;
        Tp(k,:,:) = tp;
        Tp_list = cell(dataset_n,1);
        
    end
    save("D:\无线通信网络认知\论文1\大修意见\Reviewer2-2 采样率实验\1e3-1e6Hz date\fs_"+num2str(fs/1e3)+"kHz_data.mat","Tp","Tp_list","Signals")
    
    
    
end

 生成数据:

三、PyG转换数据

#作者:zhouzhichao
#创建时间:2025/5/30
#内容:生成残缺数据集用于实验

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
import os
import h5py
import matplotlib.pyplot as plt
import numpy as np
import torch
from torch_geometric.data import InMemoryDataset, Data
from torch_geometric.utils import negative_sampling

base_dir = "D:\无线通信网络认知\论文1\大修意见\Reviewer2-2 采样率实验\\1k-100kHz data (mat)\\"

N = 30
grapg_size = N
train_n = 20


class graph_data(InMemoryDataset):
    def __init__(self, root, signals=None, tp_list = None, transform=None, pre_transform=None):
        self.signals = signals
        self.tp_list = tp_list
        super().__init__(root, transform, pre_transform)
        self.data = torch.load(self.processed_paths[0])


    # 返回process方法所需的保存文件名。你之后保存的数据集名字和列表里的一致
    @property
    def processed_file_names(self):
        return ['gcn_data.pt']

    # 生成数据集所用的方法
    def process(self):
        signals = self.signals
        tp_list =self.tp_list


        #matlab的index是从1开始的,pytorch的index应当从0开始
        tp_list = tp_list - 1

        X = torch.tensor(signals, dtype=torch.float)

        # 所有的边
        Edge_index = torch.tensor(tp_list, dtype=torch.long)

        # 所有的边1标签
        edge_label = np.ones((tp_list.shape[1]))
        Edge_label = torch.tensor(edge_label, dtype=torch.float)

        neg_edge_index = negative_sampling(
            edge_index=Edge_index, num_nodes=grapg_size,
            num_neg_samples=Edge_index.shape[1], method='sparse')
        # 拼接正负样本索引

        Edge_label_index = Edge_index
        perm = torch.randperm(Edge_index.size(1))
        Edge_index = Edge_index[:, perm]
        Edge_index = Edge_index[:, :train_n]

        # neg_edge_index = neg_edge_index[:, :train_n]

        Edge_label_index = torch.cat(
            [Edge_label_index, neg_edge_index],
            dim=-1,
        )
        # 拼接正负样本
        Edge_label = torch.cat([
            Edge_label,
            Edge_label.new_zeros(neg_edge_index.size(1))
        ], dim=0)

        data = Data(x=X, edge_index=Edge_index, edge_label_index=Edge_label_index, edge_label=Edge_label)
        torch.save(data, self.processed_paths[0])
            # data_list.append(data)
        # data_, slices = self.collate(data_list)  # 将不同大小的图数据对齐,填充

        # torch.save((data_, slices), self.processed_paths[0])


if __name__ =="__main__":
    snr = 40
    for fs_i in range(100):
        fs = fs_i+1
        mat_file = h5py.File(base_dir + 'fs_'+str(fs)+'kHz_data.mat', 'r')
        # 获取数据集
        Signals = mat_file["Signals"][()]
        Tp = mat_file["Tp"][()]
        Tp_list = mat_file["Tp_list"][()]

        n = 3
        for i in range(n):
            signals = Signals[:, :, i]
            tp_list = np.array(mat_file[Tp_list[0, i]])
            root = "1k-100kHz data (pyg)/graph " + "fs-" + str(fs) + "kHz i-" + str(i)
            graph_data(root, signals = signals, tp_list = tp_list)
            print("")

print("...图数据生成完成...")

 生成数据:


优化方案

先对波形进行采样,再随机生成多个样本,这样很难测试出采样率对拓扑推理准确率的影响,因为噪声、拓扑、幅值等都是变化的。

更好的方法是先生成多个样本,再对这些样本进行降采样,这样就对样本进行了控制,噪声、拓扑、幅值不变,只让采样率变化,实现严格的控制变量。

数据生成程序优化:

MATLAB

%创建时间:2025年6月10日
%zhouzhichao
%遍历生成100Hz到1MHz的无线通信网络拓扑推理数据用于测试



close all
clear
snr = 10;
n = 30;
dataset_n = 10;
fs = 100e3;


nodes_P = ones(n,1);


Sampling_M = round(1e6/100e3);


%获取一帧信号及对应功率
[ack,~] = ack_generate(Sampling_M);
ack_L = length(ack);
signal = ack;
P_signal = sum(abs(signal).^2);
ack_noise = randn(ack_L, 1);
P_noise_1 = sum(ack_noise.^2);  % 计算当前噪声的能量
a_slot_N = round(7*ack_L);

ave_edge = round(1.5*n);
L = ave_edge*3*a_slot_N;

Signals = zeros(dataset_n,n,L);
Tp = zeros(dataset_n,n,n);
Tp_list = cell(dataset_n,1);

A = 10^(snr/10);
P_noise = P_signal/A;
alpha = sqrt(P_noise / P_noise_1);  % 计算缩放因子


for k=1:dataset_n
    c = 0;
    %待生成内容:tp,tp_list,signals
    [tp,tp_list] = large_nodes_tp(nodes_P,n);
    signals = zeros(n,L);
    
    n_edge = length(tp_list);

    
    
    for epoch=1:3
        index_list = randperm(n_edge);
        for i =index_list
            this_slot_start_point = c*a_slot_N+1;
            p = tp_list(i,1);
            q = tp_list(i,2);
            P1 = nodes_P(p);
            P2 = nodes_P(q);
            
            
            %三次响应
            for m=1:3
                r_P = 0.9 + (1.1 - 0.9) * rand;
                signal = r_P*P1*ack;
                signals(p,this_slot_start_point+(2*m-2)*ack_L:this_slot_start_point+(2*m-1)*ack_L-1) = signal;
                
                r_P = 0.9 + (1.1 - 0.9) * rand;
                signal = r_P*P2*ack;
                
                
                signals(q,this_slot_start_point+(2*m-1)*ack_L:this_slot_start_point+(2*m)*ack_L-1) = signal;
                
            end
            
            
            c = c + 1;
        end
    end
    
    for i=1:n
        signals(i,:) = signals(i,:) + alpha*randn(1, L);
    end
      
    
    Signals(k,:,:) = signals;
    Tp(k,:,:) = tp;
    Tp_list{k} = tp_list;
          
end
save("D:\无线通信网络认知\论文1\大修意见\Reviewer2-2 采样率实验\1k-100kHz data (mat)\fs_"+num2str(fs/1e3)+"kHz_"+num2str(snr)+"dB_data.mat","Tp","Tp_list","Signals")

Python

#作者:zhouzhichao
#创建时间:2025/5/30
#内容:生成残缺数据集用于实验

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
import os
import h5py
import matplotlib.pyplot as plt
import numpy as np
import torch
from torch_geometric.data import InMemoryDataset, Data
from torch_geometric.utils import negative_sampling

base_dir = "D:\无线通信网络认知\论文1\大修意见\Reviewer2-2 采样率实验\\1k-100kHz data (mat)\\"

N = 30
grapg_size = N
train_n = 20


class graph_data(InMemoryDataset):
    def __init__(self, root, signals=None, tp_list = None, transform=None, pre_transform=None):
        self.signals = signals
        self.tp_list = tp_list
        super().__init__(root, transform, pre_transform)
        self.data = torch.load(self.processed_paths[0])


    # 返回process方法所需的保存文件名。你之后保存的数据集名字和列表里的一致
    @property
    def processed_file_names(self):
        return ['gcn_data.pt']

    # 生成数据集所用的方法
    def process(self):
        signals = self.signals
        tp_list =self.tp_list


        #matlab的index是从1开始的,pytorch的index应当从0开始
        tp_list = tp_list - 1

        X = torch.tensor(signals, dtype=torch.float)

        # 所有的边
        Edge_index = torch.tensor(tp_list, dtype=torch.long)

        # 所有的边1标签
        edge_label = np.ones((tp_list.shape[1]))
        Edge_label = torch.tensor(edge_label, dtype=torch.float)

        neg_edge_index = negative_sampling(
            edge_index=Edge_index, num_nodes=grapg_size,
            num_neg_samples=Edge_index.shape[1], method='sparse')
        # 拼接正负样本索引

        Edge_label_index = Edge_index
        perm = torch.randperm(Edge_index.size(1))
        Edge_index = Edge_index[:, perm]
        Edge_index = Edge_index[:, :train_n]

        # neg_edge_index = neg_edge_index[:, :train_n]

        Edge_label_index = torch.cat(
            [Edge_label_index, neg_edge_index],
            dim=-1,
        )
        # 拼接正负样本
        Edge_label = torch.cat([
            Edge_label,
            Edge_label.new_zeros(neg_edge_index.size(1))
        ], dim=0)

        data = Data(x=X, edge_index=Edge_index, edge_label_index=Edge_label_index, edge_label=Edge_label)
        torch.save(data, self.processed_paths[0])
            # data_list.append(data)
        # data_, slices = self.collate(data_list)  # 将不同大小的图数据对齐,填充

        # torch.save((data_, slices), self.processed_paths[0])


if __name__ =="__main__":
    snr = 10
    fs = 100
    mat_file = h5py.File(base_dir + 'fs_'+str(fs)+'kHz_'+str(snr)+'dB_data.mat', 'r')
    # 获取数据集
    Signals = mat_file["Signals"][()]
    Tp = mat_file["Tp"][()]
    Tp_list = mat_file["Tp_list"][()]

    n = 10
    for i in range(n):
        signals = Signals[:, :, i]
        tp_list = np.array(mat_file[Tp_list[0, i]])
        root = "1k-100kHz data (pyg)/graph  " + "fs-" + str(fs) + "kHz snr-"+str(snr)+"db i-" + str(i)
        graph_data(root, signals = signals, tp_list = tp_list)
        print("")

print("...图数据生成完成...")


网站公告

今日签到

点亮在社区的每一天
去签到