【NLP 34、实践 ⑧ 基于faq知识库和文本匹配算法进行意图识别】
目录
一、demo1_similarity_function.py
二、demo2_bm25.py
三、基于faq知识库和文本匹配算法的意图识别
1.初始化
2.加载BM25模型
3.加载Word2Vec模型
4.文本向量化
5.加载知识库
6.查询方法
7.模型测试
正是江南好时节,落花时节又逢君
—— 25.3.7
一、demo1_similarity_function.py
编辑距离和jaccard距离原理与具体实现请看博主:【NLP 31、文本匹配任务 —— 传统机器学习算法】_文本匹配任务概述-CSDN博客
import json
import numpy as np
import jieba
'''
包含编辑距离和jaccard距离的实现
'''
#编辑距离
def editing_distance(string1, string2):
matrix = np.zeros((len(string1) + 1, len(string2) + 1))
for i in range(len(string1) + 1):
matrix[i][0] = i
for j in range(len(string2) + 1):
matrix[0][j] = j
for i in range(1, len(string1) + 1):
for j in range(1, len(string2) + 1):
if string1[i - 1] == string2[j - 1]:
d = 0
else:
d = 1
matrix[i][j] = min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1, matrix[i - 1][j - 1] + d)
edit_distance = matrix[len(string1)][len(string2)]
return 1 - edit_distance / max(len(string1), len(string2))
#jaccard距离
def jaccard_distance(string1, string2):
words1 = set(string1)
words2 = set(string2)
distance = len(words1 & words2) / len(words1 | words2)
return distance
if __name__ == "__main__":
a = "abcd"
b = "acde"
print(editing_distance(a, b))
print(jaccard_distance(a, b))
二、demo2_bm25.py
bm25算法原理与具体实现请看博主:【NLP 31、文本匹配任务 —— 传统机器学习算法】_文本匹配任务概述-CSDN博客
import json
import math
import os
import pickle
import sys
from typing import Dict, List
class BM25:
EPSILON = 0.25
PARAM_K1 = 1.5 # BM25算法中超参数
PARAM_B = 0.6 # BM25算法中超参数
def __init__(self, corpus: Dict):
"""
初始化BM25模型
:param corpus: 文档集, 文档集合应该是字典形式,key为文档的唯一标识,val对应其文本内容,文本内容需要分词成列表
"""
self.corpus_size = 0 # 文档数量
self.wordNumsOfAllDoc = 0 # 用于计算文档集合中平均每篇文档的词数 -> wordNumsOfAllDoc / corpus_size
self.doc_freqs = {} # 记录每篇文档中查询词的词频
self.idf = {} # 记录查询词的 IDF
self.doc_len = {} # 记录每篇文档的单词数
self.docContainedWord = {} # 包含单词 word 的文档集合
self._initialize(corpus)
def _initialize(self, corpus: Dict):
"""
根据语料库构建倒排索引
"""
# nd = {} # word -> number of documents containing the word
for index, document in corpus.items():
self.corpus_size += 1
self.doc_len[index] = len(document) # 文档的单词数
self.wordNumsOfAllDoc += len(document)
frequencies = {} # 一篇文档中单词出现的频率
for word in document:
if word not in frequencies:
frequencies[word] = 0
frequencies[word] += 1
self.doc_freqs[index] = frequencies
# 构建词到文档的倒排索引,将包含单词的和文档和包含关系进行反向映射
for word in frequencies.keys():
if word not in self.docContainedWord:
self.docContainedWord[word] = set()
self.docContainedWord[word].add(index)
# 计算 idf
idf_sum = 0 # collect idf sum to calculate an average idf for epsilon value
negative_idfs = []
for word in self.docContainedWord.keys():
doc_nums_contained_word = len(self.docContainedWord[word])
idf = math.log(self.corpus_size - doc_nums_contained_word +
0.5) - math.log(doc_nums_contained_word + 0.5)
self.idf[word] = idf
idf_sum += idf
if idf < 0:
negative_idfs.append(word)
average_idf = float(idf_sum) / len(self.idf)
eps = BM25.EPSILON * average_idf
for word in negative_idfs:
self.idf[word] = eps
@property
def avgdl(self):
return float(self.wordNumsOfAllDoc) / self.corpus_size
def get_score(self, query: List, doc_index):
"""
计算查询 q 和文档 d 的相关性分数
:param query: 查询词列表
:param doc_index: 为语料库中某篇文档对应的索引
"""
k1 = BM25.PARAM_K1
b = BM25.PARAM_B
score = 0
doc_freqs = self.doc_freqs[doc_index]
for word in query:
if word not in doc_freqs:
continue
score += self.idf[word] * doc_freqs[word] * (k1 + 1) / (
doc_freqs[word] + k1 * (1 - b + b * self.doc_len[doc_index] / self.avgdl))
return [doc_index, score]
def get_scores(self, query):
scores = [self.get_score(query, index) for index in self.doc_len.keys()]
return scores
三、基于faq知识库和文本匹配算法的意图识别
1.初始化
初始化问答系统,加载知识库并根据选择的算法进行模型初始化。
know_base_path
:知识库文件路径。
algo
:选择的算法(如 "bm25"
、"word2vec"
等)。
def __init__(self, know_base_path, algo):
self.load_know_base(know_base_path)
self.algo = algo
if algo == "bm25":
self.load_bm25()
elif algo == "word2vec":
self.load_word2vec()
else:
#其余的算法不需要做事先计算
pass
2.加载BM25模型
初始化 BM25 模型,将知识库中的问题分词并构建语料库
items():返回字典中所有键值对的视图(dict_items
对象),可以用于遍历字典的键值对。
jieba.lcut():将字符串分词并返回一个列表。
参数 | 描述 |
---|---|
s | 需要分词的字符串。 |
cut_all | 是否使用全模式分词,默认为 False 。 |
HMM | 是否使用 HMM 模型,默认为 True 。 |
def load_bm25(self):
self.corpus = {}
for target, questions in self.target_to_questions.items():
self.corpus[target] = []
for question in questions:
self.corpus[target] += jieba.lcut(question)
self.bm25_model = BM25(self.corpus)
3.加载Word2Vec模型
加载或训练 Word2Vec 模型,并将知识库中的问题向量化。
- 如果已有训练好的模型(
model.w2v
),则直接加载。- 否则,使用知识库中的问题训练 Word2Vec 模型并保存。
- 将知识库中的问题转换为向量。
os.path.isfile():检查指定路径是否为文件,返回布尔值。
参数 | 描述 |
---|---|
path | 文件路径(字符串)。 |
Wodr2Vec.load():加载训练好的 Word2Vec 模型。
参数 | 描述 |
---|---|
fname | 模型文件路径(字符串)。 |
字典.values():返回字典中所有值的视图(dict_values
对象),可以用于遍历字典的值。
jieba.lcut():将字符串分词并返回一个列表。
参数 | 描述 |
---|---|
s | 需要分词的字符串。 |
cut_all | 是否使用全模式分词,默认为 False 。 |
HMM | 是否使用 HMM 模型,默认为 True 。 |
model.save():将模型保存到指定文件。
参数 | 描述 |
---|---|
fname | 保存模型的文件路径(字符串)。 |
items():返回字典中所有键值对的视图(dict_items
对象),可以用于遍历字典的键值对。
列表.append():将元素添加到列表的末尾。
参数 | 描述 |
---|---|
item | 要添加到列表末尾的元素。 |
np.array():将输入数据转换为 NumPy 数组。
参数 | 描述 |
---|---|
object | 输入数据(如列表、元组等)。 |
dtype | 数组元素的数据类型(可选)。 |
copy | 是否复制数据,默认为 True 。 |
order | 数组的内存布局(可选,如 'C' 或 'F' )。 |
#词向量的训练
def load_word2vec(self):
#词向量的训练需要一定时间,如果之前训练过,我们就直接读取训练好的模型
#注意如果数据集更换了,应当重新训练
#当然,也可以收集一份大量的通用的语料,训练一个通用词向量模型。一般少量数据来训练效果不会太理想
if os.path.isfile("model.w2v"):
self.w2v_model = Word2Vec.load("model.w2v")
else:
#训练语料的准备,把所有问题分词后连在一起
corpus = []
for questions in self.target_to_questions.values():
for question in questions:
corpus.append(jieba.lcut(question))
#调用第三方库训练模型
self.w2v_model = Word2Vec(corpus, vector_size=100, min_count=1)
#保存模型
self.w2v_model.save("model.w2v")
#借助词向量模型,将知识库中的问题向量化
self.target_to_vectors = {}
for target, questions in self.target_to_questions.items():
vectors = []
for question in questions:
vectors.append(self.sentence_to_vec(question))
self.target_to_vectors[target] = np.array(vectors)
4.文本向量化
将句子转换为向量,通过对句子中所有词的向量求平均并进行归一化。
np.zeros():创建一个全零数组。
参数 | 描述 |
---|---|
shape | 数组的形状(如整数或元组)。 |
dtype | 数组元素的数据类型,默认为 float 。 |
order | 数组的内存布局(可选,如 'C' 或 'F' )。 |
jieba.lcut():将字符串分词并返回一个列表。
参数 | 描述 |
---|---|
s | 需要分词的字符串。 |
cut_all | 是否使用全模式分词,默认为 False 。 |
HMM | 是否使用 HMM 模型,默认为 True 。 |
np.array():将输入数据转换为 NumPy 数组。
参数 | 描述 |
---|---|
object | 输入数据(如列表、元组等)。 |
dtype | 数组元素的数据类型(可选)。 |
copy | 是否复制数据,默认为 True 。 |
order | 数组的内存布局(可选,如 'C' 或 'F' )。 |
np.sqrt():计算输入数组或数值的平方根。
参数 | 描述 |
---|---|
x | 输入数组或数值。 |
np.sum():计算数组元素的和。
参数 | 描述 |
---|---|
a | 输入数组。 |
axis | 沿指定轴求和(可选)。 |
dtype | 返回值的类型(可选)。 |
np.square():计算输入数组或数值的平方。
参数 | 描述 |
---|---|
x | 输入数组或数值。 |
def sentence_to_vec(self, sentence):
vector = np.zeros(self.w2v_model.vector_size)
words = jieba.lcut(sentence)
count = 0
for word in words:
if word in self.w2v_model.wv:
count += 1
vector += self.w2v_model.wv[word]
vector = np.array(vector) / count
vector = vector / np.sqrt(np.sum(np.square(vector)))
return vector
5.加载知识库
从知识库文件中加载问题与目标(答案)的映射关系。
open():打开文件并返回文件对象。
参数 | 描述 |
---|---|
file | 文件路径(字符串)。 |
mode | 文件打开模式(如 'r' 、'w' 等)。 |
encoding | 文件编码(可选)。 |
enumerate():返回枚举对象,生成索引和元素的元组。
参数 | 描述 |
---|---|
iterable | 可迭代对象(如列表、元组等)。 |
start | 索引的起始值,默认为 0 。 |
json.loads():将 JSON 字符串解析为 Python 对象。
参数 | 描述 |
---|---|
s | JSON 格式的字符串 |
def load_know_base(self, know_base_path):
self.target_to_questions = {}
with open(know_base_path, encoding="utf8") as f:
for index, line in enumerate(f):
content = json.loads(line)
questions = content["questions"]
target = content["target"]
self.target_to_questions[target] = questions
return
6.查询方法
根据用户查询和选择的算法,计算与知识库中问题的匹配度,并返回最匹配的前三个结果。
items():返回字典中所有键值对的视图(dict_items
对象),可以用于遍历字典的键值对。
列表推导式:
语法元素 | 描述 | 示例 |
---|---|---|
表达式 | 对 item 的操作或表达式,生成新列表中的元素。 | x**2 表示计算 x 的平方。 |
变量 | 从 iterable 中遍历的每一个元素。 | x 表示从 range(1, 6) 中遍历的每一个数字。 |
可迭代对象 | 提供数据的可迭代对象(如列表、元组、字符串等)。 | range(1, 6) 生成数字 1 到 5。 |
条件(可选) | 对 item 进行筛选的条件,只有满足条件的 item 才会被处理。 | if x % 2 == 0 表示只选择偶数。 |
基本语法 | [expression for item in iterable if condition] | [x**2 for x in range(1, 6)] 生成 [1, 4, 9, 16, 25] 。 |
带条件筛选 | 在基本语法中加入 if 条件,用于筛选数据。 | [x**2 for x in range(1, 6) if x % 2 == 0] 生成 [4, 16] 。 |
嵌套列表推导式 | 使用多个 for 循环生成复杂列表。 | [[x * y for y in range(1, 4)] for x in range(1, 4)] 生成 3x3 矩阵。 |
处理字符串 | 对字符串列表中的每个元素进行操作。 | [word.upper() for word in ['hello', 'world']] 生成 ['HELLO', 'WORLD'] 。 |
注意事项 | 1. 避免过度复杂化,保持代码可读性。 2. 大数据量时考虑使用生成器表达式。 | - |
max():返回可迭代对象中的最大值。
参数 | 描述 |
---|---|
iterable | 可迭代对象(如列表、元组等)。 |
key | 用于比较的函数(可选)。 |
列表.append():将元素添加到列表的末尾。
参数 | 描述 |
---|---|
item | 要添加到列表末尾的元素。 |
dot():计算两个数组或矩阵的点积。
参数 | 描述 |
---|---|
a | 输入数组或矩阵。 |
b | 输入数组或矩阵。 |
transpose():返回数组或矩阵的转置。
参数 | 描述 |
---|---|
a | 输入数组或矩阵。 |
axes | 转置的轴顺序(可选)。 |
np.mean():计算数组元素的均值。
参数 | 描述 |
---|---|
a | 输入数组。 |
axis | 沿指定轴计算均值(可选)。 |
dtype | 返回值的类型(可选)。 |
sorted():返回排序后的列表
参数 | 描述 |
---|---|
iterable | 可迭代对象(如列表、元组等)。 |
key | 用于排序的函数(可选)。 |
reverse | 是否逆序排序,默认为 False 。 |
assert:用于调试,检查条件是否为 True
,否则抛出异常
参数 | 描述 |
---|---|
condition | 需要检查的条件。 |
message | 条件为 False 时输出的错误信息(可选)。 |
def query(self, user_query):
results = []
if self.algo == "editing_distance":
for target, questions in self.target_to_questions.items():
scores = [editing_distance(question, user_query) for question in questions]
score = max(scores)
results.append([target, score])
elif self.algo == "jaccard_distance":
for target, questions in self.target_to_questions.items():
scores = [jaccard_distance(question, user_query) for question in questions]
score = max(scores)
results.append([target, score])
elif self.algo == "bm25":
words = jieba.lcut(user_query)
results = self.bm25_model.get_scores(words)
elif self.algo == "word2vec":
query_vector = self.sentence_to_vec(user_query)
for target, vectors in self.target_to_vectors.items():
cos = query_vector.dot(vectors.transpose())
# print(cos)
results.append([target, np.mean(cos)])
else:
assert "unknown algorithm!!"
sort_results = sorted(results, key=lambda x:x[1], reverse=True)
return sort_results[:3]
7.模型测试
初始化问答系统,加载知识库并使用 BM25 算法进行查询
import os
import json
import jieba
import numpy as np
from demo2_bm25 import BM25
from demo1_similarity_function import editing_distance, jaccard_distance
from gensim.models import Word2Vec
'''
基于faq知识库和文本匹配算法进行意图识别,完成单轮问答
'''
class QASystem:
def __init__(self, know_base_path, algo):
'''
:param know_base_path: 知识库文件路径
:param algo: 选择不同的算法
'''
self.load_know_base(know_base_path)
self.algo = algo
if algo == "bm25":
self.load_bm25()
elif algo == "word2vec":
self.load_word2vec()
else:
#其余的算法不需要做事先计算
pass
def load_bm25(self):
self.corpus = {}
for target, questions in self.target_to_questions.items():
self.corpus[target] = []
for question in questions:
self.corpus[target] += jieba.lcut(question)
self.bm25_model = BM25(self.corpus)
#词向量的训练
def load_word2vec(self):
#词向量的训练需要一定时间,如果之前训练过,我们就直接读取训练好的模型
#注意如果数据集更换了,应当重新训练
#当然,也可以收集一份大量的通用的语料,训练一个通用词向量模型。一般少量数据来训练效果不会太理想
if os.path.isfile("model.w2v"):
self.w2v_model = Word2Vec.load("model.w2v")
else:
#训练语料的准备,把所有问题分词后连在一起
corpus = []
for questions in self.target_to_questions.values():
for question in questions:
corpus.append(jieba.lcut(question))
#调用第三方库训练模型
self.w2v_model = Word2Vec(corpus, vector_size=100, min_count=1)
#保存模型
self.w2v_model.save("model.w2v")
#借助词向量模型,将知识库中的问题向量化
self.target_to_vectors = {}
for target, questions in self.target_to_questions.items():
vectors = []
for question in questions:
vectors.append(self.sentence_to_vec(question))
self.target_to_vectors[target] = np.array(vectors)
# 将文本向量化
def sentence_to_vec(self, sentence):
vector = np.zeros(self.w2v_model.vector_size)
words = jieba.lcut(sentence)
# 所有词的向量相加求平均,作为句子向量
count = 0
for word in words:
if word in self.w2v_model.wv:
count += 1
vector += self.w2v_model.wv[word]
vector = np.array(vector) / count
#文本向量做l2归一化,方便计算cos距离
vector = vector / np.sqrt(np.sum(np.square(vector)))
return vector
def load_know_base(self, know_base_path):
self.target_to_questions = {}
with open(know_base_path, encoding="utf8") as f:
for index, line in enumerate(f):
content = json.loads(line)
questions = content["questions"]
target = content["target"]
self.target_to_questions[target] = questions
return
def query(self, user_query):
results = []
if self.algo == "editing_distance":
for target, questions in self.target_to_questions.items():
scores = [editing_distance(question, user_query) for question in questions]
score = max(scores)
results.append([target, score])
elif self.algo == "jaccard_distance":
for target, questions in self.target_to_questions.items():
scores = [jaccard_distance(question, user_query) for question in questions]
score = max(scores)
results.append([target, score])
elif self.algo == "bm25":
words = jieba.lcut(user_query)
results = self.bm25_model.get_scores(words)
elif self.algo == "word2vec":
query_vector = self.sentence_to_vec(user_query)
for target, vectors in self.target_to_vectors.items():
cos = query_vector.dot(vectors.transpose())
# print(cos)
results.append([target, np.mean(cos)])
else:
assert "unknown algorithm!!"
sort_results = sorted(results, key=lambda x:x[1], reverse=True)
return sort_results[:3]
if __name__ == '__main__':
qas = QASystem("data/train.json", "bm25")
question = "话费是否包月超了"
res = qas.query(question)
print(question)
print(res)
#
# while True:
# question = input("请输入问题:")
# res = qas.query(question)
# print("命中问题:", res)
# print("-----------")