当前位置: 首页 > news >正文

构建智能对话系统:基于LangChain的超级智能体架构解析

在人工智能快速发展的今天,构建能够理解和维护长期对话上下文的智能系统变得越来越重要。本文将深入解析一个基于LangChain的超级智能体架构,该系统能够有效管理对话历史、自动归纳话题,并提供智能问答服务。

项目概述

这个智能对话系统的核心目标是解决长期对话中的上下文管理问题。传统聊天机器人往往只能处理单轮对话,而我们的系统通过以下创新特性实现了更智能的对话管理:

  • 话题自动归纳:将相关对话内容自动归类为话题
  • 智能历史检索:基于关键词预测的相关话题搜索
  • DAG工具执行:支持有向无环图的工具执行计划
  • 多智能体协作: specialized智能体各司其职,协同工作

核心架构解析

数据模型设计

系统采用清晰的数据模型来管理对话内容:

class Topic:def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):self.topic_id = topic_idself.topic = topicself.summary = summaryself.message_ids = message_idsclass ChatMessage:def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):self.message_id = message_idself.role = roleself.content = contentself.timestamp = timestamp or datetime.now()

这种设计使得系统能够将离散的对话消息组织成有意义的话题单元,每个话题包含相关的消息ID列表和内容摘要。

数据库管理层

DatabaseManager 类负责所有数据持久化操作,采用SQLite作为存储后端:

class DatabaseManager:def __init__(self, db_path: str = "chat_topics.db"):self.db_path = db_pathself.init_db()def init_db(self):# 初始化数据库表结构# topics表:存储话题信息# chat_messages表:存储具体消息内容

数据库设计支持高效的话题检索和消息查询,特别是search_topics_by_keywords方法实现了基于关键词的智能话题搜索。

DAG执行器:智能化工具调度

DAGExecutor 类是系统的调度核心,实现了有向无环图的工具执行策略:

class DAGExecutor:def __init__(self, skip_threshold: int = 3):self.execution_history = {}self.skip_threshold = skip_thresholddef should_skip_agent(self, user_question: str) -> bool:# 智能判断是否跳过agent直接执行# 基于历史执行频率的优化策略

该执行器具备执行记忆功能,对于频繁执行的相同问题,可以跳过智能体分析直接调用缓存执行计划,显著提升响应效率。

超级智能体架构

SuperAgent基类设计

SuperAgent 作为所有智能体的基类,提供了统一的智能体框架:

class SuperAgent:def __init__(self, name: str, llm: Optional[Any] = None, tools: Optional[List[Tool]] = None,prompt_template: Optional[str] = None,max_workers: int = 5):# 初始化LLM连接、工具集、提示模板等

智能体的核心处理流程包含三个关键步骤:

  1. 历史信息检索:基于用户输入预测关键词,搜索相关话题
  2. 智能问答处理:结合上下文信息生成回答
  3. 话题总结归纳:将新对话内容压缩为话题

容错机制与降级策略

系统设计了完善的容错机制,当LLM服务不可用时,自动降级到基于规则的响应生成:

def _generate_rule_based_response(self, user_input: str, related_topics: List, history: List) -> str:# 基于关键词匹配的简单响应生成# 确保系统在LLM服务异常时仍能提供基本服务

专业化智能体分工

系统通过继承机制实现了智能体的专业化分工:

历史检索专家
class HistoryRetrievalAgent(SuperAgent):"""专注于历史信息检索的专用智能体"""
问答处理专家
class QAAgent(SuperAgent):"""具备DAG规划能力的问答专用智能体"""
话题总结专家
class TopicSummaryAgent(SuperAgent):"""专注于对话内容总结的智能体"""

智能体协作模式

系统支持智能体间的工具化调用,实现协同工作:

def as_tool(self) -> Tool:"""将当前智能体作为工具,供其他智能体调用"""

这种设计使得通用协调智能体可以调度专业化智能体,形成层次化的智能体协作网络。

核心工作流程

1. 用户输入处理流程

