当前位置: 首页 > news >正文

14 - 大语言模型 — 抽取式问答系统 “成长记”:靠 BERT 学本事,从文本里精准 “揪” 答案的全过程(呆瓜版-1号)

目录

1、什么是问答系统?

2、问答系统的核心工作流程

2.1、理解问题:把问题 “翻译” 成机器能懂的形式

2.2、 寻找答案:从信息中定位答案

2.3、生成答案:整理并输出结果

2.4、优化迭代:让系统更 “聪明”

3、主流技术:让系统 “变聪明” 的关键

4、问答系统的常见类型

5、现实挑战:问答系统还需要突破什么?

6、问答系统的应用场景

7、英文版回答系统(显示上下文)

8、中文版回答系统(显示上下文)

9、中文版回答系统(隐藏上下文)

10、中英文版回答系统(隐藏上下文)


1、什么是问答系统?

简单来说,问答系统是一种能 “听懂” 人类问题,并给出准确答案的智能系统。比如我们平时用的智能助手(如 Siri、小爱同学),输入 “今天天气怎么样?” 就能得到答案,这就是最常见的问答系统应用。

从技术角度看,它的核心任务是:接收自然语言问题,结合已有信息(如上下文、知识库),返回简洁准确的答案

2、问答系统的核心工作流程

一个完整的问答系统,无论复杂与否,都离不开以下 4 个关键步骤:

2.1、理解问题:把问题 “翻译” 成机器能懂的形式

人类用自然语言提问(比如 “法国的首都是什么?”),机器首先需要理解问题的核心含义:

  • 识别关键词:提取问题中的关键信息(如 “法国”“首都”)。
  • 判断问题类型:是问事实(“是什么”)、原因(“为什么”),还是方法(“怎么做”)?不同类型的问题,寻找答案的方式不同。
  • 转换为机器语言:通过技术手段(如词向量、预训练模型)将文字转为机器能处理的数字形式(向量)。

比如 “法国的首都是什么?”,机器会识别出 “法国” 是主体,“首都” 是要查询的属性,核心是寻找 “法国” 对应的 “首都” 名称。

2.2、 寻找答案:从信息中定位答案

理解问题后,系统需要从 “信息源” 中找答案。常见的信息源有两种:

  • 给定的上下文:比如阅读一篇文章后回答相关问题(如考试中的阅读理解),答案一定在文章里。
  • 外部知识库:比如回答 “地球自转一周需要多久?”,系统需要调用已有的知识储备(如百科全书、数据库)。

以 “从上下文找答案” 为例,机器需要完成:

  • 定位相关段落:在上下文里筛选出可能包含答案的部分(比如问题问 “法国首都”,就重点看提到 “法国” 的段落)。
  • 精确匹配答案:在相关段落中,找到与问题关键词匹配的内容(比如 “巴黎是法国的首都”,这里 “巴黎” 就是答案)。

2.3、生成答案:整理并输出结果

找到答案后,系统需要把它整理成人类能理解的自然语言:

  • 提取核心信息:如果答案藏在长句子里(如 “法国的首都是巴黎,它是欧洲的重要城市”),需要提炼出 “巴黎” 这个核心答案。
  • 确保答案准确:检查答案是否完全匹配问题(比如问题问 “首都”,就不能返回 “法国的货币是欧元”)。
  • 格式化输出:用简洁的语言呈现(比如直接说 “巴黎”,而不是重复整句话)。

2.4、优化迭代:让系统更 “聪明”

问答系统不是一成不变的,需要通过不断优化提升效果:

  • 学习新数据:用更多的问答样本(如 “问题 - 答案 - 上下文” 组合)训练系统,让它见过更多场景。
  • 修正错误:如果系统答错了(比如把 “法国首都” 说成 “伦敦”),通过人工标注或算法调整纠正错误。
  • 适应复杂场景:逐步处理更难的问题(如需要推理的问题 “小明有 3 个苹果,吃了 1 个,又买了 2 个,现在有几个?”)。

3、主流技术:让系统 “变聪明” 的关键

目前,问答系统的核心技术依赖自然语言处理(NLP) 和机器学习,其中最具代表性的是 “预训练语言模型”(如 BERT、GPT 等):

  • BERT 模型:擅长从上下文中找答案。它能像人类阅读一样,理解句子中每个词的含义,以及词与词之间的关系(比如 “它” 指的是前文的 “巴黎”)。因此,在 “给定上下文找答案” 的场景中表现突出。
  • GPT 模型:擅长生成答案。它不仅能理解问题,还能结合自己 “学过” 的海量知识,生成全新的答案(即使上下文里没有直接提到),更接近人类的 “思考” 过程。

4、问答系统的常见类型

根据应用场景和技术特点,问答系统可以分为几类:

  1. 抽取式问答:答案一定来自给定的上下文(如阅读理解),系统的任务是 “抽取” 出其中的片段。
  2. 生成式问答:答案可以是全新的句子(不一定来自上下文),系统需要 “创造” 答案(如智能助手回答常识问题)。
  3. 知识库问答:专门从结构化的知识库(如百科数据库)中查询答案,适合精准的事实性问题(如 “李白是哪一年出生的?”)。
  4. 对话式问答:能进行多轮对话(比如 “明天会下雨吗?”→“会下小雨。”→“那需要带伞吗?”→“需要。”),需要记住之前的对话内容。

5、现实挑战:问答系统还需要突破什么?

尽管现在的问答系统已经很实用,但仍有不少难题:

  • 复杂问题推理:对于需要多步思考的问题(如 “小明的妈妈是小红的姑姑,小红和小明是什么关系?”),系统往往难以处理。
  • 歧义问题理解:同一个问题可能有多种含义(如 “苹果多少钱?” 可能指水果,也可能指手机),系统需要结合语境判断。
  • 常识与经验依赖:很多问题需要生活常识(如 “为什么天热会出汗?”),而机器缺乏人类的生活经验,容易答错。
  • 对抗性问题干扰:遇到故意设计的 “陷阱问题”(如语法混乱、包含错误信息的问题),系统可能被误导。

6、问答系统的应用场景

除了我们熟悉的智能助手,问答系统还广泛应用在:

  • 客服领域:自动回答用户关于产品的常见问题(如 “退货流程是什么?”),减少人工成本。
  • 教育领域:作为 “智能辅导老师”,解答学生的学科问题(如 “数学公式怎么推导?”)。
  • 医疗领域:辅助医生查询医学知识(如 “这个症状可能是什么病?”),但需严格验证准确性。
  • 搜索引擎:比如百度、谷歌的 “直接回答” 功能,在搜索结果顶部直接显示问题答案(如搜索 “地球半径”,结果顶部会直接给出数字)。

7、英文版回答系统(显示上下文)

