【NLP 34、实践 ⑧ 基于faq知识库和文本匹配算法进行意图识别】
目录
一、demo1_similarity_function.py
二、demo2_bm25.py
三、基于faq知识库和文本匹配算法的意图识别
1.初始化
2.加载BM25模型
3.加载Word2Vec模型
4.文本向量化
5.加载知识库
6.查询方法
7.模型测试
正是江南好时节,落花时节又逢君
—— 25.3.7
一、demo1_similarity_function.py
编辑距离和jaccard距离原理与具体实现请看博主:【NLP 31、文本匹配任务 —— 传统机器学习算法】_文本匹配任务概述-CSDN博客
import json
import numpy as np
import jieba
'''
包含编辑距离和jaccard距离的实现
'''
#编辑距离
def editing_distance(string1, string2):
    matrix = np.zeros((len(string1) + 1, len(string2) + 1))
    for i in range(len(string1) + 1):
        matrix[i][0] = i
    for j in range(len(string2) + 1):
        matrix[0][j] = j
    for i in range(1, len(string1) + 1):
        for j in range(1, len(string2) + 1):
            if string1[i - 1] == string2[j - 1]:
                d = 0
            else:
                d = 1
            matrix[i][j] = min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1, matrix[i - 1][j - 1] + d)
    edit_distance = matrix[len(string1)][len(string2)]
    return 1 - edit_distance / max(len(string1), len(string2))
#jaccard距离
def jaccard_distance(string1, string2):
    words1 = set(string1)
    words2 = set(string2)
    distance = len(words1 & words2) / len(words1 | words2)
    return distance
if __name__ == "__main__":
    a = "abcd"
    b = "acde"
    print(editing_distance(a, b))
    print(jaccard_distance(a, b)) 
二、demo2_bm25.py
bm25算法原理与具体实现请看博主:【NLP 31、文本匹配任务 —— 传统机器学习算法】_文本匹配任务概述-CSDN博客
import json
import math
import os
import pickle
import sys
from typing import Dict, List
class BM25:
    EPSILON = 0.25
    PARAM_K1 = 1.5  # BM25算法中超参数
    PARAM_B = 0.6  # BM25算法中超参数
    def __init__(self, corpus: Dict):
        """
            初始化BM25模型
            :param corpus: 文档集, 文档集合应该是字典形式,key为文档的唯一标识,val对应其文本内容,文本内容需要分词成列表
        """
        self.corpus_size = 0  # 文档数量
        self.wordNumsOfAllDoc = 0  # 用于计算文档集合中平均每篇文档的词数 -> wordNumsOfAllDoc / corpus_size
        self.doc_freqs = {}  # 记录每篇文档中查询词的词频
        self.idf = {}  # 记录查询词的 IDF
        self.doc_len = {}  # 记录每篇文档的单词数
        self.docContainedWord = {}  # 包含单词 word 的文档集合
        self._initialize(corpus)
    def _initialize(self, corpus: Dict):
        """
            根据语料库构建倒排索引
        """
        # nd = {} # word -> number of documents containing the word
        for index, document in corpus.items():
            self.corpus_size += 1
            self.doc_len[index] = len(document)  # 文档的单词数
            self.wordNumsOfAllDoc += len(document)
            frequencies = {}  # 一篇文档中单词出现的频率
            for word in document:
                if word not in frequencies:
                    frequencies[word] = 0
                frequencies[word] += 1
            self.doc_freqs[index] = frequencies
            # 构建词到文档的倒排索引,将包含单词的和文档和包含关系进行反向映射
            for word in frequencies.keys():
                if word not in self.docContainedWord:
                    self.docContainedWord[word] = set()
                self.docContainedWord[word].add(index)
        # 计算 idf
        idf_sum = 0  # collect idf sum to calculate an average idf for epsilon value
        negative_idfs = []
        for word in self.docContainedWord.keys():
            doc_nums_contained_word = len(self.docContainedWord[word])
            idf = math.log(self.corpus_size - doc_nums_contained_word +
                           0.5) - math.log(doc_nums_contained_word + 0.5)
            self.idf[word] = idf
            idf_sum += idf
            if idf < 0:
                negative_idfs.append(word)
        average_idf = float(idf_sum) / len(self.idf)
        eps = BM25.EPSILON * average_idf
        for word in negative_idfs:
            self.idf[word] = eps
    @property
    def avgdl(self):
        return float(self.wordNumsOfAllDoc) / self.corpus_size
    def get_score(self, query: List, doc_index):
        """
        计算查询 q 和文档 d 的相关性分数
        :param query: 查询词列表
        :param doc_index: 为语料库中某篇文档对应的索引
        """
        k1 = BM25.PARAM_K1
        b = BM25.PARAM_B
        score = 0
        doc_freqs = self.doc_freqs[doc_index]
        for word in query:
            if word not in doc_freqs:
                continue
            score += self.idf[word] * doc_freqs[word] * (k1 + 1) / (
                    doc_freqs[word] + k1 * (1 - b + b * self.doc_len[doc_index] / self.avgdl))
        return [doc_index, score]
    def get_scores(self, query):
        scores = [self.get_score(query, index) for index in self.doc_len.keys()]
        return scores 
