当前位置: 首页 > news >正文

构建智能对话系统:Python实现聊天话题管理与摘要生成

在当今信息爆炸的时代,如何有效管理和组织对话记录成为了一个重要挑战。本文将介绍如何使用Python构建一个智能对话系统,该系统能够自动识别对话话题、生成摘要,并提供智能对话管理功能。

系统概述

这个智能对话系统的核心功能包括:

  • 对话记录管理:使用SQLite数据库持久化存储聊天记录
  • 话题自动识别:将相关对话内容聚类成话题
  • 智能摘要生成:对每个话题生成简洁的摘要
  • 多智能体协作:使用多个AI智能体分工处理不同任务

下面我们来详细解析系统的各个组成部分。

环境准备与依赖安装

首先,确保你已安装Python 3.7+,然后安装必要的依赖包:

pip install langchain langchain-community langchain-openai

如果你的OpenAI模型是通过LM Studio本地部署的,还需要配置相应的API端点。

项目结构设计

合理的项目结构是代码可维护性的基础,我们的项目结构如下:

chat_system/
├── chat_manager.py     # 主程序文件
├── requirements.txt    # 依赖列表
└── chat_topics.db     # SQLite数据库(自动生成)

核心代码解析

1. 数据模型设计

我们首先定义两个核心数据模型:话题(Topic)和聊天消息(ChatMessage)。

class Topic:def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):self.topic_id = topic_idself.topic = topicself.summary = summaryself.message_ids = message_idsclass ChatMessage:def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):self.message_id = message_idself.role = roleself.content = contentself.timestamp = timestamp or datetime.now()

2. 数据库管理

我们使用SQLite数据库进行数据持久化存储,通过DatabaseManager类统一管理数据库操作。