def process_user_input(self, user_input: str, session_id: str) -> Dict[str, Any]:# 检查是否跳过智能体直接执行(频率阈值判断)# 保存用户消息到数据库# 预测相关关键词并检索历史话题# 生成智能回答# 压缩新对话为话题# 返回处理结果

2. 话题压缩算法

def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]] = None):# 提取对话核心内容生成话题标题# 检查并合并相似话题# 保存新话题到数据库

3. 智能工具执行

系统内置了多种工具函数,支持灵活的对话管理:

  • get_history(): 获取会话历史
  • select_topic_tool(): 选择特定话题
  • compress_history_to_topic(): 对话压缩
  • execute_dag_tool(): DAG图执行

实际应用示例

# 创建专业化智能体团队
history_agent = HistoryRetrievalAgent("历史检索专家")
qa_agent = QAAgent("问答专家") 
topic_agent = TopicSummaryAgent("话题总结专家")# 构建智能体工具集
general_agent_tools = [history_agent.as_tool(),qa_agent.as_tool(), topic_agent.as_tool()
]# 创建协调智能体
general_agent = SuperAgent(name="通用协调者", tools=general_agent_tools)# 处理用户查询
result = general_agent.process_user_input("请分析历史对话并回答相关问题", "session_1"
)

技术亮点与创新

1. 自适应执行优化

系统通过执行历史记忆,对高频问题实现直接执行跳过,大幅提升响应速度。

2. 分层容错设计

从LLM服务检测到规则降级响应,确保系统在各种异常情况下保持可用性。

3. 模块化智能体架构

专业分工的智能体设计,支持功能扩展和个性化定制。

4. 智能话题管理

基于内容的相似性检测和话题合并算法,实现对话内容的有效组织。

未来展望

当前系统为进一步扩展提供了良好的基础架构,未来可以在以下方向进行增强:

  1. 多模态支持:扩展处理图像、音频等非文本内容
  2. 分布式智能体:支持跨网络节点的智能体协作
  3. 强化学习优化:基于用户反馈的对话策略优化
  4. 知识图谱集成:与外部知识库的深度整合

结语

本文介绍的智能对话系统架构展示了一种实用的长期对话管理解决方案。通过话题归纳、历史检索和多智能体协作等创新设计,系统能够有效理解对话上下文,提供更加连贯和智能的交互体验。

该架构不仅适用于聊天机器人场景,还可以扩展到客服系统、个人助手、知识管理等多个领域,为构建下一代智能对话系统提供了有价值的技术参考。

关键词: 智能对话系统, LangChain, 智能体架构, 话题归纳, DAG执行, 多智能体协作, 对话管理