三、基于faq知识库和文本匹配算法的意图识别
1.初始化
初始化问答系统,加载知识库并根据选择的算法进行模型初始化。
know_base_path:知识库文件路径。
algo:选择的算法(如 "bm25"、"word2vec" 等)。
def __init__(self, know_base_path, algo):
    self.load_know_base(know_base_path)
    self.algo = algo
    if algo == "bm25":
        self.load_bm25()
    elif algo == "word2vec":
        self.load_word2vec()
    else:
        #其余的算法不需要做事先计算
        pass 
2.加载BM25模型
初始化 BM25 模型,将知识库中的问题分词并构建语料库
items():返回字典中所有键值对的视图(dict_items 对象),可以用于遍历字典的键值对。
jieba.lcut():将字符串分词并返回一个列表。
| 参数 | 描述 | 
|---|---|
s | 需要分词的字符串。 | 
cut_all | 是否使用全模式分词,默认为 False。 | 
HMM | 是否使用 HMM 模型,默认为 True。 | 
    def load_bm25(self):
        self.corpus = {}
        for target, questions in self.target_to_questions.items():
            self.corpus[target] = []
            for question in questions:
                self.corpus[target] += jieba.lcut(question)
        self.bm25_model = BM25(self.corpus) 
3.加载Word2Vec模型
加载或训练 Word2Vec 模型,并将知识库中的问题向量化。
- 如果已有训练好的模型(
 model.w2v),则直接加载。- 否则,使用知识库中的问题训练 Word2Vec 模型并保存。
 - 将知识库中的问题转换为向量。
 
os.path.isfile():检查指定路径是否为文件,返回布尔值。
| 参数 | 描述 | 
|---|---|
path | 文件路径(字符串)。 | 
Wodr2Vec.load():加载训练好的 Word2Vec 模型。
| 参数 | 描述 | 
|---|---|
fname | 模型文件路径(字符串)。 | 
字典.values():返回字典中所有值的视图(dict_values 对象),可以用于遍历字典的值。
jieba.lcut():将字符串分词并返回一个列表。
| 参数 | 描述 | 
|---|---|
s | 需要分词的字符串。 | 
cut_all | 是否使用全模式分词,默认为 False。 | 
HMM | 是否使用 HMM 模型,默认为 True。 | 
model.save():将模型保存到指定文件。
| 参数 | 描述 | 
|---|---|
fname | 保存模型的文件路径(字符串)。 | 
items():返回字典中所有键值对的视图(dict_items 对象),可以用于遍历字典的键值对。
列表.append():将元素添加到列表的末尾。
| 参数 | 描述 | 
|---|---|
item | 要添加到列表末尾的元素。 | 
np.array():将输入数据转换为 NumPy 数组。
| 参数 | 描述 | 
|---|---|
object | 输入数据(如列表、元组等)。 | 
dtype | 数组元素的数据类型(可选)。 | 
copy | 是否复制数据,默认为 True。 | 
order | 数组的内存布局(可选,如 'C' 或 'F')。 | 
    #词向量的训练
    def load_word2vec(self):
        #词向量的训练需要一定时间,如果之前训练过,我们就直接读取训练好的模型
        #注意如果数据集更换了,应当重新训练
        #当然,也可以收集一份大量的通用的语料,训练一个通用词向量模型。一般少量数据来训练效果不会太理想
        if os.path.isfile("model.w2v"):
            self.w2v_model = Word2Vec.load("model.w2v")
        else:
            #训练语料的准备,把所有问题分词后连在一起
            corpus = []
            for questions in self.target_to_questions.values():
                for question in questions:
                    corpus.append(jieba.lcut(question))
            #调用第三方库训练模型
            self.w2v_model = Word2Vec(corpus, vector_size=100, min_count=1)
            #保存模型
            self.w2v_model.save("model.w2v")
        #借助词向量模型,将知识库中的问题向量化
        self.target_to_vectors = {}
        for target, questions in self.target_to_questions.items():
            vectors = []
            for question in questions:
                vectors.append(self.sentence_to_vec(question))
            self.target_to_vectors[target] = np.array(vectors) 
