第N8周:使用Word2vec实现文本分类
- 🍨 本文为🔗365天深度学习训练营中的学习记录博客
- 🍖 原作者:K同学啊
一.数据预处理
1.加载数据
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms,datasets
import os,PIL,pathlib,warningswarnings.filterwarnings('ignore')
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device
import pandas as pd
train_data=pd.read_csv('./train (1).csv',sep='\t',header=None)
train_data.head()
def coustom_data_iter(texts,labels):
for x,y in zip(texts,labels):
yield x,yx=train_data[0].values[:]
#多类标签的one-hot展开
y=train_data[1].values[:]
2.构建词典
from gensim.models.word2vec import Word2Vec
import numpy as npw2v=Word2Vec(vector_size=100,
min_count=3)w2v.build_vocab(x)
w2v.train(x,
total_examples=w2v.corpus_count,
epochs=20)
w2v=Word2Vec(x,vector_size=100,min_count=3,epochs=20)
# 将文本转化为向量
def average_vec(text):
word_vectors = []
for word in text:
try:
# 将每个词的向量添加到列表中
word_vectors.append(w2v.wv[word])
except KeyError:
continue
# 如果没有找到任何词,返回零向量
if len(word_vectors) == 0:
return np.zeros(w2v.vector_size).reshape((1, w2v.vector_size))
# 计算所有词向量的平均值
avg_vector = np.mean(word_vectors, axis=0).reshape((1, w2v.vector_size))
return avg_vector# 将词向量保存为Ndarray
x_vec = np.concatenate([average_vec(z) for z in x])# 保存Word2Vec模型及向量
w2v.save('./improved_w2v_model.pkl')
from torch.utils.data import Dataset
class TextClassificationDataset(Dataset):
def __init__(self, texts, labels, word2vec_model, label_names):
self.texts = texts
self.labels = labels
self.word2vec_model = word2vec_model
self.label_names = label_names
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
label = self.labels[idx]
text_vector = average_vec(text)
label_idx = self.label_names.index(label)
return text_vector.squeeze(0), label_idx
train_iter=coustom_data_iter(x_vec,y)
len(x),len(x_vec)
label_name=list(set(train_data[1].values[:]))
print(label_name)
3.生成数据批次和迭代器
text_pipeline=lambda x:average_vec(x)
label_pipeline=lambda x:label_name.index(x)
text_pipeline('你在干嘛')
label_pipeline('Travel-Query')
from torch.utils.data import DataLoader
def collate_batch(batch):
label_list,text_list=[],[]for (_text,_label) in batch:
#标签列表
label_list.append(label_pipeline(_label))#文本列表
processed_text=torch.tensor(text_pipeline(_text),dtype=torch.float32)
text_list.append(processed_text)label_list=torch.tensor(label_list,dtype=torch.int64)
text_list=torch.cat(text_list)return text_list.to(device),label_list.to(device)
#数据加载器
dataloader=DataLoader(train_iter,
batch_size=8,
shuffle=False,
collate_fn=collate_batch)
二.模型构建
1.搭建模型
from torch.utils.data import DataLoader
from torch.utils.data import DataLoader
def collate_batch(batch):
label_list, text_list = [], []for (text, label) in batch:
label_list.append(label)
text_list.append(torch.tensor(text, dtype=torch.float32))label_list = torch.tensor(label_list, dtype=torch.int64)
text_list = torch.stack(text_list)return text_list.to(device), label_list.to(device)
#数据加载器
dataloader=DataLoader(train_iter,
batch_size=8,
shuffle=False,
collate_fn=collate_batch)
2.初始化模型
vocab_size=100000
em_size=12
num_class = len(label_name)
INPUT_DIM = w2v.vector_size
HIDDEN_DIM = 256 # 中间隐藏层大小model = ImprovedTextClassificationModel(
input_dim=INPUT_DIM,
hidden_dim=HIDDEN_DIM,
output_dim=num_class,
dropout_rate=0.3
).to(device)
3.定义训练与评估函数
import time
def train(dataloader):
model.train()
total_acc,train_loss,total_count=0,0,0
log_interval=50
start_time=time.time()for idx,(text,label) in enumerate(dataloader):
predicted_label=model(text)optimizer.zero_grad()
loss=criterion(predicted_label,label)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(),0.1)
optimizer.step()total_acc+=(predicted_label.argmax(1)==label).sum().item()
train_loss+=loss.item()
total_count+=label.size(0)if idx % log_interval == 0 and idx > 0:
elapsed=time.time()
print('| epoch {:1d} | {:4d}/{:4d} batches | train_acc {:4.3f} train_loss {:4.5f}'.format(epoch,idx,len(dataloader),total_acc/total_count,train_loss/total_count))
total_acc,train_loss,total_count=0,0,0
start_time=time.time()
def evaluate(dataloader):
model.eval()
total_acc,train_loss,total_count=0,0,0with torch.no_grad():
for idx , (text,label) in enumerate(dataloader):
predicted_label=model(text)loss=criterion(predicted_label,label)
#记录测试数据
total_acc+=(predicted_label.argmax(1)==label).sum().item()
train_loss+=loss.item()
total_count+=label.size(0)return total_acc/total_count,train_loss/total_count
三.训练模型
1.拆分数据集并运行模型
# 超参数
EPOCHS = 20
BATCH_SIZE = 32
best_model_path = 'best_text_classification_model.pt'
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='max', factor=0.5, patience=2, verbose=True
)criterion = torch.nn.CrossEntropyLoss()
best_valid_acc = 0
text_dataset = TextClassificationDataset(
texts=train_data[0].values[:],
labels=train_data[1].values[:],
word2vec_model=w2v,
label_names=label_name
)
train_size = int(0.7 * len(text_dataset))
valid_size = int(0.15 * len(text_dataset))
test_size = len(text_dataset) - train_size - valid_sizetrain_dataset, valid_dataset, test_dataset = random_split(
text_dataset, [train_size, valid_size, test_size]
)# 数据加载器
train_dataloader = DataLoader(
train_dataset,
batch_size=BATCH_SIZE,
shuffle=True,
collate_fn=collate_batch
)valid_dataloader = DataLoader(
valid_dataset,
batch_size=BATCH_SIZE,
shuffle=False,
collate_fn=collate_batch
)test_dataloader = DataLoader(
test_dataset,
batch_size=BATCH_SIZE,
shuffle=False,
collate_fn=collate_batch
)# 训练循环
for epoch in range(1, EPOCHS + 1):
epoch_start_time = time.time()
# 训练
train(train_dataloader)
# 验证
val_acc, val_loss = evaluate(valid_dataloader)
# 更新学习率
scheduler.step(val_acc)
# 获取当前学习率
lr = optimizer.param_groups[0]['lr']
# 保存最好的模型
if val_acc > best_valid_acc:
best_valid_acc = val_acc
torch.save(model.state_dict(), best_model_path)
print(f"最佳模型保存 验证准确率: {val_acc:.4f}")
print('-' * 69)
print('| epoch {:2d} | time: {:4.2f}s | valid_acc {:4.3f} valid_loss {:4.3f} | lr {:4.6f}'.format(
epoch, time.time() - epoch_start_time, val_acc, val_loss, lr))
print('-' * 69)
# 加载最佳模型
model.load_state_dict(torch.load(best_model_path))# 在测试集上评估
test_acc, test_loss = evaluate(test_dataloader)
print(f'\n测试集表现:')
print(f'测试集准确率: {test_acc:.4f}, 测试集损失: {test_loss:.4f}')# 在验证集上评估
valid_acc, valid_loss = evaluate(valid_dataloader)
print(f'验证集准确率: {valid_acc:.4f}, 验证集损失: {valid_loss:.4f}')
2.测试指定数据
def predict(text, text_pipeline):
text = torch.tensor(text_pipeline(text), dtype=torch.float32).squeeze(0).to(device) # 添加 .to(device)
print(text.shape)
output = model(text)
return output.argmax(0).item()
ex_text_str='还有双鸭山到淮阴的汽车票吗13号的'device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model=model.to(device)
print('该文本的类别是: %s' %label_name[predict(ex_text_str,text_pipeline)])