class DatabaseManager:def __init__(self, db_path: str = "chat_topics.db"):self.db_path = db_pathself.init_db()def init_db(self):"""初始化数据库表"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 创建topics表cursor.execute("""CREATE TABLE IF NOT EXISTS topics (topic_id TEXT PRIMARY KEY,topic TEXT NOT NULL,summary TEXT,message_ids TEXT)""")# 创建chat_messages表cursor.execute("""CREATE TABLE IF NOT EXISTS chat_messages (message_id TEXT PRIMARY KEY,session_id TEXT NOT NULL,role TEXT NOT NULL,content TEXT NOT NULL,timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)""")conn.commit()conn.close()

3. 核心功能工具

系统提供了三个核心工具函数,分别负责话题选择、历史记录获取和话题压缩。

def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:"""根据topic_ids选择话题的工具"""# 实现细节...def get_history(session_id: str) -> List[Dict[str, Any]]:"""获取当前会话的聊天记录"""# 实现细节...def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:"""将聊天记录压缩成话题"""# 实现细节...

4. 多智能体系统

系统采用多智能体架构,每个智能体负责不同的任务:

  • 历史信息检索智能体:负责检索相关的历史话题
  • 问答智能体:基于上下文信息回答用户问题
  • 话题总结智能体:将对话记录压缩成结构化话题
# 创建智能体执行器
history_retrieval_agent = AgentExecutor(agent=history_retrieval_agent,tools=tools,verbose=True,handle_parsing_errors=True
)# 类似创建其他智能体...

5. 主协调函数

super_agent函数作为系统的主入口,协调各个智能体协作处理用户输入。

def super_agent(user_input: str, session_id: str):"""超级智能体主函数"""if not session_id:session_id = str(uuid.uuid4())# 保存用户输入user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)db_manager.save_chat_message(session_id, user_message)# 1. 历史信息检索智能体print("=== 历史信息检索智能体 ===")# ... 检索相关话题# 2. 使用检测信息回答用户问题智能体print("=== 问答智能体 ===")# ... 生成回答# 3. 处理历史信息 话题总结智能体print("=== 话题总结智能体 ===")# ... 压缩对话为话题return {"session_id": session_id,"qa_response": qa_response,"new_topics": new_topic}

系统特色与优势

1. 模块化设计

系统采用高度模块化的设计,各个组件职责分明,便于维护和扩展。

2. 智能话题识别

系统能够自动识别对话中的主题,并将相关对话内容聚类,大大提高了对话记录的可管理性。

3. 灵活的数据持久化

使用SQLite数据库,既保证了数据的持久化,又避免了复杂数据库的部署成本。

4. 多智能体协作

通过多个专门化的智能体分工合作,提高了系统的整体性能和准确性。

实际应用示例

下面是一个简单的使用示例:

if __name__ == "__main__":result = super_agent("你好,能告诉我天气怎么样吗?", "session_1")print(f"最终结果: {result}")

系统会自动处理用户输入,检索相关历史话题,生成回答,并将当前对话压缩成新话题。

扩展方向

这个基础系统有很多可能的扩展方向:

  1. 用户认证系统:添加用户登录和权限管理
  2. 话题可视化:提供图形化界面展示话题关系
  3. 高级摘要算法:使用更先进的NLP技术生成质量更高的摘要
  4. 多语言支持:扩展系统以支持多种语言
  5. 实时协作:支持多用户同时使用和协作

总结

本文详细介绍了一个基于Python的智能对话系统的设计与实现。系统利用LangChain框架和SQLite数据库,实现了对话记录管理、话题识别和摘要生成等核心功能。通过多智能体协作架构,系统能够高效地处理用户查询并管理对话历史。

这种系统可以广泛应用于客服系统、个人知识管理、团队协作等多种场景,帮助用户更好地组织和利用对话信息。希望本文能为你在构建智能对话系统方面提供有益的参考和启发。

提示:在实际部署时,建议添加错误处理、日志记录和性能监控等生产环境需要的功能,以确保系统的稳定性和可维护性。

import os
import uuid
import sqlite3
import json
from datetime import datetime
from typing import List, Dict, Any, Unionfrom langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
from langchain.tools import Tool# 为避免弃用警告,使用条件导入
try:from langchain_openai import ChatOpenAI
except ImportError:from langchain_community.chat_models import ChatOpenAI# 配置连接到LM Studio的LLM
llm = ChatOpenAI(model="qwen/qwen3-1.7b",openai_api_key="lm-studio",openai_api_base="http://127.0.0.1:1234/v1",temperature=0.7
)class Topic:def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):self.topic_id = topic_idself.topic = topicself.summary = summaryself.message_ids = message_idsclass ChatMessage:def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):self.message_id = message_idself.role = roleself.content = contentself.timestamp = timestamp or datetime.now()class DatabaseManager:def __init__(self, db_path: str = "chat_topics.db"):self.db_path = db_pathself.init_db()def init_db(self):"""初始化数据库表"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 创建topics表cursor.execute("""CREATE TABLE IF NOT EXISTS topics (topic_id TEXT PRIMARY KEY,topic TEXT NOT NULL,summary TEXT,message_ids TEXT)""")# 创建chat_messages表cursor.execute("""CREATE TABLE IF NOT EXISTS chat_messages (message_id TEXT PRIMARY KEY,session_id TEXT NOT NULL,role TEXT NOT NULL,content TEXT NOT NULL,timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)""")conn.commit()conn.close()def save_topic(self, topic: Topic):"""保存话题"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("INSERT OR REPLACE INTO topics (topic_id, topic, summary, message_ids) VALUES (?, ?, ?, ?)",(topic.topic_id, topic.topic, topic.summary, json.dumps(topic.message_ids)))conn.commit()conn.close()def get_all_topics(self) -> List[Topic]:"""获取所有话题"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("SELECT topic_id, topic, summary, message_ids FROM topics")rows = cursor.fetchall()conn.close()topics = []for row in rows:topic_id, topic_str, summary, message_ids_str = rowmessage_ids = json.loads(message_ids_str) if message_ids_str else []topics.append(Topic(topic_id, topic_str, summary, message_ids))return topicsdef save_chat_message(self, session_id: str, message: ChatMessage):"""保存聊天消息"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 使用INSERT OR IGNORE避免重复插入cursor.execute("INSERT OR IGNORE INTO chat_messages (message_id, session_id, role, content) VALUES (?, ?, ?, ?)",(message.message_id, session_id, message.role, message.content))conn.commit()conn.close()def get_session_messages(self, session_id: str) -> List[ChatMessage]:"""获取会话消息"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("SELECT message_id, role, content, timestamp FROM chat_messages WHERE session_id = ? ORDER BY timestamp",(session_id,))rows = cursor.fetchall()conn.close()messages = []for row in rows:message_id, role, content, timestamp = row# 处理时间戳格式if isinstance(timestamp, str):try:timestamp = datetime.fromisoformat(timestamp)except ValueError:timestamp = datetime.now()messages.append(ChatMessage(message_id, role, content, timestamp))return messages# 创建数据库管理器实例
db_manager = DatabaseManager()def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:"""根据topic_ids选择话题的工具,支持为每个topic_id指定summary_only参数"""topics = db_manager.get_all_topics()# 如果topic_ids为空,处理所有话题if not topic_ids:selected_topics = topics# 如果summary_only是布尔值,则应用于所有话题if isinstance(summary_only, bool):summary_flags = [summary_only] * len(selected_topics)# 如果summary_only是列表,则按顺序应用elif isinstance(summary_only, list):summary_flags = summary_onlyelse:summary_flags = [False] * len(selected_topics)else:# 根据指定的topic_ids筛选话题selected_topics = [topic for topic in topics if topic.topic_id in topic_ids]# 如果summary_only是布尔值,则应用于所有选定话题if isinstance(summary_only, bool):summary_flags = [summary_only] * len(selected_topics)# 如果summary_only是列表,则按顺序应用(如果长度不够则用False填充)elif isinstance(summary_only, list):summary_flags = summary_only[:len(selected_topics)]# 如果summary_only列表长度不足,用False填充summary_flags.extend([False] * (len(selected_topics) - len(summary_flags)))else:summary_flags = [False] * len(selected_topics)result = []for i, topic in enumerate(selected_topics):# 获取对应的summary_only标志show_summary_only = summary_flags[i] if i < len(summary_flags) else Falseif show_summary_only:result.append({"topic_id": topic.topic_id,"topic": topic.topic,"summary": topic.summary})else:result.append({"topic_id": topic.topic_id,"topic": topic.topic,"summary": topic.summary,"message_ids": topic.message_ids})return resultdef get_history(session_id: str) -> List[Dict[str, Any]]:"""获取当前会话的聊天记录"""messages = db_manager.get_session_messages(session_id)return [{"role": msg.role, "content": msg.content, "message_id": msg.message_id} for msg in messages]def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:"""将新的聊天记录压缩成话题"""# 这里应该调用LLM来生成话题摘要,为了简化,我们使用简单的方法if not new_messages:return []# 创建新话题topic_id = str(uuid.uuid4())topic_content = new_messages[0]['content'][:20] + "..." if len(new_messages[0]['content']) > 20 else new_messages[0]['content']message_ids = [msg.get('message_id', str(uuid.uuid4())) for msg in new_messages]# 保存消息到数据库for msg in new_messages:if 'message_id' not in msg:msg['message_id'] = str(uuid.uuid4())chat_msg = ChatMessage(msg['message_id'], msg['role'], msg['content'])db_manager.save_chat_message(session_id, chat_msg)# 创建话题对象topic = Topic(topic_id=topic_id,topic=topic_content,summary=f"包含{len(new_messages)}条消息的话题",message_ids=message_ids)# 保存话题到数据库db_manager.save_topic(topic)# 返回话题信息return [{"topic_id": topic.topic_id,"topic": topic.topic,"summary": topic.summary,"message_ids": topic.message_ids}]# 定义工具列表
tools = [Tool.from_function(func=get_history,name="get_history",description="获取指定会话的聊天历史记录"),Tool.from_function(func=select_topic_tool,name="select_topic_tool",description="根据topic_ids选择话题,支持为每个topic指定是否只显示摘要"),Tool.from_function(func=compress_history_to_topic,name="compress_history_to_topic",description="将聊天记录压缩成话题")
]# 创建提示模板
template = """Answer the following questions as best you can. You have access to the following tools:{tools}Use the following format:Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input questionBegin!Question: {input}
Thought:{agent_scratchpad}"""prompt = PromptTemplate.from_template(template)# 创建智能体
history_retrieval_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
qa_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
topic_summary_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)# 创建智能体执行器
history_retrieval_agent = AgentExecutor(agent=history_retrieval_agent,tools=tools,verbose=True,handle_parsing_errors=True
)qa_agent = AgentExecutor(agent=qa_agent,tools=tools,verbose=True,handle_parsing_errors=True
)topic_summary_agent = AgentExecutor(agent=topic_summary_agent,tools=tools,verbose=True,handle_parsing_errors=True
)def super_agent(user_input: str, session_id: str):"""超级智能体主函数"""if not session_id:session_id = str(uuid.uuid4())# 保存用户输入user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)db_manager.save_chat_message(session_id, user_message)# 1. 历史信息检索智能体print("=== 历史信息检索智能体 ===")# 检索相关话题all_topics = db_manager.get_all_topics()topic_ids = [topic.topic_id for topic in all_topics]if topic_ids:# 为每个topic_id指定不同的summary_only值summary_flags = [True, False] * (len(topic_ids) // 2 + 1)summary_flags = summary_flags[:len(topic_ids)]related_topics = select_topic_tool(topic_ids, summary_flags)print(f"相关话题: {related_topics}")else:print("未找到相关话题")related_topics = []# 2. 使用检测信息回答用户问题智能体print("=== 问答智能体 ===")history = get_history(session_id)print(f"会话历史: {history}")# 构造问答上下文qa_context = f"用户问题: {user_input}\n相关话题: {related_topics}\n会话历史: {history}"try:qa_response = qa_agent.invoke({"input": f"基于以下信息回答用户问题:\n{qa_context}\n\n用户问题: {user_input}","tools": tools,"tool_names": ", ".join([t.name for t in tools])})print(f"问答结果: {qa_response}")except Exception as e:print(f"问答智能体执行出错: {e}")qa_response = {"output": "抱歉,我无法生成回答。"}# 3. 处理历史信息 话题总结智能体print("=== 话题总结智能体 ===")# 将当前会话消息压缩成话题session_messages = get_history(session_id)if session_messages:new_topic = compress_history_to_topic(session_id, session_messages)print(f"新话题: {new_topic}")else:print("没有新消息需要压缩成话题")new_topic = []return {"session_id": session_id,"qa_response": qa_response.get("output", qa_response.get("result", "无回答")),"new_topics": new_topic}# 示例使用
if __name__ == "__main__":result = super_agent("你好,能告诉我天气怎么样吗?", "session_1")print(f"最终结果: {result}")

