当前位置: 首页 > news >正文

【NLP 37、实践 ⑨ NER 命名实体识别任务 LSTM + CRF 实现】

                                                                —— 25.3.13

一、配置文件 config.py

1.模型与数据路径

参数名称路径值作用
"model_path""model_output"模型训练完成后保存的位置。例如:保存最终的模型权重文件。
"schema_path""ner_data/schema.json"数据结构定义文件,通常用于描述数据的格式(如字段名、标签类型)。
在NER任务中,可能定义实体类别(如 {"PERSON": "人名", "ORG": "组织"})。
"train_data_path""ner_data/train"训练数据集路径,通常为标注好的文本文件(如 train.txt 或 JSON 格式)。
"valid_data_path""ner_data/test"验证数据集路径,用于模型训练时的性能评估和超参数调优。
"vocab_path""chars.txt"字符词汇表文件,记录模型中使用的字符集(如中文字符、字母、数字等)。

2.模型架构

参数名称作用
"max_length"100输入文本的最大序列长度。超过此长度的文本会被截断或填充(如用 [PAD])。
"hidden_size"256模型隐藏层神经元的数量,影响模型容量和计算复杂度。
"num_layers"2模型的堆叠层数(如LSTM、Transformer的编码器/解码器层数)。
"class_num"9任务类别总数。例如:NER任务中可能有9种实体类型。

3.训练配置

参数名称作用
"epoch"20训练轮数。每轮遍历整个训练数据集一次。
"batch_size"16每次梯度更新所使用的样本数量。较小的批次可能更适合内存受限的环境。
"optimizer""adam"优化器类型,用于调整模型参数。Adam是常用优化器,结合动量梯度下降。
"learning_rate"1e-3学习率,控制参数更新的步长。值过小可能导致训练缓慢,过大易过拟合。
"use_crf"True是否启用条件随机场(CRF)​层。在序列标注任务(如NER)中,CRF可捕捉标签间的依赖关系,提升准确性。

4.预训练模型

参数名称作用
"bert_path"r"F:\人工智能NLP\\NLP资料\week6 语言模型\bert-base-chinese"预训练BERT模型的路径。BERT是一种强大的预训练语言模型,此处可能用于微调或特征提取。
# -*- coding: utf-8 -*-

"""
配置参数信息
"""

Config = {
    "model_path": "model_output",
    "schema_path": "ner_data/schema.json",
    "train_data_path": "ner_data/train",
    "valid_data_path": "ner_data/test",
    "vocab_path":"chars.txt",
    "max_length": 100,
    "hidden_size": 256,
    "num_layers": 2,
    "epoch": 20,
    "batch_size": 16,
    "optimizer": "adam",
    "learning_rate": 1e-3,
    "use_crf": True,
    "class_num": 9,
    "bert_path": r"F:\人工智能NLP\\NLP资料\week6 语言模型\bert-base-chinese"
}


二、数据加载 loader.py

1.初始化数据加载类

data_path:数据文件存储路径

config:包含训练 / 数据配置的字典

self.config:保存包含训练 / 数据配置的字典

self.path:保存数据文件存储路径

self.vocab:加载字表 / 词表文件存储路径

self.sentences:初始化句子列表

self.schema:加载实体标签与索引的映射关系表

self.load:调用 load() 方法从 data_path 加载原始数据,进行分词、编码、填充/截断等预处理。

    def __init__(self, data_path, config):
        self.config = config
        self.path = data_path
        self.vocab = load_vocab(config["vocab_path"])
        self.config["vocab_size"] = len(self.vocab)
        self.sentences = []
        self.schema = self.load_schema(config["schema_path"])
        self.load()

2.加载数据并预处理

  1. 文件读取与分段:按段落分割原始数据。
  2. 逐行解析:提取字符和标签。
  3. 编码转换:将字符转换为词汇表索引序列。
  4. 序列标准化:调整序列长度至模型要求。
  5. 数据存储:保存为张量列表,供训练使用。