# 导入PyTorch深度学习框架,用于张量运算和模型训练
import torch
# 导入datasets库,用于加载和处理数据集;Dataset类用于构建自定义数据集
from datasets import load_dataset, Dataset
# 导入os库用于文件路径操作,shutil库用于文件删除/移动等操作
import os
import shutil
# 从transformers库导入所需组件:
# BertForQuestionAnswering:用于问答任务的BERT模型
# BertTokenizerFast:快速BERT分词器(支持offset_mapping等功能)
# TrainingArguments:训练参数配置类
# Trainer:模型训练器(封装了训练循环)
# DataCollatorWithPadding:用于批量数据填充的工具
from transformers import (BertForQuestionAnswering,BertTokenizerFast,TrainingArguments,Trainer,DataCollatorWithPadding
)# 设置环境变量:解决protobuf库的潜在兼容性问题(部分环境下可能出现导入错误)
os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python"# 检查是否有可用GPU,优先使用GPU加速训练/推理,否则使用CPU
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"使用设备: {device}")# 1. 加载本地JSON格式的SQuAD风格数据集,并展开嵌套结构为单个问答样本
def load_and_expand_squad(json_path):"""加载SQuAD格式的JSON数据集,并将嵌套结构展开为扁平的问答样本列表参数:json_path: 本地JSON文件路径(SQuAD 1.1/2.0格式)返回:Dataset对象:包含"question"(问题)、"context"(上下文)、"answers"(答案)的样本集合"""# 加载原始JSON数据(SQuAD格式的JSON通常包含"data"字段,内嵌套篇章、段落、问答对)try:# 使用load_dataset加载JSON文件,指定field="data"提取数据核心字段,取"train"拆分(实际可根据文件内容调整)raw_dataset = load_dataset("json", data_files=json_path, field="data")["train"]except Exception as e:print(f"加载数据集失败: {e}")print(f"请确保 {json_path} 文件存在且格式正确(SQuAD风格)")exit(1)  # 加载失败则退出程序# 展开嵌套结构:SQuAD格式为 篇章(article)→段落(paragraphs)→问答对(qas),需展开为单个样本expanded = []  # 存储展开后的样本invalid_count = 0  # 统计无效样本数量for article in raw_dataset:  # 遍历每个篇章try:for paragraph in article["paragraphs"]:  # 遍历篇章中的每个段落context = paragraph["context"]  # 提取段落文本作为上下文for qa in paragraph["qas"]:  # 遍历段落中的每个问答对# 构建单个样本:包含问题、上下文、答案(确保答案格式为{"text": [答案文本], "answer_start": [起始位置]})expanded.append({"question": qa["question"],  # 问题文本"context": context,  # 上下文文本"answers": {# 取第一个答案(SQuAD可能有多个答案,此处简化为单答案)"text": [qa["answers"][0]["text"]],"answer_start": [qa["answers"][0]["answer_start"]]}})except Exception as e:print(f"解析样本时出错: {e},跳过该样本")invalid_count += 1continue  # 跳过无效样本print(f"加载完成,有效样本数: {len(expanded)},无效样本数: {invalid_count}")return Dataset.from_list(expanded)  # 转换为Dataset对象返回# 2. 加载并准备训练集和验证集
print("加载训练集...")
# 加载训练集(假设本地有SQuAD格式的train-v2.0.json)
dataset = load_and_expand_squad("train-v2.0.json")
print("加载验证集...")
# 加载验证集(假设本地有SQuAD格式的dev-v2.0.json)
val_dataset = load_and_expand_squad("dev-v2.0.json")# 取1%样本用于快速测试(仅为调试用,实际训练可注释掉以使用全量数据)
# train_test_split(test_size=0.01)表示取1%数据作为测试集(此处用作快速训练样本)
dataset = dataset.train_test_split(test_size=0.01)["test"]
val_dataset = val_dataset.train_test_split(test_size=0.01)["test"]print(f"训练集大小: {len(dataset)},验证集大小: {len(val_dataset)}")# 3. 加载预训练模型和分词器(基于BERT-base-uncased)
model_name = "bert-base-uncased"  # 模型名称:BERT-base无大小写区分版本(适合英文任务)
local_model_dir = "./bert-base-uncased"  # 本地缓存模型的路径# 清理旧缓存(可选,仅在需要强制重新下载模型时启用)
# if os.path.exists(local_model_dir):
#     print(f"清理模型缓存: {local_model_dir}")
#     shutil.rmtree(local_model_dir, ignore_errors=True)  # 删除旧缓存目录# 加载分词器和模型
try:# 加载快速分词器(BertTokenizerFast支持return_offsets_mapping等功能,是QA任务必需的)tokenizer = BertTokenizerFast.from_pretrained(model_name,cache_dir=local_model_dir,  # 缓存到本地目录,避免重复下载use_fast=True  # 强制使用快速分词器)# 加载用于问答任务的BERT模型(输出start_logits和end_logits,用于预测答案位置)model = BertForQuestionAnswering.from_pretrained(model_name,cache_dir=local_model_dir).to(device)  # 将模型移动到指定设备(GPU/CPU)
except Exception as e:print(f"加载模型失败: {e}")print(f"请确保 {local_model_dir} 目录存在或网络连接正常(首次运行需下载模型)")exit(1)# 验证分词器类型(快速分词器是QA任务必需的,否则无法获取offset_mapping)
print(f"加载的分词器类型: {type(tokenizer).__name__}")
if type(tokenizer).__name__ != "BertTokenizerFast":print("警告: 当前使用的不是快速分词器,可能导致性能问题或功能缺失(QA任务需要快速分词器)")# 4. 数据预处理函数(将文本转换为模型可接受的输入格式)
def preprocess_function(examples):"""预处理函数:将原始文本(问题、上下文、答案)转换为模型输入格式参数:examples: 包含"question"、"context"、"answers"的批量样本返回:处理后的字典:包含input_ids、attention_mask、start_positions、end_positions"""# 编码问题和上下文:使用分词器将文本转换为token IDinputs = tokenizer(examples["question"],  # 问题文本(列表)examples["context"],  # 上下文文本(列表)max_length=512,  # 最大序列长度(BERT-base支持最长512token)truncation="only_second",  # 仅截断上下文(question在前,context在后,优先保留问题)return_offsets_mapping=True,  # 返回offset_mapping(token与原始文本字符的映射关系,用于定位答案)padding="max_length"  # 填充到max_length长度)# 提取offset_mapping(每个token对应的原始文本起始/结束字符位置)offset_mapping = inputs.pop("offset_mapping")  # 移除offset_mapping(不输入模型,仅用于计算答案位置)start_positions = []  # 存储每个样本的答案起始token位置end_positions = []  # 存储每个样本的答案结束token位置# 遍历每个样本的offset_mapping,计算答案的start和end token位置for i, offsets in enumerate(offset_mapping):answer = examples["answers"][i]  # 当前样本的答案# 处理无答案情况(SQuAD 2.0支持无答案样本,用(0,0)表示)if len(answer["text"]) == 0:start_positions.append(0)  # 无答案时起始位置设为0([CLS] token)end_positions.append(0)continue# 获取答案在原始上下文的字符位置(start_char:起始字符索引;end_char:结束字符索引)start_char = answer["answer_start"][0]  # 答案起始字符位置end_char = start_char + len(answer["text"][0])  # 答案结束字符位置(起始+长度)# 定位上下文在token序列中的范围(通过sequence_ids区分问题和上下文)# sequence_ids:每个token对应的序列类型(0=问题,1=上下文,None=特殊token如[CLS]、[SEP])sequence_ids = inputs.sequence_ids(i)  # 获取第i个样本的sequence_ids# 找到上下文的起始token索引(第一个sequence_id=1的位置)context_start = 0while sequence_ids[context_start] != 1:context_start += 1# 找到上下文的结束token索引(最后一个sequence_id=1的位置)context_end = len(sequence_ids) - 1while sequence_ids[context_end] != 1:context_end -= 1# 查找答案在token序列中的起始和结束位置start_token = None  # 答案起始token索引end_token = None    # 答案结束token索引# 检查答案是否超出上下文范围(若答案不在上下文中,视为无答案)if offsets[context_start][0] > end_char or offsets[context_end][1] < start_char:start_positions.append(0)end_positions.append(0)else:# 寻找答案的起始token:找到第一个包含start_char的tokenfor idx in range(context_start, context_end + 1):token_start, token_end = offsets[idx]  # 当前token的字符范围if token_start <= start_char <= token_end:start_token = idx  # 记录起始token索引break# 寻找答案的结束token:找到最后一个包含end_char的tokenfor idx in range(context_end, context_start - 1, -1):token_start, token_end = offsets[idx]  # 当前token的字符范围if token_start <= end_char <= token_end:end_token = idx  # 记录结束token索引break# 若找不到答案位置(如tokenization导致的偏移),设为[CLS] token(0)start_positions.append(start_token if start_token is not None else 0)end_positions.append(end_token if end_token is not None else 0)# 将计算好的start和end位置添加到输入中(作为模型训练的标签)inputs["start_positions"] = start_positionsinputs["end_positions"] = end_positionsreturn inputs# 5. 应用预处理函数到训练集和验证集
print("预处理训练数据...")
# 对训练集应用预处理:batched=True表示批量处理,remove_columns移除原始文本列(模型不需要)
tokenized_dataset = dataset.map(preprocess_function, batched=True, remove_columns=dataset.column_names)
print("预处理验证数据...")
# 对验证集应用同样的预处理
tokenized_val_dataset = val_dataset.map(preprocess_function, batched=True, remove_columns=val_dataset.column_names)# 将数据集转换为PyTorch张量格式(模型需要张量输入)
# 只保留模型需要的列:input_ids(token ID)、attention_mask(掩码,区分有效token和填充)、start/end_positions(标签)
tokenized_dataset.set_format("torch", columns=["input_ids", "attention_mask", "start_positions", "end_positions"])
tokenized_val_dataset.set_format("torch", columns=["input_ids", "attention_mask", "start_positions", "end_positions"])# 6. 配置训练参数
training_args = TrainingArguments(output_dir="./qa_model",  # 模型和日志输出目录evaluation_strategy="epoch",  # 每轮结束后进行验证learning_rate=3e-5,  # 学习率(BERT微调常用3e-5)per_device_train_batch_size=4,  # 每个设备的训练批大小(GPU内存小则设小些)per_device_eval_batch_size=4,   # 每个设备的验证批大小num_train_epochs=2,  # 训练轮次(小样本快速测试用2轮,实际训练可增至3-5轮)weight_decay=0.01,  # 权重衰减(L2正则化,防止过拟合)logging_dir="./logs",  # 日志目录(可通过tensorboard查看)logging_steps=10,  # 每10步记录一次日志save_strategy="epoch",  # 每轮结束后保存模型fp16=True if device == "cuda" else False,  # 若使用GPU,启用混合精度训练(加速训练并减少内存占用)
)# 数据整理器:用于批量处理时自动填充(确保同批次样本长度一致)
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)# 7. 初始化Trainer并开始训练
print("开始训练模型...")
trainer = Trainer(model=model,  # 待训练的模型args=training_args,  # 训练参数train_dataset=tokenized_dataset,  # 训练集eval_dataset=tokenized_val_dataset,  # 验证集tokenizer=tokenizer,  # 分词器(用于保存模型时一同保存)data_collator=data_collator,  # 数据整理器
)# 开始训练(封装了完整的训练循环:前向传播、损失计算、反向传播、参数更新)
trainer.train()# 训练完成后保存模型(Trainer会自动保存,这里再次确认)
print(f"训练完成,模型已保存到: {training_args.output_dir}")
trainer.save_model(training_args.output_dir)  # 手动保存模型(包含配置、分词器、权重)# 8. 答案生成函数(使用训练好的模型进行问答推理)
def generate_answer(question, context):"""生成答案:根据问题和上下文,使用模型预测答案参数:question: 输入的问题文本context: 相关的上下文文本返回:模型预测的答案文本"""# 确保模型在正确的设备上,并设置为评估模式(关闭dropout等训练特有的层)model.to(device)model.eval()# 编码输入:将问题和上下文转换为模型输入格式(返回张量)inputs = tokenizer(question,  # 单个问题context,   # 单个上下文max_length=512,truncation="only_second",  # 仅截断上下文return_tensors="pt",  # 返回PyTorch张量padding="max_length"  # 填充到max_length).to(device)  # 移动到指定设备# 模型推理(关闭梯度计算,节省内存)with torch.no_grad():outputs = model(** inputs)  # 输入解包(input_ids、attention_mask等)# 从输出中获取预测的答案起始和结束token索引(取概率最大的位置)start_idx = torch.argmax(outputs.start_logits).item()  # 起始token索引end_idx = torch.argmax(outputs.end_logits).item()      # 结束token索引# 将token ID转换为原始token(用于拼接答案)tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])  # 取第一个样本(仅输入一个样本)answer_tokens = tokens[start_idx:end_idx + 1]  # 截取答案对应的token序列#答案对应的token序列# 处理无答案情况(若start和end都为0,对应[CLS] token,视为无答案)if start_idx == 0 and end_idx == 0:return "No answer found"  # 无答案时返回提示# 将token转换为文本,并清理特殊token(如[CLS]、[SEP])answer = tokenizer.convert_tokens_to_string(answer_tokens)# 移除可能残留的特殊token,并strip去空格return answer.replace("[CLS]", "").replace("[SEP]", "").strip()# 9. 交互式问答测试(方便用户手动输入问题和上下文进行测试)
if __name__ == "__main__":print("\n=== 问答测试 ===")# 示例问题(用于快速验证模型功能)test_questions = [{"question": "What is the capital of France?",  # 问题:法国的首都是什么?"context": "France is a country in Europe. Its capital is Paris, which is also a major cultural center."  # 上下文},{"question": "Who developed BERT?",  # 问题:谁开发了BERT?"context": "BERT is a language model developed by Google in 2018. It uses a Transformer architecture."  # 上下文}]# 运行示例问题测试for i, test in enumerate(test_questions):print(f"\n问题 {i + 1}: {test['question']}")print(f"答案: {generate_answer(test['question'], test['context'])}")# 交互式问答(允许用户自定义输入)while True:print("\n=== 交互式问答 ===")question = input("请输入问题 (输入q退出): ")if question.lower() == "q":  # 输入q则退出breakcontext = input("请输入相关上下文: ")if not context:  # 上下文不能为空print("上下文不能为空!")continue# 生成并打印答案answer = generate_answer(question, context)print(f"模型回答: {answer}")