4.文本向量化
将句子转换为向量,通过对句子中所有词的向量求平均并进行归一化。
np.zeros():创建一个全零数组。
| 参数 | 描述 | 
|---|---|
shape | 数组的形状(如整数或元组)。 | 
dtype | 数组元素的数据类型,默认为 float。 | 
order | 数组的内存布局(可选,如 'C' 或 'F')。 | 
jieba.lcut():将字符串分词并返回一个列表。
| 参数 | 描述 | 
|---|---|
s | 需要分词的字符串。 | 
cut_all | 是否使用全模式分词,默认为 False。 | 
HMM | 是否使用 HMM 模型,默认为 True。 | 
np.array():将输入数据转换为 NumPy 数组。
| 参数 | 描述 | 
|---|---|
object | 输入数据(如列表、元组等)。 | 
dtype | 数组元素的数据类型(可选)。 | 
copy | 是否复制数据,默认为 True。 | 
order | 数组的内存布局(可选,如 'C' 或 'F')。 | 
np.sqrt():计算输入数组或数值的平方根。
| 参数 | 描述 | 
|---|---|
x | 输入数组或数值。 | 
np.sum():计算数组元素的和。
| 参数 | 描述 | 
|---|---|
a | 输入数组。 | 
axis | 沿指定轴求和(可选)。 | 
dtype | 返回值的类型(可选)。 | 
np.square():计算输入数组或数值的平方。
| 参数 | 描述 | 
|---|---|
x | 输入数组或数值。 | 
def sentence_to_vec(self, sentence):
    vector = np.zeros(self.w2v_model.vector_size)
    words = jieba.lcut(sentence)
    count = 0
    for word in words:
        if word in self.w2v_model.wv:
            count += 1
            vector += self.w2v_model.wv[word]
    vector = np.array(vector) / count
    vector = vector / np.sqrt(np.sum(np.square(vector)))
    return vector 
5.加载知识库
从知识库文件中加载问题与目标(答案)的映射关系。
open():打开文件并返回文件对象。
| 参数 | 描述 | 
|---|---|
file | 文件路径(字符串)。 | 
mode | 文件打开模式(如 'r'、'w' 等)。 | 
encoding | 文件编码(可选)。 | 
enumerate():返回枚举对象,生成索引和元素的元组。
| 参数 | 描述 | 
|---|---|
iterable | 可迭代对象(如列表、元组等)。 | 
start | 索引的起始值,默认为 0。 | 
json.loads():将 JSON 字符串解析为 Python 对象。
| 参数 | 描述 | 
|---|---|
s | JSON 格式的字符串 | 
    def load_know_base(self, know_base_path):
        self.target_to_questions = {}
        with open(know_base_path, encoding="utf8") as f:
            for index, line in enumerate(f):
                content = json.loads(line)
                questions = content["questions"]
                target = content["target"]
                self.target_to_questions[target] = questions
        return 
