Transformer推理拓扑关系
主程序:
import argparse
import torch
import h5py
import numpy as np
from transformer.Models import Transformer, MLP
import os
import torch.nn as nn
from torch.nn import functional as F
from tensorboardX import SummaryWriterN = 30
snr = 40
learning_rate = 0.003
M = 3000
epochs = 10000def graph_normalize(signals):for i in range(signals.shape[0]):signals[i, :] = signals[i, :]/np.max(np.abs(signals[i, :]))def generate_dataset():dataset_n = 100base_dir = "D:\无线通信网络认知\论文1\大修意见\Reviewer2-7 多种深度学习方法对比实验\\test data 30 (mat)\\"mat_file = h5py.File(base_dir + '30_nodes_dataset.mat', 'r')# 获取数据集signals = mat_file["Signals"][()]tp = mat_file["Tp"][()]tp_list = mat_file["Tp_list"][()]Signals = np.swapaxes(signals, 2, 0)Tp = np.swapaxes(tp, 2, 0)# tp_list = tp_list - 1# 关闭文件mat_file.close()#把每张图的数据和标签都装进去x_list = []y_list = []for n in range(0,dataset_n,1):# print("n: ",n)signals = Signals[n,:,:]graph_normalize(signals)L = 2 * signals.shape[1] # 待分析的2个信号拼在一起tp = Tp[n,:,:]# x 的形状为 (100000, 3600)x = np.zeros((N * N, L))# y 的形状为 (100000, 1)y = np.zeros((N * N, 1))# 生成 x 和 yindex = 0#处理正样本for i in range(N):for j in range(N):if i!=j and tp[i, j]==1:combined_signal = np.concatenate((signals[i, :], signals[j, :]))x[index, :] = combined_signaly[index, 0] = 1index += 1#算负样本总共有多少n_pair_list = []for i in range(N):for j in range(N):if i!=j and tp[i, j] == 0:n_pair_list.append((i,j))np.random.seed(42)indices = np.arange(len(n_pair_list))np.random.shuffle(indices)n_pair_list = np.array(n_pair_list)n_pair_list = n_pair_list[indices]#根据正样本数取负样本n_pair = n_pair_list[:index,:]for k in range(n_pair.shape[0]):i = n_pair[k, 0]j = n_pair[k, 1]combined_signal = np.concatenate((signals[i, :], signals[j, :]))x[index, :] = combined_signaly[index, 0] = 0index += 1x = x[:index,:]y = y[:index, :]# x = np.expand_dims(x, axis=-1)# plt.plot(np.linspace(0, 1, 2520), np.hstack((x[1, :1260], x[2, 1260:])))# c = 70# plot_sig = np.vstack((x[c, :1260], x[c, 1260:]))# print("y: ",1-y[c,0])# plt.plot(np.linspace(0, 1, 1260), plot_sig.T)x_list.append(x)y_list.append(y)x = np.vstack(x_list)y = np.vstack(y_list)return x, ydef cal_performance(tra_pred,tra_true):return F.mse_loss(tra_pred,tra_true)def train(model, data, label, optimizer):best_acc = 0count = 0# model = torch.load(model_dir + 'model.pt')for epoch in range(epochs):# if epoch!=0:count = count + 1if count>50:breakoptimizer.zero_grad() # 清零优化器梯度,梯度不清零会一直存在# score = score.to(device)correct_count = 0# pred = model(before_track_data.get(p).to(device), after_track_data.get(q).to(device))pre = model(data)loss = loss_function(pre, label) # 计算一次损失# loss = loss_function(pre_1.float(), data_1.y.float())# loss = loss_function(pre_1, data_1.y)# loss反向传播就行,这里没有acc监视器loss.backward()# print(" ")# 用反向传播得到的梯度进行参数更新optimizer.step()# 计算准确率with torch.no_grad():# 输出是概率,转换成0/1预测值pred_class = (pre >= 0.5).float()correct = (pred_class == label).sum().item()total = label.size(0)accuracy = correct / totalif accuracy > best_acc:best_acc = accuracytorch.save(model, model_dir + 'epoch '+ str(round(epoch)) +' accuracy '+str(round(accuracy,2))+'.pt')count = 0print("epoch: ", epoch, " loss: ", loss.item(), "accuracy: ", accuracy)def test(data, label):model = torch.load(model_dir + 'model.pt')pre = model(data)loss = loss_function(pre, label)print("test || loss: ", loss.item())if __name__ == '__main__':x, y = generate_dataset()indices = np.arange(y.shape[0])np.random.shuffle(indices)x = x[indices]y = y[indices]x_down = x[:, ::2]a = round(0.8 * len(indices))# b = 12000x_train = x[:a, :]y_train = y[:a, :]x_val = x[a:, :]y_val = y[a:, :]x_train = torch.tensor(x_train).float()y_train = torch.tensor(y_train).float()x_val = torch.tensor(x_val).float()y_val = torch.tensor(y_val).float()device = "cuda:0"x_train = x_train.to(device)y_train = y_train.to(device)x_val = x_val.to(device)y_val = y_val.to(device)log_writer = SummaryWriter()# os.environ['CUDA_LAUNCH_BLOCKING'] = '1'model_dir = "D:\english\wcna\\reviewer2-7\\transformer\\1\\"loss_function = nn.BCELoss() # 损失函数parser = argparse.ArgumentParser()parser.add_argument('-epoch', type=int, default=epochs)parser.add_argument('-b', '--batch_size', type=int, default=40)parser.add_argument('-d_model', type=int, default=40)parser.add_argument('-d_inner_hid', type=int, default=1024)parser.add_argument('-d_k', type=int, default=64)parser.add_argument('-d_v', type=int, default=64)parser.add_argument('-warmup', '--n_warmup_steps', type=int, default=2000)parser.add_argument('-lr_mul', type=float, default=2.0)parser.add_argument('-lr', type=float, default=0.001)parser.add_argument('-n_head', type=int, default=2)parser.add_argument('-n_layers', type=int, default=1)parser.add_argument('-dropout', type=float, default=0.1)parser.add_argument('-do_train', type=bool, default=True)parser.add_argument('-do_retrain', type=bool, default=False)parser.add_argument('-do_eval', type=bool, default=False)parser.add_argument('-use_mlp', type=bool, default=False)opt = parser.parse_args()opt.d_word_vec = opt.d_model# device = "gpu:0"# device="cpu"transformer = Transformer(2000,2000,d_k=opt.d_k,d_v=opt.d_v,d_model=opt.d_model,d_word_vec=opt.d_word_vec,d_inner=opt.d_inner_hid,n_layers=opt.n_layers,n_head=opt.n_head,dropout=opt.dropout,n_position = 250).to(device)mlp = MLP(10,10,25,50,use_extra_input=False).to(device)model_train = transformerif opt.use_mlp:model_train = mlpif opt.do_train == True:parameters = mlp.parameters() if opt.use_mlp else transformer.parameters()# optimizer = ScheduledOptim(# optim.Adam(parameters, betas=(0.9, 0.98), eps=1e-09),# opt.lr, opt.d_model, opt.n_warmup_steps, opt.use_mlp)lr = 0.001optimizer = torch.optim.Adam(model_train.parameters(), lr=lr)if opt.do_retrain == True: # only used for transformercheckpoint = torch.load("./checkpoint/ckpt.pth")transformer.load_state_dict(checkpoint['net'])optimizer.load_state_dict(checkpoint['optimizer'])train(model=model_train,data=x_train,label=y_train,optimizer=optimizer)if opt.do_eval == True:test(data=x_val,label=y_val)# model = torch.load(model_dir + 'model.pt')
经验分享
调试了ResNet,MobileNet和Transformer进行无线通信网络拓扑推理后最大的感悟就是:调试模型一定要懂得消融实验。消融实验不光能证明自己模型相比于其他模型的增益,更能在模型准确率一直在0.5附近波动,死活学不出来时,救自己一命
一开始死活调不出transformer的拓扑推理模型,准确率一直在0.5附近波动。仅保留fpn和fdn:
def forward(self,input_data):src_seq=self.fpn(input_data).squeeze(-1)pooled = torch.mean(src_seq, dim=1) # 全局平均池化 → [batch, d_model]out = self.fdn(pooled) # 分类器 → [batch, 1]return torch.sigmoid(out)
模型可以正常训练
fpn里是cnn,fdn里是fn全连接层。所以,找到问题在transformer的编码器上:
def forward(self, src_seq,return_attns=False):enc_slf_attn_list = []# -- Forwardenc_output = self.src_word_emb(src_seq.long())if self.scale_emb:enc_output *= self.d_model ** 0.5enc_output = self.dropout(self.position_enc(enc_output))enc_output = self.layer_norm(enc_output)for enc_layer in self.layer_stack:enc_output, enc_slf_attn = enc_layer(enc_output)enc_slf_attn_list += [enc_slf_attn] if return_attns else []if return_attns:return enc_output, enc_slf_attn_listreturn enc_output