8、中文版回答系统(显示上下文)

# 导入JSON模块用于处理JSON格式数据(数据集通常为JSON格式)
import json
# 导入os模块用于文件路径操作(创建目录、检查文件是否存在等)
import os
# 导入PyTorch深度学习框架,用于张量运算和模型训练
import torch
# 从torch.utils.data导入Dataset(自定义数据集基类)和DataLoader(数据加载器)
from torch.utils.data import Dataset, DataLoader
# 从transformers库导入中文问答任务所需组件:
# BertTokenizerFast:快速BERT分词器(支持offset_mapping,用于定位答案在原始文本中的位置)
# BertForQuestionAnswering:用于问答任务的BERT模型(输出start_logits和end_logits,预测答案起止位置)
# AdamW:适用于Transformer模型的优化器
# get_linear_schedule_with_warmup:学习率调度器(控制训练过程中学习率的变化)
from transformers import (BertTokenizerFast,BertForQuestionAnswering,AdamW,get_linear_schedule_with_warmup
)
# 导入tqdm用于显示训练进度条
from tqdm import tqdm# 设置计算设备:优先使用GPU(cuda)加速训练/推理,若无则使用CPU
# 说明:GPU的并行计算能力可大幅缩短训练时间,尤其适合BERT等大型模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")# 自定义数据集类:将预处理后的编码数据转换为PyTorch可识别的数据集格式
# 作用:统一数据接口,方便DataLoader按批次加载数据
class QADataset(Dataset):def __init__(self, encodings):# encodings:包含input_ids、attention_mask、start_positions、end_positions的字典self.encodings = encodings# 按索引获取单个样本(PyTorch数据集必需实现的方法)def __getitem__(self, idx):# 将每个字段转换为PyTorch张量(模型输入需为张量格式)return {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}# 返回数据集总样本数(PyTorch数据集必需实现的方法)def __len__(self):# 以input_ids的长度为准(所有字段长度相同)return len(self.encodings.input_ids)# 加载并验证数据集:确保数据格式正确,为训练提供可靠输入
def load_existing_dataset(train_path='data/train_dataset.json', val_path='data/val_dataset.json'):print(f"加载数据集: {train_path} 和 {val_path}")# 内部函数:加载单个数据集并验证格式def validate_dataset(path):# 检查文件是否存在if not os.path.exists(path):raise FileNotFoundError(f"数据集文件不存在: {path}")# 加载JSON数据并解析with open(path, 'r', encoding='utf-8') as f:try:dataset = json.load(f)  # 加载为列表格式(每个元素是一个样本)except json.JSONDecodeError as e:raise ValueError(f"JSON解析错误: {path}, 错误: {e}")# 验证数据集是否为列表(JSON数组)if not isinstance(dataset, list):raise ValueError(f"数据集格式错误: {path} 应为JSON数组(样本列表)")# 验证数据集不为空if not dataset:raise ValueError(f"数据集为空: {path}(至少需要一个样本)")# 验证每个样本的结构是否符合要求required_fields = ['question', 'context', 'answers']  # 每个样本必须包含的字段for i, sample in enumerate(dataset):# 检查是否包含必要字段for field in required_fields:if field not in sample:raise ValueError(f"样本 {i} 缺少必要字段: {field}(必须包含问题、上下文、答案)")# 验证answers字段的结构(必须是字典)answers = sample['answers']if not isinstance(answers, dict):raise TypeError(f"样本 {i} 的answers字段应为字典,实际为: {type(answers).__name__}")# 检查answers是否包含text和answer_start子字段(抽取式问答必需)if 'text' not in answers or 'answer_start' not in answers:raise ValueError(f"样本 {i} 的answers字段缺少必要子字段(需包含text和answer_start)")# 验证text和answer_start是否为列表(允许空列表表示无答案)if not isinstance(answers['text'], list) or not isinstance(answers['answer_start'], list):raise TypeError(f"样本 {i} 的answers.text或answers.answer_start应为列表(存储答案文本和起始位置)")# 关键校验:无答案样本需text和answer_start同时为空,有答案则同时非空if (not answers['text'] and answers['answer_start']) or (answers['text'] and not answers['answer_start']):raise ValueError(f"样本 {i} 的answers.text和answer_start需同时为空(无答案)或同时非空(有答案)")# 对有答案的样本,进一步验证answer_start的类型和位置if answers['answer_start']:  # 仅处理有答案的样本# 验证answer_start中的元素是否为整数(位置必须是整数索引)if not all(isinstance(pos, int) for pos in answers['answer_start']):raise TypeError(f"样本 {i} 的answers.answer_start包含非整数类型(必须为字符起始位置)")# 验证答案文本与上下文的匹配性(确保答案确实在上下文中)context = sample['context']for j, (text, pos) in enumerate(zip(answers['text'], answers['answer_start'])):if not isinstance(text, str):raise TypeError(f"样本 {i} 的answers.text[{j}]不是字符串(答案必须是文本)")# 检查答案起始位置是否超出上下文范围if pos < 0 or pos >= len(context):raise ValueError(f"样本 {i} 的answers.answer_start[{j}]={pos}超出上下文长度(上下文长度为{len(context)})")# 检查答案文本是否与上下文对应位置的内容一致(防止标注错误)if context[pos:pos + len(text)] != text:print(f"警告: 样本 {i} 的答案文本与上下文不匹配")print(f"  上下文: {context}")print(f"  答案文本: '{text}', 起始位置: {pos}(上下文对应位置为'{context[pos:pos + len(text)]}')")print(f"已验证数据集: {path}, 样本数: {len(dataset)}")return dataset, len(dataset)  # 返回验证后的数据集和样本数# 验证训练集和验证集train_data, train_size = validate_dataset(train_path)val_data, val_size = validate_dataset(val_path)# 保存验证后的数据集(确保后续使用的是格式正确的数据)with open(train_path, 'w', encoding='utf-8') as f:json.dump(train_data, f, ensure_ascii=False, indent=2)with open(val_path, 'w', encoding='utf-8') as f:json.dump(val_data, f, ensure_ascii=False, indent=2)print(f"训练集规模: {train_size}, 验证集规模: {val_size}")return train_path, val_path  # 返回验证后的数据集路径# 数据预处理:将原始文本转换为BERT可接受的输入格式(核心步骤)
def prepare_data(dataset_path, tokenizer, max_length=512):# 加载数据集with open(dataset_path, 'r', encoding='utf-8') as f:dataset = json.load(f)# 打印第一个样本的结构(方便调试和理解数据格式)if dataset:print(f"\n样本结构示例:")for key, value in dataset[0].items():if key == 'answers':print(f"  {key}: {type(value).__name__}({list(value.keys())})")  # 显示answers的子字段else:print(f"  {key}: {type(value).__name__}")  # 显示问题和上下文的类型# 过滤无效样本(保留格式正确的样本用于训练)valid_samples = []for i, item in enumerate(dataset):try:# 基本字段类型检查(问题和上下文必须是字符串)if not isinstance(item['question'], str) or not isinstance(item['context'], str):raise TypeError(f"问题或上下文不是字符串类型(必须为文本)")ans = item['answers']if not isinstance(ans, dict):raise TypeError(f"answers字段不是字典类型(格式错误)")# 验证答案字段的格式(必须是列表)if not isinstance(ans.get('text', []), list) or not isinstance(ans.get('answer_start', []), list):raise ValueError(f"answers.text或answer_start格式错误(必须为列表)")# 处理无答案样本(text和answer_start均为空)if not ans['text'] and not ans['answer_start']:valid_samples.append({'question': item['question'],'context': item['context'],'answers': {'text': [], 'answer_start': []}  # 保留无答案样本})continue# 有答案样本必须同时包含text和answer_start(排除格式矛盾的样本)if not ans['text'] or not ans['answer_start']:raise ValueError(f"answers.text和answer_start需同时为空或非空(格式矛盾)")# 取第一个答案(通常抽取式问答每个问题对应一个答案)answer_text = ans['text'][0]answer_start = ans['answer_start'][0]# 确保answer_start是整数(若为字符串则尝试转换)if not isinstance(answer_start, int):try:answer_start = int(answer_start)except:raise TypeError(f"answer_start不是整数(无法转换为位置索引)")# 验证答案起始位置是否在上下文范围内context = item['context']if answer_start < 0 or answer_start >= len(context):raise ValueError(f"answer_start超出上下文范围(上下文长度为{len(context)})")# 修正答案文本与上下文的匹配性(若标注错误,以上下文实际内容为准)actual_text = context[answer_start:answer_start + len(answer_text)]if actual_text != answer_text:print(f"警告: 样本 {i} 的答案文本不匹配")print(f"  预期: '{answer_text}', 实际: '{actual_text}'")answer_text = actual_text  # 修正为上下文实际内容# 将验证通过的样本加入有效列表valid_samples.append({'question': item['question'],'context': context,'answers': {'text': [answer_text],'answer_start': [answer_start]}})except Exception as e:print(f"跳过无效样本 {i}: {e}")  # 打印错误信息并跳过无效样本print(f"有效样本数: {len(valid_samples)}/{len(dataset)}")  # 统计有效样本比例# 提取问题、上下文、答案列表(用于批量处理)questions = [item['question'] for item in valid_samples]contexts = [item['context'] for item in valid_samples]answers = [item['answers'] for item in valid_samples]# 用BERT分词器编码文本:将问题和上下文转换为模型可识别的token ID# 关键参数说明:# - truncation="only_first": 仅截断上下文(保留问题完整,因问题更关键)# - return_offsets_mapping: 返回token与原始文本字符的映射关系(用于定位答案)# - return_token_type_ids: 返回token类型(0表示问题,1表示上下文,BERT需要区分)encodings = tokenizer(contexts,  # 第一个参数:上下文列表questions,  # 第二个参数:问题列表(BERT要求上下文在前,问题在后)truncation="only_first",padding='max_length',  # 填充至max_length(512)max_length=max_length,  # BERT-base模型最大支持512个tokenreturn_tensors='pt',  # 返回PyTorch张量return_offsets_mapping=True,  # 核心:token与原始字符的映射(抽取答案必需)return_token_type_ids=True)# 存储每个样本的答案在token序列中的起始和结束位置(模型训练的标签)start_positions = []end_positions = []for i in range(len(answers)):answer = answers[i]# 无答案样本:将start和end位置设为0(对应[CLS] token,BERT中常用0表示无答案)if not answer['text'] or not answer['answer_start']:start_positions.append(0)end_positions.append(0)continue# 有答案样本:计算答案在token序列中的位置start_char = answer['answer_start'][0]  # 答案在上下文的起始字符索引end_char = start_char + len(answer['text'][0]) - 1  # 答案在上下文的结束字符索引(减1避免超出)# 定位上下文在token序列中的范围(通过token_type_ids区分问题和上下文)sequence_ids = encodings.sequence_ids(i)  # 第i个样本的token类型(0=问题,1=上下文,None=特殊token)context_start = sequence_ids.index(1)  # 上下文的第一个token索引context_end = len(sequence_ids) - 1 - sequence_ids[::-1].index(1)  # 上下文的最后一个token索引# 查找答案对应的token起始和结束位置start_token = None  # 答案起始token索引end_token = None    # 答案结束token索引offsets = encodings.offset_mapping[i]  # 第i个样本的token-字符映射(每个token对应(起始字符, 结束字符))# 寻找答案起始token:找到第一个包含start_char的tokenfor idx in range(context_start, context_end + 1):token_start, token_end = offsets[idx]  # 当前token的字符范围if token_start <= start_char < token_end:  # start_char在当前token范围内start_token = idxbreak# 寻找答案结束token:找到最后一个包含end_char的tokenfor idx in range(context_end, context_start - 1, -1):token_start, token_end = offsets[idx]  # 当前token的字符范围if token_start <= end_char < token_end:  # end_char在当前token范围内end_token = idxbreak# 兜底处理:若未找到答案位置(如tokenization导致偏移),视为无答案if start_token is None or end_token is None:print(f"警告:样本 {i} 的答案无法映射到tokens,设为无答案")start_positions.append(0)end_positions.append(0)else:start_positions.append(start_token)end_positions.append(end_token)# 移除offset_mapping(模型不需要该字段,仅预处理时用)encodings.pop('offset_mapping')# 将start_positions和end_positions加入编码结果(作为训练标签)encodings.update({'start_positions': start_positions,  # 答案起始token索引'end_positions': end_positions      # 答案结束token索引})# 返回自定义数据集对象(封装编码后的数据)return QADataset(encodings)# 模型训练函数:定义训练流程,优化模型参数
def train_model(model, tokenizer, train_loader, val_loader, optimizer, scheduler, epochs, model_save_path):# 将模型设为训练模式(启用dropout等训练特有的层)model.train()# 创建模型保存目录(若不存在)os.makedirs(model_save_path, exist_ok=True)# 遍历训练轮次for epoch in range(epochs):total_loss = 0  # 累计训练损失# 用tqdm显示训练进度progress_bar = tqdm(enumerate(train_loader), total=len(train_loader))# 遍历每个批次的训练数据for step, batch in progress_bar:# 将批次数据移至指定设备(GPU/CPU)input_ids = batch['input_ids'].to(device)  # token ID序列attention_mask = batch['attention_mask'].to(device)  # 注意力掩码(区分有效token和填充)start_positions = batch['start_positions'].to(device)  # 答案起始位置标签end_positions = batch['end_positions'].to(device)      # 答案结束位置标签# 获取token_type_ids(区分问题和上下文,BERT需要)token_type_ids = batch.get('token_type_ids', None)if token_type_ids is not None:token_type_ids = token_type_ids.to(device)# 前向传播:将输入传入模型,得到输出outputs = model(input_ids=input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids,start_positions=start_positions,  # 传入标签用于计算损失end_positions=end_positions)# 提取损失值(BertForQuestionAnswering会自动计算start和end位置的联合损失)loss = outputs.losstotal_loss += loss.item()  # 累计损失(转换为Python数值)# 反向传播:计算梯度optimizer.zero_grad()  # 清空之前的梯度(避免累积)loss.backward()        # 计算当前损失对参数的梯度optimizer.step()       # 根据梯度更新参数scheduler.step()       # 调整学习率(按预设策略)# 更新进度条显示当前轮次和损失progress_bar.set_description(f'Epoch {epoch + 1}/{epochs}, Loss: {loss.item():.4f}')# 计算本轮平均训练损失avg_train_loss = total_loss / len(train_loader)# 在验证集上评估模型性能(计算验证损失)val_loss = evaluate_model(model, val_loader)# 打印本轮训练和验证损失(用于判断模型是否过拟合)print(f'Epoch {epoch + 1}, Train Loss: {avg_train_loss:.4f}, Val Loss: {val_loss:.4f}')# 训练完成后保存模型和分词器(模型权重、配置、分词器词汇表)model.save_pretrained(model_save_path)tokenizer.save_pretrained(model_save_path)print(f"模型已保存至: {model_save_path}")# 模型评估函数:在验证集上计算损失,评估模型泛化能力
def evaluate_model(model, val_loader):# 将模型设为评估模式(关闭dropout等层,确保结果稳定)model.eval()total_loss = 0  # 累计验证损失# 关闭梯度计算(评估时无需更新参数,节省内存)with torch.no_grad():# 遍历验证集批次for batch in val_loader:# 数据移至设备input_ids = batch['input_ids'].to(device)attention_mask = batch['attention_mask'].to(device)start_positions = batch['start_positions'].to(device)end_positions = batch['end_positions'].to(device)token_type_ids = batch.get('token_type_ids', None)if token_type_ids is not None:token_type_ids = token_type_ids.to(device)# 前向传播(仅计算输出,不更新参数)outputs = model(input_ids=input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids,start_positions=start_positions,end_positions=end_positions)# 累计验证损失total_loss += outputs.loss.item()# 返回平均验证损失(越低表示模型在未见过的数据上表现越好)return total_loss / len(val_loader)# 答案预测函数:根据问题和上下文,用训练好的模型生成答案
def predict_answer(question, context, model, tokenizer, confidence_threshold=0.1, debug=True):# 将模型设为评估模式model.eval()# 用分词器编码输入(问题和上下文)inputs = tokenizer(context,  # 上下文在前question,  # 问题在后(符合BERT输入格式)truncation=True,  # 超过最大长度则截断max_length=512,return_offsets_mapping=True,  # 保留token与字符的映射,用于提取答案return_tensors='pt'  # 返回PyTorch张量)# 提取offset_mapping并转换为numpy数组(用于后续映射)offset_mapping = inputs.pop('offset_mapping').numpy()[0]  # 形状:(token数, 2)# 将输入移至模型所在设备inputs = {k: v.to(device) for k, v in inputs.items()}# 关闭梯度计算,进行推理with torch.no_grad():outputs = model(** inputs)  # 传入所有输入字段# 提取模型输出的logits(未归一化的概率)start_logits = outputs.start_logits[0].cpu().numpy()  # 每个token作为答案起始的分数end_logits = outputs.end_logits[0].cpu().numpy()      # 每个token作为答案结束的分数# 找到分数最高的起始和结束token索引start_idx = start_logits.argmax()end_idx = end_logits.argmax()# 调试信息:打印中间结果(帮助分析预测错误原因)if debug:print(f"\n===== 预测调试信息 =====")print(f"问题: {question}")print(f"上下文: {context}")# 打印预测的token索引和对应分数(分数越高表示模型越有把握)# print(f"start_idx={start_idx}, end_idx={end_idx}")# print(f"start_logits[start_idx]={start_logits[start_idx]:.4f}, end_logits[end_idx]={end_logits[end_idx]:.4f}")# 将token ID转换为文字(查看模型关注的token)tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'][0].cpu().numpy())# print(f"预测的起始token: {tokens[start_idx]}")# print(f"预测的结束token: {tokens[end_idx]}")# print(f"start_idx的offset: {offset_mapping[start_idx]}")# print(f"end_idx的offset: {offset_mapping[end_idx]}")# 过滤低置信度或无效的答案(无答案逻辑)if (start_logits[start_idx] < confidence_threshold or  # 起始位置分数过低end_logits[end_idx] < confidence_threshold or  # 结束位置分数过低start_idx > end_idx or  # 起始位置在结束位置之后(无效)(start_idx == 0 and end_idx == 0)):  # 起始和结束均为0(对应[CLS],视为无答案)if debug:print(f"答案过滤: 置信度不足或无答案")return "未找到答案"# 边界扩展:适当延长答案结束位置(若下一个token的分数较高)max_end_idx = len(end_logits) - 1while end_idx < max_end_idx and end_logits[end_idx + 1] > end_logits[end_idx] * 0.8:end_idx += 1if debug:print(f"扩展end_idx至: {end_idx}, end_logits={end_logits[end_idx]:.4f}")# 根据offset_mapping将token位置映射回原始文本的字符位置start_char = offset_mapping[start_idx][0]  # 答案起始字符索引end_char = offset_mapping[end_idx][1]      # 答案结束字符索引(取token的结束位置)# 调试:打印提取的字符范围和文本if debug:# print(f"提取字符范围: [{start_char}, {end_char})")print(f"提取的答案文本: '{context[start_char:end_char]}'")# 从上下文提取答案文本answer = context[start_char:end_char]return answer if answer else "未找到答案"# 主函数:整合数据加载、模型训练和交互式问答流程
def main():try:# 加载并验证训练集和验证集train_path, val_path = load_existing_dataset()# 加载中文BERT分词器和模型(专为中文优化的抽取式问答模型)# BertTokenizerFast:快速分词器,支持offset_mapping(抽取式任务必需)# BertForQuestionAnswering:在BERT基础上添加了start/end logits输出层,用于预测答案位置tokenizer = BertTokenizerFast.from_pretrained('bert-base-chinese')model = BertForQuestionAnswering.from_pretrained('bert-base-chinese').to(device)# 准备训练数据(转换为模型输入格式)print("\n准备训练数据...")train_dataset = prepare_data(train_path, tokenizer)# 准备验证数据print("\n准备验证数据...")val_dataset = prepare_data(val_path, tokenizer)# 创建数据加载器(按批次加载数据,支持打乱和并行加载)train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)  # 训练集打乱顺序val_loader = DataLoader(val_dataset, batch_size=8)  # 验证集无需打乱# 配置优化器和学习率调度器optimizer = AdamW(model.parameters(), lr=1e-5)  # 学习率1e-5(BERT微调常用)epochs = 20  # 训练轮次(根据数据量调整,20轮适合中小规模数据集)total_steps = len(train_loader) * epochs  # 总训练步数# 线性学习率调度器:从初始学习率线性衰减到0(避免后期波动)scheduler = get_linear_schedule_with_warmup(optimizer, 0, total_steps)# 开始训练模型print("\n开始训练...")train_model(model, tokenizer, train_loader, val_loader, optimizer, scheduler, epochs, './chinese_qa_model')# 在验证集上评估最终模型性能val_loss = evaluate_model(model, val_loader)print(f"\n评估结果: eval_loss={val_loss:.4f}(损失越低越好)")# 交互式问答测试(允许用户输入问题和上下文,查看模型预测结果)print("\n=== 交互式问答 ===")print("提示: 输入q退出,输入d切换调试模式,输入r重新加载模型")debug_mode = True  # 默认开启调试模式(显示中间过程)while True:question = input("\n问题 (输入q退出, d切换调试模式, r重新加载模型): ").strip()# 处理用户指令if question.lower() == 'q':break  # 退出程序if question.lower() == 'd':debug_mode = not debug_mode  # 切换调试模式print(f"调试模式已切换为: {'开启' if debug_mode else '关闭'}")continueif question.lower() == 'r':print("重新加载模型...")  # 重新加载训练好的模型model = BertForQuestionAnswering.from_pretrained('./chinese_qa_model').to(device)print("模型已重新加载")continue# 获取用户输入的上下文context = input("上下文: ").strip()if not context:print("上下文不能为空!")continue# 调用预测函数生成答案answer = predict_answer(question, context, model, tokenizer, debug=debug_mode)print(f"\n答案: {answer}")except Exception as e:print(f"\n错误: {e}")import tracebacktraceback.print_exc()  # 打印完整错误堆栈,方便调试# 程序入口:当脚本直接运行时执行main函数
if __name__ == "__main__":main()