6.查询方法
根据用户查询和选择的算法,计算与知识库中问题的匹配度,并返回最匹配的前三个结果。
items():返回字典中所有键值对的视图(dict_items 对象),可以用于遍历字典的键值对。
列表推导式:
| 语法元素 | 描述 | 示例 | 
|---|---|---|
| 表达式 | 对 item 的操作或表达式,生成新列表中的元素。 | x**2 表示计算 x 的平方。 | 
| 变量 | 从 iterable 中遍历的每一个元素。 | x 表示从 range(1, 6) 中遍历的每一个数字。 | 
| 可迭代对象 | 提供数据的可迭代对象(如列表、元组、字符串等)。 | range(1, 6) 生成数字 1 到 5。 | 
| 条件(可选) | 对 item 进行筛选的条件,只有满足条件的 item 才会被处理。 | if x % 2 == 0 表示只选择偶数。 | 
| 基本语法 | [expression for item in iterable if condition] | [x**2 for x in range(1, 6)] 生成 [1, 4, 9, 16, 25]。 | 
| 带条件筛选 | 在基本语法中加入 if 条件,用于筛选数据。 | [x**2 for x in range(1, 6) if x % 2 == 0] 生成 [4, 16]。 | 
| 嵌套列表推导式 | 使用多个 for 循环生成复杂列表。 | [[x * y for y in range(1, 4)] for x in range(1, 4)] 生成 3x3 矩阵。 | 
| 处理字符串 | 对字符串列表中的每个元素进行操作。 | [word.upper() for word in ['hello', 'world']] 生成 ['HELLO', 'WORLD']。 | 
| 注意事项 | 1. 避免过度复杂化,保持代码可读性。 2. 大数据量时考虑使用生成器表达式。  | - | 
max():返回可迭代对象中的最大值。
| 参数 | 描述 | 
|---|---|
iterable | 可迭代对象(如列表、元组等)。 | 
key | 用于比较的函数(可选)。 | 
列表.append():将元素添加到列表的末尾。
| 参数 | 描述 | 
|---|---|
item | 要添加到列表末尾的元素。 | 
dot():计算两个数组或矩阵的点积。
| 参数 | 描述 | 
|---|---|
a | 输入数组或矩阵。 | 
b | 输入数组或矩阵。 | 
transpose():返回数组或矩阵的转置。
| 参数 | 描述 | 
|---|---|
a | 输入数组或矩阵。 | 
axes | 转置的轴顺序(可选)。 | 
np.mean():计算数组元素的均值。
| 参数 | 描述 | 
|---|---|
a | 输入数组。 | 
axis | 沿指定轴计算均值(可选)。 | 
dtype | 返回值的类型(可选)。 | 
sorted():返回排序后的列表
| 参数 | 描述 | 
|---|---|
iterable | 可迭代对象(如列表、元组等)。 | 
key | 用于排序的函数(可选)。 | 
reverse | 是否逆序排序,默认为 False。 | 
assert:用于调试,检查条件是否为 True,否则抛出异常
| 参数 | 描述 | 
|---|---|
condition | 需要检查的条件。 | 
message | 条件为 False 时输出的错误信息(可选)。 | 
    def query(self, user_query):
        results = []
        if self.algo == "editing_distance":
            for target, questions in self.target_to_questions.items():
                scores = [editing_distance(question, user_query) for question in questions]
                score = max(scores)
                results.append([target, score])
        elif self.algo == "jaccard_distance":
            for target, questions in self.target_to_questions.items():
                scores = [jaccard_distance(question, user_query) for question in questions]
                score = max(scores)
                results.append([target, score])
        elif self.algo == "bm25":
            words = jieba.lcut(user_query)
            results = self.bm25_model.get_scores(words)
        elif self.algo == "word2vec":
            query_vector = self.sentence_to_vec(user_query)
            for target, vectors in self.target_to_vectors.items():
                cos = query_vector.dot(vectors.transpose())
                # print(cos)
                results.append([target, np.mean(cos)])
        else:
            assert "unknown algorithm!!"
        sort_results = sorted(results, key=lambda x:x[1], reverse=True)
        return sort_results[:3] 
