【NLP 37、实践 ⑨ NER 命名实体识别任务 LSTM + CRF 实现】
—— 25.3.13
一、配置文件 config.py
1.模型与数据路径
参数名称 | 路径值 | 作用 |
---|---|---|
"model_path" | "model_output" | 模型训练完成后保存的位置。例如:保存最终的模型权重文件。 |
"schema_path" | "ner_data/schema.json" | 数据结构定义文件,通常用于描述数据的格式(如字段名、标签类型)。 在NER任务中,可能定义实体类别(如 {"PERSON": "人名", "ORG": "组织"} )。 |
"train_data_path" | "ner_data/train" | 训练数据集路径,通常为标注好的文本文件(如 train.txt 或 JSON 格式)。 |
"valid_data_path" | "ner_data/test" | 验证数据集路径,用于模型训练时的性能评估和超参数调优。 |
"vocab_path" | "chars.txt" | 字符词汇表文件,记录模型中使用的字符集(如中文字符、字母、数字等)。 |
2.模型架构
参数名称 | 值 | 作用 |
---|---|---|
"max_length" | 100 | 输入文本的最大序列长度。超过此长度的文本会被截断或填充(如用 [PAD] )。 |
"hidden_size" | 256 | 模型隐藏层神经元的数量,影响模型容量和计算复杂度。 |
"num_layers" | 2 | 模型的堆叠层数(如LSTM、Transformer的编码器/解码器层数)。 |
"class_num" | 9 | 任务类别总数。例如:NER任务中可能有9种实体类型。 |
3.训练配置
参数名称 | 值 | 作用 |
---|---|---|
"epoch" | 20 | 训练轮数。每轮遍历整个训练数据集一次。 |
"batch_size" | 16 | 每次梯度更新所使用的样本数量。较小的批次可能更适合内存受限的环境。 |
"optimizer" | "adam" | 优化器类型,用于调整模型参数。Adam是常用优化器,结合动量梯度下降。 |
"learning_rate" | 1e-3 | 学习率,控制参数更新的步长。值过小可能导致训练缓慢,过大易过拟合。 |
"use_crf" | True | 是否启用条件随机场(CRF)层。在序列标注任务(如NER)中,CRF可捕捉标签间的依赖关系,提升准确性。 |
4.预训练模型
参数名称 | 值 | 作用 |
---|---|---|
"bert_path" | r"F:\人工智能NLP\\NLP资料\week6 语言模型\bert-base-chinese" | 预训练BERT模型的路径。BERT是一种强大的预训练语言模型,此处可能用于微调或特征提取。 |
# -*- coding: utf-8 -*-
"""
配置参数信息
"""
Config = {
"model_path": "model_output",
"schema_path": "ner_data/schema.json",
"train_data_path": "ner_data/train",
"valid_data_path": "ner_data/test",
"vocab_path":"chars.txt",
"max_length": 100,
"hidden_size": 256,
"num_layers": 2,
"epoch": 20,
"batch_size": 16,
"optimizer": "adam",
"learning_rate": 1e-3,
"use_crf": True,
"class_num": 9,
"bert_path": r"F:\人工智能NLP\\NLP资料\week6 语言模型\bert-base-chinese"
}
二、数据加载 loader.py
1.初始化数据加载类
data_path:数据文件存储路径
config:包含训练 / 数据配置的字典
self.config:保存包含训练 / 数据配置的字典
self.path:保存数据文件存储路径
self.vocab:加载字表 / 词表文件存储路径
self.sentences:初始化句子列表
self.schema:加载实体标签与索引的映射关系表
self.load:调用 load()
方法从 data_path
加载原始数据,进行分词、编码、填充/截断等预处理。
def __init__(self, data_path, config):
self.config = config
self.path = data_path
self.vocab = load_vocab(config["vocab_path"])
self.config["vocab_size"] = len(self.vocab)
self.sentences = []
self.schema = self.load_schema(config["schema_path"])
self.load()
2.加载数据并预处理
- 文件读取与分段:按段落分割原始数据。
- 逐行解析:提取字符和标签。
- 编码转换:将字符转换为词汇表索引序列。
- 序列标准化:调整序列长度至模型要求。
- 数据存储:保存为张量列表,供训练使用。
self.data:列表,存储预处理后的数据样本,每个样本由输入张量和标签张量组成
sentenece:保存原始文本句子的拼接结果,便于后续可视化或调试。
open():打开文件并返回文件对象,支持读/写/追加等模式。
参数名 | 类型 | 说明 |
---|---|---|
file | 字符串 | 文件路径(绝对/相对路径) |
mode | 字符串 | 打开模式(如 r -只读、w -写入、a -追加) |
encoding | 字符串 | 文件编码(如 utf-8 ,文本模式需指定) |
errors | 字符串 | 编码错误处理方式(如 ignore 、replace ) |
文件对象.read():读取文件内容,返回字符串或字节流
参数名 | 类型 | 说明 |
---|---|---|
size | 整数 | 可选,指定读取的字节数(默认读取全部内容) |
split():按分隔符分割字符串,返回子字符串列表
参数名 | 类型 | 说明 |
---|---|---|
delimiter | 字符串 | 分隔符(默认空格) |
maxsplit | 整数 | 可选,最大分割次数(默认-1表示全部) |
strip():去除字符串首尾指定字符(默认空白字符)
参数名 | 类型 | 说明 |
---|---|---|
chars | 字符串 | 可选,指定需去除的字符集合 |
join():用分隔符连接可迭代对象的元素,返回新字符串
参数名 | 类型 | 说明 |
---|---|---|
iterable | 可迭代对象 | 需连接的元素集合(如列表、元组) |
sep | 字符串 | 分隔符(默认空字符串) |
列表.append():在列表末尾添加元素
参数名 | 类型 | 说明 |
---|---|---|
obj | 任意类型 | 要添加的元素 |
def load(self):
self.data = []
with open(self.path, encoding="utf8") as f:
segments = f.read().split("\n\n")
for segment in segments:
sentenece = []
labels = []
for line in segment.split("\n"):
if line.strip() == "":
continue
char, label = line.split()
sentenece.append(char)
labels.append(self.schema[label])
self.sentences.append("".join(sentenece))
input_ids = self.encode_sentence(sentenece)
labels = self.padding(labels, -1)
self.data.append([torch.LongTensor(input_ids), torch.LongTensor(labels)])
return
3.加载字 / 词表
load_vocab 函数用于从指定路径加载词汇表文件,并将每个词汇项映射到一个从 1 开始的唯一整数索引(索引 0 保留给 Padding 占位符)
token_dict:字典,存储词汇到索引的映射
open():打开文件并返回文件对象,用于读写文件内容
参数名 | 类型 | 默认值 | 说明 |
---|---|---|---|
file_name | str | 无 | 文件路径(需包含扩展名) |
mode | str | 'r' | 文件打开模式: - 'r' : 只读- 'w' : 只写(覆盖原文件)- 'a' : 追加写入- 'b' : 二进制模式- 'x' : 创建新文件(若存在则报错) |
buffering | int | None | 缓冲区大小(仅二进制模式有效) |
encoding | str | None | 文件编码(仅文本模式有效,如 'utf-8' ) |
newline | str | '\n' | 行结束符(仅文本模式有效) |
closefd | bool | True | 是否在文件关闭时自动关闭文件描述符 |
dir_fd | int | -1 | 文件描述符(高级用法,通常忽略) |
flags | int | 0 | Linux 系统下的额外标志位 |
mode | str | 无 | (重复参数,实际使用中只需指定 mode ) |
enumerate():遍历可迭代对象时,同时返回元素的索引和值。
参数名 | 类型 | 默认值 | 说明 |
---|---|---|---|
iterable | 可迭代对象 | 无 | 需要遍历的对象(如列表、元组、字符串等) |
start | int | 0 | 索引的起始值(可自定义,如从 1 开始) |
strip():移除字符串开头和结尾的空白字符或指定字符
参数名 | 类型 | 默认值 | 说明 |
---|---|---|---|
chars | str | None | 需要移除的字符集合(默认为空格、换行、制表符 \t 、换页符 \f 、回车 \r ) |
#加载字表或词表
def load_vocab(vocab_path):
token_dict = {}
with open(vocab_path, encoding="utf8") as f:
for index, line in enumerate(f):
token = line.strip()
token_dict[token] = index + 1 #0留给padding位置,所以从1开始
return token_dict
4.加载映射关系表
加载位于指定路径的 JSON 格式的模式文件,并将其内容解析为 Python 对象以便在数据生成过程中使用。
open():打开文件并返回文件对象,用于读写文件内容。
参数名 | 类型 | 默认值 | 说明 |
---|---|---|---|
file_name | str | 无 | 文件路径(需包含扩展名) |
mode | str | 'r' | 文件打开模式: - 'r' : 只读- 'w' : 只写(覆盖原文件)- 'a' : 追加写入- 'b' : 二进制模式- 'x' : 创建新文件(若存在则报错) |
buffering | int | None | 缓冲区大小(仅二进制模式有效) |
encoding | str | None | 文件编码(仅文本模式有效,如 'utf-8' ) |
newline | str | '\n' | 行结束符(仅文本模式有效) |
closefd | bool | True | 是否在文件关闭时自动关闭文件描述符 |
dir_fd | int | -1 | 文件描述符(高级用法,通常忽略) |
flags | int | 0 | Linux 系统下的额外标志位 |
mode | str | 无 | (重复参数,实际使用中只需指定 mode ) |
json.load():从已打开的 JSON 文件对象中加载数据,并将其转换为 Python 对象(如字典、列表)。
参数名 | 类型 | 默认值 | 说明 |
---|---|---|---|
fp | io.TextIO | 无 | 已打开的文件对象(需处于读取模式) |
indent | int/str | None | 缩进空格数(美化输出,如 4 或 " " ) |
sort_keys | bool | False | 是否对 JSON 键进行排序 |
load_hook | callable | None | 自定义对象加载回调函数 |
object_hook | callable | None | 自定义对象解析回调函数 |
def load_schema(self, path):
with open(path, encoding="utf8") as f:
return json.load(f)
5.封装数据
DataLoader():PyTorch 模型训练的标配工具,通过合理的参数配置(如 batch_size
、num_workers
、shuffle
),可以显著提升数据加载效率,尤其适用于大规模数据集和复杂预处理任务。其与 Dataset
类的配合使用,是构建高效训练管道的核心。
参数名 | 类型 | 默认值 | 说明 |
---|---|---|---|
dataset | Dataset | None | 必须参数,自定义数据集对象(需继承 torch.utils.data.Dataset )。 |
batch_size | int | 1 | 每个批次的样本数量。 |
shuffle | bool | False | 是否在每个 epoch 开始时打乱数据顺序(训练时推荐设为 True )。 |
num_workers | int | 0 | 使用多线程加载数据的工人数量(需大于 0 时生效)。 |
pin_memory | bool | False | 是否将数据存储在 pinned memory 中(加速 GPU 数据传输)。 |
drop_last | bool | False | 如果数据集长度无法被 batch_size 整除,是否丢弃最后一个不完整的批次。 |
persistent_workers | bool | False | 是否保持工作线程在 epoch 之间持续运行(减少多线程初始化开销)。 |
worker_init_fn | callable | None | 自定义工作线程初始化函数。 |
#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
dg = DataGenerator(data_path, config)
dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
return dl
6.对于输入文本做截断 / 填充
#补齐或截断输入的序列,使其可以在一个batch内运算
def padding(self, input_id, pad_token=0):
input_id = input_id[:self.config["max_length"]]
input_id += [pad_token] * (self.config["max_length"] - len(input_id))
return input_id
7.对于输入的文本编码
input_id:初始化列表,存储词 / 字符的索引
jieba.cut():将中文句子分割成词语,支持三种分词模式(精确模式、全模式、搜索引擎模式)
参数名 | 类型 | 说明 |
---|---|---|
sentence | 字符串 | 需要分词的中文句子 |
cut_all | 布尔值 | 是否采用全模式(True为全模式,False为精确模式,默认False) |
HMM | 布尔值 | 是否使用隐马尔可夫模型(True为使用,默认True) |
列表.append():在列表末尾添加一个元素,修改原列表
参数名 | 类型 | 说明 |
---|---|---|
obj | 任意类型 | 要添加的元素(支持字符串、数字、列表等) |
字典.get():安全获取字典中指定键的值,键不存在时返回默认值(默认为None
)
参数名 | 类型 | 说明 |
---|---|---|
key | 不可变类型 | 要查询的键 |
default | 任意类型 | 可选,键不存在时返回的默认值(若未指定则返回None) |
def encode_sentence(self, text, padding=True):
input_id = []
if self.config["vocab_path"] == "words.txt":
for word in jieba.cut(text):
input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
else:
for char in text:
input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
if padding:
input_id = self.padding(input_id)
return input_id
完整代码
# -*- coding: utf-8 -*-
import json
import re
import os
import torch
import random
import jieba
import numpy as np
from torch.utils.data import Dataset, DataLoader
"""
数据加载
"""
class DataGenerator:
def __init__(self, data_path, config):
self.config = config
self.path = data_path
self.vocab = load_vocab(config["vocab_path"])
self.config["vocab_size"] = len(self.vocab)
self.sentences = []
self.schema = self.load_schema(config["schema_path"])
self.load()
def load(self):
self.data = []
with open(self.path, encoding="utf8") as f:
segments = f.read().split("\n\n")
for segment in segments:
sentenece = []
labels = []
for line in segment.split("\n"):
if line.strip() == "":
continue
char, label = line.split()
sentenece.append(char)
labels.append(self.schema[label])
self.sentences.append("".join(sentenece))
input_ids = self.encode_sentence(sentenece)
labels = self.padding(labels, -1)
self.data.append([torch.LongTensor(input_ids), torch.LongTensor(labels)])
return
def encode_sentence(self, text, padding=True):
input_id = []
if self.config["vocab_path"] == "words.txt":
for word in jieba.cut(text):
input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
else:
for char in text:
input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
if padding:
input_id = self.padding(input_id)
return input_id
#补齐或截断输入的序列,使其可以在一个batch内运算
def padding(self, input_id, pad_token=0):
input_id = input_id[:self.config["max_length"]]
input_id += [pad_token] * (self.config["max_length"] - len(input_id))
return input_id
def __len__(self):
return len(self.data)
def __getitem__(self, index):
return self.data[index]
def load_schema(self, path):
with open(path, encoding="utf8") as f:
return json.load(f)
#加载字表或词表
def load_vocab(vocab_path):
token_dict = {}
with open(vocab_path, encoding="utf8") as f:
for index, line in enumerate(f):
token = line.strip()
token_dict[token] = index + 1 #0留给padding位置,所以从1开始
return token_dict
#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
dg = DataGenerator(data_path, config)
dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
return dl
if __name__ == "__main__":
from config import Config
dg = DataGenerator("../ner_data/train.txt", Config)
三、模型建立 model.py
1.模型初始化
2.前向计算
3.选择优化器
# -*- coding: utf-8 -*-
import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from torchcrf import CRF
"""
建立网络模型结构
"""
class TorchModel(nn.Module):
def __init__(self, config):
super(TorchModel, self).__init__()
hidden_size = config["hidden_size"]
vocab_size = config["vocab_size"] + 1
max_length = config["max_length"]
class_num = config["class_num"]
num_layers = config["num_layers"]
self.embedding = nn.Embedding(vocab_size, hidden_size, padding_idx=0)
self.layer = nn.LSTM(hidden_size, hidden_size, batch_first=True, bidirectional=True, num_layers=num_layers)
self.classify = nn.Linear(hidden_size * 2, class_num)
self.crf_layer = CRF(class_num, batch_first=True)
self.use_crf = config["use_crf"]
self.loss = torch.nn.CrossEntropyLoss(ignore_index=-1) #loss采用交叉熵损失
#当输入真实标签,返回loss值;无真实标签,返回预测值
def forward(self, x, target=None):
x = self.embedding(x) #input shape:(batch_size, sen_len)
x, _ = self.layer(x) #input shape:(batch_size, sen_len, input_dim)
predict = self.classify(x) #ouput:(batch_size, sen_len, num_tags) -> (batch_size * sen_len, num_tags)
if target is not None:
if self.use_crf:
mask = target.gt(-1)
return - self.crf_layer(predict, target, mask, reduction="mean")
else:
#(number, class_num), (number)
return self.loss(predict.view(-1, predict.shape[-1]), target.view(-1))
else:
if self.use_crf:
return self.crf_layer.decode(predict)
else:
return predict
def choose_optimizer(config, model):
optimizer = config["optimizer"]
learning_rate = config["learning_rate"]
if optimizer == "adam":
return Adam(model.parameters(), lr=learning_rate)
elif optimizer == "sgd":
return SGD(model.parameters(), lr=learning_rate)
if __name__ == "__main__":
from config import Config
model = TorchModel(Config)
四、模型效果评估 evaluate.py
1.初始化
2.评估模型效果
3.统计模型效果
4.可视化统计模型效果
# -*- coding: utf-8 -*-
import torch
import re
import numpy as np
from collections import defaultdict
from loader import load_data
"""
模型效果测试
"""
class Evaluator:
def __init__(self, config, model, logger):
self.config = config
self.model = model
self.logger = logger
self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)
def eval(self, epoch):
self.logger.info("开始测试第%d轮模型效果:" % epoch)
self.stats_dict = {"LOCATION": defaultdict(int),
"TIME": defaultdict(int),
"PERSON": defaultdict(int),
"ORGANIZATION": defaultdict(int)}
self.model.eval()
for index, batch_data in enumerate(self.valid_data):
sentences = self.valid_data.dataset.sentences[index * self.config["batch_size"]: (index+1) * self.config["batch_size"]]
if torch.cuda.is_available():
batch_data = [d.cuda() for d in batch_data]
input_id, labels = batch_data #输入变化时这里需要修改,比如多输入,多输出的情况
with torch.no_grad():
pred_results = self.model(input_id) #不输入labels,使用模型当前参数进行预测
self.write_stats(labels, pred_results, sentences)
self.show_stats()
return
def write_stats(self, labels, pred_results, sentences):
assert len(labels) == len(pred_results) == len(sentences)
if not self.config["use_crf"]:
pred_results = torch.argmax(pred_results, dim=-1)
for true_label, pred_label, sentence in zip(labels, pred_results, sentences):
if not self.config["use_crf"]:
pred_label = pred_label.cpu().detach().tolist()
true_label = true_label.cpu().detach().tolist()
true_entities = self.decode(sentence, true_label)
pred_entities = self.decode(sentence, pred_label)
# print("=+++++++++")
# print(true_entities)
# print(pred_entities)
# print('=+++++++++')
# 正确率 = 识别出的正确实体数 / 识别出的实体数
# 召回率 = 识别出的正确实体数 / 样本的实体数
for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]:
self.stats_dict[key]["正确识别"] += len([ent for ent in pred_entities[key] if ent in true_entities[key]])
self.stats_dict[key]["样本实体数"] += len(true_entities[key])
self.stats_dict[key]["识别出实体数"] += len(pred_entities[key])
return
def show_stats(self):
F1_scores = []
for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]:
# 正确率 = 识别出的正确实体数 / 识别出的实体数
# 召回率 = 识别出的正确实体数 / 样本的实体数
precision = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["识别出实体数"])
recall = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["样本实体数"])
F1 = (2 * precision * recall) / (precision + recall + 1e-5)
F1_scores.append(F1)
self.logger.info("%s类实体,准确率:%f, 召回率: %f, F1: %f" % (key, precision, recall, F1))
self.logger.info("Macro-F1: %f" % np.mean(F1_scores))
correct_pred = sum([self.stats_dict[key]["正确识别"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])
total_pred = sum([self.stats_dict[key]["识别出实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])
true_enti = sum([self.stats_dict[key]["样本实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])
micro_precision = correct_pred / (total_pred + 1e-5)
micro_recall = correct_pred / (true_enti + 1e-5)
micro_f1 = (2 * micro_precision * micro_recall) / (micro_precision + micro_recall + 1e-5)
self.logger.info("Micro-F1 %f" % micro_f1)
self.logger.info("--------------------")
return
'''
{
"B-LOCATION": 0,
"B-ORGANIZATION": 1,
"B-PERSON": 2,
"B-TIME": 3,
"I-LOCATION": 4,
"I-ORGANIZATION": 5,
"I-PERSON": 6,
"I-TIME": 7,
"O": 8
}
'''
def decode(self, sentence, labels):
labels = "".join([str(x) for x in labels[:len(sentence)]])
results = defaultdict(list)
for location in re.finditer("(04+)", labels):
s, e = location.span()
results["LOCATION"].append(sentence[s:e])
for location in re.finditer("(15+)", labels):
s, e = location.span()
results["ORGANIZATION"].append(sentence[s:e])
for location in re.finditer("(26+)", labels):
s, e = location.span()
results["PERSON"].append(sentence[s:e])
for location in re.finditer("(37+)", labels):
s, e = location.span()
results["TIME"].append(sentence[s:e])
return results
五、主函数文件 main.py
# -*- coding: utf-8 -*-
import torch
import os
import random
import numpy as np
import logging
from config import Config
from model import TorchModel, choose_optimizer
from evaluate import Evaluator
from loader import load_data
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
"""
模型训练主程序
"""
def main(config):
#创建保存模型的目录
if not os.path.isdir(config["model_path"]):
os.mkdir(config["model_path"])
#加载训练数据
train_data = load_data(config["train_data_path"], config)
#加载模型
model = TorchModel(config)
# 标识是否使用gpu
cuda_flag = torch.cuda.is_available()
if cuda_flag:
logger.info("gpu可以使用,迁移模型至gpu")
model = model.cuda()
#加载优化器
optimizer = choose_optimizer(config, model)
#加载效果测试类
evaluator = Evaluator(config, model, logger)
#训练
for epoch in range(config["epoch"]):
epoch += 1
model.train()
logger.info("epoch %d begin" % epoch)
train_loss = []
for index, batch_data in enumerate(train_data):
optimizer.zero_grad()
if cuda_flag:
batch_data = [d.cuda() for d in batch_data]
input_id, labels = batch_data #输入变化时这里需要修改,比如多输入,多输出的情况
loss = model(input_id, labels)
loss.backward()
optimizer.step()
train_loss.append(loss.item())
if index % int(len(train_data) / 2) == 0:
logger.info("batch loss %f" % loss)
logger.info("epoch average loss: %f" % np.mean(train_loss))
evaluator.eval(epoch)
model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)
# torch.save(model.state_dict(), model_path)
return model, train_data
if __name__ == "__main__":
model, train_data = main(Config)