self.data:列表,存储预处理后的数据样本,每个样本由输入张量和标签张量组成

sentenece:保存原始文本句子的拼接结果,便于后续可视化或调试。

open():打开文件并返回文件对象,支持读/写/追加等模式。

参数名类型说明
file字符串文件路径(绝对/相对路径)
mode字符串打开模式(如 r-只读、w-写入、a-追加)
encoding字符串文件编码(如 utf-8,文本模式需指定)
errors字符串编码错误处理方式(如 ignorereplace

文件对象.read():读取文件内容,返回字符串或字节流

参数名类型说明
size整数可选,指定读取的字节数(默认读取全部内容)

split():按分隔符分割字符串,返回子字符串列表

参数名类型说明
delimiter字符串分隔符(默认空格)
maxsplit整数可选,最大分割次数(默认-1表示全部)

strip():去除字符串首尾指定字符(默认空白字符)

参数名类型说明
chars字符串可选,指定需去除的字符集合

join():用分隔符连接可迭代对象的元素,返回新字符串

参数名类型说明
iterable可迭代对象需连接的元素集合(如列表、元组)
sep字符串分隔符(默认空字符串)

列表.append():在列表末尾添加元素

参数名类型说明
obj任意类型要添加的元素
    def load(self):
        self.data = []
        with open(self.path, encoding="utf8") as f:
            segments = f.read().split("\n\n")
            for segment in segments:
                sentenece = []
                labels = []
                for line in segment.split("\n"):
                    if line.strip() == "":
                        continue
                    char, label = line.split()
                    sentenece.append(char)
                    labels.append(self.schema[label])
                self.sentences.append("".join(sentenece))
                input_ids = self.encode_sentence(sentenece)
                labels = self.padding(labels, -1)
                self.data.append([torch.LongTensor(input_ids), torch.LongTensor(labels)])
        return

3.加载字 / 词表 

        load_vocab 函数用于从指定路径加载词汇表文件,并将每个词汇项映射到一个从 1 开始的唯一整数索引​(索引 0 保留给 Padding 占位符)

token_dict:字典,存储词汇到索引的映射

open():打开文件并返回文件对象,用于读写文件内容

参数名类型默认值说明
file_namestr文件路径(需包含扩展名)
modestr'r'文件打开模式:
'r': 只读
'w': 只写(覆盖原文件)
'a': 追加写入
'b': 二进制模式
'x': 创建新文件(若存在则报错)
bufferingintNone缓冲区大小(仅二进制模式有效)
encodingstrNone文件编码(仅文本模式有效,如 'utf-8'
newlinestr'\n'行结束符(仅文本模式有效)
closefdboolTrue是否在文件关闭时自动关闭文件描述符
dir_fdint-1文件描述符(高级用法,通常忽略)
flagsint0Linux 系统下的额外标志位
modestr(重复参数,实际使用中只需指定 mode

enumerate():遍历可迭代对象时,同时返回元素的索引

参数名类型默认值说明
iterable可迭代对象需要遍历的对象(如列表、元组、字符串等)
startint0索引的起始值(可自定义,如从 1 开始)

strip():移除字符串开头和结尾的空白字符或指定字符

参数名类型默认值说明
charsstrNone需要移除的字符集合(默认为空格、换行、制表符 \t、换页符 \f、回车 \r
#加载字表或词表
def load_vocab(vocab_path):
    token_dict = {}
    with open(vocab_path, encoding="utf8") as f:
        for index, line in enumerate(f):
            token = line.strip()
            token_dict[token] = index + 1  #0留给padding位置,所以从1开始
    return token_dict

4.加载映射关系表

        加载位于指定路径的 JSON 格式的模式文件,并将其内容解析为 Python 对象以便在数据生成过程中使用。

open():打开文件并返回文件对象,用于读写文件内容。

参数名类型默认值说明
file_namestr文件路径(需包含扩展名)
modestr'r'文件打开模式:
'r': 只读
'w': 只写(覆盖原文件)
'a': 追加写入
'b': 二进制模式
'x': 创建新文件(若存在则报错)
bufferingintNone缓冲区大小(仅二进制模式有效)
encodingstrNone文件编码(仅文本模式有效,如 'utf-8'
newlinestr'\n'行结束符(仅文本模式有效)
closefdboolTrue是否在文件关闭时自动关闭文件描述符
dir_fdint-1文件描述符(高级用法,通常忽略)
flagsint0Linux 系统下的额外标志位
modestr(重复参数,实际使用中只需指定 mode

json.load():从已打开的 JSON 文件对象中加载数据,并将其转换为 Python 对象(如字典、列表)。

参数名类型默认值说明
fpio.TextIO已打开的文件对象(需处于读取模式)
indentint/strNone缩进空格数(美化输出,如 4 或 " "
sort_keysboolFalse是否对 JSON 键进行排序
load_hookcallableNone自定义对象加载回调函数
object_hookcallableNone自定义对象解析回调函数
    def load_schema(self, path):
        with open(path, encoding="utf8") as f:
            return json.load(f)

5.封装数据

DataLoader():PyTorch 模型训练的标配工具,通过合理的参数配置(如 batch_sizenum_workersshuffle),可以显著提升数据加载效率,尤其适用于大规模数据集和复杂预处理任务。其与 Dataset 类的配合使用,是构建高效训练管道的核心。

参数名类型默认值说明
datasetDatasetNone必须参数,自定义数据集对象(需继承 torch.utils.data.Dataset)。
batch_sizeint1每个批次的样本数量。
shuffleboolFalse是否在每个 epoch 开始时打乱数据顺序(训练时推荐设为 True)。
num_workersint0使用多线程加载数据的工人数量(需大于 0 时生效)。
pin_memoryboolFalse是否将数据存储在 pinned memory 中(加速 GPU 数据传输)。
drop_lastboolFalse如果数据集长度无法被 batch_size 整除,是否丢弃最后一个不完整的批次。
persistent_workersboolFalse是否保持工作线程在 epoch 之间持续运行(减少多线程初始化开销)。
worker_init_fncallableNone自定义工作线程初始化函数。
#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
    dg = DataGenerator(data_path, config)
    dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
    return dl

6.对于输入文本做截断 / 填充

    #补齐或截断输入的序列,使其可以在一个batch内运算
    def padding(self, input_id, pad_token=0):
        input_id = input_id[:self.config["max_length"]]
        input_id += [pad_token] * (self.config["max_length"] - len(input_id))
        return input_id

7.对于输入的文本编码

input_id:初始化列表,存储词 / 字符的索引

jieba.cut():将中文句子分割成词语,支持三种分词模式(精确模式、全模式、搜索引擎模式)

参数名类型说明
sentence字符串需要分词的中文句子
cut_all布尔值是否采用全模式(True为全模式,False为精确模式,默认False)
HMM布尔值是否使用隐马尔可夫模型(True为使用,默认True)

列表.append():在列表末尾添加一个元素,修改原列表

参数名类型说明
obj任意类型要添加的元素(支持字符串、数字、列表等)

字典.get():安全获取字典中指定键的值,键不存在时返回默认值(默认为None

参数名类型说明
key不可变类型要查询的键
default任意类型可选,键不存在时返回的默认值(若未指定则返回None)
    def encode_sentence(self, text, padding=True):
        input_id = []
        if self.config["vocab_path"] == "words.txt":
            for word in jieba.cut(text):
                input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
        else:
            for char in text:
                input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
        if padding:
            input_id = self.padding(input_id)
        return input_id

完整代码 

# -*- coding: utf-8 -*-

import json
import re
import os
import torch
import random
import jieba
import numpy as np
from torch.utils.data import Dataset, DataLoader

"""
数据加载
"""


class DataGenerator:
    def __init__(self, data_path, config):
        self.config = config
        self.path = data_path
        self.vocab = load_vocab(config["vocab_path"])
        self.config["vocab_size"] = len(self.vocab)
        self.sentences = []
        self.schema = self.load_schema(config["schema_path"])
        self.load()

    def load(self):
        self.data = []
        with open(self.path, encoding="utf8") as f:
            segments = f.read().split("\n\n")
            for segment in segments:
                sentenece = []
                labels = []
                for line in segment.split("\n"):
                    if line.strip() == "":
                        continue
                    char, label = line.split()
                    sentenece.append(char)
                    labels.append(self.schema[label])
                self.sentences.append("".join(sentenece))
                input_ids = self.encode_sentence(sentenece)
                labels = self.padding(labels, -1)
                self.data.append([torch.LongTensor(input_ids), torch.LongTensor(labels)])
        return

    def encode_sentence(self, text, padding=True):
        input_id = []
        if self.config["vocab_path"] == "words.txt":
            for word in jieba.cut(text):
                input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
        else:
            for char in text:
                input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
        if padding:
            input_id = self.padding(input_id)
        return input_id

    #补齐或截断输入的序列,使其可以在一个batch内运算
    def padding(self, input_id, pad_token=0):
        input_id = input_id[:self.config["max_length"]]
        input_id += [pad_token] * (self.config["max_length"] - len(input_id))
        return input_id

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        return self.data[index]

    def load_schema(self, path):
        with open(path, encoding="utf8") as f:
            return json.load(f)

#加载字表或词表
def load_vocab(vocab_path):
    token_dict = {}
    with open(vocab_path, encoding="utf8") as f:
        for index, line in enumerate(f):
            token = line.strip()
            token_dict[token] = index + 1  #0留给padding位置,所以从1开始
    return token_dict

#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
    dg = DataGenerator(data_path, config)
    dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
    return dl



if __name__ == "__main__":
    from config import Config
    dg = DataGenerator("../ner_data/train.txt", Config)


三、模型建立 model.py

1.模型初始化

2.前向计算

3.选择优化器

# -*- coding: utf-8 -*-

import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from torchcrf import CRF
"""
建立网络模型结构
"""

class TorchModel(nn.Module):
    def __init__(self, config):
        super(TorchModel, self).__init__()
        hidden_size = config["hidden_size"]
        vocab_size = config["vocab_size"] + 1
        max_length = config["max_length"]
        class_num = config["class_num"]
        num_layers = config["num_layers"]
        self.embedding = nn.Embedding(vocab_size, hidden_size, padding_idx=0)
        self.layer = nn.LSTM(hidden_size, hidden_size, batch_first=True, bidirectional=True, num_layers=num_layers)
        self.classify = nn.Linear(hidden_size * 2, class_num)
        self.crf_layer = CRF(class_num, batch_first=True)
        self.use_crf = config["use_crf"]
        self.loss = torch.nn.CrossEntropyLoss(ignore_index=-1)  #loss采用交叉熵损失

    #当输入真实标签,返回loss值;无真实标签,返回预测值
    def forward(self, x, target=None):
        x = self.embedding(x)  #input shape:(batch_size, sen_len)
        x, _ = self.layer(x)      #input shape:(batch_size, sen_len, input_dim)
        predict = self.classify(x) #ouput:(batch_size, sen_len, num_tags) -> (batch_size * sen_len, num_tags)

        if target is not None:
            if self.use_crf:
                mask = target.gt(-1) 
                return - self.crf_layer(predict, target, mask, reduction="mean")
            else:
                #(number, class_num), (number)
                return self.loss(predict.view(-1, predict.shape[-1]), target.view(-1))
        else:
            if self.use_crf:
                return self.crf_layer.decode(predict)
            else:
                return predict


def choose_optimizer(config, model):
    optimizer = config["optimizer"]
    learning_rate = config["learning_rate"]
    if optimizer == "adam":
        return Adam(model.parameters(), lr=learning_rate)
    elif optimizer == "sgd":
        return SGD(model.parameters(), lr=learning_rate)


if __name__ == "__main__":
    from config import Config
    model = TorchModel(Config)

四、模型效果评估 evaluate.py

1.初始化

2.评估模型效果

3.统计模型效果

4.可视化统计模型效果

# -*- coding: utf-8 -*-
import torch
import re
import numpy as np
from collections import defaultdict
from loader import load_data

"""
模型效果测试
"""

class Evaluator:
    def __init__(self, config, model, logger):
        self.config = config
        self.model = model
        self.logger = logger
        self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)


    def eval(self, epoch):
        self.logger.info("开始测试第%d轮模型效果:" % epoch)
        self.stats_dict = {"LOCATION": defaultdict(int),
                           "TIME": defaultdict(int),
                           "PERSON": defaultdict(int),
                           "ORGANIZATION": defaultdict(int)}
        self.model.eval()
        for index, batch_data in enumerate(self.valid_data):
            sentences = self.valid_data.dataset.sentences[index * self.config["batch_size"]: (index+1) * self.config["batch_size"]]
            if torch.cuda.is_available():
                batch_data = [d.cuda() for d in batch_data]
            input_id, labels = batch_data   #输入变化时这里需要修改,比如多输入,多输出的情况
            with torch.no_grad():
                pred_results = self.model(input_id) #不输入labels,使用模型当前参数进行预测
            self.write_stats(labels, pred_results, sentences)
        self.show_stats()
        return

    def write_stats(self, labels, pred_results, sentences):
        assert len(labels) == len(pred_results) == len(sentences)
        if not self.config["use_crf"]:
            pred_results = torch.argmax(pred_results, dim=-1)
        for true_label, pred_label, sentence in zip(labels, pred_results, sentences):
            if not self.config["use_crf"]:
                pred_label = pred_label.cpu().detach().tolist()
            true_label = true_label.cpu().detach().tolist()
            true_entities = self.decode(sentence, true_label)
            pred_entities = self.decode(sentence, pred_label)
            # print("=+++++++++")
            # print(true_entities)
            # print(pred_entities)
            # print('=+++++++++')
            # 正确率 = 识别出的正确实体数 / 识别出的实体数
            # 召回率 = 识别出的正确实体数 / 样本的实体数
            for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]:
                self.stats_dict[key]["正确识别"] += len([ent for ent in pred_entities[key] if ent in true_entities[key]])
                self.stats_dict[key]["样本实体数"] += len(true_entities[key])
                self.stats_dict[key]["识别出实体数"] += len(pred_entities[key])
        return

    def show_stats(self):
        F1_scores = []
        for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]:
            # 正确率 = 识别出的正确实体数 / 识别出的实体数
            # 召回率 = 识别出的正确实体数 / 样本的实体数
            precision = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["识别出实体数"])
            recall = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["样本实体数"])
            F1 = (2 * precision * recall) / (precision + recall + 1e-5)
            F1_scores.append(F1)
            self.logger.info("%s类实体,准确率:%f, 召回率: %f, F1: %f" % (key, precision, recall, F1))
        self.logger.info("Macro-F1: %f" % np.mean(F1_scores))
        correct_pred = sum([self.stats_dict[key]["正确识别"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])
        total_pred = sum([self.stats_dict[key]["识别出实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])
        true_enti = sum([self.stats_dict[key]["样本实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])
        micro_precision = correct_pred / (total_pred + 1e-5)
        micro_recall = correct_pred / (true_enti + 1e-5)
        micro_f1 = (2 * micro_precision * micro_recall) / (micro_precision + micro_recall + 1e-5)
        self.logger.info("Micro-F1 %f" % micro_f1)
        self.logger.info("--------------------")
        return

    '''
    {
      "B-LOCATION": 0,
      "B-ORGANIZATION": 1,
      "B-PERSON": 2,
      "B-TIME": 3,
      "I-LOCATION": 4,
      "I-ORGANIZATION": 5,
      "I-PERSON": 6,
      "I-TIME": 7,
      "O": 8
    }
    '''
    def decode(self, sentence, labels):
        labels = "".join([str(x) for x in labels[:len(sentence)]])
        results = defaultdict(list)
        for location in re.finditer("(04+)", labels):
            s, e = location.span()
            results["LOCATION"].append(sentence[s:e])
        for location in re.finditer("(15+)", labels):
            s, e = location.span()
            results["ORGANIZATION"].append(sentence[s:e])
        for location in re.finditer("(26+)", labels):
            s, e = location.span()
            results["PERSON"].append(sentence[s:e])
        for location in re.finditer("(37+)", labels):
            s, e = location.span()
            results["TIME"].append(sentence[s:e])
        return results



五、主函数文件 main.py

# -*- coding: utf-8 -*-

import torch
import os
import random
import numpy as np
import logging
from config import Config
from model import TorchModel, choose_optimizer
from evaluate import Evaluator
from loader import load_data

logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

"""
模型训练主程序
"""

def main(config):
    #创建保存模型的目录
    if not os.path.isdir(config["model_path"]):
        os.mkdir(config["model_path"])
    #加载训练数据
    train_data = load_data(config["train_data_path"], config)
    #加载模型
    model = TorchModel(config)
    # 标识是否使用gpu
    cuda_flag = torch.cuda.is_available()
    if cuda_flag:
        logger.info("gpu可以使用,迁移模型至gpu")
        model = model.cuda()
    #加载优化器
    optimizer = choose_optimizer(config, model)
    #加载效果测试类
    evaluator = Evaluator(config, model, logger)
    #训练
    for epoch in range(config["epoch"]):
        epoch += 1
        model.train()
        logger.info("epoch %d begin" % epoch)
        train_loss = []
        for index, batch_data in enumerate(train_data):
            optimizer.zero_grad()
            if cuda_flag:
                batch_data = [d.cuda() for d in batch_data]
            input_id, labels = batch_data   #输入变化时这里需要修改,比如多输入,多输出的情况
            loss = model(input_id, labels)
            loss.backward()
            optimizer.step()
            train_loss.append(loss.item())
            if index % int(len(train_data) / 2) == 0:
                logger.info("batch loss %f" % loss)
        logger.info("epoch average loss: %f" % np.mean(train_loss))
        evaluator.eval(epoch)
    model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)
    # torch.save(model.state_dict(), model_path)
    return model, train_data

if __name__ == "__main__":
    model, train_data = main(Config)

相关文章:

  • Language Models are Few-Shot Learners,GPT-3详细讲解
  • petalinxu 在zynq的FPGA下的ST7735S的驱动配置
  • 射频辐射干扰:变频器电缆的电磁天线效应
  • 9-1 USART串口协议
  • C语言高级进阶4
  • WinSW-x64(2.12.0)将nginx注册为服务可能有bug
  • 【区块链】btc
  • C语言 第四章 数组(4)
  • scanf() 函数:C语言中的数据输入桥梁
  • SAP FI模块之付款管理开发
  • 理解langchain langgraph 官方文档示例代码中的MemorySaver
  • 稀疏矩阵(信息学奥赛一本通-2042)
  • 芯谷D8563TS实时时钟/日历芯片详解可替代PCF8563
  • notion enhancer 新版工作方法
  • torch_geometric 安装
  • Conda+jupyterlab
  • 第九节:哈希表(初阶)
  • uni-app App 端分段导出 JSON 数据为文件
  • 解决进入Oracle11g的OEM显示网站不安全问题
  • 使用easyexcel实现单元格样式设置和下拉框设置
  • 上海市政府党组赴全面从严治党警示教育基地参观学习,推进作风建设走深走实
  • 锚定建设“中国樱桃第一县”目标,第六届澄城樱桃营销季启动
  • “高原笑匠”、西藏著名表演艺术家扎西顿珠去世
  • “当代阿炳”甘柏林逝世,创办了国内第一所残疾人高等学府
  • 国家统计局:4月全国城镇调查失业率为5.1%,比上月下降0.1个百分点
  • 聘期三年已至:37岁香港青年叶家麟卸任三亚市旅游发展局局长