import os
import uuid
import sqlite3
import json
from datetime import datetime
from typing import List, Dict, Any, Union, Optional, Callable
from concurrent.futures import ThreadPoolExecutor, as_completed# 添加网络连接相关的导入
import requests
from requests.exceptions import RequestExceptionfrom langchain_core.prompts import PromptTemplate
from langchain_core.tools import Tool# 为避免弃用警告,使用条件导入
try:from langchain_openai import ChatOpenAI
except ImportError:try:from langchain_community.chat_models import ChatOpenAIexcept ImportError:from langchain.chat_models import ChatOpenAIclass Topic:def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):self.topic_id = topic_idself.topic = topicself.summary = summaryself.message_ids = message_idsclass ChatMessage:def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):self.message_id = message_idself.role = roleself.content = contentself.timestamp = timestamp or datetime.now()class DatabaseManager:def __init__(self, db_path: str = "chat_topics.db"):self.db_path = db_pathself.init_db()def init_db(self):"""初始化数据库表"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 创建topics表cursor.execute("""CREATE TABLE IF NOT EXISTS topics(topic_idTEXTPRIMARYKEY,topicTEXTNOTNULL,summaryTEXT,message_idsTEXT)""")# 创建chat_messages表cursor.execute("""CREATE TABLE IF NOT EXISTS chat_messages(message_idTEXTPRIMARYKEY,session_idTEXTNOTNULL,roleTEXTNOTNULL,contentTEXTNOTNULL,timestampDATETIMEDEFAULTCURRENT_TIMESTAMP)""")conn.commit()conn.close()def save_topic(self, topic: Topic):"""保存话题"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("INSERT OR REPLACE INTO topics (topic_id, topic, summary, message_ids) VALUES (?, ?, ?, ?)",(topic.topic_id, topic.topic, topic.summary, json.dumps(topic.message_ids)))conn.commit()conn.close()def get_all_topics(self) -> List[Topic]:"""获取所有话题"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("SELECT topic_id, topic, summary, message_ids FROM topics")rows = cursor.fetchall()conn.close()topics = []for row in rows:topic_id, topic_str, summary, message_ids_str = rowmessage_ids = json.loads(message_ids_str) if message_ids_str else []topics.append(Topic(topic_id, topic_str, summary, message_ids))return topicsdef search_topics_by_keywords(self, keywords: List[str]) -> List[Topic]:"""根据关键词搜索话题"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 构建查询条件,关键词之间是"或"的关系,支持模糊搜索conditions = []params = []for keyword in keywords:conditions.append("(topic LIKE ? OR summary LIKE ?)")params.extend([f"%{keyword}%", f"%{keyword}%"])query = "SELECT DISTINCT topic_id, topic, summary, message_ids FROM topics"if conditions:query += " WHERE " + " OR ".join(conditions)cursor.execute(query, params)rows = cursor.fetchall()conn.close()topics = []for row in rows:topic_id, topic_str, summary, message_ids_str = rowmessage_ids = json.loads(message_ids_str) if message_ids_str else []topics.append(Topic(topic_id, topic_str, summary, message_ids))return topicsdef save_chat_message(self, session_id: str, message: ChatMessage):"""保存聊天消息"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 使用INSERT OR IGNORE避免重复插入cursor.execute("INSERT OR IGNORE INTO chat_messages (message_id, session_id, role, content) VALUES (?, ?, ?, ?)",(message.message_id, session_id, message.role, message.content))conn.commit()conn.close()def get_session_messages(self, session_id: str) -> List[ChatMessage]:"""获取会话消息"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("SELECT message_id, role, content, timestamp FROM chat_messages WHERE session_id = ? ORDER BY timestamp",(session_id,))rows = cursor.fetchall()conn.close()messages = []for row in rows:message_id, role, content, timestamp = row# 处理时间戳格式if isinstance(timestamp, str):try:timestamp = datetime.fromisoformat(timestamp)except ValueError:timestamp = datetime.now()messages.append(ChatMessage(message_id, role, content, timestamp))return messages# 创建数据库管理器实例
db_manager = DatabaseManager()def predict_relevant_keywords(user_input: str) -> List[str]:"""根据用户输入预测相关话题关键词"""# 这里应该使用LLM来预测关键词,为了简化,我们使用简单的关键词提取方法# 在实际应用中,可以使用更复杂的NLP技术或调用LLM来提取关键词common_words = ['的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', '一个', '上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有', '看', '好', '自己', '这', '请', '分析', '历史', '对话', '回答', '相关', '问题']# 简单的关键词提取import rewords = re.findall(r'[\w]+', user_input)# 过滤掉停用词和长度小于2的词keywords = [word for word in words if word not in common_words and len(word) > 1]# 去重并返回最多5个关键词return list(set(keywords))[:5]  # 返回最多5个不重复的关键词def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:"""根据topic_ids选择话题的工具,支持为每个topic_id指定summary_only参数"""try:topics = db_manager.get_all_topics()# 如果topic_ids为空,处理所有话题if not topic_ids:selected_topics = topics# 如果summary_only是布尔值,则应用于所有话题if isinstance(summary_only, bool):summary_flags = [summary_only] * len(selected_topics)# 如果summary_only是列表,则按顺序应用elif isinstance(summary_only, list):summary_flags = summary_onlyelse:summary_flags = [False] * len(selected_topics)else:# 根据指定的topic_ids筛选话题selected_topics = [topic for topic in topics if topic.topic_id in topic_ids]# 如果summary_only是布尔值,则应用于所有选定话题if isinstance(summary_only, bool):summary_flags = [summary_only] * len(selected_topics)# 如果summary_only是列表,则按顺序应用(如果长度不够则用False填充)elif isinstance(summary_only, list):summary_flags = summary_only[:len(selected_topics)]# 如果summary_only列表长度不足,用False填充summary_flags.extend([False] * (len(selected_topics) - len(summary_flags)))else:summary_flags = [False] * len(selected_topics)result = []for i, topic in enumerate(selected_topics):# 获取对应的summary_only标志show_summary_only = summary_flags[i] if i < len(summary_flags) else Falseif show_summary_only:result.append({"topic_id": topic.topic_id,"topic": topic.topic,"summary": topic.summary})else:result.append({"topic_id": topic.topic_id,"topic": topic.topic,"summary": topic.summary,"message_ids": topic.message_ids})return resultexcept Exception as e:print(f"select_topic_tool执行出错: {e}")return []def get_history(session_id: str) -> List[Dict[str, Any]]:"""获取当前会话的聊天记录"""try:messages = db_manager.get_session_messages(session_id)return [{"role": msg.role, "content": msg.content, "message_id": msg.message_id} for msg in messages]except Exception as e:print(f"get_history执行出错: {e}")return []def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]] = None) -> List[Dict[str, Any]]:"""将新的聊天记录压缩成话题,并合并相似话题"""try:# 如果没有提供new_messages,则从数据库获取会话消息if new_messages is None:new_messages = get_history(session_id)# 这里应该调用LLM来生成话题摘要,为了简化,我们使用简单的方法if not new_messages:return []# 创建新话题topic_id = str(uuid.uuid4())topic_content = new_messages[0]['content'][:20] + "..." if len(new_messages[0]['content']) > 20 else \new_messages[0]['content']message_ids = [msg.get('message_id', str(uuid.uuid4())) for msg in new_messages]# 保存消息到数据库for msg in new_messages:if 'message_id' not in msg:msg['message_id'] = str(uuid.uuid4())chat_msg = ChatMessage(msg['message_id'], msg['role'], msg['content'])db_manager.save_chat_message(session_id, chat_msg)# 创建话题对象new_topic = Topic(topic_id=topic_id,topic=topic_content,summary=f"包含{len(new_messages)}条消息的话题",message_ids=message_ids)# 检查是否已存在相似话题all_topics = db_manager.get_all_topics()merged = Falsefor existing_topic in all_topics:# 简单的相似性检查:比较话题标题的前几个字符if existing_topic.topic[:10] == topic_content[:10] and existing_topic.topic_id != topic_id:# 合并到现有话题existing_topic.message_ids.extend(message_ids)existing_topic.summary = f"包含{len(existing_topic.message_ids)}条消息的话题"db_manager.save_topic(existing_topic)merged = Truebreak# 如果没有合并,则保存新话题if not merged:db_manager.save_topic(new_topic)# 返回话题信息return [{"topic_id": new_topic.topic_id,"topic": new_topic.topic,"summary": new_topic.summary,"message_ids": new_topic.message_ids}]except Exception as e:print(f"compress_history_to_topic执行出错: {e}")return []class DAGExecutor:"""DAG执行器,用于执行工具执行计划,具有独立的记忆能力"""def __init__(self, skip_threshold: int = 3):"""初始化DAG执行器Args:skip_threshold: 跳过智能体直接执行的阈值"""self.execution_history = {}self.skip_threshold = skip_thresholddef should_skip_agent(self, user_question: str) -> bool:"""判断是否应该跳过智能体直接执行Args:user_question: 用户问题Returns:bool: 是否应该跳过智能体"""if user_question in self.execution_history:execution_count = self.execution_history[user_question].get("count", 0)return execution_count >= self.skip_thresholdreturn Falsedef get_cached_dag_params(self, user_question: str) -> Optional[Dict[str, Any]]:"""获取缓存的DAG参数Args:user_question: 用户问题Returns:Dict[str, Any]: 缓存的DAG参数,如果没有则返回None"""if user_question in self.execution_history:return self.execution_history[user_question].get("last_dag_params")return Nonedef execute_dag(self, dag_json: str, user_question: str, dag_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:"""执行DAG工具图Args:dag_json: DAG图的JSON表示user_question: 用户原始问题dag_params: DAG执行参数Returns:执行结果"""try:dag_data = json.loads(dag_json)dag_id = str(uuid.uuid4())# 更新执行历史if user_question in self.execution_history:self.execution_history[user_question]["count"] += 1self.execution_history[user_question]["last_executed"] = datetime.now().isoformat()else:self.execution_history[user_question] = {"count": 1,"first_executed": datetime.now().isoformat(),"last_executed": datetime.now().isoformat(),"last_dag_params": dag_params}# 保存执行历史详情execution_detail = {"user_question": user_question,"dag_structure": dag_data,"dag_params": dag_params,"start_time": datetime.now().isoformat(),"status": "executing"}# 模拟DAG执行过程result = {"dag_id": dag_id,"result": f"已执行DAG任务,包含{len(dag_data.get('nodes', []))}个节点","user_question": user_question,"execution_details": execution_detail}# 更新执行历史execution_detail.update({"status": "completed","end_time": datetime.now().isoformat(),"result": result})return resultexcept Exception as e:error_result = {"error": f"DAG执行失败: {str(e)}","user_question": user_question}return error_result# 创建全局DAG执行器实例
dag_executor = DAGExecutor()def execute_dag_tool(dag_json: str, user_question: str, dag_params: Optional[str] = None) -> str:"""执行DAG图的工具函数Args:dag_json: DAG图的JSON表示user_question: 用户原始问题dag_params: DAG执行参数的JSON字符串(可选)Returns:执行结果的JSON字符串"""params = Noneif dag_params:try:params = json.loads(dag_params)except:passresult = dag_executor.execute_dag(dag_json, user_question, params)return json.dumps(result, ensure_ascii=False, indent=2)class SuperAgent:"""超级智能体类,可以通过初始化不同的配置来创建具有不同功能的智能体"""def __init__(self, name: str,llm: Optional[Any] = None,tools: Optional[List[Tool]] = None,prompt_template: Optional[str] = None,max_workers: int = 5):"""初始化超级智能体Args:name: 智能体名称llm: 语言模型实例tools: 工具列表prompt_template: 提示模板max_workers: 最大工作线程数"""self.name = name# 检查LM Studio服务是否可用llm_available = self._check_llm_service()# 配置连接到LM Studio的LLMif llm_available:try:self.llm = llm or ChatOpenAI(model="qwen/qwen3-1.7b",openai_api_key="lm-studio",openai_api_base="http://127.0.0.1:1234/v1",temperature=0.7,streaming=False,request_timeout=100  # 减少超时时间)except Exception as e:print(f"初始化LLM时出错: {e}")self.llm = Noneelse:# 如果LLM服务不可用,使用模拟的响应机制print("警告: 无法连接到LLM服务,将使用模拟响应模式")self.llm = None# 工具配置self.tools = tools or self._create_default_tools()# 提示模板self.prompt_template = prompt_template or self._create_default_prompt()self.prompt = PromptTemplate.from_template(self.prompt_template)# 并行处理配置self.max_workers = max_workers# 只有在LLM可用时才创建智能体和执行器if self.llm:# 创建智能体from langchain_classic.agents import AgentExecutor, create_react_agentself.agent = create_react_agent(llm=self.llm, tools=self.tools, prompt=self.prompt)# 创建智能体执行器self.agent_executor = AgentExecutor(agent=self.agent,tools=self.tools,verbose=True,handle_parsing_errors=True)else:self.agent = Noneself.agent_executor = Nonedef _check_llm_service(self) -> bool:"""检查LLM服务是否可用Returns:bool: 服务是否可用"""try:response = requests.get("http://127.0.0.1:1234/v1/models", timeout=30)return response.status_code == 200except RequestException as e:print(f"检查LLM服务时出错: {e}")return Falsedef _create_default_tools(self) -> List[Tool]:"""创建默认工具列表"""tools = [Tool.from_function(func=get_history,name="get_history",description="获取指定会话的聊天历史记录"),Tool.from_function(func=select_topic_tool,name="select_topic_tool",description="根据topic_ids选择话题,支持为每个topic指定是否只显示摘要"),Tool.from_function(func=lambda session_id: compress_history_to_topic(session_id),name="compress_history_to_topic",description="将聊天记录压缩成话题"),Tool.from_function(func=execute_dag_tool,name="execute_dag",description="执行工具DAG图,参数为dag_json(工具执行计划的JSON表示)、user_question(用户原始问题)和dag_params(DAG执行参数,可选)")]return toolsdef _create_default_prompt(self) -> str:"""创建默认提示模板"""return """Answer the following questions as best you can. You have access to the following tools:{tools}Use the following format:Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input questionBegin!Question: {input}
Thought: {agent_scratchpad}"""def process_user_input(self, user_input: str, session_id: str) -> Dict[str, Any]:"""处理用户输入的主函数Args:user_input: 用户输入session_id: 会话IDReturns:处理结果字典"""# 检查是否应该跳过智能体直接执行if dag_executor.should_skip_agent(user_input):print(f"=== {self.name} 检测到重复问题,跳过智能体直接执行 ===")cached_params = dag_executor.get_cached_dag_params(user_input)if cached_params:# 这里应该根据缓存参数生成一个DAG执行计划# 为简化,我们使用一个示例DAGsample_dag = {"nodes": [{"id": "1", "tool": "get_history", "parameters": {"session_id": session_id}},{"id": "2", "tool": "select_topic_tool", "parameters": {"topic_ids": []}}],"edges": [{"from": "1", "to": "2"}]}dag_json = json.dumps(sample_dag, ensure_ascii=False)result = dag_executor.execute_dag(dag_json, user_input, cached_params)return {"session_id": session_id,"qa_response": f"直接执行结果: {result.get('result', '执行完成')}","new_topics": [],"direct_execution": True}if not session_id:session_id = str(uuid.uuid4())# 保存用户输入user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)db_manager.save_chat_message(session_id, user_message)# 1. 历史信息检索(智能搜索)print(f"=== {self.name} 历史信息检索(智能搜索) ===")# 根据用户问题预测相关关键词predicted_keywords = predict_relevant_keywords(user_input)print(f"预测的关键词: {predicted_keywords}")# 根据关键词搜索相关话题related_topics = []if predicted_keywords:related_topics = db_manager.search_topics_by_keywords(predicted_keywords)print(f"通过关键词找到 {len(related_topics)} 个相关话题")else:# 如果没有预测到关键词,则获取所有话题(保持向后兼容)all_topics = db_manager.get_all_topics()topic_ids = [topic.topic_id for topic in all_topics]if topic_ids:related_topics = all_topicsprint(f"未预测到关键词,获取所有 {len(related_topics)} 个话题")else:print("未找到任何话题")# 2. 使用检测信息回答用户问题print(f"=== {self.name} 问答处理 ===")history = get_history(session_id)print(f"会话历史: {history}")# 构造问答上下文qa_context = f"用户问题: {user_input}\n相关话题: {[{'topic_id': t.topic_id, 'topic': t.topic, 'summary': t.summary} for t in related_topics]}\n会话历史: {history}"# 如果LLM不可用,使用模拟响应if not self.llm or not self.agent_executor:print("LLM服务不可用,使用模拟响应")# 创建一个基于规则的简单响应response_text = self._generate_rule_based_response(user_input, related_topics, history)qa_response = {"output": response_text}else:try:qa_response = self.agent_executor.invoke({"input": f"基于以下信息回答用户问题:\n{qa_context}\n\n用户问题: {user_input}"})print(f"问答结果: {qa_response}")except Exception as e:print(f"问答智能体执行出错: {e}")# 出错时也提供基于规则的响应response_text = self._generate_rule_based_response(user_input, related_topics, history)qa_response = {"output": response_text}# 3. 处理历史信息 话题总结print(f"=== {self.name} 话题总结 ===")# 将当前会话消息压缩成话题new_topic = compress_history_to_topic(session_id)if new_topic:print(f"新话题: {new_topic}")else:print("没有新消息需要压缩成话题")# 确保qa_response不为None并且有合适的键if qa_response is None:qa_response = {"output": "抱歉,我无法生成回答。"}# 处理可能缺少output或result键的情况if "output" in qa_response:qa_result = qa_response["output"]elif "result" in qa_response:qa_result = qa_response["result"]else:qa_result = "无回答"return {"session_id": session_id,"qa_response": qa_result,"new_topics": new_topic}def as_tool(self) -> Tool:"""将当前智能体作为工具返回,可以被其他智能体调用Returns:Tool: 可以被其他智能体使用的工具"""def agent_tool_func(input_data: str) -> str:# 为工具调用创建一个唯一的会话IDsession_id = f"tool_call_{self.name}_{str(uuid.uuid4())[:8]}"# 处理输入result = self.process_user_input(input_data, session_id)# 返回结果的文本表示return result.get("qa_response", "无回答")return Tool.from_function(func=agent_tool_func,name=self.name,description=f"智能体工具: {self.name},用于处理: {self.__class__.__name__}")def _generate_rule_based_response(self, user_input: str, related_topics: List, history: List) -> str:"""基于规则生成响应(当LLM不可用时的降级方案)Args:user_input: 用户输入related_topics: 相关话题history: 历史记录Returns:str: 生成的响应"""# 简单的关键词匹配规则if "天气" in user_input:return "我无法获取实时天气信息。建议您查看天气应用或网站获取最新的天气预报。"elif "历史" in user_input or "对话" in user_input:if related_topics:topics_summary = "\n".join([f"- {topic.topic}: {topic.summary}" for topic in related_topics[:3]])return f"根据您的问题,我找到了以下相关话题:\n{topics_summary}"else:return "我没有找到与您的问题直接相关的历史对话记录。"else:# 默认响应return f"我理解您的问题是关于'{user_input}'。虽然我无法提供详细的回答,但我找到了{len(related_topics)}个相关话题和{len(history)}条历史记录。请尝试提出更具体的问题,或者确保LM Studio服务正在运行以获得更好的体验。"# 专门的智能体示例
class HistoryRetrievalAgent(SuperAgent):"""历史信息检索专用智能体"""def __init__(self, name: str = "HistoryRetrievalAgent", llm=None):# 自定义提示模板,专注于历史信息检索prompt_template = """You are a history retrieval specialist. Your main task is to retrieve and organize historical conversation topics.Available tools: {tools}Use the following format:
Question: the input question you must answer
Thought: think about how to retrieve relevant historical information
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: Provide a well-organized summary of the relevant historical topicsQuestion: {input}
Thought: {agent_scratchpad}"""super().__init__(name=name, llm=llm, prompt_template=prompt_template)class QAAgent(SuperAgent):"""问答专用智能体,具有规划工具执行DAG图的能力"""def __init__(self, name: str = "QAAgent", llm=None):# 自定义提示模板,专注于问答和DAG规划prompt_template = """You are a question answering specialist with DAG planning capabilities. 
Your main task is to answer user questions by planning and executing tool workflows.
When appropriate, you can create a DAG (Directed Acyclic Graph) plan of tools to execute.Available tools: {tools}Use the following format:
Question: the input question you must answer
Thought: think about how to analyze the question and plan tool execution
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: Provide a clear and comprehensive answer to the user's questionWhen creating a DAG plan, output it as a JSON with nodes and edges representing the tool execution workflow.Question: {input}
Thought: {agent_scratchpad}"""super().__init__(name=name, llm=llm, prompt_template=prompt_template)def plan_dag_execution(self, user_question: str) -> Dict[str, Any]:"""规划工具执行的DAG图Args:user_question: 用户问题Returns:DAG图的字典表示"""# 这里应该调用LLM来生成DAG计划,为了简化,我们使用示例数据sample_dag = {"nodes": [{"id": "1", "tool": "get_history", "parameters": {"session_id": "sample_session"}},{"id": "2", "tool": "select_topic_tool", "parameters": {"topic_ids": []}},{"id": "3", "tool": "compress_history_to_topic", "parameters": {"session_id": "sample_session"}}],"edges": [{"from": "1", "to": "2"},{"from": "2", "to": "3"}],"user_question": user_question}return sample_dagclass TopicSummaryAgent(SuperAgent):"""话题总结专用智能体"""def __init__(self, name: str = "TopicSummaryAgent", llm=None):# 自定义提示模板,专注于话题总结prompt_template = """You are a topic summarization specialist. Your main task is to analyze conversations and create meaningful topic summaries.Available tools: {tools}Use the following format:
Question: the input question you must answer
Thought: think about how to analyze the conversation and create meaningful summaries
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: Provide a well-structured topic summary with key pointsQuestion: {input}
Thought: {agent_scratchpad}"""super().__init__(name=name, llm=llm, prompt_template=prompt_template)# 示例使用
if __name__ == "__main__":# 创建专门的智能体history_agent = HistoryRetrievalAgent("历史检索专家")qa_agent = QAAgent("问答专家")topic_agent = TopicSummaryAgent("话题总结专家")# 将专门的智能体作为工具添加到通用智能体中general_agent_tools = [history_agent.as_tool(),qa_agent.as_tool(),topic_agent.as_tool()]# 创建一个可以调用其他智能体的通用智能体general_agent = SuperAgent(name="通用协调者",tools=general_agent_tools)# 使用通用智能体处理用户输入,它可以调用其他专门的智能体result = general_agent.process_user_input("请分析历史对话并回答天气相关问题", "session_1")print(f"最终结果: {result}")
http://www.dtcms.com/a/535672.html

相关文章:

  • 幸福指数数据分析与预测:从数据预处理到模型构建完整案例
  • 做网站要费用多少wordpress注册美化
  • 城建亚泰建设集团网站手机网站建设教程视频教程
  • 产品定制网站开发网站建设分析案例
  • 总结企业网站建设的流程网站没备案可以上线吗
  • 公司核名在哪个网站专门做字体设计的网站
  • 潍坊医院网站建设酒生产企业网站建设的目的
  • 购物网站排名前100物流行业网站源码
  • 苏州公司做变更网站免费咨询法律问题的网站
  • 医生可以自己做网站吗服务类网站建设策划书
  • phpcms 网站名称标签松原网站建设公司
  • 打折网站建设教程下载松江网站建设公司怎么样
  • 网站运营计划书网络营销服务商有哪些
  • 网网站开发和设计wordpress 海量数据
  • 买房子上哪个网站最好什么网站做h5没有广告
  • 大型行业门户网站开发建设wordpress自定义首页布局
  • wordpress评论ip网站推广及seo方案
  • 为什么网站找不到了网络营销的内容
  • 深圳做自适应网站公司网站搭建与网站建设
  • 个人网站盈利模式合肥站建设
  • 天津网站建设制作排名点样用外网访问自己做的网站
  • 阿里云做网站要几天泉港做网站公司
  • 网站积分方案如何帮人做网站赚钱
  • 青海营销型网站建设青岛微信网站制作
  • 学校网站框架网页翻译在线翻译
  • 新型h5网站建设学做网站快吗
  • 如何制作导航网站洛阳霞光只做有效果的网站
  • 做网站什么叫电商
  • 网站建设中是因为没有ftp上传吗个人注什么域名的网站
  • 给公司建网站 深圳怎样在网站上做专栏