AWS strands agents 当智能体作为独立服务/容器部署时,它们无法共享进程内状态
当智能体作为独立服务/容器部署时,它们无法共享进程内状态。 以下是针对分布式部署中动态内存库的生产就绪解决方案:
1. 基于外部存储的内存库
基于 DynamoDB 的共享内存
import boto3 from strands import Agent, tool from typing import Dict, Any import jsonclass DynamoDBMemoryBank:def __init__(self, table_name: str = "agent-shared-memory"):self.dynamodb = boto3.resource('dynamodb')self.table = self.dynamodb.Table(table_name)def store(self, memory_key: str, data: Dict[str, Any], agent_id: str):"""在共享内存中存储数据"""self.table.put_item(Item={'memory_key': memory_key,'data': json.dumps(data),'agent_id': agent_id,'timestamp': int(time.time()),'ttl': int(time.time()) + 86400 # 24 小时 TTL})def retrieve(self, memory_key: str) -> Dict[str, Any]:"""从共享内存检索数据"""try:response = self.table.get_item(Key={'memory_key': memory_key})if 'Item' in response:return json.loads(response['Item']['data'])except Exception as e:print(f"检索内存错误: {e}")return {}def list_keys(self, prefix: str = "") -> list:"""列出所有内存键,可选前缀过滤"""response = self.table.scan()keys = [item['memory_key'] for item in response['Items']]if prefix:keys = [k for k in keys if k.startswith(prefix)]return keys# 创建共享内存工具 memory_bank = DynamoDBMemoryBank()@tool def store_shared_data(key: str, data: str, description: str = "") -> str:"""在所有智能体可访问的共享内存库中存储数据"""memory_bank.store(key, {'content': data,'description': description}, agent_id=os.getenv('AGENT_ID', 'unknown'))return f"存储数据,键为: {key}"@tool def retrieve_shared_data(key: str) -> str:"""从共享内存库检索数据"""data = memory_bank.retrieve(key)if data:return f"{data.get('description', '')}: {data.get('content', '')}"return f"未找到键对应的数据: {key}"@tool def list_memory_keys(prefix: str = "") -> str:"""列出可用的内存键"""keys = memory_bank.list_keys(prefix)return f"可用键: {', '.join(keys)}"
基于 Redis 的高性能内存
import redis import json from strands import Agent, toolclass RedisMemoryBank:def __init__(self, host: str = "redis-cluster.company.com", port: int = 6379):self.redis_client = redis.Redis(host=host, port=port, decode_responses=True,health_check_interval=30)def store(self, key: str, data: dict, ttl: int = 3600):"""存储数据并设置 TTL"""self.redis_client.setex(key, ttl, json.dumps(data))def retrieve(self, key: str) -> dict:"""检索数据"""data = self.redis_client.get(key)return json.loads(data) if data else {}def publish_event(self, channel: str, message: dict):"""向其他智能体发布事件"""self.redis_client.publish(channel, json.dumps(message))def subscribe_to_events(self, channels: list):"""订阅来自其他智能体的事件"""pubsub = self.redis_client.pubsub()pubsub.subscribe(channels)return pubsub# 基于 Redis 的内存工具 redis_memory = RedisMemoryBank()@tool def store_fast_memory(key: str, data: str, ttl_minutes: int = 60) -> str:"""在高速 Redis 内存库中存储数据"""redis_memory.store(key, {'content': data}, ttl_minutes * 60)return f"存储在快速内存中: {key}"@tool def get_fast_memory(key: str) -> str:"""从高速内存检索"""data = redis_memory.retrieve(key)return data.get('content', '未找到')
2. 基于 S3 的已部署智能体会话管理
python
from strands import Agent from strands.session.s3_session_manager import S3SessionManager# 每个已部署的智能体使用 S3 进行持久化会话 class DeployedAgent:def __init__(self, agent_id: str, user_id: str = None):self.agent_id = agent_id# 创建带有 S3 后端的会话管理器session_id = f"{agent_id}_{user_id}" if user_id else agent_idself.session_manager = S3SessionManager(session_id=session_id,bucket=os.getenv('AGENT_MEMORY_BUCKET', 'agent-memory-bucket'),prefix=f"agents/{agent_id}/sessions")# 创建具有持久化会话的智能体self.agent = Agent(name=f"已部署智能体 {agent_id}",session_manager=self.session_manager,tools=[store_shared_data, retrieve_shared_data, *other_tools])async def process_request(self, message: str, context: dict = None):"""使用持久化内存处理请求"""if context:# 在共享内存中存储上下文供其他智能体使用await store_shared_data(f"context_{self.agent_id}_{int(time.time())}", json.dumps(context))return await self.agent.run(message)# 部署配置 deployed_orchestrator = DeployedAgent("orchestrator", "global") deployed_specialist = DeployedAgent("data_specialist", "global")
3. 带有共享内存的容器部署
带有共享服务的 Docker Compose
# docker-compose.yml version: '3.8' services:# 共享内存服务redis:image: redis:7-alpineports:- "6379:6379"volumes:- redis_data:/datadynamodb-local:image: amazon/dynamodb-localports:- "8000:8000"command: ["-jar", "DynamoDBLocal.jar", "-sharedDb", "-inMemory"]# 智能体服务orchestrator:build: ./orchestratorenvironment:- AGENT_ID=orchestrator- REDIS_HOST=redis- DYNAMODB_ENDPOINT=http://dynamodb-local:8000- AGENT_MEMORY_BUCKET=shared-agent-memoryports:- "9000:9000"depends_on:- redis- dynamodb-localdata-specialist:build: ./data-specialistenvironment:- AGENT_ID=data_specialist- REDIS_HOST=redis- DYNAMODB_ENDPOINT=http://dynamodb-local:8000- AGENT_MEMORY_BUCKET=shared-agent-memoryports:- "9001:9001"depends_on:- redis- dynamodb-localresearch-agent:build: ./research-agentenvironment:- AGENT_ID=research_agent- REDIS_HOST=redis- DYNAMODB_ENDPOINT=http://dynamodb-local:8000- AGENT_MEMORY_BUCKET=shared-agent-memoryports:- "9002:9002"depends_on:- redis- dynamodb-localvolumes:redis_data:
4. 带有共享内存的 AWS ECS/Fargate 部署
# agent_service.py - 所有已部署智能体的基础服务 import os import asyncio from strands import Agent from strands.multiagent.a2a import A2AServerclass ProductionAgentService:def __init__(self):self.agent_id = os.getenv('AGENT_ID')self.memory_bank = DynamoDBMemoryBank()self.redis_memory = RedisMemoryBank(host=os.getenv('REDIS_HOST', 'redis-cluster.company.com'))# 创建带有共享内存工具的智能体self.agent = Agent(name=f"生产环境智能体 {self.agent_id}",session_manager=S3SessionManager(session_id=f"prod_{self.agent_id}",bucket=os.getenv('AGENT_MEMORY_BUCKET'),prefix=f"production/{self.agent_id}"),tools=[store_shared_data,retrieve_shared_data,store_fast_memory,get_fast_memory,*self.get_specialized_tools()])def get_specialized_tools(self):"""在子类中重写以提供专用工具"""return []async def start_service(self):"""启动 A2A 服务"""port = int(os.getenv('PORT', 9000))server = A2AServer(agent=self.agent, port=port)# 启动后台任务asyncio.create_task(self.memory_sync_task())# 启动 A2A 服务器server.serve()async def memory_sync_task(self):"""同步内存和处理事件的后台任务"""pubsub = self.redis_memory.subscribe_to_events(['agent_events'])while True:try:message = pubsub.get_message(timeout=1.0)if message and message['type'] == 'message':await self.handle_agent_event(json.loads(message['data']))except Exception as e:print(f"内存同步错误: {e}")await asyncio.sleep(1)async def handle_agent_event(self, event: dict):"""处理来自其他智能体的事件"""if event.get('type') == 'memory_update':# 刷新本地缓存或采取行动print(f"内存被 {event.get('agent_id')} 更新: {event.get('key')}")# 专用智能体实现 class OrchestratorService(ProductionAgentService):def get_specialized_tools(self):return [orchestration_tools]class DataSpecialistService(ProductionAgentService):def get_specialized_tools(self):return [data_analysis_tools]# 主入口点 if __name__ == "__main__":agent_type = os.getenv('AGENT_TYPE', 'orchestrator')if agent_type == 'orchestrator':service = OrchestratorService()elif agent_type == 'data_specialist':service = DataSpecialistService()else:service = ProductionAgentService()asyncio.run(service.start_service())
5. 事件驱动的内存更新
# 事件驱动的内存同步 class EventDrivenMemoryBank:def __init__(self):self.dynamodb_memory = DynamoDBMemoryBank()self.redis_memory = RedisMemoryBank()self.sns = boto3.client('sns')self.topic_arn = os.getenv('AGENT_EVENTS_TOPIC_ARN')async def store_with_notification(self, key: str, data: dict, agent_id: str):"""存储数据并通知其他智能体"""# 在持久化存储中存储self.dynamodb_memory.store(key, data, agent_id)# 在快速缓存中存储self.redis_memory.store(key, data, ttl=3600)# 通知其他智能体event = {'type': 'memory_update','key': key,'agent_id': agent_id,'timestamp': time.time()}# 发布到 SNS 进行跨区域/服务通知await self.sns.publish(TopicArn=self.topic_arn,Message=json.dumps(event))# 发布到 Redis 进行实时更新self.redis_memory.publish_event('agent_events', event)@tool def store_with_broadcast(key: str, data: str, description: str = "") -> str:"""存储数据并广播给所有智能体"""event_memory = EventDrivenMemoryBank()asyncio.create_task(event_memory.store_with_notification(key, {'content': data, 'description': description},os.getenv('AGENT_ID')))return f"已存储并广播: {key}"
此架构的主要优势
可扩展性:智能体可以独立部署并根据需求扩展
持久性:内存可以在容器重启和部署后保留
实时同步:Redis 提供快速内存访问和事件通知
耐用性:DynamoDB/S3 提供持久化存储
事件驱动:智能体可以实时响应内存变化
多区域:可以在 AWS 区域和可用区之间工作
生产部署模式
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ 协调者 │ │ 数据专家 │ │ 研究智能体 │ │ (ECS 任务) │ │ (ECS 任务) │ │ (ECS 任务) │ └─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘│ │ │└──────────────────────┼──────────────────────┘│┌─────────────┴─────────────┐│ 共享内存层 ││ ││ ┌─────────┐ ┌─────────┐ ││ │ Redis │ │DynamoDB │ ││ │(快速) │ │(持久) │ ││ └─────────┘ └─────────┘ ││ ││ ┌─────────┐ ┌─────────┐ ││ │ S3 │ │ SNS │ ││ │(会话) │ │(事件) │ ││ └─────────┘ └─────────┘ │└───────────────────────────┘
这种架构确保您的已部署智能体可以共享动态内存库,同时保持分布式、可扩展部署的优势。