并行

import os
import uuid
import sqlite3
import json
from datetime import datetime
from typing import List, Dict, Any, Union
from concurrent.futures import ThreadPoolExecutor, as_completedfrom langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
from langchain.tools import Tool# 为避免弃用警告,使用条件导入
try:from langchain_openai import ChatOpenAI
except ImportError:from langchain_community.chat_models import ChatOpenAI# 配置连接到LM Studio的LLM
llm = ChatOpenAI(model="qwen/qwen3-1.7b",openai_api_key="lm-studio",openai_api_base="http://127.0.0.1:1234/v1",temperature=0.7
)class Topic:def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):self.topic_id = topic_idself.topic = topicself.summary = summaryself.message_ids = message_idsclass ChatMessage:def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):self.message_id = message_idself.role = roleself.content = contentself.timestamp = timestamp or datetime.now()class DatabaseManager:def __init__(self, db_path: str = "chat_topics.db"):self.db_path = db_pathself.init_db()def init_db(self):"""初始化数据库表"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 创建topics表cursor.execute("""CREATE TABLE IF NOT EXISTS topics (topic_id TEXT PRIMARY KEY,topic TEXT NOT NULL,summary TEXT,message_ids TEXT)""")# 创建chat_messages表cursor.execute("""CREATE TABLE IF NOT EXISTS chat_messages (message_id TEXT PRIMARY KEY,session_id TEXT NOT NULL,role TEXT NOT NULL,content TEXT NOT NULL,timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)""")conn.commit()conn.close()def save_topic(self, topic: Topic):"""保存话题"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("INSERT OR REPLACE INTO topics (topic_id, topic, summary, message_ids) VALUES (?, ?, ?, ?)",(topic.topic_id, topic.topic, topic.summary, json.dumps(topic.message_ids)))conn.commit()conn.close()def get_all_topics(self) -> List[Topic]:"""获取所有话题"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("SELECT topic_id, topic, summary, message_ids FROM topics")rows = cursor.fetchall()conn.close()topics = []for row in rows:topic_id, topic_str, summary, message_ids_str = rowmessage_ids = json.loads(message_ids_str) if message_ids_str else []topics.append(Topic(topic_id, topic_str, summary, message_ids))return topicsdef save_chat_message(self, session_id: str, message: ChatMessage):"""保存聊天消息"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 使用INSERT OR IGNORE避免重复插入cursor.execute("INSERT OR IGNORE INTO chat_messages (message_id, session_id, role, content) VALUES (?, ?, ?, ?)",(message.message_id, session_id, message.role, message.content))conn.commit()conn.close()def get_session_messages(self, session_id: str) -> List[ChatMessage]:"""获取会话消息"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("SELECT message_id, role, content, timestamp FROM chat_messages WHERE session_id = ? ORDER BY timestamp",(session_id,))rows = cursor.fetchall()conn.close()messages = []for row in rows:message_id, role, content, timestamp = row# 处理时间戳格式if isinstance(timestamp, str):try:timestamp = datetime.fromisoformat(timestamp)except ValueError:timestamp = datetime.now()messages.append(ChatMessage(message_id, role, content, timestamp))return messages# 创建数据库管理器实例
db_manager = DatabaseManager()def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:"""根据topic_ids选择话题的工具,支持为每个topic_id指定summary_only参数"""try:topics = db_manager.get_all_topics()# 如果topic_ids为空,处理所有话题if not topic_ids:selected_topics = topics# 如果summary_only是布尔值,则应用于所有话题if isinstance(summary_only, bool):summary_flags = [summary_only] * len(selected_topics)# 如果summary_only是列表,则按顺序应用elif isinstance(summary_only, list):summary_flags = summary_onlyelse:summary_flags = [False] * len(selected_topics)else:# 根据指定的topic_ids筛选话题selected_topics = [topic for topic in topics if topic.topic_id in topic_ids]# 如果summary_only是布尔值,则应用于所有选定话题if isinstance(summary_only, bool):summary_flags = [summary_only] * len(selected_topics)# 如果summary_only是列表,则按顺序应用(如果长度不够则用False填充)elif isinstance(summary_only, list):summary_flags = summary_only[:len(selected_topics)]# 如果summary_only列表长度不足,用False填充summary_flags.extend([False] * (len(selected_topics) - len(summary_flags)))else:summary_flags = [False] * len(selected_topics)result = []for i, topic in enumerate(selected_topics):# 获取对应的summary_only标志show_summary_only = summary_flags[i] if i < len(summary_flags) else Falseif show_summary_only:result.append({"topic_id": topic.topic_id,"topic": topic.topic,"summary": topic.summary})else:result.append({"topic_id": topic.topic_id,"topic": topic.topic,"summary": topic.summary,"message_ids": topic.message_ids})return resultexcept Exception as e:print(f"select_topic_tool执行出错: {e}")return []def get_history(session_id: str) -> List[Dict[str, Any]]:"""获取当前会话的聊天记录"""try:messages = db_manager.get_session_messages(session_id)return [{"role": msg.role, "content": msg.content, "message_id": msg.message_id} for msg in messages]except Exception as e:print(f"get_history执行出错: {e}")return []def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]] = None) -> List[Dict[str, Any]]:"""将新的聊天记录压缩成话题,并合并相似话题"""try:# 如果没有提供new_messages,则从数据库获取会话消息if new_messages is None:new_messages = get_history(session_id)# 这里应该调用LLM来生成话题摘要,为了简化,我们使用简单的方法if not new_messages:return []# 创建新话题topic_id = str(uuid.uuid4())topic_content = new_messages[0]['content'][:20] + "..." if len(new_messages[0]['content']) > 20 else new_messages[0]['content']message_ids = [msg.get('message_id', str(uuid.uuid4())) for msg in new_messages]# 保存消息到数据库for msg in new_messages:if 'message_id' not in msg:msg['message_id'] = str(uuid.uuid4())chat_msg = ChatMessage(msg['message_id'], msg['role'], msg['content'])db_manager.save_chat_message(session_id, chat_msg)# 创建话题对象new_topic = Topic(topic_id=topic_id,topic=topic_content,summary=f"包含{len(new_messages)}条消息的话题",message_ids=message_ids)# 检查是否已存在相似话题all_topics = db_manager.get_all_topics()merged = Falsefor existing_topic in all_topics:# 简单的相似性检查:比较话题标题的前几个字符if existing_topic.topic[:10] == topic_content[:10] and existing_topic.topic_id != topic_id:# 合并到现有话题existing_topic.message_ids.extend(message_ids)existing_topic.summary = f"包含{len(existing_topic.message_ids)}条消息的话题"db_manager.save_topic(existing_topic)merged = Truebreak# 如果没有合并,则保存新话题if not merged:db_manager.save_topic(new_topic)# 返回话题信息return [{"topic_id": new_topic.topic_id,"topic": new_topic.topic,"summary": new_topic.summary,"message_ids": new_topic.message_ids}]except Exception as e:print(f"compress_history_to_topic执行出错: {e}")return []# 定义工具列表
tools = [Tool.from_function(func=get_history,name="get_history",description="获取指定会话的聊天历史记录"),Tool.from_function(func=select_topic_tool,name="select_topic_tool",description="根据topic_ids选择话题,支持为每个topic指定是否只显示摘要"),Tool.from_function(func=lambda session_id: compress_history_to_topic(session_id),name="compress_history_to_topic",description="将聊天记录压缩成话题")
]# 创建提示模板
template = """Answer the following questions as best you can. You have access to the following tools:{tools}Use the following format:Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input questionBegin!Question: {input}
Thought: {agent_scratchpad}"""prompt = PromptTemplate.from_template(template)# 创建智能体
history_retrieval_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
qa_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
topic_summary_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)# 创建智能体执行器
history_retrieval_agent = AgentExecutor(agent=history_retrieval_agent,tools=tools,verbose=True,handle_parsing_errors=True
)qa_agent = AgentExecutor(agent=qa_agent,tools=tools,verbose=True,handle_parsing_errors=True
)topic_summary_agent = AgentExecutor(agent=topic_summary_agent,tools=tools,verbose=True,handle_parsing_errors=True
)def super_agent(user_input: str, session_id: str):"""超级智能体主函数"""if not session_id:session_id = str(uuid.uuid4())# 保存用户输入user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)db_manager.save_chat_message(session_id, user_message)# 1. 历史信息检索智能体(并行搜索)print("=== 历史信息检索智能体(并行搜索) ===")# 检索相关话题all_topics = db_manager.get_all_topics()topic_ids = [topic.topic_id for topic in all_topics]related_topics = []if topic_ids:# 将话题ID分组,每组10个topic_groups = [topic_ids[i:i+10] for i in range(0, len(topic_ids), 10)]# 并行执行多个搜索任务,每组话题作为一个任务with ThreadPoolExecutor(max_workers=min(5, len(topic_groups))) as executor:# 提交多个任务future_to_topic = {}# 为每组话题创建一个任务for i, group in enumerate(topic_groups):# 交替使用摘要和完整信息模式summary_mode = (i % 2 == 0)  # 偶数索引组使用摘要模式future = executor.submit(select_topic_tool, group, summary_mode)future_to_topic[future] = f"话题组{i}搜索({'摘要' if summary_mode else '完整'})"# 任务: 获取会话历史future3 = executor.submit(get_history, session_id)future_to_topic[future3] = "会话历史"# 收集并行任务结果search_results = {}for future in as_completed(future_to_topic):task_name = future_to_topic[future]try:result = future.result()search_results[task_name] = resultprint(f"{task_name} 完成,结果数: {len(result) if isinstance(result, list) else 'N/A'}")except Exception as exc:print(f'{task_name} 执行时发生异常: {exc}')search_results[task_name] = []# 合并所有话题搜索结果for task_name, result in search_results.items():if "话题组" in task_name and isinstance(result, list):related_topics.extend(result)print(f"合并后相关话题数: {len(related_topics)}")else:print("未找到相关话题")related_topics = []# 2. 使用检测信息回答用户问题智能体print("=== 问答智能体 ===")history = get_history(session_id)print(f"会话历史: {history}")# 构造问答上下文qa_context = f"用户问题: {user_input}\n相关话题: {related_topics}\n会话历史: {history}"try:qa_response = qa_agent.invoke({"input": f"基于以下信息回答用户问题:\n{qa_context}\n\n用户问题: {user_input}"})print(f"问答结果: {qa_response}")except Exception as e:print(f"问答智能体执行出错: {e}")qa_response = {"output": "抱歉,我无法生成回答。"}# 3. 处理历史信息 话题总结智能体print("=== 话题总结智能体 ===")# 将当前会话消息压缩成话题new_topic = compress_history_to_topic(session_id)if new_topic:print(f"新话题: {new_topic}")else:print("没有新消息需要压缩成话题")# 确保qa_response不为None并且有合适的键if qa_response is None:qa_response = {"output": "抱歉,我无法生成回答。"}# 处理可能缺少output或result键的情况if "output" in qa_response:qa_result = qa_response["output"]elif "result" in qa_response:qa_result = qa_response["result"]else:qa_result = "无回答"return {"session_id": session_id,"qa_response": qa_result,"new_topics": new_topic}# 示例使用
if __name__ == "__main__":result = super_agent("你好,能告诉我天气怎么样吗?", "session_1")print(f"最终结果: {result}")
http://www.dtcms.com/a/533254.html

