分布式部署的A2A strands agents sdk架构中的最佳选择,使用open search共享模型记忆
对于分布式智能体部署中的动态记忆库而言,OpenSearch 是一个绝佳的选择。实际上,对于许多用例来说,它可能比 DynamoDB/Redis 更胜一筹。让我来说明原因及实现方法:
对于分布式智能体部署中的动态记忆库,OpenSearch 是一个更优越的选择。以下是原因及实现方法:
为何 OpenSearch 更适合作为智能体记忆库
语义记忆搜索
与 DynamoDB 的键值查找不同,OpenSearch 支持语义记忆检索:
import boto3 from opensearchpy import OpenSearch, RequestsHttpConnection from aws_requests_auth.aws_auth import AWSRequestsAuth from strands import Agent, tool import json import timeclass OpenSearchMemoryBank:def __init__(self, endpoint: str, region: str = 'us-east-1'):# 用于 OpenSearch 的 AWS 认证credentials = boto3.Session().get_credentials()awsauth = AWSRequestsAuth(credentials, region, 'es')self.client = OpenSearch(hosts=[{'host': endpoint.replace('https://', ''), 'port': 443}],http_auth=awsauth,use_ssl=True,verify_certs=True,connection_class=RequestsHttpConnection)self.memory_index = "agent-shared-memory" # 智能体共享记忆索引self.create_memory_index()def create_memory_index(self):"""创建包含用于语义搜索的向量字段的索引"""index_body = {"settings": {"index": {"knn": True,"knn.algo_param.ef_search": 100}},"mappings": {"properties": {"memory_key": {"type": "keyword"}, # 记忆键"content": {"type": "text"}, # 内容"description": {"type": "text"}, # 描述"agent_id": {"type": "keyword"}, # 智能体ID"timestamp": {"type": "date"}, # 时间戳"tags": {"type": "keyword"}, # 标签"memory_vector": { # 记忆向量"type": "knn_vector","dimension": 1536, # OpenAI 嵌入维度"method": {"name": "hnsw","space_type": "cosine", # 余弦相似度"engine": "nmslib"}},"context": {"type": "object"}, # 上下文"importance_score": {"type": "float"} # 重要性分数}}}if not self.client.indices.exists(self.memory_index):self.client.indices.create(self.memory_index, body=index_body)def store_memory(self, key: str, content: str, agent_id: str, description: str = "", tags: list = None, context: dict = None, importance: float = 1.0):"""存储带有语义向量的记忆"""# 生成用于语义搜索的嵌入向量embedding = self.generate_embedding(f"{description} {content}")doc = {"memory_key": key,"content": content,"description": description,"agent_id": agent_id,"timestamp": time.time(),"tags": tags or [],"memory_vector": embedding,"context": context or {},"importance_score": importance}self.client.index(index=self.memory_index,id=key,body=doc)def semantic_search(self, query: str, k: int = 5, agent_filter: str = None):"""通过语义相似性搜索记忆"""query_vector = self.generate_embedding(query)search_body = {"size": k,"query": {"bool": {"must": [{"knn": {"memory_vector": {"vector": query_vector,"k": k}}}]}},"sort": [{"importance_score": {"order": "desc"}}, # 按重要性降序{"timestamp": {"order": "desc"}} # 按时间戳降序]}# 如果指定了智能体过滤器,则添加if agent_filter:search_body["query"]["bool"]["filter"] = [{"term": {"agent_id": agent_filter}}]response = self.client.search(index=self.memory_index, body=search_body)return response['hits']['hits']def keyword_search(self, query: str, tags: list = None):"""传统关键字搜索"""search_body = {"query": {"bool": {"should": [{"match": {"content": query}},{"match": {"description": query}}]}}}if tags:search_body["query"]["bool"]["filter"] = [{"terms": {"tags": tags}}]response = self.client.search(index=self.memory_index, body=search_body)return response['hits']['hits']def hybrid_search(self, query: str, k: int = 5):"""结合语义和关键字搜索"""query_vector = self.generate_embedding(query)search_body = {"size": k,"query": {"bool": {"should": [{"knn": {"memory_vector": {"vector": query_vector,"k": k,"boost": 2.0 # 提升权重}}},{"multi_match": {"query": query,"fields": ["content", "description"],"boost": 1.0}}]}}}response = self.client.search(index=self.memory_index, body=search_body)return response['hits']['hits']def generate_embedding(self, text: str):"""使用 Amazon Bedrock 生成嵌入向量"""bedrock = boto3.client('bedrock-runtime')response = bedrock.invoke_model(modelId='amazon.titan-embed-text-v1',body=json.dumps({"inputText": text}))result = json.loads(response['body'].read())return result['embedding']# 初始化 OpenSearch 记忆库 opensearch_memory = OpenSearchMemoryBank(endpoint=os.getenv('OPENSEARCH_ENDPOINT'), # 从环境变量获取端点region=os.getenv('AWS_REGION', 'us-east-1') # 从环境变量获取区域,默认为 us-east-1 )
用于智能体的高级记忆工具
@tool def store_semantic_memory(key: str, content: str, description: str = "", tags: str = "", importance: float = 1.0) -> str:"""存储可以通过语义搜索找到的记忆"""tag_list = [t.strip() for t in tags.split(",")] if tags else []opensearch_memory.store_memory(key=key,content=content,agent_id=os.getenv('AGENT_ID', 'unknown'), # 从环境变量获取智能体ID,默认为 unknowndescription=description,tags=tag_list,importance=importance)return f"已存储语义记忆: {key}"@tool def find_similar_memories(query: str, limit: int = 5) -> str:"""使用 AI 语义搜索查找与查询相似的记忆"""results = opensearch_memory.semantic_search(query, k=limit)if not results:return "未找到相似记忆"memories = []for hit in results:source = hit['_source']score = hit['_score']memories.append(f"键: {source['memory_key']}\n"f"描述: {source['description']}\n"f"内容: {source['content'][:200]}...\n" # 截取前200个字符f"相关性: {score:.2f}\n"f"智能体: {source['agent_id']}\n---")return "\n".join(memories)@tool def search_memory_by_context(query: str, context_type: str = "") -> str:"""使用混合(语义 + 关键字)搜索记忆"""results = opensearch_memory.hybrid_search(query, k=5)memories = []for hit in results:source = hit['_source']if context_type and context_type not in source.get('tags', []):continuememories.append(f"📝 {source['description']}\n"f"💭 {source['content']}\n"f"🏷️ 标签: {', '.join(source.get('tags', []))}\n"f"🤖 智能体: {source['agent_id']}\n")return "\n\n".join(memories) if memories else "未找到相关记忆"@tool def update_memory_importance(key: str, new_importance: float) -> str:"""更新记忆的重要性分数"""opensearch_memory.client.update(index=opensearch_memory.memory_index,id=key,body={"doc": {"importance_score": new_importance}})return f"已将 {key} 的重要性更新为 {new_importance}"
使用 OpenSearch 进行生产环境部署
python
# production_agent_with_opensearch.py import os import asyncio from strands import Agent from strands.multiagent.a2a import A2AServer from strands.session.s3_session_manager import S3SessionManagerclass ProductionAgentWithOpenSearch:def __init__(self, agent_id: str):self.agent_id = agent_id# OpenSearch 记忆库self.memory_bank = OpenSearchMemoryBank(endpoint=os.getenv('OPENSEARCH_ENDPOINT'),region=os.getenv('AWS_REGION', 'us-east-1'))# 用于会话历史的 S3 会话管理器self.session_manager = S3SessionManager(session_id=f"prod_{agent_id}",bucket=os.getenv('AGENT_MEMORY_BUCKET'), # 智能体记忆存储桶prefix=f"sessions/{agent_id}" # 前缀)# 创建带有 OpenSearch 记忆工具的智能体self.agent = Agent(name=f"生产环境智能体 {agent_id}",session_manager=self.session_manager,tools=[store_semantic_memory,find_similar_memories,search_memory_by_context,update_memory_importance,*self.get_specialized_tools() # 获取特定工具])def get_specialized_tools(self):"""重写此方法以提供智能体特定工具"""return []async def start_service(self):"""启动带有内存同步的 A2A 服务"""port = int(os.getenv('PORT', 9000)) # 端口,默认9000server = A2AServer(agent=self.agent, port=port)# 启动内存维护任务asyncio.create_task(self.memory_maintenance_task())server.serve()async def memory_maintenance_task(self):"""用于内存管理的后台任务"""while True:try:# 清理旧的、低重要性的记忆await self.cleanup_old_memories()# 根据使用情况更新记忆重要性await self.update_memory_scores()await asyncio.sleep(3600) # 每小时运行一次except Exception as e:print(f"内存维护错误: {e}")async def cleanup_old_memories(self):"""移除旧的、不重要的记忆"""cutoff_time = time.time() - (30 * 24 * 3600) # 30 天前search_body = {"query": {"bool": {"must": [{"range": {"timestamp": {"lt": cutoff_time}}}, # 时间戳早于 cutoff_time{"range": {"importance_score": {"lt": 0.3}}} # 重要性分数低于 0.3]}}}response = self.memory_bank.client.search(index=self.memory_bank.memory_index,body=search_body)for hit in response['hits']['hits']:self.memory_bank.client.delete(index=self.memory_bank.memory_index,id=hit['_id'])
多智能体记忆协作
class CollaborativeMemorySystem:def __init__(self):self.opensearch = OpenSearchMemoryBank(endpoint=os.getenv('OPENSEARCH_ENDPOINT'))async def cross_agent_memory_sharing(self, source_agent: str, target_agents: list, memory_query: str):"""在智能体之间共享相关记忆"""# 从源智能体查找相关记忆memories = self.opensearch.semantic_search(query=memory_query,agent_filter=source_agent,k=10)# 为目标智能体创建共享记忆条目for memory in memories:source = memory['_source']shared_key = f"shared_{source['memory_key']}_{int(time.time())}" # 共享键名for target_agent in target_agents:self.opensearch.store_memory(key=shared_key,content=source['content'],agent_id=target_agent,description=f"从 {source_agent} 共享: {source['description']}",tags=source['tags'] + ['shared_memory'], # 添加共享记忆标签importance=source['importance_score'] * 0.8 # 重要性稍低)# 具有协作记忆功能的增强型智能体 class CollaborativeAgent(ProductionAgentWithOpenSearch):def __init__(self, agent_id: str):super().__init__(agent_id)self.memory_system = CollaborativeMemorySystem()@tooldef share_memories_with_agents(self, topic: str, target_agents: str) -> str:"""与其他智能体共享关于某个主题的相关记忆"""agent_list = [a.strip() for a in target_agents.split(",")] # 智能体列表asyncio.create_task(self.memory_system.cross_agent_memory_sharing(source_agent=self.agent_id,target_agents=agent_list,memory_query=topic))return f"正在与 {len(agent_list)} 个智能体共享关于 '{topic}' 的记忆"
用于自动扩展的 OpenSearch Serverless
# terraform/opensearch.tf resource "aws_opensearchserverless_collection" "agent_memory" {name = "agent-shared-memory" # 集合名称type = "VECTORSEARCH" # 类型为向量搜索tags = {Environment = "production" # 环境标签Purpose = "agent-memory-bank" # 用途标签} }resource "aws_opensearchserverless_access_policy" "agent_memory_access" {name = "agent-memory-access" # 策略名称type = "data" # 策略类型为数据policy = jsonencode([{Rules = [{ResourceType = "index" # 资源类型为索引Resource = ["index/agent-shared-memory/*"] # 资源名称Permission = ["aoss:*"] # 权限}]Principal = [aws_iam_role.agent_execution_role.arn] # 主体(执行角色ARN)}]) }
完整的 Docker 部署
# docker-compose.yml version: '3.8' services:opensearch:image: opensearchproject/opensearch:2.11.0 # OpenSearch 镜像environment:- discovery.type=single-node # 单节点发现- plugins.security.disabled=true # 禁用安全插件(仅用于开发)- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # JVM 选项ports:- "9200:9200" # 端口映射volumes:- opensearch_data:/usr/share/opensearch/data # 数据卷orchestrator: # 协调器服务build: ./orchestrator # 构建上下文environment:- AGENT_ID=orchestrator # 智能体ID- OPENSEARCH_ENDPOINT=http://opensearch:9200 # OpenSearch 端点- AGENT_MEMORY_BUCKET=shared-agent-memory # 记忆存储桶ports:- "9000:9000" # 端口映射depends_on:- opensearch # 依赖 OpenSearch 服务data-specialist: # 数据专家服务build: ./data-specialist # 构建上下文environment:- AGENT_ID=data_specialist # 智能体ID- OPENSEARCH_ENDPOINT=http://opensearch:9200 # OpenSearch 端点- AGENT_MEMORY_BUCKET=shared-agent-memory # 记忆存储桶ports:- "9001:9001" # 端口映射depends_on:- opensearch # 依赖 OpenSearch 服务volumes:opensearch_data: # 定义数据卷
OpenSearch 作为智能体记忆库的主要优势
语义搜索 (Semantic Search): 按含义而非仅关键字查找记忆
向量存储 (Vector Storage): 原生支持嵌入向量和相似性搜索
混合搜索 (Hybrid Search): 结合语义和传统搜索
可扩展性 (Scalability): 可处理数十亿条记忆条目
实时性 (Real-time): 即时索引和搜索
分析能力 (Analytics): 内置仪表板和监控
成本效益 (Cost-Effective): 提供按使用量付费的无服务器选项
AWS 集成 (AWS Integration): 与 Bedrock、Lambda 等原生集成
对于分布式智能体部署中的动态记忆库,尤其是当您需要智能记忆检索和语义理解时,OpenSearch 绝对是更优越的选择!
资料来源
Vector search - Amazon OpenSearch Service (向量搜索 - Amazon OpenSearch Service)
Unlock the power of Amazon OpenSearch Service: Your learning guide for search, analytics, and generative AI solutions | AWS Training and Certification Blog (释放 Amazon OpenSearch Service 的力量:您的搜索、分析和生成式 AI 解决方案学习指南 | AWS 培训与认证博客)
Vector Embeddings for Search, RAG, Chatbots, Agents, and Generative AI - Vector Database for Amazon OpenSearch Service - AWS (用于搜索、RAG、聊天机器人、智能体和生成式 AI 的向量嵌入 - Amazon OpenSearch Service 的向量数据库 - AWS)
Open-Source Search Engine - Amazon OpenSearch Service Managed Service - AWS (开源搜索引擎 - Amazon OpenSearch Service 托管服务 - AWS)
Meta-tools - AWS Prescriptive Guidance (元工具 - AWS 规范性指南)