A2A 中的内存共享方法
非常好的问题!A2A 系统中的内存共享对于维护上下文和协调至关重要。让我解释一下中央控制(协调者)与网络中其他智能体共享内存的不同方法。
绝佳的问题!A2A 系统中的内存共享是一个复杂的话题,有多种方法。 让我详细说明中央控制(协调者)如何与网络中的其他智能体共享内存。
A2A 中的内存共享方法
1. 持久化会话管理(Strands 1.0 特性)
协调者可以维护其他智能体可访问的持久化会话:
python
from strands import Agent from strands.session.s3_session_manager import S3SessionManager from strands.multiagent.a2a import A2AServer# 具有持久化内存的中央协调者 session_manager = S3SessionManager(session_id="global_context_session",bucket="shared-agent-memory",prefix="orchestrator_sessions" )orchestrator = Agent(name="Central Orchestrator",session_manager=session_manager,tools=[a2a_provider.tools] )# 专家智能体可以访问共享的会话数据 specialist_session_manager = S3SessionManager(session_id="global_context_session", # 相同的会话 IDbucket="shared-agent-memory",prefix="orchestrator_sessions" )specialist_agent = Agent(name="Data Specialist", session_manager=specialist_session_manager,tools=[data_analysis_tools] )
2. 共享内存存储模式
创建一个所有智能体都可以访问的集中式内存存储:
python
from strands import Agent, tool import boto3 import json# 共享内存实现 class SharedMemoryStore:def __init__(self, dynamodb_table="agent_shared_memory"):self.dynamodb = boto3.resource('dynamodb')self.table = self.dynamodb.Table(dynamodb_table)def store_context(self, key: str, data: dict, agent_id: str):"""存储共享上下文数据"""self.table.put_item(Item={'memory_key': key,'data': json.dumps(data),'agent_id': agent_id,'timestamp': int(time.time())})def retrieve_context(self, key: str):"""检索共享上下文数据"""response = self.table.get_item(Key={'memory_key': key})if 'Item' in response:return json.loads(response['Item']['data'])return None# 创建共享内存工具 shared_memory = SharedMemoryStore()@tool def store_shared_memory(key: str, data: str, description: str) -> str:"""将信息存储在共享内存中供其他智能体访问"""shared_memory.store_context(key, {'data': data,'description': description}, agent_id="orchestrator")return f"存储数据,键为: {key}"@tool def read_shared_memory(key: str) -> str:"""从共享内存读取信息"""data = shared_memory.retrieve_context(key)if data:return f"检索到: {data['description']} - {data['data']}"return "未找到该键对应的数据"# 具有内存共享能力的协调者 orchestrator = Agent(name="Memory-Sharing Orchestrator",tools=[store_shared_memory, read_shared_memory, *other_tools] )
3. 通过 A2A 协议传递上下文
在 A2A 调用中直接传递相关上下文:
python
from strands import Agent from strands.multiagent.a2a import A2AServer# 将上下文传递给其他智能体的协调者 class ContextAwareOrchestrator(Agent):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self.global_context = {}def call_agent_with_context(self, agent_url: str, task: str, context: dict):"""调用另一个智能体并传递当前上下文"""full_prompt = f"""任务: {task}当前上下文:- 用户 ID: {context.get('user_id')}- 会话历史: {context.get('session_summary')}- 先前结果: {context.get('previous_results')}- 偏好设置: {context.get('user_preferences')}请使用此上下文完成任务。"""# 使用丰富上下文的 A2A 调用return self.call_a2a_agent(agent_url, full_prompt)# 使用 orchestrator = ContextAwareOrchestrator(name="Context-Aware Orchestrator",tools=[a2a_tools] )
4. 黑板模式实现
创建一个共享的"黑板",智能体可以在上面发布和读取信息:
python
from strands import Agent, tool from dataclasses import dataclass from typing import Dict, List import threading@dataclass class BlackboardEntry:agent_id: strdata: dicttimestamp: floattags: List[str]class AgentBlackboard:def __init__(self):self.entries: Dict[str, BlackboardEntry] = {}self.lock = threading.Lock()def post(self, key: str, data: dict, agent_id: str, tags: List[str] = None):"""将信息发布到黑板"""with self.lock:self.entries[key] = BlackboardEntry(agent_id=agent_id,data=data,timestamp=time.time(),tags=tags or [])def read(self, key: str = None, tags: List[str] = None):"""从黑板读取信息"""with self.lock:if key:return self.entries.get(key)# 如果提供了标签则进行过滤if tags:return {k: v for k, v in self.entries.items() if any(tag in v.tags for tag in tags)}return dict(self.entries)# 全局黑板实例 blackboard = AgentBlackboard()# 智能体的黑板工具 @tool def post_to_blackboard(key: str, data: str, tags: str = "") -> str:"""将信息发布到共享黑板"""tag_list = [t.strip() for t in tags.split(",")] if tags else []blackboard.post(key, {"content": data}, "agent", tag_list)return f"发布到黑板,键为: {key}"@tool def read_from_blackboard(key: str = "", tags: str = "") -> str:"""从共享黑板读取信息"""if key:entry = blackboard.read(key)return f"找到: {entry.data}" if entry else "未找到条目"if tags:tag_list = [t.strip() for t in tags.split(",")]entries = blackboard.read(tags=tag_list)return f"找到 {len(entries)} 个具有这些标签的条目"all_entries = blackboard.read()return f"黑板共有 {len(all_entries)} 个条目"# 所有智能体都可以访问黑板 orchestrator = Agent(name="Orchestrator",tools=[post_to_blackboard, read_from_blackboard, *other_tools] )specialist_agent = Agent(name="Specialist",tools=[post_to_blackboard, read_from_blackboard, *specialist_tools] )
5. Amazon Bedrock 内存集成
使用 Bedrock 的内置内存功能为 A2A 智能体服务:
python
from strands import Agent from strands.models import BedrockModel# 具有 Bedrock 内存的协调者 orchestrator_model = BedrockModel(model_id="anthropic.claude-3-sonnet-20240229-v1:0",memory_id="shared_agent_memory", # 共享内存 IDsession_ttl=3600 # 1 小时会话超时 )orchestrator = Agent(name="Memory-Enabled Orchestrator",model=orchestrator_model,tools=[a2a_tools] )# 专家智能体可以共享相同的内存 ID specialist_model = BedrockModel(model_id="anthropic.claude-3-haiku-20240307-v1:0", memory_id="shared_agent_memory", # 相同的内存 IDsession_ttl=3600 )specialist = Agent(name="Memory-Enabled Specialist",model=specialist_model,tools=[specialist_tools] )
6. 实际实现示例
这是一个展示内存共享实际应用的完整示例:
python
from strands import Agent, tool from strands.multiagent.a2a import A2AServer, A2AClientToolProvider from strands.session.s3_session_manager import S3SessionManager# 共享上下文存储 class ProjectContext:def __init__(self):self.current_project = Noneself.user_preferences = {}self.task_history = []self.shared_data = {}project_context = ProjectContext()@tool def update_project_context(key: str, value: str) -> str:"""更新共享项目上下文"""project_context.shared_data[key] = valuereturn f"更新上下文: {key} = {value}"@tool def get_project_context(key: str = "") -> str:"""获取共享项目上下文"""if key:return project_context.shared_data.get(key, "未找到")return str(project_context.shared_data)# 具有内存共享的协调者 orchestrator = Agent(name="Project Orchestrator",prompt="""您协调一个专家智能体团队进行项目工作。 始终与其他智能体共享相关上下文并维护项目状态。""",tools=[update_project_context, get_project_context],session_manager=S3SessionManager(session_id="project_session",bucket="project-memory-bucket") )# 具有共享上下文访问权限的研究智能体 research_agent = Agent(name="Research Specialist", prompt="""您进行研究分析。开始工作前始终检查项目上下文,并用您的发现更新它。""",tools=[update_project_context, get_project_context, research_tools] )# 通过 A2A 提供研究智能体服务 A2AServer(agent=research_agent, port=9001).serve()# 将协调者连接到研究智能体 a2a_provider = A2AClientToolProvider(known_agent_urls=["http://localhost:9001"] )orchestrator.tools.extend(a2a_provider.tools)# 具有内存共享的示例工作流 response = orchestrator.run(""" 研究 AI 智能体的最新趋势并创建市场分析报告。 确保: 1. 用研究发现更新项目上下文 2. 与团队共享关键见解 3. 在多个研究会话中保持连续性 """)
内存共享的主要优势
连续性:智能体在交互间保持上下文
协作性:智能体可以相互构建工作成果
高效性:避免重复工作或丢失上下文
协调性:中央协调者维护全局状态
可扩展性:内存模式适用于大型智能体网络
最佳实践
在智能体间使用一致的内存键
对敏感数据实施适当的访问控制
设置适当的 TTL 以进行内存清理
监控内存使用以防止膨胀
为分布式场景设计最终一致性
中央控制变得异常强大,因为它可以在整个专家智能体网络中维护和共享上下文,从而创建一个具有持久化内存和共享智能的真正协作式 AI 系统。