当前位置: 首页 > news >正文

AWS strands agents 当智能体作为独立服务/容器部署时,它们无法共享进程内状态

当智能体作为独立服务/容器部署时,它们无法共享进程内状态。 以下是针对分布式部署中动态内存库的生产就绪解决方案:

1. 基于外部存储的内存库

基于 DynamoDB 的共享内存

import boto3
from strands import Agent, tool
from typing import Dict, Any
import jsonclass DynamoDBMemoryBank:def __init__(self, table_name: str = "agent-shared-memory"):self.dynamodb = boto3.resource('dynamodb')self.table = self.dynamodb.Table(table_name)def store(self, memory_key: str, data: Dict[str, Any], agent_id: str):"""在共享内存中存储数据"""self.table.put_item(Item={'memory_key': memory_key,'data': json.dumps(data),'agent_id': agent_id,'timestamp': int(time.time()),'ttl': int(time.time()) + 86400  # 24 小时 TTL})def retrieve(self, memory_key: str) -> Dict[str, Any]:"""从共享内存检索数据"""try:response = self.table.get_item(Key={'memory_key': memory_key})if 'Item' in response:return json.loads(response['Item']['data'])except Exception as e:print(f"检索内存错误: {e}")return {}def list_keys(self, prefix: str = "") -> list:"""列出所有内存键,可选前缀过滤"""response = self.table.scan()keys = [item['memory_key'] for item in response['Items']]if prefix:keys = [k for k in keys if k.startswith(prefix)]return keys# 创建共享内存工具
memory_bank = DynamoDBMemoryBank()@tool
def store_shared_data(key: str, data: str, description: str = "") -> str:"""在所有智能体可访问的共享内存库中存储数据"""memory_bank.store(key, {'content': data,'description': description}, agent_id=os.getenv('AGENT_ID', 'unknown'))return f"存储数据,键为: {key}"@tool
def retrieve_shared_data(key: str) -> str:"""从共享内存库检索数据"""data = memory_bank.retrieve(key)if data:return f"{data.get('description', '')}: {data.get('content', '')}"return f"未找到键对应的数据: {key}"@tool
def list_memory_keys(prefix: str = "") -> str:"""列出可用的内存键"""keys = memory_bank.list_keys(prefix)return f"可用键: {', '.join(keys)}"

基于 Redis 的高性能内存

import redis
import json
from strands import Agent, toolclass RedisMemoryBank:def __init__(self, host: str = "redis-cluster.company.com", port: int = 6379):self.redis_client = redis.Redis(host=host, port=port, decode_responses=True,health_check_interval=30)def store(self, key: str, data: dict, ttl: int = 3600):"""存储数据并设置 TTL"""self.redis_client.setex(key, ttl, json.dumps(data))def retrieve(self, key: str) -> dict:"""检索数据"""data = self.redis_client.get(key)return json.loads(data) if data else {}def publish_event(self, channel: str, message: dict):"""向其他智能体发布事件"""self.redis_client.publish(channel, json.dumps(message))def subscribe_to_events(self, channels: list):"""订阅来自其他智能体的事件"""pubsub = self.redis_client.pubsub()pubsub.subscribe(channels)return pubsub# 基于 Redis 的内存工具
redis_memory = RedisMemoryBank()@tool
def store_fast_memory(key: str, data: str, ttl_minutes: int = 60) -> str:"""在高速 Redis 内存库中存储数据"""redis_memory.store(key, {'content': data}, ttl_minutes * 60)return f"存储在快速内存中: {key}"@tool
def get_fast_memory(key: str) -> str:"""从高速内存检索"""data = redis_memory.retrieve(key)return data.get('content', '未找到')

2. 基于 S3 的已部署智能体会话管理

python

from strands import Agent
from strands.session.s3_session_manager import S3SessionManager# 每个已部署的智能体使用 S3 进行持久化会话
class DeployedAgent:def __init__(self, agent_id: str, user_id: str = None):self.agent_id = agent_id# 创建带有 S3 后端的会话管理器session_id = f"{agent_id}_{user_id}" if user_id else agent_idself.session_manager = S3SessionManager(session_id=session_id,bucket=os.getenv('AGENT_MEMORY_BUCKET', 'agent-memory-bucket'),prefix=f"agents/{agent_id}/sessions")# 创建具有持久化会话的智能体self.agent = Agent(name=f"已部署智能体 {agent_id}",session_manager=self.session_manager,tools=[store_shared_data, retrieve_shared_data, *other_tools])async def process_request(self, message: str, context: dict = None):"""使用持久化内存处理请求"""if context:# 在共享内存中存储上下文供其他智能体使用await store_shared_data(f"context_{self.agent_id}_{int(time.time())}", json.dumps(context))return await self.agent.run(message)# 部署配置
deployed_orchestrator = DeployedAgent("orchestrator", "global")
deployed_specialist = DeployedAgent("data_specialist", "global")

3. 带有共享内存的容器部署

带有共享服务的 Docker Compose

# docker-compose.yml
version: '3.8'
services:# 共享内存服务redis:image: redis:7-alpineports:- "6379:6379"volumes:- redis_data:/datadynamodb-local:image: amazon/dynamodb-localports:- "8000:8000"command: ["-jar", "DynamoDBLocal.jar", "-sharedDb", "-inMemory"]# 智能体服务orchestrator:build: ./orchestratorenvironment:- AGENT_ID=orchestrator- REDIS_HOST=redis- DYNAMODB_ENDPOINT=http://dynamodb-local:8000- AGENT_MEMORY_BUCKET=shared-agent-memoryports:- "9000:9000"depends_on:- redis- dynamodb-localdata-specialist:build: ./data-specialistenvironment:- AGENT_ID=data_specialist- REDIS_HOST=redis- DYNAMODB_ENDPOINT=http://dynamodb-local:8000- AGENT_MEMORY_BUCKET=shared-agent-memoryports:- "9001:9001"depends_on:- redis- dynamodb-localresearch-agent:build: ./research-agentenvironment:- AGENT_ID=research_agent- REDIS_HOST=redis- DYNAMODB_ENDPOINT=http://dynamodb-local:8000- AGENT_MEMORY_BUCKET=shared-agent-memoryports:- "9002:9002"depends_on:- redis- dynamodb-localvolumes:redis_data:

4. 带有共享内存的 AWS ECS/Fargate 部署

# agent_service.py - 所有已部署智能体的基础服务
import os
import asyncio
from strands import Agent
from strands.multiagent.a2a import A2AServerclass ProductionAgentService:def __init__(self):self.agent_id = os.getenv('AGENT_ID')self.memory_bank = DynamoDBMemoryBank()self.redis_memory = RedisMemoryBank(host=os.getenv('REDIS_HOST', 'redis-cluster.company.com'))# 创建带有共享内存工具的智能体self.agent = Agent(name=f"生产环境智能体 {self.agent_id}",session_manager=S3SessionManager(session_id=f"prod_{self.agent_id}",bucket=os.getenv('AGENT_MEMORY_BUCKET'),prefix=f"production/{self.agent_id}"),tools=[store_shared_data,retrieve_shared_data,store_fast_memory,get_fast_memory,*self.get_specialized_tools()])def get_specialized_tools(self):"""在子类中重写以提供专用工具"""return []async def start_service(self):"""启动 A2A 服务"""port = int(os.getenv('PORT', 9000))server = A2AServer(agent=self.agent, port=port)# 启动后台任务asyncio.create_task(self.memory_sync_task())# 启动 A2A 服务器server.serve()async def memory_sync_task(self):"""同步内存和处理事件的后台任务"""pubsub = self.redis_memory.subscribe_to_events(['agent_events'])while True:try:message = pubsub.get_message(timeout=1.0)if message and message['type'] == 'message':await self.handle_agent_event(json.loads(message['data']))except Exception as e:print(f"内存同步错误: {e}")await asyncio.sleep(1)async def handle_agent_event(self, event: dict):"""处理来自其他智能体的事件"""if event.get('type') == 'memory_update':# 刷新本地缓存或采取行动print(f"内存被 {event.get('agent_id')} 更新: {event.get('key')}")# 专用智能体实现
class OrchestratorService(ProductionAgentService):def get_specialized_tools(self):return [orchestration_tools]class DataSpecialistService(ProductionAgentService):def get_specialized_tools(self):return [data_analysis_tools]# 主入口点
if __name__ == "__main__":agent_type = os.getenv('AGENT_TYPE', 'orchestrator')if agent_type == 'orchestrator':service = OrchestratorService()elif agent_type == 'data_specialist':service = DataSpecialistService()else:service = ProductionAgentService()asyncio.run(service.start_service())

5. 事件驱动的内存更新

# 事件驱动的内存同步
class EventDrivenMemoryBank:def __init__(self):self.dynamodb_memory = DynamoDBMemoryBank()self.redis_memory = RedisMemoryBank()self.sns = boto3.client('sns')self.topic_arn = os.getenv('AGENT_EVENTS_TOPIC_ARN')async def store_with_notification(self, key: str, data: dict, agent_id: str):"""存储数据并通知其他智能体"""# 在持久化存储中存储self.dynamodb_memory.store(key, data, agent_id)# 在快速缓存中存储self.redis_memory.store(key, data, ttl=3600)# 通知其他智能体event = {'type': 'memory_update','key': key,'agent_id': agent_id,'timestamp': time.time()}# 发布到 SNS 进行跨区域/服务通知await self.sns.publish(TopicArn=self.topic_arn,Message=json.dumps(event))# 发布到 Redis 进行实时更新self.redis_memory.publish_event('agent_events', event)@tool
def store_with_broadcast(key: str, data: str, description: str = "") -> str:"""存储数据并广播给所有智能体"""event_memory = EventDrivenMemoryBank()asyncio.create_task(event_memory.store_with_notification(key, {'content': data, 'description': description},os.getenv('AGENT_ID')))return f"已存储并广播: {key}"

此架构的主要优势

  • 可扩展性:智能体可以独立部署并根据需求扩展

  • 持久性:内存可以在容器重启和部署后保留

  • 实时同步:Redis 提供快速内存访问和事件通知

  • 耐用性:DynamoDB/S3 提供持久化存储

  • 事件驱动:智能体可以实时响应内存变化

  • 多区域:可以在 AWS 区域和可用区之间工作

生产部署模式

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   协调者        │    │  数据专家       │    │  研究智能体     │
│   (ECS 任务)    │    │  (ECS 任务)     │    │  (ECS 任务)     │
└─────────┬───────┘    └─────────┬───────┘    └─────────┬───────┘│                      │                      │└──────────────────────┼──────────────────────┘│┌─────────────┴─────────────┐│     共享内存层            ││                           ││  ┌─────────┐ ┌─────────┐  ││  │ Redis   │ │DynamoDB │  ││  │(快速)   │ │(持久)   │  ││  └─────────┘ └─────────┘  ││                           ││  ┌─────────┐ ┌─────────┐  ││  │   S3    │ │   SNS   │  ││  │(会话)   │ │(事件)   │  ││  └─────────┘ └─────────┘  │└───────────────────────────┘

这种架构确保您的已部署智能体可以共享动态内存库,同时保持分布式、可扩展部署的优势。


文章转载自:

http://2BrWWrqy.zmyhn.cn
http://UpgDcyEz.zmyhn.cn
http://BZ6d5Lkq.zmyhn.cn
http://eYqpYedE.zmyhn.cn
http://2Ug0UXGJ.zmyhn.cn
http://VAM3ipUg.zmyhn.cn
http://Hx3VJsRH.zmyhn.cn
http://r5FExnfN.zmyhn.cn
http://OPrKZytm.zmyhn.cn
http://v1WbEgHd.zmyhn.cn
http://unUF76P2.zmyhn.cn
http://ynr8qU1s.zmyhn.cn
http://yXQbaGTy.zmyhn.cn
http://iFVKHRia.zmyhn.cn
http://nb4nNqys.zmyhn.cn
http://vFLYOowg.zmyhn.cn
http://Fn8tdzj1.zmyhn.cn
http://DM88U9Y8.zmyhn.cn
http://HPHaSyQ5.zmyhn.cn
http://eJv7mwwF.zmyhn.cn
http://FGIpO5NO.zmyhn.cn
http://wU21OkGU.zmyhn.cn
http://c1fsY5Pi.zmyhn.cn
http://z7TqT9Ce.zmyhn.cn
http://GKh9Scs1.zmyhn.cn
http://xtdw1O8d.zmyhn.cn
http://CQdJOOJX.zmyhn.cn
http://xgZAbr42.zmyhn.cn
http://f3pRC0cj.zmyhn.cn
http://EOxMmxjU.zmyhn.cn
http://www.dtcms.com/a/378749.html

相关文章:

  • 云手机与云游戏之间有什么关系?
  • 数据库学习MySQL系列3、Windows11系统安装MySQL方法二.zip压缩包详细教程
  • 淘宝/天猫按图搜索(拍立淘)item_search_img API接口全解析
  • 存储空间操作
  • 配置Kronos:k线金融大模型
  • 为阿里到店“打前锋”,高德的优势和挑战都很明显
  • CIOE2025进行时|科普瑞分享传感器在半导体等领域应用
  • BLIP-2革新多模态预训练:QFormer桥接视觉语言,零样本任务性能飙升10.7%!
  • WhatWeb-网站安全扫描指纹识别
  • 【LeetCode 每日一题】498. 对角线遍历——(解法一)模拟
  • LeetCode2 两数相加 两个链表相加(C++)
  • 项目1——单片机程序审查,控制系统项目评估总结报告
  • 科技行业新闻发布平台哪家好?多场景推广专业方案服务商推荐
  • 电力基站掉电数据丢失问题该靠天硕工业级SSD固态硬盘解决吗?
  • VSCode 设置和选择conda环境
  • 遗传算法属于机器学习吗?
  • html获取16个随机颜色并不重复
  • 数据库开启ssl
  • 12V转18V/2A车灯方案:宽输入电压、支持PWM调光的车灯驱动芯片FP7208
  • get post 请求
  • 如何在Anaconda中配置你的CUDA Pytorch cuNN环境(2025最新教程)
  • 关于大模型提示词设计的思路探讨
  • 软考-系统架构设计师 信息加解密技术详细讲解
  • 人工鱼群算法AFSA优化支持向量机SVM,提高故障分类精度
  • 《RAD Studio 13.0》 [DELPHI 13.0] [官方原版IOS] 下载
  • 最小曲面问题的欧拉-拉格朗日方程 / 曲面极值问题的变分法推导
  • kotlin的函数前面增加suspend关键字的作用
  • Linux vi/vim
  • 赋能高效设计:12套中后台管理信息系统通用原型框架
  • Spark 核心 RDD详解