9、中文版回答系统(隐藏上下文)

模型和分词器

# 导入JSON处理模块(用于加载和解析数据集)
import json
# 导入OS模块(用于文件路径操作和目录创建)
import os
# 导入PyTorch深度学习框架
import torch
# 导入数据集和数据加载器(用于批量处理训练数据)
from torch.utils.data import Dataset, DataLoader
# 导入transformers库中的自动加载器和模型组件
# AutoTokenizer:自动加载与模型匹配的分词器
# AutoModelForSeq2SeqLM:自动加载适合序列到序列任务的语言模型(如T5)
# AdamW:优化器(带权重衰减的Adam)
# get_linear_schedule_with_warmup:学习率调度器
from transformers import (AutoTokenizer,AutoModelForSeq2SeqLM,AdamW,get_linear_schedule_with_warmup
)
# 导入进度条库(用于显示训练进度)
from tqdm import tqdm# 设置计算设备(优先使用GPU,否则使用CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")# 自定义数据集类(处理问答对,生成模型可接受的输入格式)
class QADataset(Dataset):def __init__(self, data, tokenizer, max_length=128):# 原始问答数据self.data = data# 分词器(将文本转换为模型可接受的token IDs)self.tokenizer = tokenizer# 最大序列长度(超出部分将被截断)self.max_length = max_lengthdef __len__(self):# 返回数据集大小return len(self.data)def __getitem__(self, idx):# 获取单个样本item = self.data[idx]# 提取问题文本question = item['question']# 提取答案文本(取第一个答案,若无答案则为空字符串)answer = item['answers']['text'][0] if item['answers']['text'] else ""# 编码问题(添加"question: "前缀,明确任务类型)# 注:T5模型通过前缀提示任务类型(如"translate English to German: ...")inputs = self.tokenizer(f"question: {question}",  # 添加任务前缀max_length=self.max_length,truncation=True,  # 截断超出长度的文本padding='max_length',  # 填充至最大长度return_tensors='pt'  # 返回PyTorch张量)# 编码答案(作为模型的目标输出)labels = self.tokenizer(answer,max_length=self.max_length,truncation=True,padding='max_length',return_tensors='pt')# 将padding位置的标签设为-100(PyTorch交叉熵损失会忽略-100位置)# 这样在计算损失时会忽略填充部分,只关注有效tokenlabels = labels['input_ids'].squeeze()labels[labels == self.tokenizer.pad_token_id] = -100return {'input_ids': inputs['input_ids'].squeeze(),  # 问题的token IDs'attention_mask': inputs['attention_mask'].squeeze(),  # 注意力掩码(标识有效token)'labels': labels  # 答案的token IDs(padding位置为-100)}# 加载并过滤数据集(仅保留有答案的样本)
def load_data(train_path='data/train_dataset.json', val_path='data/val_dataset.json'):def load_and_filter(path):# 读取JSON格式的数据集with open(path, 'r', encoding='utf-8') as f:data = json.load(f)# 过滤无答案的样本(生成式模型需要明确的答案)filtered = [item for item in data if item['answers']['text']]print(f"加载{path},有效样本数: {len(filtered)}/{len(data)}")return filtered# 加载训练集和验证集train_data = load_and_filter(train_path)val_data = load_and_filter(val_path)return train_data, val_data# 模型训练函数
def train_model(model, tokenizer, train_loader, val_loader, optimizer, scheduler, epochs, save_path):# 设置模型为训练模式(启用dropout等训练特有的层)model.train()# 创建保存模型的目录os.makedirs(save_path, exist_ok=True)# 训练主循环for epoch in range(epochs):total_loss = 0  # 累计训练损失# 使用tqdm显示训练进度progress_bar = tqdm(enumerate(train_loader), total=len(train_loader))for step, batch in progress_bar:# 将数据移至指定设备(GPU/CPU)input_ids = batch['input_ids'].to(device)attention_mask = batch['attention_mask'].to(device)labels = batch['labels'].to(device)# 前向传播(计算模型输出和损失)outputs = model(input_ids=input_ids,attention_mask=attention_mask,labels=labels  # 传入目标答案,模型会自动计算生成损失)loss = outputs.losstotal_loss += loss.item()  # 累计损失值# 反向传播(计算梯度并更新模型参数)optimizer.zero_grad()  # 清空梯度loss.backward()  # 计算梯度optimizer.step()  # 更新参数scheduler.step()  # 更新学习率# 更新进度条显示当前损失progress_bar.set_description(f'Epoch {epoch + 1}/{epochs}, Loss: {loss.item():.4f}')# 计算平均训练损失avg_train_loss = total_loss / len(train_loader)# 在验证集上评估模型val_loss = evaluate_model(model, val_loader)print(f'Epoch {epoch + 1}, 训练损失: {avg_train_loss:.4f}, 验证损失: {val_loss:.4f}')# 保存训练好的模型和分词器model.save_pretrained(save_path)tokenizer.save_pretrained(save_path)print(f"模型已保存至: {save_path}")# 模型评估函数(计算验证集上的平均损失)
def evaluate_model(model, val_loader):# 设置模型为评估模式(关闭dropout等)model.eval()total_loss = 0with torch.no_grad():  # 不计算梯度(节省内存和计算资源)for batch in val_loader:# 将数据移至指定设备input_ids = batch['input_ids'].to(device)attention_mask = batch['attention_mask'].to(device)labels = batch['labels'].to(device)# 前向传播计算损失outputs = model(input_ids=input_ids,attention_mask=attention_mask,labels=labels)total_loss += outputs.loss.item()# 返回平均验证损失return total_loss / len(val_loader)# 答案预测函数(给定问题,生成答案)
def predict_answer(question, model, tokenizer, max_length=128, temperature=0.7):# 设置模型为评估模式model.eval()# 准备输入文本(添加任务前缀)input_text = f"question: {question}"# 编码输入文本inputs = tokenizer(input_text,max_length=max_length,truncation=True,padding='max_length',return_tensors='pt').to(device)# 生成答案(使用beam search生成更连贯的文本)with torch.no_grad():outputs = model.generate(**inputs,  # 传入编码后的输入max_length=max_length,  # 最大生成长度num_beams=5,  # beam search的宽度(生成质量和速度的权衡)temperature=temperature,  # 控制生成的随机性(值越小越确定性)no_repeat_ngram_size=2,  # 避免生成重复的2-gram短语early_stopping=True  # 当所有beam都生成结束标记时停止)# 解码生成的答案(将token IDs转换为文本)answer = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()return answer if answer else "无法生成答案"# 主函数(程序入口点)
def main():# 加载预训练的中文T5模型和分词器# Langboat/mengzi-t5-base:专为中文优化的T5模型,适合生成任务model_name = "Langboat/mengzi-t5-base"tokenizer = AutoTokenizer.from_pretrained(model_name)model = AutoModelForSeq2SeqLM.from_pretrained(model_name).to(device)# 加载训练数据和验证数据train_data, val_data = load_data(train_path='data/train_dataset.json',val_path='data/val_dataset.json')# 创建数据集和数据加载器train_dataset = QADataset(train_data, tokenizer)val_dataset = QADataset(val_data, tokenizer)train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)  # 训练集打乱顺序val_loader = DataLoader(val_dataset, batch_size=8)  # 验证集不打乱# 配置优化器和学习率调度器optimizer = AdamW(model.parameters(), lr=5e-5)  # 学习率设置为5e-5epochs = 15  # 训练15个轮次total_steps = len(train_loader) * epochs  # 计算总训练步数scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)# 开始训练模型print("开始训练生成式问答模型...")train_model(model, tokenizer, train_loader, val_loader, optimizer, scheduler, epochs, './chinese_qa_generator')# 交互式问答测试(无需提供上下文,模型直接生成答案)print("\n=== 交互式问答(无需上下文) ===")print("提示: 输入问题(输入q退出)")while True:question = input("\n问题: ").strip()if question.lower() == 'q':break  # 退出程序# 调用预测函数生成答案answer = predict_answer(question, model, tokenizer)print(f"答案: {answer}")# 程序入口点(当脚本直接运行时执行main函数)
if __name__ == "__main__":main()

