构建智能对话系统:Python实现聊天话题管理与摘要生成
在当今信息爆炸的时代,如何有效管理和组织对话记录成为了一个重要挑战。本文将介绍如何使用Python构建一个智能对话系统,该系统能够自动识别对话话题、生成摘要,并提供智能对话管理功能。
系统概述
这个智能对话系统的核心功能包括:
- 对话记录管理:使用SQLite数据库持久化存储聊天记录
- 话题自动识别:将相关对话内容聚类成话题
- 智能摘要生成:对每个话题生成简洁的摘要
- 多智能体协作:使用多个AI智能体分工处理不同任务
下面我们来详细解析系统的各个组成部分。
环境准备与依赖安装
首先,确保你已安装Python 3.7+,然后安装必要的依赖包:
pip install langchain langchain-community langchain-openai
如果你的OpenAI模型是通过LM Studio本地部署的,还需要配置相应的API端点。
项目结构设计
合理的项目结构是代码可维护性的基础,我们的项目结构如下:
chat_system/
├── chat_manager.py # 主程序文件
├── requirements.txt # 依赖列表
└── chat_topics.db # SQLite数据库(自动生成)
核心代码解析
1. 数据模型设计
我们首先定义两个核心数据模型:话题(Topic)和聊天消息(ChatMessage)。
class Topic:def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):self.topic_id = topic_idself.topic = topicself.summary = summaryself.message_ids = message_idsclass ChatMessage:def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):self.message_id = message_idself.role = roleself.content = contentself.timestamp = timestamp or datetime.now()
2. 数据库管理
我们使用SQLite数据库进行数据持久化存储,通过DatabaseManager类统一管理数据库操作。
class DatabaseManager:def __init__(self, db_path: str = "chat_topics.db"):self.db_path = db_pathself.init_db()def init_db(self):"""初始化数据库表"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 创建topics表cursor.execute("""CREATE TABLE IF NOT EXISTS topics (topic_id TEXT PRIMARY KEY,topic TEXT NOT NULL,summary TEXT,message_ids TEXT)""")# 创建chat_messages表cursor.execute("""CREATE TABLE IF NOT EXISTS chat_messages (message_id TEXT PRIMARY KEY,session_id TEXT NOT NULL,role TEXT NOT NULL,content TEXT NOT NULL,timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)""")conn.commit()conn.close()
3. 核心功能工具
系统提供了三个核心工具函数,分别负责话题选择、历史记录获取和话题压缩。
def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:"""根据topic_ids选择话题的工具"""# 实现细节...def get_history(session_id: str) -> List[Dict[str, Any]]:"""获取当前会话的聊天记录"""# 实现细节...def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:"""将聊天记录压缩成话题"""# 实现细节...
4. 多智能体系统
系统采用多智能体架构,每个智能体负责不同的任务:
- 历史信息检索智能体:负责检索相关的历史话题
- 问答智能体:基于上下文信息回答用户问题
- 话题总结智能体:将对话记录压缩成结构化话题
# 创建智能体执行器
history_retrieval_agent = AgentExecutor(agent=history_retrieval_agent,tools=tools,verbose=True,handle_parsing_errors=True
)# 类似创建其他智能体...
5. 主协调函数
super_agent函数作为系统的主入口,协调各个智能体协作处理用户输入。
def super_agent(user_input: str, session_id: str):"""超级智能体主函数"""if not session_id:session_id = str(uuid.uuid4())# 保存用户输入user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)db_manager.save_chat_message(session_id, user_message)# 1. 历史信息检索智能体print("=== 历史信息检索智能体 ===")# ... 检索相关话题# 2. 使用检测信息回答用户问题智能体print("=== 问答智能体 ===")# ... 生成回答# 3. 处理历史信息 话题总结智能体print("=== 话题总结智能体 ===")# ... 压缩对话为话题return {"session_id": session_id,"qa_response": qa_response,"new_topics": new_topic}
系统特色与优势
1. 模块化设计
系统采用高度模块化的设计,各个组件职责分明,便于维护和扩展。
2. 智能话题识别
系统能够自动识别对话中的主题,并将相关对话内容聚类,大大提高了对话记录的可管理性。
3. 灵活的数据持久化
使用SQLite数据库,既保证了数据的持久化,又避免了复杂数据库的部署成本。
4. 多智能体协作
通过多个专门化的智能体分工合作,提高了系统的整体性能和准确性。
实际应用示例
下面是一个简单的使用示例:
if __name__ == "__main__":result = super_agent("你好,能告诉我天气怎么样吗?", "session_1")print(f"最终结果: {result}")
系统会自动处理用户输入,检索相关历史话题,生成回答,并将当前对话压缩成新话题。
扩展方向
这个基础系统有很多可能的扩展方向:
- 用户认证系统:添加用户登录和权限管理
- 话题可视化:提供图形化界面展示话题关系
- 高级摘要算法:使用更先进的NLP技术生成质量更高的摘要
- 多语言支持:扩展系统以支持多种语言
- 实时协作:支持多用户同时使用和协作
总结
本文详细介绍了一个基于Python的智能对话系统的设计与实现。系统利用LangChain框架和SQLite数据库,实现了对话记录管理、话题识别和摘要生成等核心功能。通过多智能体协作架构,系统能够高效地处理用户查询并管理对话历史。
这种系统可以广泛应用于客服系统、个人知识管理、团队协作等多种场景,帮助用户更好地组织和利用对话信息。希望本文能为你在构建智能对话系统方面提供有益的参考和启发。
提示:在实际部署时,建议添加错误处理、日志记录和性能监控等生产环境需要的功能,以确保系统的稳定性和可维护性。
import os
import uuid
import sqlite3
import json
from datetime import datetime
from typing import List, Dict, Any, Unionfrom langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
from langchain.tools import Tool# 为避免弃用警告,使用条件导入
try:from langchain_openai import ChatOpenAI
except ImportError:from langchain_community.chat_models import ChatOpenAI# 配置连接到LM Studio的LLM
llm = ChatOpenAI(model="qwen/qwen3-1.7b",openai_api_key="lm-studio",openai_api_base="http://127.0.0.1:1234/v1",temperature=0.7
)class Topic:def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):self.topic_id = topic_idself.topic = topicself.summary = summaryself.message_ids = message_idsclass ChatMessage:def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):self.message_id = message_idself.role = roleself.content = contentself.timestamp = timestamp or datetime.now()class DatabaseManager:def __init__(self, db_path: str = "chat_topics.db"):self.db_path = db_pathself.init_db()def init_db(self):"""初始化数据库表"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 创建topics表cursor.execute("""CREATE TABLE IF NOT EXISTS topics (topic_id TEXT PRIMARY KEY,topic TEXT NOT NULL,summary TEXT,message_ids TEXT)""")# 创建chat_messages表cursor.execute("""CREATE TABLE IF NOT EXISTS chat_messages (message_id TEXT PRIMARY KEY,session_id TEXT NOT NULL,role TEXT NOT NULL,content TEXT NOT NULL,timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)""")conn.commit()conn.close()def save_topic(self, topic: Topic):"""保存话题"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("INSERT OR REPLACE INTO topics (topic_id, topic, summary, message_ids) VALUES (?, ?, ?, ?)",(topic.topic_id, topic.topic, topic.summary, json.dumps(topic.message_ids)))conn.commit()conn.close()def get_all_topics(self) -> List[Topic]:"""获取所有话题"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("SELECT topic_id, topic, summary, message_ids FROM topics")rows = cursor.fetchall()conn.close()topics = []for row in rows:topic_id, topic_str, summary, message_ids_str = rowmessage_ids = json.loads(message_ids_str) if message_ids_str else []topics.append(Topic(topic_id, topic_str, summary, message_ids))return topicsdef save_chat_message(self, session_id: str, message: ChatMessage):"""保存聊天消息"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 使用INSERT OR IGNORE避免重复插入cursor.execute("INSERT OR IGNORE INTO chat_messages (message_id, session_id, role, content) VALUES (?, ?, ?, ?)",(message.message_id, session_id, message.role, message.content))conn.commit()conn.close()def get_session_messages(self, session_id: str) -> List[ChatMessage]:"""获取会话消息"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("SELECT message_id, role, content, timestamp FROM chat_messages WHERE session_id = ? ORDER BY timestamp",(session_id,))rows = cursor.fetchall()conn.close()messages = []for row in rows:message_id, role, content, timestamp = row# 处理时间戳格式if isinstance(timestamp, str):try:timestamp = datetime.fromisoformat(timestamp)except ValueError:timestamp = datetime.now()messages.append(ChatMessage(message_id, role, content, timestamp))return messages# 创建数据库管理器实例
db_manager = DatabaseManager()def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:"""根据topic_ids选择话题的工具,支持为每个topic_id指定summary_only参数"""topics = db_manager.get_all_topics()# 如果topic_ids为空,处理所有话题if not topic_ids:selected_topics = topics# 如果summary_only是布尔值,则应用于所有话题if isinstance(summary_only, bool):summary_flags = [summary_only] * len(selected_topics)# 如果summary_only是列表,则按顺序应用elif isinstance(summary_only, list):summary_flags = summary_onlyelse:summary_flags = [False] * len(selected_topics)else:# 根据指定的topic_ids筛选话题selected_topics = [topic for topic in topics if topic.topic_id in topic_ids]# 如果summary_only是布尔值,则应用于所有选定话题if isinstance(summary_only, bool):summary_flags = [summary_only] * len(selected_topics)# 如果summary_only是列表,则按顺序应用(如果长度不够则用False填充)elif isinstance(summary_only, list):summary_flags = summary_only[:len(selected_topics)]# 如果summary_only列表长度不足,用False填充summary_flags.extend([False] * (len(selected_topics) - len(summary_flags)))else:summary_flags = [False] * len(selected_topics)result = []for i, topic in enumerate(selected_topics):# 获取对应的summary_only标志show_summary_only = summary_flags[i] if i < len(summary_flags) else Falseif show_summary_only:result.append({"topic_id": topic.topic_id,"topic": topic.topic,"summary": topic.summary})else:result.append({"topic_id": topic.topic_id,"topic": topic.topic,"summary": topic.summary,"message_ids": topic.message_ids})return resultdef get_history(session_id: str) -> List[Dict[str, Any]]:"""获取当前会话的聊天记录"""messages = db_manager.get_session_messages(session_id)return [{"role": msg.role, "content": msg.content, "message_id": msg.message_id} for msg in messages]def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:"""将新的聊天记录压缩成话题"""# 这里应该调用LLM来生成话题摘要,为了简化,我们使用简单的方法if not new_messages:return []# 创建新话题topic_id = str(uuid.uuid4())topic_content = new_messages[0]['content'][:20] + "..." if len(new_messages[0]['content']) > 20 else new_messages[0]['content']message_ids = [msg.get('message_id', str(uuid.uuid4())) for msg in new_messages]# 保存消息到数据库for msg in new_messages:if 'message_id' not in msg:msg['message_id'] = str(uuid.uuid4())chat_msg = ChatMessage(msg['message_id'], msg['role'], msg['content'])db_manager.save_chat_message(session_id, chat_msg)# 创建话题对象topic = Topic(topic_id=topic_id,topic=topic_content,summary=f"包含{len(new_messages)}条消息的话题",message_ids=message_ids)# 保存话题到数据库db_manager.save_topic(topic)# 返回话题信息return [{"topic_id": topic.topic_id,"topic": topic.topic,"summary": topic.summary,"message_ids": topic.message_ids}]# 定义工具列表
tools = [Tool.from_function(func=get_history,name="get_history",description="获取指定会话的聊天历史记录"),Tool.from_function(func=select_topic_tool,name="select_topic_tool",description="根据topic_ids选择话题,支持为每个topic指定是否只显示摘要"),Tool.from_function(func=compress_history_to_topic,name="compress_history_to_topic",description="将聊天记录压缩成话题")
]# 创建提示模板
template = """Answer the following questions as best you can. You have access to the following tools:{tools}Use the following format:Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input questionBegin!Question: {input}
Thought:{agent_scratchpad}"""prompt = PromptTemplate.from_template(template)# 创建智能体
history_retrieval_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
qa_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
topic_summary_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)# 创建智能体执行器
history_retrieval_agent = AgentExecutor(agent=history_retrieval_agent,tools=tools,verbose=True,handle_parsing_errors=True
)qa_agent = AgentExecutor(agent=qa_agent,tools=tools,verbose=True,handle_parsing_errors=True
)topic_summary_agent = AgentExecutor(agent=topic_summary_agent,tools=tools,verbose=True,handle_parsing_errors=True
)def super_agent(user_input: str, session_id: str):"""超级智能体主函数"""if not session_id:session_id = str(uuid.uuid4())# 保存用户输入user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)db_manager.save_chat_message(session_id, user_message)# 1. 历史信息检索智能体print("=== 历史信息检索智能体 ===")# 检索相关话题all_topics = db_manager.get_all_topics()topic_ids = [topic.topic_id for topic in all_topics]if topic_ids:# 为每个topic_id指定不同的summary_only值summary_flags = [True, False] * (len(topic_ids) // 2 + 1)summary_flags = summary_flags[:len(topic_ids)]related_topics = select_topic_tool(topic_ids, summary_flags)print(f"相关话题: {related_topics}")else:print("未找到相关话题")related_topics = []# 2. 使用检测信息回答用户问题智能体print("=== 问答智能体 ===")history = get_history(session_id)print(f"会话历史: {history}")# 构造问答上下文qa_context = f"用户问题: {user_input}\n相关话题: {related_topics}\n会话历史: {history}"try:qa_response = qa_agent.invoke({"input": f"基于以下信息回答用户问题:\n{qa_context}\n\n用户问题: {user_input}","tools": tools,"tool_names": ", ".join([t.name for t in tools])})print(f"问答结果: {qa_response}")except Exception as e:print(f"问答智能体执行出错: {e}")qa_response = {"output": "抱歉,我无法生成回答。"}# 3. 处理历史信息 话题总结智能体print("=== 话题总结智能体 ===")# 将当前会话消息压缩成话题session_messages = get_history(session_id)if session_messages:new_topic = compress_history_to_topic(session_id, session_messages)print(f"新话题: {new_topic}")else:print("没有新消息需要压缩成话题")new_topic = []return {"session_id": session_id,"qa_response": qa_response.get("output", qa_response.get("result", "无回答")),"new_topics": new_topic}# 示例使用
if __name__ == "__main__":result = super_agent("你好,能告诉我天气怎么样吗?", "session_1")print(f"最终结果: {result}")
并行
import os
import uuid
import sqlite3
import json
from datetime import datetime
from typing import List, Dict, Any, Union
from concurrent.futures import ThreadPoolExecutor, as_completedfrom langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
from langchain.tools import Tool# 为避免弃用警告,使用条件导入
try:from langchain_openai import ChatOpenAI
except ImportError:from langchain_community.chat_models import ChatOpenAI# 配置连接到LM Studio的LLM
llm = ChatOpenAI(model="qwen/qwen3-1.7b",openai_api_key="lm-studio",openai_api_base="http://127.0.0.1:1234/v1",temperature=0.7
)class Topic:def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):self.topic_id = topic_idself.topic = topicself.summary = summaryself.message_ids = message_idsclass ChatMessage:def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):self.message_id = message_idself.role = roleself.content = contentself.timestamp = timestamp or datetime.now()class DatabaseManager:def __init__(self, db_path: str = "chat_topics.db"):self.db_path = db_pathself.init_db()def init_db(self):"""初始化数据库表"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 创建topics表cursor.execute("""CREATE TABLE IF NOT EXISTS topics (topic_id TEXT PRIMARY KEY,topic TEXT NOT NULL,summary TEXT,message_ids TEXT)""")# 创建chat_messages表cursor.execute("""CREATE TABLE IF NOT EXISTS chat_messages (message_id TEXT PRIMARY KEY,session_id TEXT NOT NULL,role TEXT NOT NULL,content TEXT NOT NULL,timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)""")conn.commit()conn.close()def save_topic(self, topic: Topic):"""保存话题"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("INSERT OR REPLACE INTO topics (topic_id, topic, summary, message_ids) VALUES (?, ?, ?, ?)",(topic.topic_id, topic.topic, topic.summary, json.dumps(topic.message_ids)))conn.commit()conn.close()def get_all_topics(self) -> List[Topic]:"""获取所有话题"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("SELECT topic_id, topic, summary, message_ids FROM topics")rows = cursor.fetchall()conn.close()topics = []for row in rows:topic_id, topic_str, summary, message_ids_str = rowmessage_ids = json.loads(message_ids_str) if message_ids_str else []topics.append(Topic(topic_id, topic_str, summary, message_ids))return topicsdef save_chat_message(self, session_id: str, message: ChatMessage):"""保存聊天消息"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 使用INSERT OR IGNORE避免重复插入cursor.execute("INSERT OR IGNORE INTO chat_messages (message_id, session_id, role, content) VALUES (?, ?, ?, ?)",(message.message_id, session_id, message.role, message.content))conn.commit()conn.close()def get_session_messages(self, session_id: str) -> List[ChatMessage]:"""获取会话消息"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("SELECT message_id, role, content, timestamp FROM chat_messages WHERE session_id = ? ORDER BY timestamp",(session_id,))rows = cursor.fetchall()conn.close()messages = []for row in rows:message_id, role, content, timestamp = row# 处理时间戳格式if isinstance(timestamp, str):try:timestamp = datetime.fromisoformat(timestamp)except ValueError:timestamp = datetime.now()messages.append(ChatMessage(message_id, role, content, timestamp))return messages# 创建数据库管理器实例
db_manager = DatabaseManager()def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:"""根据topic_ids选择话题的工具,支持为每个topic_id指定summary_only参数"""try:topics = db_manager.get_all_topics()# 如果topic_ids为空,处理所有话题if not topic_ids:selected_topics = topics# 如果summary_only是布尔值,则应用于所有话题if isinstance(summary_only, bool):summary_flags = [summary_only] * len(selected_topics)# 如果summary_only是列表,则按顺序应用elif isinstance(summary_only, list):summary_flags = summary_onlyelse:summary_flags = [False] * len(selected_topics)else:# 根据指定的topic_ids筛选话题selected_topics = [topic for topic in topics if topic.topic_id in topic_ids]# 如果summary_only是布尔值,则应用于所有选定话题if isinstance(summary_only, bool):summary_flags = [summary_only] * len(selected_topics)# 如果summary_only是列表,则按顺序应用(如果长度不够则用False填充)elif isinstance(summary_only, list):summary_flags = summary_only[:len(selected_topics)]# 如果summary_only列表长度不足,用False填充summary_flags.extend([False] * (len(selected_topics) - len(summary_flags)))else:summary_flags = [False] * len(selected_topics)result = []for i, topic in enumerate(selected_topics):# 获取对应的summary_only标志show_summary_only = summary_flags[i] if i < len(summary_flags) else Falseif show_summary_only:result.append({"topic_id": topic.topic_id,"topic": topic.topic,"summary": topic.summary})else:result.append({"topic_id": topic.topic_id,"topic": topic.topic,"summary": topic.summary,"message_ids": topic.message_ids})return resultexcept Exception as e:print(f"select_topic_tool执行出错: {e}")return []def get_history(session_id: str) -> List[Dict[str, Any]]:"""获取当前会话的聊天记录"""try:messages = db_manager.get_session_messages(session_id)return [{"role": msg.role, "content": msg.content, "message_id": msg.message_id} for msg in messages]except Exception as e:print(f"get_history执行出错: {e}")return []def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]] = None) -> List[Dict[str, Any]]:"""将新的聊天记录压缩成话题,并合并相似话题"""try:# 如果没有提供new_messages,则从数据库获取会话消息if new_messages is None:new_messages = get_history(session_id)# 这里应该调用LLM来生成话题摘要,为了简化,我们使用简单的方法if not new_messages:return []# 创建新话题topic_id = str(uuid.uuid4())topic_content = new_messages[0]['content'][:20] + "..." if len(new_messages[0]['content']) > 20 else new_messages[0]['content']message_ids = [msg.get('message_id', str(uuid.uuid4())) for msg in new_messages]# 保存消息到数据库for msg in new_messages:if 'message_id' not in msg:msg['message_id'] = str(uuid.uuid4())chat_msg = ChatMessage(msg['message_id'], msg['role'], msg['content'])db_manager.save_chat_message(session_id, chat_msg)# 创建话题对象new_topic = Topic(topic_id=topic_id,topic=topic_content,summary=f"包含{len(new_messages)}条消息的话题",message_ids=message_ids)# 检查是否已存在相似话题all_topics = db_manager.get_all_topics()merged = Falsefor existing_topic in all_topics:# 简单的相似性检查:比较话题标题的前几个字符if existing_topic.topic[:10] == topic_content[:10] and existing_topic.topic_id != topic_id:# 合并到现有话题existing_topic.message_ids.extend(message_ids)existing_topic.summary = f"包含{len(existing_topic.message_ids)}条消息的话题"db_manager.save_topic(existing_topic)merged = Truebreak# 如果没有合并,则保存新话题if not merged:db_manager.save_topic(new_topic)# 返回话题信息return [{"topic_id": new_topic.topic_id,"topic": new_topic.topic,"summary": new_topic.summary,"message_ids": new_topic.message_ids}]except Exception as e:print(f"compress_history_to_topic执行出错: {e}")return []# 定义工具列表
tools = [Tool.from_function(func=get_history,name="get_history",description="获取指定会话的聊天历史记录"),Tool.from_function(func=select_topic_tool,name="select_topic_tool",description="根据topic_ids选择话题,支持为每个topic指定是否只显示摘要"),Tool.from_function(func=lambda session_id: compress_history_to_topic(session_id),name="compress_history_to_topic",description="将聊天记录压缩成话题")
]# 创建提示模板
template = """Answer the following questions as best you can. You have access to the following tools:{tools}Use the following format:Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input questionBegin!Question: {input}
Thought: {agent_scratchpad}"""prompt = PromptTemplate.from_template(template)# 创建智能体
history_retrieval_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
qa_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
topic_summary_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)# 创建智能体执行器
history_retrieval_agent = AgentExecutor(agent=history_retrieval_agent,tools=tools,verbose=True,handle_parsing_errors=True
)qa_agent = AgentExecutor(agent=qa_agent,tools=tools,verbose=True,handle_parsing_errors=True
)topic_summary_agent = AgentExecutor(agent=topic_summary_agent,tools=tools,verbose=True,handle_parsing_errors=True
)def super_agent(user_input: str, session_id: str):"""超级智能体主函数"""if not session_id:session_id = str(uuid.uuid4())# 保存用户输入user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)db_manager.save_chat_message(session_id, user_message)# 1. 历史信息检索智能体(并行搜索)print("=== 历史信息检索智能体(并行搜索) ===")# 检索相关话题all_topics = db_manager.get_all_topics()topic_ids = [topic.topic_id for topic in all_topics]related_topics = []if topic_ids:# 将话题ID分组,每组10个topic_groups = [topic_ids[i:i+10] for i in range(0, len(topic_ids), 10)]# 并行执行多个搜索任务,每组话题作为一个任务with ThreadPoolExecutor(max_workers=min(5, len(topic_groups))) as executor:# 提交多个任务future_to_topic = {}# 为每组话题创建一个任务for i, group in enumerate(topic_groups):# 交替使用摘要和完整信息模式summary_mode = (i % 2 == 0) # 偶数索引组使用摘要模式future = executor.submit(select_topic_tool, group, summary_mode)future_to_topic[future] = f"话题组{i}搜索({'摘要' if summary_mode else '完整'})"# 任务: 获取会话历史future3 = executor.submit(get_history, session_id)future_to_topic[future3] = "会话历史"# 收集并行任务结果search_results = {}for future in as_completed(future_to_topic):task_name = future_to_topic[future]try:result = future.result()search_results[task_name] = resultprint(f"{task_name} 完成,结果数: {len(result) if isinstance(result, list) else 'N/A'}")except Exception as exc:print(f'{task_name} 执行时发生异常: {exc}')search_results[task_name] = []# 合并所有话题搜索结果for task_name, result in search_results.items():if "话题组" in task_name and isinstance(result, list):related_topics.extend(result)print(f"合并后相关话题数: {len(related_topics)}")else:print("未找到相关话题")related_topics = []# 2. 使用检测信息回答用户问题智能体print("=== 问答智能体 ===")history = get_history(session_id)print(f"会话历史: {history}")# 构造问答上下文qa_context = f"用户问题: {user_input}\n相关话题: {related_topics}\n会话历史: {history}"try:qa_response = qa_agent.invoke({"input": f"基于以下信息回答用户问题:\n{qa_context}\n\n用户问题: {user_input}"})print(f"问答结果: {qa_response}")except Exception as e:print(f"问答智能体执行出错: {e}")qa_response = {"output": "抱歉,我无法生成回答。"}# 3. 处理历史信息 话题总结智能体print("=== 话题总结智能体 ===")# 将当前会话消息压缩成话题new_topic = compress_history_to_topic(session_id)if new_topic:print(f"新话题: {new_topic}")else:print("没有新消息需要压缩成话题")# 确保qa_response不为None并且有合适的键if qa_response is None:qa_response = {"output": "抱歉,我无法生成回答。"}# 处理可能缺少output或result键的情况if "output" in qa_response:qa_result = qa_response["output"]elif "result" in qa_response:qa_result = qa_response["result"]else:qa_result = "无回答"return {"session_id": session_id,"qa_response": qa_result,"new_topics": new_topic}# 示例使用
if __name__ == "__main__":result = super_agent("你好,能告诉我天气怎么样吗?", "session_1")print(f"最终结果: {result}")
