在 PyTorch 中借助 GloVe 词嵌入完成情感分析
一. Glove 词嵌入原理
GloVe是一种学习词嵌入的方法,它希望拟合给定上下文单词i时单词j出现的次数。使用的误差函数为:
其中N是词汇表大小,是线性层参数,
是词嵌入。f(x)是权重项,用于平衡不同频率的单词对误差的影响,并消除log0时式子不成立情况。
GloVe作者提供了官方的预训练词嵌入(https://nlp.stanford.edu/projects/glove/ )。预训练的GloVe有好几个版本,按数据来源,可以分成:
- 维基百科+gigaword(6B)
- 爬虫(42B)
- 爬虫(840B)
- 推特(27B)
按照词嵌入向量的大小分,又可以分成50维,100维,200维等不同维度。
预训练GloVe的文件格式非常简明,一行代表一个单词向量,每行先是一个单词,再是若干个浮点数,表示该单词向量的每一个元素。
在Pytorch里,我们不必自己去下载解析GloVe,而是可以直接调用Pytorch库自动下载解析GloVe。首先我们要安装Pytorch的NLP库-- torchtext。
如上所述,GloVe版本可以由其数据来源和向量维数确定,在构建GloVe类时,要提供这两个参数,我们选择的是6B token,维度100的GloVe
调用glove.get_vecs_by_tokens
,我们能够把token转换成GloVe里的向量。
import torch
from torchtext.vocab import GloVe
glove = GloVe(name='6B', dim=100)
# Get vectors
tensor = glove.get_vecs_by_tokens(['', '1998', '199999998', ',', 'cat'], True)
print(tensor)
PyTorch提供的这个函数非常方便。如果token不在GloVe里的话,该函数会返回一个全0向量。如果你运行上面的代码,可以观察到一些有趣的事:空字符串和199999998这样的不常见数字不在词汇表里,而1998这种常见的数字以及标点符号都在词汇表里。
GloVe
类内部维护了一个矩阵,即每个单词向量的数组。因此,GloVe
需要一个映射表来把单词映射成向量数组的下标。glove.itos
和glove.stoi
完成了下标与单词字符串的相互映射。比如用下面的代码,我们可以知道词汇表的大小,并访问词汇表的前几个单词:
myvocab = glove.itos
print(len(myvocab))
print(myvocab[0], myvocab[1], myvocab[2], myvocab[3])
最后,我们来通过一个实际的例子认识一下词嵌入的意义。词嵌入就是向量,向量的关系常常与语义关系对应。利用词嵌入的相对关系,我们能够回答“x1之于y1,相当于x2之于谁?”这种问题。比如,男人之于女人,相当于国王之于王后。设我们要找的向量为y2,我们想让x1-y1=x2-y2,即找出一个和x2-(x1-y1)最相近的向量y2出来。这一过程可以用如下的代码描述:
def get_counterpart(x1, y1, x2):x1_id = glove.stoi[x1]y1_id = glove.stoi[y1]x2_id = glove.stoi[x2]#print("x1:",x1,"y1:",y1,"x2:",x2)x1, y1, x2 = glove.get_vecs_by_tokens([x1, y1, x2],True)#print("x1:",x1,"y1:",y1,"x2:",x2)target = x2 - x1 + y1max_sim =0 max_id = -1for i in range(len(myvocab)):vector = glove.get_vecs_by_tokens([myvocab[i]],True)[0]cossim = torch.dot(target, vector)if cossim > max_sim and i not in {x1_id, y1_id, x2_id}:max_sim = cossimmax_id = ireturn myvocab[max_id]
print(get_counterpart('man', 'woman', 'king'))
print(get_counterpart('more', 'less', 'long'))
print(get_counterpart('apple', 'red', 'banana'))
运行结果:
queen
short
yellow
二.基于GloVe的情感分析
情感分析任务与数据集
和猫狗分类类似,情感分析任务是一种比较简单的二分类NLP任务:给定一段话,输出这段话的情感是积极的还是消极的。
比如下面这段话:
I went and saw this movie last night after being coaxed to by a few friends of mine. I'll admit that I was reluctant to see it because from what I knew of Ashton Kutcher he was only able to do comedy. I was wrong. Kutcher played the character of Jake Fischer very well, and Kevin Costner played Ben Randall with such professionalism. ......
这是一段影评,大意说,这个观众本来不太想去看电影,因为他认为演员Kutcher只能演好喜剧。但是,看完后,他发现他错了,所有演员都演得非常好。这是一段积极的评论。
1. 读取数据集:
import os
from torchtext.data import get_tokenizerdef read_imdb(dir='aclImdb', split = 'pos', is_train=True):subdir = 'train' if is_train else 'test'dir = os.path.join(dir, subdir, split)lines = []for file in os.listdir(dir):with open(os.path.join(dir, file), 'rb') as f:line = f.read().decode('utf-8')lines.append(line)return lineslines = read_imdb()
print('Length of the file:', len(lines))
print('lines[0]:', lines[0])
tokenizer = get_tokenizer('basic_english')
tokens = tokenizer(lines[0])
print('lines[0] tokens:', tokens)
output:
2.获取经GloVe预处理的数据
在这个作业里,模型其实很简单,输入序列经过词嵌入,送入单层RNN,之后输出结果。作业最难的是如何把token转换成GloVe词嵌入。
torchtext其实还提供了一些更方便的NLP工具类(Field,Vectors),用于管理向量。但是,这些工具需要一定的学习成本,后续学习pytorch时再学习。
Pytorch通常用nn.Embedding来表示词嵌入层。nn.Embedding其实就是一个矩阵,每一行都是一个词嵌入,每一个token都是整型索引,表示该token再词汇表里的序号。有了索引,有了矩阵就可以得到token的词嵌入了。但是有些token在词汇表中并不存在,我们得对输入做处理,把词汇表里没有的token转换成<unk>这个表示未知字符的特殊token。同时为了对齐序列的长度,我们还得添加<pad>这个特殊字符。而用glove直接生成的nn.Embedding里没有<unk>和<pad>字符。如果使用nn.Embedding的话,我们要编写非常复杂的预处理逻辑。
为此,我们可以用GloVe类的get_vecs_by_tokens直接获取token的词嵌入,以代替nn.Embedding。回忆一下前文提到的get_vecs_by_tokens的使用结果,所有没有出现的token都会被转换成零向量。这样,我们就不必操心数据预处理的事了。get_vecs_by_tokens应该发生在数据读取之后,可以直接被写在Dataset的读取逻辑里
from torch.utils.data import DataLoader, Dataset
from torchtext.data import get_tokenizer
from torchtext.vocab import GloVeclass IMDBDataset(Dataset):def __init__(self, is_train=True, dir = 'aclImdb'):super().__init__()self.tokenizer = get_tokenizer('basic_english')pos_lines = read_imdb(dir, 'pos', is_train)neg_lines = read_imdb(dir, 'neg', is_train)self.pos_length = len(pos_lines)self.neg_length = len(neg_lines)self.lines = pos_lines+neg_linesdef __len__(self):return self.pos_length + self.neg_lengthdef __getitem__(self, index):sentence = self.tokenizer(self.lines[index])x = glove.get_vecs_by_tokens(sentence)label = 1 if index < self.pos_length else 0return x, label
数据预处理的逻辑都在__getitem__
里。每一段字符串会先被token化,之后由GLOVE.get_vecs_by_tokens
得到词嵌入数组。
3.对齐输入
使用一个batch的序列数据时常常会碰到序列不等长的问题。实际上利用Pytorch Dataloader的collate_fn机制有更简洁的实现方法。
from torch.nn.utils.rnn import pad_sequencedef get_dataloader(dir='aclImdb'):def collate_fn(batch):x, y = zip(*batch)x_pad = pad_sequence(x, batch_first=True)y = torch.Tensor(y)return x_pad, ytrain_dataloader = DataLoader(IMDBDataset(True, dir),batch_size=32,shuffle=True,collate_fn=collate_fn)test_dataloader = DataLoader(IMDBDataset(False, dir),batch_size=32,shuffle=True,collate_fn=collate_fn)return train_dataloader, test_dataloader
PyTorch DataLoader在获取Dataset的一个batch的数据时,实际上会先吊用Dataset.__getitem__获取若干个样本,再把所有样本拼接成一个batch,比如用__getitem__获取四个[4,3,10,10]这一个batch,可是序列数据通常长度不等,__getitem__
可能会获得[10, 100]
, [15, 100]
这样不等长的词嵌入数组。
为了解决这个问题,我们要手动编写把所有张量拼成一个batch的函数。这个函数就是DataLoader
的collate_fn
函数。我们的collate_fn
应该这样编写:
def collate_fn(batch):x, y = zip(*batch)x_pad = pad_sequence(x, batch_first=True)y = torch.Tensor(y)return x_pad, y
collate_fn
的输入batch
是每次__getitem__
的结果的数组。比如在我们这个项目中,第一次获取了一个长度为10的积极的句子,__getitem__
返回(Tensor[10, 100], 1)
;第二次获取了一个长度为15的消极的句子,__getitem__
返回(Tensor[15, 100], 0)
。那么,输入batch
的内容就是:
[(Tensor[10, 100], 1), (Tensor[15, 100], 0)]
我们可以用x, y = zip(*batch)
把它巧妙地转换成两个元组:
x = (Tensor[10, 100], Tensor[15, 100])
y = (1, 0)
之后,PyTorch的pad_sequence
可以把不等长序列的数组按最大长度填充成一整个batch张量。也就是说,经过这个函数后,x_pad
变成了:
x_pad = Tensor[2, 15, 100]
pad_sequence
的batch_first
决定了batch
是否在第一维。如果它为False
,则结果张量的形状是[15, 2, 100]
。
pad_sequence
还可以决定填充内容,默认填充0。在我们这个项目中,被填充的序列已经是词嵌入了,直接用全零向量表示<pad>
没问题。
有了collate_fn
,构建DataLoader
就很轻松了:
DataLoader(IMDBDataset(True, dir),batch_size=32,shuffle=True,collate_fn=collate_fn)
注意,使用shuffle=True
可以令DataLoader
随机取数据构成batch。由于我们的Dataset
十分工整,前一半的标签是1,后一半是0,必须得用随机的方式去取数据以提高训练效率。
4.模型
import torch.nn as nn
GLOVE_DIM = 100
GLOVE = GloVe(name = '6B', dim=GLOVE_DIM)
class RNN(torch.nn.Module):def __init__(self, hidden_units=64, dropout_rate = 0.5):super().__init__()self.drop = nn.Dropout(dropout_rate)self.rnn = nn.GRU(GLOVE_DIM, hidden_units, 1, batch_first=True)self.linear = nn.Linear(hidden_units,1)self.sigmoid = nn.Sigmoid()def forward(self, x:torch.Tensor):# x: [batch, max_word_length, embedding_length]emb = self.drop(x)output,_ = self.rnn(emb)output = output[:, -1]output = self.linear(output)output = self.sigmoid(output)return output
这里要注意一下,PyTorch的RNN会返回整个序列的输出。而在预测分类概率时,我们只需要用到最后一轮RNN计算的输出。因此,要用output[:, -1]
取最后一次的输出。
5. 训练、测试、推理
train_dataloader, test_dataloader = get_dataloader()
model = RNN()optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
citerion = torch.nn.BCELoss()for epoch in range(100):loss_sum = 0dataset_len = len(train_dataloader.dataset)for x, y in train_dataloader:batchsize = y.shape[0]hat_y = model(x)hat_y = hat_y.squeeze(-1)loss = citerion(hat_y, y)optimizer.zero_grad()loss.backward()torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)optimizer.step()loss_sum += loss * batchsizeprint(f'Epoch{epoch}. loss :{loss_sum/dataset_len}')torch.save(model.state_dict(),'rnn.pth')
output:
model.load_state_dict(torch.load('rnn.pth'))
accuracy = 0
dataset_len = len(test_dataloader.dataset)
model.eval()
for x, y in test_dataloader:with torch.no_grad():hat_y = model(x)hat_y.squeeze_(1)predictions = torch.where(hat_y>0.5,1,0)score = torch.sum(torch.where(predictions==y,1,0))accuracy += score.item()
accuracy /= dataset_lenprint(f'Accuracy:{accuracy}')
Accuracy:0.90516
tokenizer = get_tokenizer('basic_english')
article = "U.S. stock indexes fell Tuesday, driven by expectations for tighter Federal Reserve policy and an energy crisis in Europe. Stocks around the globe have come under pressure in recent weeks as worries about tighter monetary policy in the U.S. and a darkening economic outlook in Europe have led investors to sell riskier assets."x = GLOVE.get_vecs_by_tokens(tokenizer(article)).unsqueeze(0)
with torch.no_grad():hat_y = model(x)
hat_y = hat_y.squeeze_().item()
result = 'positive' if hat_y > 0.5 else 'negative'
print(result)
negative