10、中英文版回答系统(隐藏上下文)(需要调试)

"""
多语言生成式问答模型(修复数据集合并与索引错误)
"""
import torch
import os
import random
import json
from datasets import load_dataset, Dataset, concatenate_datasets  # 新增合并工具
from transformers import (AutoModelForSeq2SeqLM,AutoTokenizer,TrainingArguments,Trainer
)device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"使用设备: {device}")# 1. 数据加载函数(修复索引错误)
def load_qa_data(json_path, sample_ratio=0.01, is_chinese=False):"""加载问答数据,修复:- 中文样本的list index out of range错误- 数据集合并方法"""try:if is_chinese:# 中文数据:手动加载扁平JSONwith open(json_path, 'r', encoding='utf-8') as f:raw_data = json.load(f)if not isinstance(raw_data, list):print(f"中文数据错误:{json_path} 必须是JSON列表")exit(1)dataset = Dataset.from_list(raw_data)else:# 英文数据:SQuAD格式dataset = load_dataset("json", data_files=json_path, field="data")["train"]except Exception as e:print(f"加载数据失败: {e}")exit(1)# 提取问题和答案(修复索引错误)samples = []for idx, item in enumerate(dataset):try:if is_chinese:  # 中文数据处理(核心修复)question = item.get("question", "").strip()answers = item.get("answers", {})  # 默认为字典,避免列表索引错误# 修复:先判断answers类型,再提取textif isinstance(answers, list):# 处理列表格式:answers = [{"text": "..."}]if len(answers) == 0:print(f"中文样本{idx}错误:answers为空列表")continueanswer_text = answers[0].get("text", "").strip()elif isinstance(answers, dict):# 处理字典格式:answers = {"text": ["..."]}text_list = answers.get("text", [])if not isinstance(text_list, list) or len(text_list) == 0:print(f"中文样本{idx}错误:answers.text为空列表")continueanswer_text = text_list[0].strip()else:print(f"中文样本{idx}错误:answers格式错误(非列表/字典)")continue# 验证有效样本if question and answer_text:samples.append({"question": question, "answer": answer_text})else:print(f"中文样本{idx}无效:问题或答案为空")else:  # 英文数据处理for para in item.get("paragraphs", []):for qa in para.get("qas", []):question = qa.get("question", "").strip()answers = qa.get("answers", [{"text": ""}])if len(answers) == 0:continueanswer_text = answers[0]["text"].strip()if question and answer_text:samples.append({"question": question, "answer": answer_text})except Exception as e:print(f"样本{idx}处理错误: {e},已跳过")continue# 抽样逻辑(安全处理)if len(samples) == 0:print(f"错误:{json_path} 未提取到有效样本")exit(1)sample_size = min(int(len(samples) * sample_ratio), len(samples))sample_size = max(1, sample_size)sampled = samples if len(samples) <= sample_size else random.sample(samples, sample_size)print(f"{ '中文' if is_chinese else '英文' }数据:原始{len(samples)}条,抽样后{len(sampled)}条")# 打印示例if sampled:print(f"样本示例 - 问题: {sampled[0]['question'][:50]}...")print(f"样本示例 - 答案: {sampled[0]['answer'][:50]}...")return Dataset.from_list(sampled)# 2. 预处理函数(保持不变)
def preprocess_data(examples, tokenizer, max_length=128):inputs = tokenizer(examples["question"],max_length=max_length,truncation=True,padding="max_length",return_tensors="pt")labels = tokenizer(examples["answer"],max_length=max_length,truncation=True,padding="max_length",return_tensors="pt")["input_ids"]labels[labels == tokenizer.pad_token_id] = -100return {"input_ids": inputs["input_ids"].squeeze(),"attention_mask": inputs["attention_mask"].squeeze(),"labels": labels.squeeze()}# 3. 训练函数(保持不变)
def train_model(model, tokenizer, train_data, val_data, output_dir):print("预处理训练数据...")tokenized_train = train_data.map(lambda x: preprocess_data(x, tokenizer),batched=True,remove_columns=train_data.column_names,desc="预处理训练集")print("预处理验证数据...")tokenized_val = val_data.map(lambda x: preprocess_data(x, tokenizer),batched=True,remove_columns=val_data.column_names,desc="预处理验证集")training_args = TrainingArguments(output_dir=output_dir,evaluation_strategy="epoch",learning_rate=3e-5,per_device_train_batch_size=4,per_device_eval_batch_size=4,num_train_epochs=3,weight_decay=0.01,logging_steps=5,save_strategy="epoch",fp16=device == "cuda",load_best_model_at_end=True)trainer = Trainer(model=model,args=training_args,train_dataset=tokenized_train,eval_dataset=tokenized_val,tokenizer=tokenizer)print("开始训练模型...")trainer.train()trainer.save_model(output_dir)print(f"模型已保存至: {output_dir}")return model# 4. 预测函数(保持不变)
def predict_answer(question, model, tokenizer):model.eval()input_text = f"answer the question: {question}"inputs = tokenizer(input_text,max_length=128,truncation=True,return_tensors="pt").to(device)with torch.no_grad():outputs = model.generate(**inputs,max_length=128,num_beams=3,temperature=0.7,no_repeat_ngram_size=2,early_stopping=True)return tokenizer.decode(outputs[0].cpu(), skip_special_tokens=True).strip() or "无法生成答案"# 主函数(修复数据集合并)
def main():model_config = {"model_name": "google/mt5-base","train_data": {"english": "train-v2.0.json","chinese": "train_dataset.json"},"val_data": {"english": "dev-v2.0.json","chinese": "val_dataset.json"},"output_dir": "./multilingual_qa_model"}print(f"加载模型: {model_config['model_name']}")try:tokenizer = AutoTokenizer.from_pretrained(model_config["model_name"])model = AutoModelForSeq2SeqLM.from_pretrained(model_config["model_name"]).to(device)except Exception as e:print(f"加载模型失败: {e}")return# 加载英文数据print("\n加载英文训练数据...")english_train = load_qa_data(model_config["train_data"]["english"], sample_ratio=0.05, is_chinese=False)print("加载英文验证数据...")english_val = load_qa_data(model_config["val_data"]["english"], sample_ratio=0.05, is_chinese=False)# 加载中文数据print("\n加载中文训练数据...")chinese_train = load_qa_data(model_config["train_data"]["chinese"], sample_ratio=1.0, is_chinese=True)print("加载中文验证数据...")chinese_val = load_qa_data(model_config["val_data"]["chinese"], sample_ratio=1.0, is_chinese=True)# 修复:使用concatenate_datasets合并数据集(兼容所有版本)print("\n合并中英文数据集...")train_data = concatenate_datasets([english_train, chinese_train])val_data = concatenate_datasets([english_val, chinese_val])print(f"最终训练样本数: {len(train_data)},验证样本数: {len(val_data)}")# 训练模型model = train_model(model, tokenizer, train_data, val_data, model_config["output_dir"])# 交互式测试print("\n=== 双语问答测试 ===")while True:question = input("\n请输入问题 (输入q退出): ").strip()if question.lower() == "q":breakprint(f"答案: {predict_answer(question, model, tokenizer)}")if __name__ == "__main__":main()