7.模型测试
初始化问答系统,加载知识库并使用 BM25 算法进行查询
import os
import json
import jieba
import numpy as np
from demo2_bm25 import BM25
from demo1_similarity_function import editing_distance, jaccard_distance
from gensim.models import Word2Vec
'''
基于faq知识库和文本匹配算法进行意图识别,完成单轮问答
'''
class QASystem:
    def __init__(self, know_base_path, algo):
        '''
        :param know_base_path: 知识库文件路径
        :param algo: 选择不同的算法
        '''
        self.load_know_base(know_base_path)
        self.algo = algo
        if algo == "bm25":
            self.load_bm25()
        elif algo == "word2vec":
            self.load_word2vec()
        else:
            #其余的算法不需要做事先计算
            pass
    def load_bm25(self):
        self.corpus = {}
        for target, questions in self.target_to_questions.items():
            self.corpus[target] = []
            for question in questions:
                self.corpus[target] += jieba.lcut(question)
        self.bm25_model = BM25(self.corpus)
    #词向量的训练
    def load_word2vec(self):
        #词向量的训练需要一定时间,如果之前训练过,我们就直接读取训练好的模型
        #注意如果数据集更换了,应当重新训练
        #当然,也可以收集一份大量的通用的语料,训练一个通用词向量模型。一般少量数据来训练效果不会太理想
        if os.path.isfile("model.w2v"):
            self.w2v_model = Word2Vec.load("model.w2v")
        else:
            #训练语料的准备,把所有问题分词后连在一起
            corpus = []
            for questions in self.target_to_questions.values():
                for question in questions:
                    corpus.append(jieba.lcut(question))
            #调用第三方库训练模型
            self.w2v_model = Word2Vec(corpus, vector_size=100, min_count=1)
            #保存模型
            self.w2v_model.save("model.w2v")
        #借助词向量模型,将知识库中的问题向量化
        self.target_to_vectors = {}
        for target, questions in self.target_to_questions.items():
            vectors = []
            for question in questions:
                vectors.append(self.sentence_to_vec(question))
            self.target_to_vectors[target] = np.array(vectors)
    # 将文本向量化
    def sentence_to_vec(self, sentence):
        vector = np.zeros(self.w2v_model.vector_size)
        words = jieba.lcut(sentence)
        # 所有词的向量相加求平均,作为句子向量
        count = 0
        for word in words:
            if word in self.w2v_model.wv:
                count += 1
                vector += self.w2v_model.wv[word]
        vector = np.array(vector) / count
        #文本向量做l2归一化,方便计算cos距离
        vector = vector / np.sqrt(np.sum(np.square(vector)))
        return vector
    def load_know_base(self, know_base_path):
        self.target_to_questions = {}
        with open(know_base_path, encoding="utf8") as f:
            for index, line in enumerate(f):
                content = json.loads(line)
                questions = content["questions"]
                target = content["target"]
                self.target_to_questions[target] = questions
        return
    def query(self, user_query):
        results = []
        if self.algo == "editing_distance":
            for target, questions in self.target_to_questions.items():
                scores = [editing_distance(question, user_query) for question in questions]
                score = max(scores)
                results.append([target, score])
        elif self.algo == "jaccard_distance":
            for target, questions in self.target_to_questions.items():
                scores = [jaccard_distance(question, user_query) for question in questions]
                score = max(scores)
                results.append([target, score])
        elif self.algo == "bm25":
            words = jieba.lcut(user_query)
            results = self.bm25_model.get_scores(words)
        elif self.algo == "word2vec":
            query_vector = self.sentence_to_vec(user_query)
            for target, vectors in self.target_to_vectors.items():
                cos = query_vector.dot(vectors.transpose())
                # print(cos)
                results.append([target, np.mean(cos)])
        else:
            assert "unknown algorithm!!"
        sort_results = sorted(results, key=lambda x:x[1], reverse=True)
        return sort_results[:3]
if __name__ == '__main__':
    qas = QASystem("data/train.json", "bm25")
    question = "话费是否包月超了"
    res = qas.query(question)
    print(question)
    print(res)
    #
    # while True:
    #     question = input("请输入问题:")
    #     res = qas.query(question)
    #     print("命中问题:", res)
    #     print("-----------")
 

