langchain学习笔记之基于RAG实现文档问答
langchain学习笔记之基于RAG实现文档问答
- 引言
- RAG基本介绍
- 准备工作
- 代码实现过程
- streamlit页面布局
- 构建检索器
- 基于检索器构建文档检索工具
- 提示模板
- Agent定义、streamlit其它组件、效果展示
- 附:完整代码
引言
本节将介绍使用 langchain \text{langchain} langchain基于 RAG \text{RAG} RAG的文档问答以及具体实现方法。
RAG基本介绍
大模型虽然可以对主题进行推理,但它们的知识仅限于过去时间的公开数据。若想要构建对实时特定信息进行推理的人工智能应用,我们需要:
- 预先准备实时的特定信息;
- 将上述信息插入到模型的提示词当中。
这个过程被称为检索增强生成 ( Retrieval-Augmented Generation,RAG ) (\text{Retrieval-Augmented Generation,RAG}) (Retrieval-Augmented Generation,RAG)。
RAG
\text{RAG}
RAG流程表示如下:
主要分为如下几个步骤:
- 用户输入:即当前
step
用户提出的prompt
信息。 - 知识文本切割:大模型的上下文
token
\text{token}
token输入量是有限的,若导入的文本信息过多,需要将这些信息进行分块操作,将信息划分成若干个
chunk
传递到大模型中; - 嵌入模型:需要将输入的文本信息转化成语义向量,需要处理成语义向量的部分主要有两个:
- 知识文本提供的信息所产生的
chunk
,这部分信息实际上是在用户输入prompt
之前,就已经将知识文本切分以及向量化,最终存储在向量数据库中。 - 当前
step
用户输出的prompt
信息;
- 知识文本提供的信息所产生的
- 向量数据库:将各
chunk
对应的语义向量存储到向量数据库中,通过计算用户输入信息的Embedding
与各chunk
对应的Embedding
进行相似度比对,例如计算向量之间的欧式距离,检索出与用户输入语义相似的chunk
,并将其从向量数据库中召回。 - LLM模型:将用户输入以及召回的
chunk
信息作为LLM model
的输入部分;与此同时,可以将存储在Memory Database
中的历史对话记录同样作为LLM model
的输入,并最终获取当前step
大模型的输出结果。
准备工作
基于上述步骤,需要准备一个知识文本。其中langchain_community.document_loaders
支持各式各样的格式的数据输入。例如:txt,markdown,pdf,csv
等等,这里仅使用txt
作为示例。对应内容表示如下:
# 公司制度.txt
员工每年有多少天年假?
员工每年享有15天带薪年假,具体天数根据工龄有所调整
病假如何申请?
员工需要提供医生证明,并通过人力资源部门的审批流程申请病假
法定节假日有哪些?
公司遵顼国家规定的法定节假日,包括春节、国庆节、中秋节等
公司提供哪些保险福利?
公司为员工提供五险一金,包括养老保险、医疗保险、失业保险、工伤保险、生育保险和住房公积金
是否有员工健康体检
公司每年为员工安排一次免费的健康体检
有哪些员工活动或俱乐部?
公司定期组织团建活动,并有多个兴趣俱乐部,如篮球、书法、摄影等
代码实现过程
streamlit页面布局
这里使用streamlit
实现页面的设计和布局,一个简单布局设置表示如下:
import streamlit as st
st.set_page_config(
page_title="文档问答",
layout="wide"
)
st.title("文档问答")
upload_files = st.sidebar.file_uploader(
label="上传txt文件",
type=["txt"],
accept_multiple_files=True
)
if not upload_files:
st.info("请上传txt文档..")
st.stop()
对应页面效果表示如下:
我们需要点击Browse files
上传预先准备好的文本知识txt
文档。
在上传完txt
文档后,创建一个清空聊天记录
按钮、一个简单的开场白,以及一个用户与大模型交互的对话框。点击清空聊天记录
按钮,会初始化消息记录:
if "messages" not in st.session_state or st.sidebar.button("清空聊天记录"):
st.session_state["messages"] = [
{
"role": "assistant",
"content": "您好,我是你的文档助手"
}
]
user_query = st.chat_input(
placeholder="请开始提问.."
)
最终效果展示如下,此时并没有进行对话,关于清空聊天记录
按钮在后续进行展示。
构建检索器
检索器retriever
是整个RAG
的核心部分之一,基于上传的txt
文档,相关操作展示如下:
- 预设一个临时路径,用于存储文档信息:
import tempfile
temp_dir = tempfile.TemporaryDirectory(
dir="D:\\")
- 使用
TextLoader
对上传文档进行加载,并最终存放到docs
列表中:
一次可以上传若干个文档,并非仅限于一个
import os
from langchain_community.document_loaders import TextLoader
docs = []
for file in upload_files_input:
temp_filepath = os.path.join(temp_dir.name, file.name)
with open(temp_filepath, "wb") as f:
f.write(file.getvalue())
loader = TextLoader(
temp_filepath,
encoding="utf-8"
)
docs.extend(loader.load())
- 文本分割:需要将文本知识分割成若干个
chunk
形式:
from langchain_text_splitters import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=100,
chunk_overlap=10
)
split = text_splitter.split_documents(docs)
# print("split_output: ", split)
以上述txt
文档为例,可以将对应的split
结果打印出来,观察它的格式:
split_output: [
Document(
metadata={
'source': 'D:\\tmpg25q318z\\公司制度.txt'
},
page_content='员工每年有多少天年假?\n员工每年享有15天带薪年假,具体天数根据工龄有所调整\n\n病假如何申请?\n员工需要提供医生证明,并通过人力资源部门的审批流程申请病假'
),
Document(
metadata={
'source': 'D:\\tmpg25q318z\\公司制度.txt'
},
page_content='法定节假日有哪些?\n公司遵顼国家规定的法定节假日,包括春节、国庆节、中秋节等\n\n公司提供哪些保险福利?\n公司为员工提供五险一金,包括养老保险、医疗保险、失业保险、工伤保险、生育保险和住房公积金'
),
Document(
metadata={
'source': 'D:\\tmpg25q318z\\公司制度.txt'
},
page_content='是否有员工健康体检\n公司每年为员工安排一次免费的健康体检\n\n有哪些员工活动或俱乐部?\n公司定期组织团建活动,并有多个兴趣俱乐部,如篮球、书法、摄影等'
)
]
将上述的txt
文档使用RecursiveCharacterTextSplitter
划分成了
3
3
3个chunk
。其中chunk_size
表示划分chunk
的文本长度;chunk_overlap
则表示相邻chunk
之间重合部分的长度;source
路径中的tmpg25q318z
是TemporaryDirectory
创建的临时路径;
由于示例txt
长度较小,因而使用较短的chunk_size
与chunk_overlap
,目的是为了能够分出若干个块。若输入的信息体量较大,可以根据实际情况自行调整chunk_size
与chunk_overlap
。
- 向量生成与向量数据库的构建:由于使用的是
Tongyi()
作为我们的LLM model
,这里选择DashScopeEmbedding
库作为文本转化为向量的方式;并使用DashVector
作为向量数据库。需要注意的是,在使用DashVector
时,需要配置相应的DASHVECTOR_API_KEY
和DASHVECTOR_ENDPOINT
:
# Embedding加载
from langchain_community.embeddings import DashScopeEmbeddings
# 向量数据库
from langchain_community.vectorstores import DashVector
embeddings = DashScopeEmbeddings(
model="text-embedding-v1"
)
vectordb = DashVector.from_documents(
split, embeddings
)
- 定义检索器:
DashScopeEmbeddings
和DashVector
创建结束后,将生成的chunk
转换成相应的Embedding
形式,并存放在向量数据库vectordb
中;最后定义一个检索器与vectordb
进行对接,用于检索与用户输入语义相近的chunk
信息:
retriever_out = vectordb.as_retriever()
至此,我们已经实现:将导入的txt
文档切分、向量化、向量存储、向量检索操作。该部分的完整代码如下:
@st.cache_resource(ttl="1h")
def configure_retriever(upload_files_input):
docs = []
# TemporaryDirectory会自行创建临时路径
temp_dir = tempfile.TemporaryDirectory(
dir="D:\\"
)
# 文档导入
for file in upload_files_input:
temp_filepath = os.path.join(temp_dir.name, file.name)
with open(temp_filepath, "wb") as f:
f.write(file.getvalue())
loader = TextLoader(
temp_filepath,
encoding="utf-8"
)
docs.extend(loader.load())
# 文档分割
# 参数根据文本长度、文本内容自行调整
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=100,
chunk_overlap=10
)
split = text_splitter.split_documents(docs)
print("split_output: ", split)
print("num split_output: ", len(split))
# 向量展示
embeddings = DashScopeEmbeddings(
model="text-embedding-v1"
)
vectordb = DashVector.from_documents(
split, embeddings
)
# 生成检索器
retriever_out = vectordb.as_retriever()
return retriever_out
# 配置检索器retriever
retriever = configure_retriever(upload_files_input=upload_files)
基于检索器构建文档检索工具
引入create_retriever_tool
方法对检索器retriever
进行封装,并创建一个用于文档检索的工具供agent
使用。同样可以创建多个tool
以供agent
执行检索逻辑时选择,并使用tools
列表进行存储:
from langchain.tools.retriever import create_retriever_tool
tool = create_retriever_tool(
retriever,
name="text_retriever",
description="基于检索用户提出的问题,并基于检索到的文档内容进行回复"
)
tools = [tool]
提示模板
该部分同样是RAG
执行的核心模块,我们需要一系列包含格式的提示词来引导agent
与大模型进行交互。具体示例如下:
instructions = """
您是一个设计用于查询魂荡来回答问题的代理;
您可以使用检索工具,并基于检索内容来回答问题;
您可以通过不查询文档就知道答案,但您仍然需要通过查询文档来获取答案;
如果您从文档中找不到任何信息用于回答问题,则只需返回“抱歉,这个问题我还不知道”作为答案。
"""
# 基础提示模板
base_prompt_template = """
{instructions}
TOOLS:
------
You have access to the following tools:
{tools}
To use a tool,please use the following format:
Thought: Do I need to use a tool? Yes
Action: the action to take,should be one of [{tool_names}]
Action Input: {input}
Observations: the result of the action
When you have a response to say to the Human,or if you do not need to use a tool,you MUST use the format:
Thought: Do I need to use a tool: No
Final Answer:[your response here]
Begin!
Previous conversation history:
{chat_history}
New input:{input}
{agent_scratchpad}
"""
print("base_prompt_template: ", base_prompt_template)
其中,一些变量名称是被固定下来的,和ReAct
相关,在后续博客中进行介绍。需要注意模板中的关键词:
agent_scratchpad
tools
tool_names
不可随意修改,否则会出现相应错误:
ValueError: Prompt missing required variables: {'agent_scratchpad', 'tools', 'tool_names'}
同理,一些格式也是被固定下来的,在设计提示模板过程中,我们需要满足这样的格式:
Thought: Do I need to use a tool? Yes
Action: the action to take,should be one of [{tool_names}]
Action Input: {input}
Observations: the result of the action
未按照格式书写可能出现如下错误:
valueError: An output parsing error occurred. In order to pass this error back to the agent and have it try again, pass `handle_parsing_errors=True` to the AgentExecutor. This is the error: Could not parse LLM output: xxx
由于各种被固定模式的信息,因而在书写提示模板时,最好使用英语书写,否则可能会出现类似错误:
这也可能是因为书写不够熟练,后续继续跟进.
Invalid Format: Missing 'Action:' after 'Thought:'
创建提示词模板:将上述提示词指令使用PromptTemplate
进行封装,并赋予agent
一个初始的指令模板:
from langchain_core.prompts import PromptTemplate
# 创建基础提示词模板
base_prompt = PromptTemplate.from_template(
template=base_prompt_template
)
# 创建部分填充的提示词模板
prompt = base_prompt.partial(
instructions=instructions
)
回顾上述指令模板中的instructions
:
instructions = """
您是一个设计用于查询魂荡来回答问题的代理;
您可以使用检索工具,并基于检索内容来回答问题;
您可以通过不查询文档就知道答案,但您仍然需要通过查询文档来获取答案;
如果您从文档中找不到任何信息用于回答问题,则只需返回“抱歉,这个问题我还不知道”作为答案。
"""
实际上,这种包含语义信息的指令相比于代码的确定性而言是抽象的、自由度较高的。该instructions
引导agent
将base_prompt_template
中的缺失信息进行补充。假设user
提出一个prompt
:
公司制度中病假如何申请?
agent
它的思考过程/逻辑执行过程如下:
观察并对比上述信息与base_prompt_template
中描述的信息:
Action
中的tool_names
被替换成了被定义的工具:text_retriever
;Action_input
中的input
被替换成了user
提出的prompt
Final Answer
也被替换成了agent
最终归纳的结果。
Agent定义、streamlit其它组件、效果展示
关于agent
的定义表示如下:
llm = Tongyi(
model_name="tongyi-7b-chinese",
temperature=0.5,
max_tokens=200)
agent = create_react_agent(
llm,
tools,
prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
memory=memory,
verbose=True,
handle_parsing_errors=True)
其中agent
是被prompt
,retriever_tools
,LLM model
共同引导的代理者;而agent_executor
可看作是将agent
封装在内的一个runnable_chain
,从而通过该chain
执行invoke
操作,从而产生相应的response
结果。
关于参数handle_parsing_errors
,在上面的提示模板中的报错也提到了这个参数。源码中关于它的描述如下:
handle_parsing_errors: Union[bool, str, Callable[[OutputParserException], str]] = (
False
)
"""How to handle errors raised by the agent's output parser.
Defaults to `False`, which raises the error.
If `true`, the error will be sent back to the LLM as an observation.
If a string, the string itself will be sent to the LLM as an observation.
If a callable function, the function will be called with the exception
as an argument, and the result of that function will be passed to the agent
as an observation.
"""
在agent
执行过程中若出现了parsing_error
,handle_parsing_errors
设置为True
则意味着:让agent
重新思考,并整理出结果。这种设置方案也存在一定风险:就像上面我们的指令模板出现了类似格式上的问题,可能导致:agent
会无限循环地思考下去,不会停止,也不会产生Final Answer
。
而将handle_parsing_errors
设置为string
是指:设置一种人性化的错误信息,报错时返回string
自身;设置成默认,即false
则返回系统错误信息。
最终步骤的执行过程如下:
if user_query:
st.session_state.messages.append(
{
"role": "user",
"content": user_query
}
)
st.chat_message("user").write(user_query)
with st.chat_message("assistant"):
st_cb = StreamlitCallbackHandler(st.container())
config = {
"callbacks": [st_cb]
}
response = agent_executor.invoke(
{
"input": user_query
}, config=config
)
st.session_state.messages.append(
{
"role": "assistant",
"content": response["output"]
}
)
st.write(response["output"])
这里需要注意的点是:response
部分中的
key
\text{key}
key:input
与提示词模板中的Action Input: {input}
保持一致。
剩余的其他组件中,存在一个StreamlitCallbackHandler
,该模块在源码中的描述表示如下:
Callback Handler that writes to a Streamlit app.
This CallbackHandler is geared towards
use with a LangChain Agent; it displays the Agent's LLM and tool-usage "thoughts"
inside a series of Streamlit expanders.
该模块在与langchain Agent
一起使用时,会记录agent
使用LLM model
和tools
时的想法。具体在streamlit
页面中的表现结果如下:
提出一个txt
文件中不存在的反例。例如:请简单介绍一下林徽因,对应的返回结果如下:
结合instruction
提到的要求,大模型能够精准地按照要求返回结果。只是由于AgentExcutor
中设置handle_parsing_errors=True
,导致其重复了4次后才返回到正确结果。
附:完整代码
import streamlit as st
import tempfile
import os
from langchain.memory import ConversationBufferMemory
from langchain_community.chat_message_histories import StreamlitChatMessageHistory
from langchain_community.document_loaders import TextLoader
# Embedding加载
from langchain_community.embeddings import DashScopeEmbeddings
# 向量数据库
from langchain_community.vectorstores import DashVector
from langchain_core.prompts import PromptTemplate
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 加载检索工具
from langchain.tools.retriever import create_retriever_tool
from langchain.agents import create_react_agent, AgentExecutor
# agent执行结果 需要 动态传输到 streamlit中
from langchain_community.callbacks.streamlit import StreamlitCallbackHandler
from langchain_community.llms import Tongyi
st.set_page_config(
page_title="文档问答",
layout="wide"
)
st.title("文档问答")
upload_files = st.sidebar.file_uploader(
label="上传txt文件",
type=["txt"],
accept_multiple_files=True
)
if not upload_files:
st.info("请上传txt文档..")
st.stop()
@st.cache_resource(ttl="1h")
def configure_retriever(upload_files_input):
docs = []
temp_dir = tempfile.TemporaryDirectory(
dir="D:\\"
)
# 文档导入
for file in upload_files_input:
temp_filepath = os.path.join(temp_dir.name, file.name)
with open(temp_filepath, "wb") as f:
f.write(file.getvalue())
loader = TextLoader(
temp_filepath,
encoding="utf-8"
)
docs.extend(loader.load())
# 文档分割
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=100,
chunk_overlap=10
)
split = text_splitter.split_documents(docs)
print("split_output: ", split)
print("num split_output: ", len(split))
# 向量展示
embeddings = DashScopeEmbeddings(
model="text-embedding-v1"
)
vectordb = DashVector.from_documents(
split, embeddings
)
# 生成检索器
retriever_out = vectordb.as_retriever()
return retriever_out
# 配置检索器retriever
retriever = configure_retriever(upload_files_input=upload_files)
if "messages" not in st.session_state or st.sidebar.button("清空聊天记录"):
st.session_state["messages"] = [
{
"role": "assistant",
"content": "您好,我是你的文档助手"
}
]
# 加载历史聊天记录
for msg in st.session_state.messages:
st.chat_message(
msg["role"],
).write(
msg["content"]
)
tool = create_retriever_tool(
retriever,
name="text_retriever",
description="基于检索用户提出的问题,并基于检索到的文档内容进行回复"
)
tools = [tool]
# 创建历史聊天记录
msgs = StreamlitChatMessageHistory()
# 创建对话缓冲区内存
memory = ConversationBufferMemory(
chat_memory=msgs,
return_messages=True,
memory_key="chat_history",
output_key="output")
instructions = """
您是一个设计用于查询魂荡来回答问题的代理;
您可以使用检索工具,并基于检索内容来回答问题;
您可以通过不查询文档就知道答案,但您仍然需要通过查询文档来获取答案;
如果您从文档中找不到任何信息用于回答问题,则只需返回“抱歉,这个问题我还不知道”作为答案。
"""
# 基础提示模板
base_prompt_template = """
{instructions}
TOOLS:
------
You have access to the following tools:
{tools}
To use a tool,please use the following format:
Thought: Do I need to use a tool? Yes
Action: the action to take,should be one of [{tool_names}]
Action Input: {input}
Observations: the result of the action
When you have a response to say to the Human,or if you do not need to use a tool,you MUST use the format:
Thought: Do I need to use a tool: No
Final Answer:[your response here]
Begin!
Previous conversation history:
{chat_history}
New input:{input}
{agent_scratchpad}
"""
print("base_prompt_template: ", base_prompt_template)
# 创建基础提示词模板
base_prompt = PromptTemplate.from_template(
template=base_prompt_template
)
# 创建部分填充的提示词模板
prompt = base_prompt.partial(
instructions=instructions
)
llm = Tongyi(
model_name="tongyi-7b-chinese",
temperature=0.5,
max_tokens=200,
)
agent = create_react_agent(
llm,
tools,
prompt
)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
memory=memory,
verbose=True,
handle_parsing_errors=True
)
user_query = st.chat_input(
placeholder="请开始提问.."
)
if user_query:
st.session_state.messages.append(
{
"role": "user",
"content": user_query
}
)
st.chat_message("user").write(user_query)
with st.chat_message("assistant"):
st_cb = StreamlitCallbackHandler(st.container())
print("st_cb: ", st_cb)
config = {
"callbacks": [st_cb]
}
response = agent_executor.invoke(
{
"input": user_query
}, config=config
)
st.session_state.messages.append(
{
"role": "assistant",
"content": response["output"]
}
)
st.write(response["output"])