http://www.dtcms.com/a/303934.html

相关文章:

  • “非参数化”大语言模型与RAG的关系?
  • 云原生MySQL Operator开发实战(五):扩展与生态系统集成
  • python使用ffmpeg录制rtmp/m3u8推流视频并按ctrl+c实现优雅退出
  • DateTime::ToString 日期时间文本格式化深度解析(C++)
  • Mysql InnoDB存储引擎
  • 2.快速开始
  • Windows下基于 SenseVoice模型的本地语音转文字工具
  • 【Linux我做主】探秘进程状态
  • 聚铭安全管家平台2.0实战解码 | 安服篇(三):配置保障 自动核查
  • 从单机架构到分布式:Redis为何成为架构升级的关键一环?
  • OpenLayers 综合案例-底图换肤(变色)
  • DevOps 详解
  • Linux -- 文件【中】
  • CVE-2022-46169漏洞复现
  • DNS污染与劫持
  • 《林景媚与命运协议》
  • 服务器数据恢复—RAID上层部署的oracle数据库数据恢复案例
  • logtrick 按位或最大的最小子数组长度
  • JavaWeb(苍穹外卖)--学习笔记15(分页查询PageHelper)
  • Unity_UI_NGUI_DrawCall
  • Mac安装Navicat步骤Navicat Premium for Mac v17.1.9【亲测】
  • 【腾讯云】EdgeOne网站安全防护的配置方法 防范盗刷流量 附恶意IP和UA黑名单
  • 学习网址备份(二)
  • Linux 启动流程、密码破解、引导修复完全手册
  • 【智能协同云图库】智能协同云图库第八弹:基于阿里云百炼大模型—实现 AI 扩图功能
  • haproxy应用详解
  • 创建型设计模式-工厂方法模式和抽象工厂方法模式
  • 云端文档管理新纪元:Paperless-ngx与cpolar打造的无边界文件生态
  • Ext JS极速项目之 Coworkee
  • 在WSL中配置VS Code C++开发环境完整教程