大模型LLMs框架Langchain之链详解
写在前文
所有环境,默认以及安装...。
链,主要展示:基于LCEL版本的新版本链。分为基础版本和进阶版本。
在基础版本中介绍标准链。
在进阶链中,主要介绍:流程控制链(顺序链SequentialChain、路由链RouterChain)、检索增强链(组合链create_retrieval_chain、历史链create_history_aware_retriever),文本摘要create_stuff_documents_chain和load_summarize_chain、SQL执行链create_sql_query_chain
环境初始化
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate
import utils
from langchain.chains.llm import LLMChain
from langchain_core.runnables import RunnablePassthrough
from langchain.chains import SequentialChain
from langchain_core.tracers import ConsoleCallbackHandler
from operator import itemgetter
import logging
from logging.handlers import RotatingFileHandler
logging.basicConfig(
filename='app.log',
level=logging.INFO,
encoding='utf-8',
format='%(asctime)s - %(message)s'
)
llm = ChatOpenAI(
base_url='http://localhost:11434/v1',
api_key='ollama',
streaming=False,
temperature=0.8,
model='deepseek-r1:1.5b', # deepseek-r1:1.5b、qwen2:0.5b
#verbose=True # 在链中好像不太生效...不知道为啥.
)
embedding = OllamaEmbeddings(model='nomic-embed-text:latest')
基础链:chains = 参数 | 提示词 | 模型 | 输出格式
新版本
"""
最新版本 --- 基础
"""
chains_1 = llm | StrOutputParser()
chains_1.invoke("请问你叫什么?")
"""
最新版本 --- 进阶
- 使用RunnablePassthrough()动态输入参数、
- 使用“|”格式化要输出的key、
- 使用with_config配置callbacks的回调函数ConsoleCallbackHandler打印日志
- 使用多重链并重命名输出key
"""
prompt_template = """请回答下面问题:
问题:{input_q}
回答:
"""
prompt = ChatPromptTemplate.from_template(template=prompt_template)
# prompt = PromptTemplate(input_variables=["input_q"], template=prompt_template)
chain = (
{"input_q": RunnablePassthrough()}
| prompt
| llm
| {"synopsis": lambda x: x.content} # 提取 LLM 响应内容 --- 可以用做第二个链的输入
).with_config(
{"callbacks": [ConsoleCallbackHandler()]}, # 在控制台打印执行日志
)
print(chain.invoke("1+1等于什么?"))
### ---如果中间有多个链,可以连接
final_chain = RunnablePassthrough() | chain | {"synopsis_final": lambda x: x['synopsis']}
print(final_chain.invoke("1+1等于什么?"))
"""
最新版本 --- 进阶
{"input_q": lambda x: x, "language": RunnablePassthrough(),"style":"幽默"}:此种方法要报错:TypeError: Expected a Runnable, callable or dict.Instead got an unsupported type: <class 'str'>
解决方法就是将style的值修改为Runnable类型即可。方法如下:
{"input_q": lambda x: x, "language": RunnablePassthrough(),"style":lambda x:"幽默"}
如果不想使用lambda,那么可以使用
RunnableParallel(
input_q=itemgetter("input_q"),
language=itemgetter("language"),
style=lambda x: "幽默" # 用函数替代固定值 lambda 或者可以定义一个函数
)
--------
def get_style():
return "幽默"
RunnableParallel(
input_q=itemgetter("input_q"),
language=itemgetter("language"),
style=get_style # 使用自定义一个函数
)
-------------
在with_config中配置output_key="text1":要求链的输出是一个字典,且包含键 text1
但我的链最终输出是 {"synopsis": "内容"},没有 text1 键output_key 配置失效,运行时会报错或忽略该配置!
原始输出格式:
AIMessage(content='<think>\n\n</think>\n\n您好!我是由中国的深度求索(DeepSeek)公司开发的智能助手DeepSeek-R1。如您有任何任何问题,我会尽我所能为您提供帮助。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 40, 'prompt_tokens': 8, 'total_tokens': 48, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'deepseek-r1:1.5b', 'system_fingerprint': 'fp_ollama', 'finish_reason': 'stop', 'logprobs': None}, id='run-6a2e7e9d-741e-4e6a-ae25-d4a64659e3cd-0', usage_metadata={'input_tokens': 8, 'output_tokens': 40, 'total_tokens': 48, 'input_token_details': {}, 'output_token_details': {}})
通过{"synopsis": lambda x: x.content}后的输出格式:{"synopsis": "LLM生成的内容"}
最终通过{"text1": itemgetter("synopsis")}输出:{"text1": "LLM生成的内容"}
"""
prompt_template = """请使用下面指定的语言用我指定的风格回答下面问题:
风格:{style}
语言:{language}
问题:{input_q}
回答:
"""
prompt = ChatPromptTemplate.from_template(template=prompt_template)
# prompt = PromptTemplate(input_variables=["input_q"], template=prompt_template)
chain = (
{"input_q": lambda x: x, "language": RunnablePassthrough(), "style": lambda x: '幽默'}
# RunnableParallel(
# input_q=itemgetter("input_q"),
# language=itemgetter("language"),
# style=lambda x: "幽默" # 用函数替代固定值 lambda 或者可以定义一个函数
# )
| prompt
| llm
| {"synopsis": lambda x: x.content} # 提取 LLM 响应内容 --- 可以用做第二个链的输入
| {"text1": itemgetter("synopsis")} # 重命名键
).with_config(
# output_key="text1",
verbose=True
)
print(chain.invoke({"input_q": "1+1等于什么?", "language": "英语"}))
# final_chain = RunnablePassthrough() | chain | {"synopsis_final": lambda x: x['synopsis']}
# print(final_chain.invoke("1+1等于什么?"))
老版本 --- 已经过时
但是官方的Docs中,很多都是基于LLMChain的.。
"""
过时版本
"""
chain = LLMChain(
llm=llm,
prompt=PromptTemplate.from_template("""
将下面内容翻译成英文:
{content}
"""),
output_key='text1',
output_parser=StrOutputParser(),
)
chain.invoke({"content": "你好"})['text1']
进阶链
路由链
RouterChain:负责选择下一个要调用的链条 1. 路由链RouterChain:其本身就是一个chain应用,能够根据用户输入进行下游子链的选择;Langchain框架提供了多种RouterChain,其中着重介绍了LLMRouterChain 和 EmbeddingRouterChain两种: - LLMRouterChain 将用户输入放进大语言模型,通过Prompt的形式让大语言模型来进行路由 - EmbeddingRouterChain 通过向量搜索的方式,将用户输入通过向量检索后确认其需要调用的子链 2. 子链DestinationChain:直译为目标链,即可路由到的链,按照上图,我们会存在4个目标链,分别是 物理 chain,数学 chain,语言 chain 和 其他(生成) chain
base分类链 --- 用来分类路由
"""
定义base链 --- 即用来分类
"""
base_prompt_template = """
考虑到下面的用户问题,将其分类为“physics”、“math”、“language”或“other”
用一个以上的词来回答。
Question: {input}
Classification分类:
"""
base_prompt = ChatPromptTemplate.from_template(base_prompt_template)
base_chain = base_prompt | llm | StrOutputParser()
# base_chain.invoke({"input": "1+1等于多少?"})
定义处理链
"""
定义链
"""
physics_prompt_template = """你是一位非常聪明的物理学教授。
你擅长以简洁易懂的方式回答有关物理的问题。
当你不知道一个问题的答案时,你承认你不知道。
这里有个问题:
Question: {input}
Answer:
"""
physics_prompt = ChatPromptTemplate.from_template(physics_prompt_template)
physics_chain = physics_prompt | llm | StrOutputParser()
math_prompt_template = """你是个很好的数学家。你很擅长回答数学问题。
你之所以如此优秀,是因为你能够将难题分解为各个组成部分,
回答组成部分,然后把它们放在一起回答更广泛的问题。
这里有个问题:
Question: {input}
Answer:
"""
math_prompt = ChatPromptTemplate.from_template(math_prompt_template)
math_chain = math_prompt | llm | StrOutputParser()
language_prompt_template = """
你是一名语言学家,掌握中文、英文、俄语、西班牙语、日语等多国语言,你可以帮我很轻松的将其他语言翻译成我希望的目标语言:
现在请将下面的内容翻译成指定语言\n
要翻译的内容:{input}
要翻译成的语言:{language}
"""
language_prompt = ChatPromptTemplate.from_template(language_prompt_template)
language_chain = language_prompt | llm | StrOutputParser()
other_chain = PromptTemplate.from_template(
"""请回答以下问题:
Question: {input}
Answer:
"""
) | llm | StrOutputParser()
管理路由:自定义函数管理路由
"""
方法一:使用RunnableLambda+自定义函数管理路由
"""
def route(info):
"""
info:{'topic': '<think>\n嗯,:100乘以100等于多少</think>\n\n100乘以100等于10000,...分类为math。\n\n**答案:math**', 'input': '100*100等于多少?'}
:param info:
:return:
"""
print(info)
if "physics" in info["topic"].lower():
print("调用了physics_chain....")
return physics_chain
elif "math" in info["topic"].lower():
print("调用了math_chain....")
return math_chain
elif "language" in info["topic"].lower():
print("调用了language_chain....")
return language_chain
else:
print("调用了other_chain....")
return other_chain
# ### 测试数学
full_chain = {"topic": base_chain, "input": lambda x: x["input"]} | RunnableLambda(route)
math_result = full_chain.invoke({"input": "100*100等于多少?"})
print(f"math_result={math_result}")
# ### 测试物理
full_chain = {"topic": base_chain, "input": lambda x: x["input"]} | RunnableLambda(route)
physics_result = full_chain.invoke({"input": "什么是光速?"})
print(f"physics_result={physics_result}")
# ### 其他
full_chain = {"topic": base_chain, "input": lambda x: x["input"]} | RunnableLambda(route)
other_result = full_chain.invoke({"input": "你知道阿基米德生活在哪儿么?"})
print(f"other_result={other_result}")
# ### 测试语言
full_chain = {"topic": base_chain, "input": lambda x: x["input"],"language":RunnablePassthrough()} | RunnableLambda(route)
language_result = full_chain.invoke({"input": "帮我翻译:你好奥","language":"日语"})
print(f"language_result={language_result}")
管理路由:使用RunnableBranch来管理路由
"""
方法二:使用RunnableBranch来管理路由
"""
branch = RunnableBranch(
(lambda x: "physics" in x["topic"].lower(), physics_chain),
(lambda x: "math" in x["topic"].lower(), math_chain),
(lambda x: "language" in x["topic"].lower(), language_chain),
other_chain,
)
# ### 测试数学
full_chain = {"topic": base_chain, "input": lambda x: x["input"]} | branch
math_result = full_chain.invoke({"input": "100*100等于多少?"})
print(f"math_result={math_result}")
# ### 测试物理
full_chain = {"topic": base_chain, "input": lambda x: x["input"]} | branch
physics_result = full_chain.invoke({"input": "什么是光速?"})
print(f"physics_result={physics_result}")
# ### 其他
full_chain = {"topic": base_chain, "input": lambda x: x["input"]} | branch
other_result = full_chain.invoke({"input": "你知道阿基米德生活在哪儿么?"})
print(f"other_result={other_result}")
# ### 测试语言
full_chain = {"topic": base_chain, "input": lambda x: x["input"],"language":RunnablePassthrough()} | branch
language_result = full_chain.invoke({"input": "帮我翻译:早上好","language":"日语"})
print(f"language_result={language_result}")
管理路由:使用Embedding语义管理路由
"""
方法三:通过Embedding路由
"""
prompt_templates = [physics_prompt_template, math_prompt_template, language_prompt_template]
prompt_embeddings = embedding.embed_documents(prompt_templates)
def prompt_router(input):
query_embedding = embedding.embed_query(input["input"])
similarity = cosine_similarity([query_embedding], prompt_embeddings)[0]
most_similar = prompt_templates[similarity.argmax()]
if most_similar == math_prompt_template:
print("Using MATH")
elif most_similar == physics_prompt_template:
print("使用物理")
elif most_similar == language_prompt_template:
print("使用语言翻译")
return PromptTemplate.from_template(most_similar)
embedding_router_chain = (
{"input": RunnablePassthrough()}
| RunnableLambda(prompt_router)
| llm
| StrOutputParser()
)
# ### 测试数学
math_result = embedding_router_chain.invoke("在数学中100*100等于多少?")
print(f"math_result={math_result}")
# ### 测试物理
physics_result = embedding_router_chain.invoke("什么是光速?")
print(f"physics_result={physics_result}")
# ### 其他
other_result = embedding_router_chain.invoke("你知道阿基米德生活在哪儿么?")
print(f"other_result={other_result}")
# ### 测试语言
language_result = embedding_router_chain.invoke({"input": "帮我翻译:早上好","language":"日语"})
print(f"language_result={language_result}")
顺序链SequentialChain
SimpleSequentialChain 简单链
最简单的顺序链形式,每个步骤都有一个单一的输入/输出,一个步骤的输出是下一个步骤的输入。
新版本Demo1:当只有一个参数时,可以不指定参数
"""
新版本顺序链:
Demo1:当只有一个参数时,可以不指定参数
如果只有一个参数,那么系统会默认输入的数据就是该参数,所有可以不用指定参数输入名称或者输出参数名称
"""
title_prompt_template = """你是个剧作家。鉴于该剧的标题,你的工作是为该标题写一个梗概。
标题:{title}
剧作家:这是上述剧本的梗概:
"""
title_prompt = PromptTemplate(input_variables=["title"], template=title_prompt_template)
title_chain = (
title_prompt
| llm
).with_config(verbose=True)
synopsis_prompt_template = """你是《纽约时报》的戏剧评论家。鉴于该剧的梗概,你的工作是为该剧写一篇评论。
游戏概要:{synopsis}
《纽约时报》戏剧评论家对上述戏剧的评论:
"""
# synopsis_prompt = PromptTemplate(input_variables=["synopsis"], template=synopsis_prompt_template)
synopsis_prompt = ChatPromptTemplate.from_template(synopsis_prompt_template)
synopsis_chain = (
synopsis_prompt
| llm
| RunnableLambda(lambda x: x.content) # 最终输出纯文本
).with_config(verbose=True)
overall_chain = (
title_chain # 输出 数据会自动填充到synopsis_chain的参数中,但是synopsis_chain链中只有一个参数,所以会默认到该参数
| synopsis_chain
| {"out_put_key": lambda x: x} # synopsis_chain中已经时纯文本,所以不需要再指定key
).with_config(verbose=True)
review = overall_chain.invoke({"title": "月亮与我"})
review
新版本Demo2:指定固定的输入、输出参数名称
"""
新版本顺序链:
Demo2:指定固定的输入、输出参数名称
overall_chain.invoke("月亮与我")输入,会自动填充到 title_chain 的title中;
{"synopsis": lambda x: x.content}:title_chain链输出时会以synopsis为key输出到overall_chain中
overall_chain会自动将输出的填充到synopsis中
使用RunnablePassthrough()或者lambda时,如果只有一个参数,那么系统会默认输入的数据就是该参数,所有可以不用指定参数输入名称或者输出参数名称
"""
title_prompt_template = """你是个剧作家。鉴于该剧的标题,你的工作是为该标题写一个梗概。
标题:{title}
剧作家:这是上述剧本的梗概:
"""
title_prompt = PromptTemplate(input_variables=["title"], template=title_prompt_template)
title_chain = (
{"title": RunnablePassthrough()}
| title_prompt
| llm
| {"synopsis": lambda x: x.content} # 提取 LLM 响应内容 --- 可以用做第二个链的输入
).with_config(verbose=True)
# final_chain = RunnablePassthrough() | title_chain | {"synopsis_final": lambda x: x['synopsis']}
# final_chain.invoke("月亮与狗")
synopsis_prompt_template = """你是《纽约时报》的戏剧评论家。鉴于该剧的梗概,你的工作是为该剧写一篇评论。
游戏概要:{synopsis}
《纽约时报》戏剧评论家对上述戏剧的评论:
"""
# synopsis_prompt = PromptTemplate(input_variables=["synopsis"], template=synopsis_prompt_template)
synopsis_prompt = ChatPromptTemplate.from_template(synopsis_prompt_template)
synopsis_chain = (
synopsis_prompt
| llm
| RunnableLambda(lambda x: x.content) # 最终输出纯文本
).with_config(verbose=True)
overall_chain = (
RunnablePassthrough()
| title_chain # 输出 数据会自动填充到synopsis_chain的参数中,但是synopsis_chain链中只有一个参数,所以会默认到该参数
| synopsis_chain
| {"out_put_key": lambda x: x} # synopsis_chain中已经时纯文本,所以不需要再指定key
).with_config(verbose=True)
review = overall_chain.invoke("月亮与我")
review
新版本Demo3:调试方法
"""
新版本顺序链:
Demo3:调试方法
- 直接访问中间步骤
- 直接通过链的属性访问:RunnableSequence 有一个 .steps 属性,可以按顺序访问所有子组件:
- 插入调试步骤:若需要实时查看中间结果,可以在链中插入 RunnableLambda 打印日志
- 保存中间结果到上下文:将中间结果保留在输出字典中,然后返回:
- 通过logging日志记录
- 使用回调函数记录日志:通过自定义回调自动记录每个链的输出:
- 设置callbacks回调函数ConsoleCallbackHandler() 在控制台打印执行日志 ---- 替代verbose=True
* 可以设置chain.with_config({"callbacks": [ConsoleCallbackHandler()]})
* 可以在chain.invoke(xxx,config={'callbacks': [ConsoleCallbackHandler()]})
"""
title_prompt_template = """你是个剧作家。鉴于该剧的标题,你的工作是为该标题写一个梗概。
标题:{title}
剧作家:这是上述剧本的梗概:
"""
title_prompt = PromptTemplate(input_variables=["title"], template=title_prompt_template)
title_chain1 = (
{"title": RunnablePassthrough()}
| title_prompt
| llm
| {"synopsis": lambda x: x.content} # 提取 LLM 响应内容 --- 可以用做第二个链的输入
).with_config(verbose=True)
# final_chain = RunnablePassthrough() | title_chain1 | {"synopsis_final": lambda x: x['synopsis']}
# final_chain.invoke("月亮与狗")
synopsis_prompt_template = """你是《纽约时报》的戏剧评论家。鉴于该剧的梗概,你的工作是为该剧写一篇评论。
游戏概要:{synopsis}
《纽约时报》戏剧评论家对上述戏剧的评论:
"""
# synopsis_prompt = PromptTemplate(input_variables=["synopsis"], template=synopsis_prompt_template)
synopsis_prompt = ChatPromptTemplate.from_template(synopsis_prompt_template)
synopsis_chain2 = (
{"synopsis": RunnablePassthrough()}
| synopsis_prompt
| llm
| RunnableLambda(lambda x: x.content) # 最终输出纯文本
).with_config(verbose=True)
def direct_access():
"""
直接访问中间步骤 --- 即,单独调用chain1、然后将chain1的结果再单独调用chain2
# 原始链
full_chain = (
RunnablePassthrough()
| step1
| step2
| step3
)
# 拆解出前两步的中间链
partial_chain = RunnablePassthrough() | step1 | step2
# 获取中间结果
intermediate_result = partial_chain.invoke(input_data)
# 继续执行后续步骤
final_result = step3.invoke(intermediate_result)
:return:
"""
print('direct_access。。。')
# 拆解出第一步 --- 即单独使用chain1
step1_chain = RunnablePassthrough() | title_chain1
step1_result = step1_chain.invoke("月亮与狗")
print(f"step1_chain = {step1_result}")
# 拆解出第二步 --- 将第一步的值传递到chain2中
print(f"step2_chain2 = {synopsis_chain2.invoke(step1_result)}")
def chains_steps():
"""
通过 steps 属性访问组件。
直接通过链的属性访问:RunnableSequence 有一个 .steps 属性,可以按顺序访问所有子组件:
# 定义链
chain = step1 | step2 | step3
# 访问第一个组件(step1)
first_step = chain.steps[0]
# 手动调用第一个组件
output_of_step1 = first_step.invoke(input_data)
:return:
"""
print('chains_steps...')
final_chain = (
RunnablePassthrough()
| title_chain1
| synopsis_chain2
| {"out_put_key": lambda x: x}
)
first_step = final_chain.steps[1]
print(first_step.invoke('你与月亮'))
def viewing_middle():
"""
插入调试步骤:若需要实时查看中间结果,可以在链中插入 RunnableLambda 打印日志
chain = (
RunnablePassthrough()
| step1
| RunnableLambda(lambda x: print(f"Step1 Output: {x}") or x) # 打印并传递数据
| step2
| RunnableLambda(lambda x: print(f"Step2 Output: {x}") or x)
| step3
)
:return:
"""
print("viewing_middle")
final_chain = (
RunnablePassthrough()
| title_chain1
| RunnableLambda(lambda x: print(f"title_chain1 Output: {x['synopsis']}") or x['synopsis'])
| synopsis_chain2
| RunnableLambda(lambda x: print(f"Step2 Output: {x}") or x)
| {"out_put_key": lambda x: x}
)
print(f"final_chain={final_chain.invoke("月亮与我")}")
def save_results_context():
"""
保存中间结果到上下文:将中间结果保存到字典中,直接返回
---- 通过
final_chain = (
RunnablePassthrough()
| {"chain1_output": chain1, "original_input": RunnablePassthrough()}
| {"chain2_output": chain2, "chain1_output": lambda x: x["chain1_output"]}
| {
"out_put_key": lambda x: x["chain2_output"],
"chain1_output": lambda x: x["chain1_output"],
"chain2_output": lambda x: x["chain2_output"],
}
)
:return:
"""
print("save_results_context")
final_chain = (
RunnablePassthrough()
| {"chain1_output": title_chain1, "original_input": RunnablePassthrough()}
| {"chain2_output": synopsis_chain2, "chain1_output": lambda x: x["chain1_output"]}
| {
"out_put_key": lambda x: x["chain2_output"],
"chain1_output": lambda x: x["chain1_output"],
"chain2_output": lambda x: x["chain2_output"],
}
)
result = final_chain.invoke("月亮与我")
print(f"chain1_output={result["chain1_output"]}") # 获取chain1结果
print(f"chain2_output={result["chain2_output"]}") # 获取chain2结果
def log_chain():
print("log_chain")
# 定义中间步骤的日志记录
def log_chain1(input):
result = title_chain1.invoke(input)
logging.info(f"log_chain chain1 输出: {result}") # 记录到日志
return result
def log_chain2(input):
result = synopsis_chain2.invoke(input)
logging.info(f"log_chain chain2 输出: {result}")
return result
"""
通过logging日志记录
:return:
"""
# 重新构建链,保留中间结果
final_chain = (
RunnablePassthrough()
| {"chain1_output": log_chain1, "original_input": RunnablePassthrough()}
| {"chain2_output": log_chain2, "chain1_output": lambda x: x["chain1_output"]}
| {
"out_put_key": lambda x: x["chain2_output"],
"chain1_output": lambda x: x["chain1_output"],
"chain2_output": lambda x: x["chain2_output"],
}
)
# 调用并获取结果
result = final_chain.invoke("月亮与我")
print("chain1结果:", result["chain1_output"])
print("chain2结果:", result["chain2_output"])
pass
def callback_function():
"""
使用回调函数记录日志:通过自定义回调自动记录每个链的输出:
:return:
"""
class LoggingCallbackHandler(BaseCallbackHandler):
def on_chain_end(self, outputs, run_id, parent_run_id=None, tags=None, **kwargs):
if "chain1" in tags:
logging.info(f"callback_function chain1 输出: {outputs}")
elif "chain2" in tags:
logging.info(f"callback_function chain2 输出: {outputs}")
print("callback_function")
# 为每个链添加标签
chain1 = title_chain1.with_config(tags=["chain1"])
chain2 = synopsis_chain2.with_config(tags=["chain2"])
# 构建原始链
final_chain = (
RunnablePassthrough()
| chain1
| chain2
| {"out_put_key": lambda x: x}
)
# 调用时传入回调
handler = LoggingCallbackHandler()
print(f"callback_function:{final_chain.invoke("月亮与我", callbacks=[handler])}")
# log_chain()
callback_function()
老版本-过时版本
"""
老版本 --- 过时版本
"""
from langchain.chains.sequential import SimpleSequentialChain
# 第一个链
prompt_template1 = """
帮我给{product}取一个名字。
"""
prompt1 = ChatPromptTemplate.from_template(prompt_template1)
chain1 = LLMChain(
prompt=prompt1,
llm=llm,
verbose=True
)
# 第二个链
prompt_template2 = """
用10个字描述下这个公司:{company_name}
"""
prompt2 = ChatPromptTemplate.from_template(prompt_template2)
chain2 = LLMChain(
prompt=prompt2,
llm=llm,
verbose=True
)
## 合并两个链
simple_two_chain = SimpleSequentialChain(chains=[chain1, chain2], verbose=True)
print(simple_two_chain.run("AI教育培训机构"))
顺序链(Sequential Chain)
新版本 弃用LLMChain改用LCEL版本
"""
新版本 弃用LLMChain改用LCEL版本
RunnablePassthrough.assign
"""
chain1_template = """你是个剧作家。鉴于该剧的标题及其所处的时代,你的工作是为该剧写一个梗概。
标题:{title}
风格:{era}
剧作家:这是上述剧本的梗概:"""
chain1_prompt_template = PromptTemplate(input_variables=["title", 'era'], template=chain1_template)
chain_1 = (
chain1_prompt_template
| llm
| {"synopsis_output_1": lambda x: x.content}
)
chain2_template = """你是《纽约时报》的戏剧评论家。鉴于该剧的梗概,你的工作是为该剧写一篇评论。
游戏概要:
{synopsis}
《纽约时报》戏剧评论家对上述戏剧的评论:
"""
chain2_prompt_template_2 = PromptTemplate(input_variables=["synopsis"], template=chain2_template)
chain_2 = (
chain2_prompt_template_2
| llm
| {"review_output_2": lambda x: x.content}
)
chain_3_template = """根据下面的输入内容,回答下面问题:
输入1: {chain1_content}
输入2: {chain2_content}
问题: {question}
返回值:
"""
chain_3_prompt_template = PromptTemplate(input_variables=["synopsis"], template=chain_3_template)
chain_3 = chain_3_prompt_template | llm | {"result3": lambda x: x.content}
"""
overall_chain = RunnablePassthrough().assign(synopsis_output_1=synopsis_chain_1)
输出:
{'title': '月亮与我',
'era': '宋代李清照',
'synopsis_1': {'synopsis_output_1': '《月亮与我》四幕诗剧梗概\n\n第一幕:....'}}
"""
overall_chain = (
RunnablePassthrough.assign(synopsis_1=chain_1) # 调用synopsis_chain_1第一个链的结果,输出key=synopsis_output_1,然后赋值给synopsis_1
.assign(review_2=(RunnableLambda(lambda d: {"synopsis": d["synopsis_1"]["synopsis_output_1"]}) | chain_2)) # 隐式调用invoke
# .assign(review_2=(RunnablePassthrough() | {"synopsis": lambda x: x["synopsis_1"]["synopsis_output_1"]} | chain_2)) # 隐式调用invoke
# .assign(review_2=lambda d: chain_2.invoke({"synopsis": d["synopsis_1"]['synopsis_output_1']})) # review_2输出key 将第一个的结果传入chain2,获取结果,输出key=review_output_2
| {
"chain1_content": lambda x: x['synopsis_1']['synopsis_output_1'],
"chain2_content": lambda x: x['review_2']['review_output_2'],
"question": RunnablePassthrough()
# "question":lambda x:x['question'] # 也可以采用这种方式传递参数
}
| chain_3
| {"result3_overall": lambda x: x['result3']}
)
overall_chain.invoke({"title": "月亮与我", "era": "宋代李清照", "question": "用鲁迅的语气评价下这个总结"},
config={'callbacks': [ConsoleCallbackHandler()]})
老版本使用LLMChain+SequentialChain
"""
老版本使用LLMChain+SequentialChain
"""
template_1 = """你是个剧作家。鉴于该剧的标题及其所处的时代,你的工作是为该剧写一个梗概。
标题:{title}
风格:{era}
剧作家:这是上述剧本的梗概:"""
prompt_template_1 = PromptTemplate(input_variables=["title", 'era'], template=template_1)
synopsis_chain_1 =LLMChain(llm=llm,prompt=prompt_template_1,output_key='synopsis' ,verbose=True)
template_2 = """你是《纽约时报》的戏剧评论家。鉴于该剧的梗概,你的工作是为该剧写一篇评论。
游戏概要:
{synopsis}
《纽约时报》戏剧评论家对上述戏剧的评论:
"""
prompt_template_2 = PromptTemplate(input_variables=["synopsis"], template=template_2)
review_chain_2 =LLMChain(llm=llm,prompt=prompt_template_2,output_key='review',verbose=True)
# ### 因为LLMChain被弃用,所以我们使用的是LCEL通过管道创建链,所以在使用SequentialChain时会出现不兼容问题。
# ### 弃用...改用RunnablePassthrough.assign类型
overall_chain = SequentialChain(
chains=[synopsis_chain_1, review_chain_2],
input_variables=["era", "title"],
# Here we return multiple variables
output_variables=["synopsis", "review"],
verbose=True
)
overall_chain.invoke({"title": "月亮与我", "era": "宋代李清照"})
并行链(RunnableParallel)
%%time # 可以统计本cell执行时间,但是需要放在行最开始
parallel_prompt_template_1 = """
帮我从多个角度完善优化帮我们丰富问题、并进行多角度的扩展:
扩展数量:{count}
问题:{input_q}
回答:
"""
parallel_template_1 = ChatPromptTemplate.from_template(parallel_prompt_template_1)
parallel_chain_1 = parallel_template_1 | llm | {"output_key_2": lambda x: x.content}
parallel_prompt_template_2 = """
帮我用指定语气风格以及语言完成下面的问题:
语言:{language}
风格:{style}
问题:{input}
回答:
"""
parallel_template_2 = ChatPromptTemplate.from_template(parallel_prompt_template_2)
parallel_chain_2 = (
{'style': RunnablePassthrough(), "input": lambda x: x['XXXX_input'], "language": RunnablePassthrough()}
| parallel_template_2
| llm
| StrOutputParser()
)
# 假设 synopsis_chain_1 和 review_chain_2 已经正确定义,并接受相同的输入变量
overall_chain = RunnableParallel({
"parallel_chain1": parallel_chain_1,
"parallel_chain2": parallel_chain_2
})
# 调用链时使用invoke方法
result = overall_chain.invoke(
{"count": 3, "input_q": "java是什么?", "style": "幽默", "XXXX_input": "你是谁?", "language": "英语"})
print(result)
组合链create_retrieval_chain
为了展示组合链,会采用简化版本的RAG。
直接从web页面加载数据然后向量化,检索...
"""
rag:
加载数据、数据清洗、分割、存储、检索、生成
"""
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "LANGCHAIN_PROJECT项目名称"
os.environ["LANGCHAIN_API_KEY"] = 'LANGCHAIN_PROJECT的KEY'
# Step1 初始化
model = ChatOllama(model='deepseek-r1:1.5b')
embedding = OllamaEmbeddings(model='nomic-embed-text:latest')
# Step2 加载web页面
loader = WebBaseLoader(
web_path='https://blog.csdn.net/ultingCSDN/article/details/145062991?spm=1001.2014.3001.5501',
bs_kwargs=dict(
parse_only=bs4.SoupStrainer(class_=('article-header', 'title-article', 'htmledit_views'))
# 解析这个网页下面的这三个类选择器的文档;
)
)
# 加载数据
docs = loader.load()
# Step3 切割文档
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = splitter.split_documents(docs)
# Step4 向量入库 --- 依然使用FAISS,也可以使用其他
# 使用FAISS
faiss_persist_directory = './faiss_db'
faiss_index_name = 'rag_index'
vector_store = FAISS.from_documents(
documents=splits,
embedding=OllamaEmbeddings(model='nomic-embed-text:latest'),
)
# 使用Chroma
# vector_store = Chroma.from_documents(
# documents=splits,
# embedding=OllamaEmbeddings(model='nomic-embed-text:latest')
# )
# Step5:构建检索器
retriever = vector_store.as_retriever()
# Step6 设置提示词
system_prompt = """
你是一个智能助手,我现在需要你使用我提供的上下文来回答我的问题,如果上下文中没有你就回答不知道即可;回答请保持三句话以内,并且尽可能的精简:\n
{context}
"""
prompt = ChatPromptTemplate.from_messages( ### 提问和回答 历史记录 模板
[
("system", system_prompt),
("human", "{input}")
]
)
# Step7:得到检索chain
"""
维度 方法一 方法二
文档处理 显式提取 page_content 隐式合并文档内容
输入结构 手动定义 context 和 input 自动处理输入格式
代码复杂度 较低层级,更灵活 高层级封装,更简洁
自定义能力 高(可自定义中间步骤) 低(依赖框架默认逻辑)
与 LangChain 生态集成 需要手动兼容 天然兼容其他高阶组件
功能上可以替代:两者最终都能实现 检索文档 → 合并到上下文 → 生成回答 的流程。
实现细节不同:
方法一需要手动确保文档格式与 prompt 的 context 占位符匹配。
方法二通过 create_stuff_documents_chain 自动处理文档合并,可能包含额外逻辑(如文档分页、长度截断等)。
优先方法二:若需求是标准检索-生成场景,推荐使用高阶 API,代码更简洁且符合框架设计模式。
选择方法一:若需要以下能力:
自定义文档处理(如过滤低质量文档、特殊格式拼接)
修改默认的上下文合并逻辑
调试或分析中间数据流
"""
# 方法一:预处理文检索出来的文档 ---- 为了展示,所以没有处理,直接返回检索内容了...
process_docs = RunnableLambda(lambda x: x)
chain_1 = {"context": retriever | process_docs, "input": RunnablePassthrough()} | prompt | model
# 带日志打印功能
result = chain_1.invoke("这篇主要将什么?", config={"callbacks": [ConsoleCallbackHandler()]})
print(result)
print(f"{"-" * 10}")
print(result.content)
print(f"{"-" * 100}")
# 方法二
chain1 = create_stuff_documents_chain(llm=model, prompt=prompt)
chain2 = create_retrieval_chain(retriever=retriever, combine_docs_chain=chain1)
result = chain2.invoke({"input": "这篇主要将什么?"},config={"callbacks":[ConsoleCallbackHandler()]})
print(result)
print(f"{"-" * 10}")
print(result['answer'])
print(f"{"-" * 100}")
历史链:create_history_aware_retriever
注意:
一般情况下,我们构建的链(chain)直接使用输入问答记录来关联上下文。但在此案例中,查询检索器也需要对话上下文才能被理解
解决方法:
添加一个子链,它采用最新用户问题和聊天历史,并在它引用历史信息中的任何信息时重新表述问题。这可以被简单地认为是构建一个新的“历史感知”检索器 这个子链的目的:让检索过程融入了对话的上下文。
"""
rag:
加载数据、数据清洗、分割、存储、检索、生成
"""
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "LANGCHAIN_PROJECT项目名称"
os.environ["LANGCHAIN_API_KEY"] = 'LANGCHAIN_PROJECT的KEY'
# Step1 初始化
model = ChatOllama(model='deepseek-r1:1.5b')
embedding = OllamaEmbeddings(model='nomic-embed-text:latest')
# Step2 加载web页面
loader = WebBaseLoader(
web_path='https://blog.csdn.net/ultingCSDN/article/details/145062991?spm=1001.2014.3001.5501',
bs_kwargs=dict(
parse_only=bs4.SoupStrainer(class_=('article-header', 'title-article', 'htmledit_views'))
# 解析这个网页下面的这三个类选择器的文档;
)
)
# 加载数据
docs = loader.load()
# Step3 切割文档
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = splitter.split_documents(docs)
# Step4 向量入库 --- 依然使用FAISS,也可以使用其他
# 使用FAISS
faiss_persist_directory = './faiss_db'
faiss_index_name = 'rag_index'
vector_store = FAISS.from_documents(
documents=splits,
embedding=OllamaEmbeddings(model='nomic-embed-text:latest'),
)
# 使用Chroma
# vector_store = Chroma.from_documents(
# documents=splits,
# embedding=OllamaEmbeddings(model='nomic-embed-text:latest')
# )
# Step5:构建检索器
retriever = vector_store.as_retriever()
# Step6 设置提示词
system_prompt = """
你是一个智能助手,我现在需要你使用我提供的上下文来回答我的问题,如果上下文中没有你就回答不知道即可;回答请保持三句话以内,并且尽可能的精简:\n
{context}
"""
prompt = ChatPromptTemplate.from_messages( ### 提问和回答 历史记录 模板
[
("system", system_prompt),
MessagesPlaceholder('chat_history'), # 添加历史记录
("human", "{input}")
]
)
# Step7 创建链
chain1 = create_stuff_documents_chain(llm=model, prompt=prompt)
#############################################################################
# Step8 创建历史链子链
contextualize_q_system_prompt = """
给定聊天记录和最新的用户问题,该问题可能参考聊天记录中的上下文,聊天记录的情况下可以理解。不要回答问题,如果需要,只需重新表述,否则返回原样。
制定一个独立的问题,在不参考
"""
retriever_history_template = ChatPromptTemplate.from_messages([
("system", contextualize_q_system_prompt),
MessagesPlaceholder('chat_history'),
("human", "{input}")
])
# 创建历史子链
history_chain = create_history_aware_retriever(
llm=model,
retriever=retriever,
prompt=retriever_history_template
)
# Step9 保持问答的历史记录
# ### 此处可以使用Redis、本地内存....
# ### 此处为了简化直接使用内存
store = {}
def get_history_msg(session_id: str):
if session_id not in store:
store[session_id] = ChatMessageHistory()
return store[session_id]
# Step10 组合链chain1和history_chain
merge_chain = create_retrieval_chain(history_chain, chain1)
result_chain = RunnableWithMessageHistory(
merge_chain,
get_history_msg,
input_messages_key='input',
history_messages_key='chat_history',
output_messages_key='answer',
)
######
### 注意,这样写貌似好像不知道是因为LLMs模型的原因还是啥,好像模型不太智能,和理想的回答好像不太一样....。
######
# 第一轮对话
result1 = result_chain.invoke(
# input= {'input': "What is Task Decomposition?"},
input={'input': "本文主要讲了什么?"},
config={'configurable': {"session_id": '123456'}, "callbacks": [ConsoleCallbackHandler()]}
)
print(result1['answer'])
print(f"{"-" * 100}")
# 第二轮对话
result2 = result_chain.invoke(
# input= {'input': "What are common ways of doing it?"},
input={'input': "我刚刚问了什么?你怎么回答的?"},
config={'configurable': {"session_id": '123456'}, "callbacks": [ConsoleCallbackHandler()]}
)
print(result2['answer'])
print(f"{"-" * 100}")
# 第三轮对话
result2 = result_chain.invoke(
# input= {'input': "What are common ways of doing it?"},
input={'input': "我最开始问了什么?你怎么回答的?"},
config={'configurable': {"session_id": '123456'}, "callbacks": [ConsoleCallbackHandler()]}
)
print(result2['answer'])
print(f"{"-" * 100}")
文本摘要:load_summarize_chain
stuff方式
使用create_stuff_documents_chain或者load_summarize_chain指定chain_type为stuff
import os
import bs4
from boto3.docs.attr import document_attribute
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain.chains.llm import LLMChain
from langchain.chains.summarize import load_summarize_chain
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate, BasePromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_ollama import OllamaEmbeddings, ChatOllama
# Step1 初始化model和embedding
# model = ChatOpenAI(base_url="http://localhost:11434/v1", api_key="ollama", model='qwen2.5:latest', temperature=0.8)
model = ChatOllama(model='qwen2.5:latest', temperature=0)
embedding = OllamaEmbeddings(model='nomic-embed-text')
# Step2 加载文档
loader = WebBaseLoader(
web_path='https://blog.csdn.net/ultingCSDN/article/details/145062991?spm=1001.2014.3001.5501',
bs_kwargs=dict(
parse_only=bs4.SoupStrainer(class_=('article-header', 'title-article', 'htmledit_views'))
# 解析这个网页下面的这三个类选择器的文档;
)
)
# 加载数据
docs = loader.load()
# Step3.1 直接使用load_summarize_chain
# ### 未结合LLMs,可能不太智能
chain = load_summarize_chain(llm=model, chain_type='stuff') # chain_type:refine
result_no1 = chain.invoke(docs)
print(f'NO1版本:{result_no1}')
print(f'NO1版本:{result_no1['output_text']}')
# Step3.2 结合LLM模板提问
prompt_template = """ 针对下面的内容,写一个简洁的总结摘要:
"{text}"
简洁的总结摘要:
"""
prompt = PromptTemplate.from_template(prompt_template)
### 未使用 stuff 时 的链
chain = ({'text': RunnablePassthrough()}
| prompt
| model
)
result_NO2 = chain.invoke({"text": docs})
print(f'未使用 stuff 时:{result_NO2}')
### 使用 stuff时的链---老版本,已过时
llm_chain = LLMChain(llm=model, prompt=prompt)
stuff_chain = StuffDocumentsChain(
llm_chain=llm_chain,
document_variable_name='text'
)
result_old = stuff_chain.invoke(docs)
print(f'使用 stuff 时的老版本:{result_old}')
print(f'使用 stuff 时的老版本:{result_old['output_text']}')
### 使用 stuff时的链---新版本
stuff_chain = create_stuff_documents_chain(
llm=model,
prompt=prompt,
output_parser=StrOutputParser(),
document_variable_name='text'
)
result_new = stuff_chain.invoke({"text": docs})
print(f'使用 stuff 时的新版本:{result_new}')
Refine方式
import os
import bs4
from langchain.chains.summarize import load_summarize_chain
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.documents import Document
from langchain_core.prompts import PromptTemplate
from langchain_ollama import OllamaEmbeddings, ChatOllama
from langchain_text_splitters import CharacterTextSplitter
from utils.PDFUtils import load_pdf_pdfplumber
"""
文本摘要:Refine阶段
Refine:RefineDocumentsChain,类似于Map-Reduce(映射-归约)
细化文档链通过循环遍历输入文档并逐步更新其答案来构建响应。对于每个文档,它将当前文档和最新的中间答案传递给LLM链,以获得最新的答案
"""
# model = ChatOpenAI(base_url="http://localhost:11434/v1", api_key="ollama", model='qwen2.5:latest', temperature=0.8)
model = ChatOllama(model='qwen2.5:latest', temperature=0)
embedding = OllamaEmbeddings(model='nomic-embed-text')
# 加载本地PDF
# pdf_text = load_pdf_pdfplumber(
# "D://Z20下载缓存//A3谷歌//回话有招.pdf",
# start_page=5
# )
# base_document = Document(page_content=pdf_text)
# 加载网页
# loader = WebBaseLoader(
# web_path='https://www.zhonghuadiancang.com/waiguomingzhu/9528/194318.html',
# bs_kwargs=dict(
# parse_only=bs4.SoupStrainer(class_=('panel-body'))
# )
# )
# docs = loader.load() # 得到整篇文章
loader = WebBaseLoader(
web_path='https://blog.csdn.net/ultingCSDN/article/details/145062991?spm=1001.2014.3001.5501',
bs_kwargs=dict(
parse_only=bs4.SoupStrainer(class_=('article-header', 'title-article', 'htmledit_views'))
# 解析这个网页下面的这三个类选择器的文档;
)
)
# 加载数据
docs = loader.load()
# 第一步:切割阶段
# ### 每一个小的docs大小为1000,然后进入map阶段...
# text_splitter = CharacterTextSplitter.from_tiktoken_encoder(chunk_size=1000, chunk_overlap=0)
text_splitter = CharacterTextSplitter(
chunk_size=2000,
chunk_overlap=0,
separator="\n"
)
# split_docs = text_splitter.split_documents([base_document])
split_docs = text_splitter.split_documents(docs)
# ### 使用方法一:原始没有提示情况下;
# 指定chain_type的类型为“refine”
chain = load_summarize_chain(llm=model, chain_type='refine')
result = chain.invoke(split_docs)
print(result)
print(result['output_text'])
# ### 使用方法二:有提示词模板情况下;
# 初次提示词
prompt_template = """
针对下面的内容,写一个简洁的总结摘要:
"{text}"
简洁的总结摘要:
"""
prompt = PromptTemplate.from_template(prompt_template)
# 继续的提示词
refine_template = (
"你的工作是做出一个最终的总结摘要。\n"
"我们提供了一个到某个点的现有摘要:{existing_answer}\n"
"我们有机会完善现有的摘要,基于下面更多的文本内容\n"
"--------------------------\n"
"{text}\n"
"--------------------------\n"
)
refine_prompt = PromptTemplate.from_template(refine_template)
# 指定chain_type的类型为“refine”
chain = load_summarize_chain(
llm=model,
chain_type='refine',
question_prompt=prompt,
refine_prompt=refine_prompt,
input_key='input_documents',
output_key='output_text',
)
result1 = chain.invoke(split_docs)
print(result1)
print(result1['output_text'])
SQL执行链:create_sql_query_chain
import os
from operator import itemgetter
from langchain.chains.sql_database.query import create_sql_query_chain
from langchain_community.tools import QuerySQLDataBaseTool
from langchain_community.utilities import SQLDatabase
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_ollama.chat_models import ChatOllama
"""
1、直接使用llm+数据库
2、通过langchain的执行链读取数据库
"""
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "xxxxx"
os.environ["LANGCHAIN_API_KEY"] = 'xxxxxx'
os.environ["TAVILY_API_KEY"] = 'tvly-xxxxxx'
model = ChatOllama(model='qwen2.5:latest')
HOSTNAME = 'ip'
PORT = '端口'
# DATABASE = 'xxxxx'
DATABASE = '库名'
USERNAME = '用户名'
PASSWORD = '数据库密码'
MYSQL_URI = 'mysql+mysqldb://{}:{}@{}:{}/{}?charset=utf8'.format(USERNAME, PASSWORD, HOSTNAME, PORT, DATABASE)
# db = MySQLdb.connect(MYSQL_URI, USERNAME, PASSWORD, DATABASE)
db = SQLDatabase.from_uri(MYSQL_URI)
### 测试连接是否成功
print(db.get_usable_table_names())
print(db.run('select * from user limit 1;'))
test_chain = create_sql_query_chain(model, db)
# 方法一:直接使用LLM和数据库整合 --- 可以看到,系统只输出SQL语句,不会真正执行SQL;
result = test_chain.invoke({'question': "请问员工表有多少人?"})
print(result)
# 方法二:使用执行链执行
answer_prompt = PromptTemplate.from_template(
"""
给定以下用户问题、SQL语句和SQL执行后的结果,回答用户问题。
Question: {question}
SQL Query: {query}
SQL Result: {result}
回答:
"""
)
excute_sql_tool = QuerySQLDataBaseTool(db=db)
# 1、生成SQL语句;2、执行sql语句
### 在第一个assign中,调用 test_chain 生成sql语句,然后将sql结果传递给query;
### 在第二个assign中,先执行sql(excute_sql_tool),然后将结果取出来,传递给result
chain = (RunnablePassthrough.assign(query=test_chain).assign(result=itemgetter('query') | excute_sql_tool)
| answer_prompt
| model
| StrOutputParser()
)
result1 = test_chain.invoke({'question': "请问在sys_user员工表中有多少条数据?请将员工“nick_name”打印出来;"})
print(result1)