相关文章:

  • 在ps做网站分辨率96可以吗xml网站地图怎么做
  • 如何建造网站视频教程icon psd下载网站
  • ArkTS省市区三级联动选择器实现详解
  • 自己代码做网站做商城的网站用什么框架好
  • 怎么上传自己的网站红豆网桂林论坛
  • 西乡移动网站建设网投网站怎么做
  • 【第十九周】机器学习笔记08
  • 文件管理百度seo学院
  • 成都网站建设 龙兵做网站应该注意什么
  • CDC(Communication Device Class)是什么?
  • 东台建设网站的公司成都纯手工seo
  • fopen 函数实现追踪(glibc 2.9)(了解和选学)
  • 国产数据库之XuguDB:虚怀若谷
  • 湖南移动官网网站建设微信营销的优缺点
  • 自建网站推广的最新发展河北网络建站
  • 精品课程网站建设意义最近免费韩国电影hd无
  • 怎么建设小说网站中国企业500强排行榜
  • 建设银行企业网站首页wordpress图像缩放插件
  • 奇偶分家:破解n^4+4^n的合数身份
  • js微任务输出
  • Linux小课堂: 守护进程与初始化进程服务详解
  • synchronized 和 Lock
  • 2.2.1.2 大数据方法论与实践指南-基于飞书项目的项目管理规范
  • 做防腐木网站优秀网站设计流程
  • LangChain最详细教程之Model I/O(二)Prompt Template
  • STM32F103C8T6_UART串口通信完整教程
  • Gorm(一)查询方法
  • 网站管理工具wordpress中文版和英文版区别
  • 新网网站空间到期停了 咋续费北海哪里做网站建设
  • 百日挑战-单词篇(第四天)