当前位置: 首页 > news >正文

分布式部署的A2A strands agents sdk架构中的最佳选择,使用open search共享模型记忆

对于分布式智能体部署中的动态记忆库而言,OpenSearch 是一个绝佳的选择。实际上,对于许多用例来说,它可能比 DynamoDB/Redis 更胜一筹。让我来说明原因及实现方法:

对于分布式智能体部署中的动态记忆库,OpenSearch 是一个更优越的选择。以下是原因及实现方法:

为何 OpenSearch 更适合作为智能体记忆库

  1. 语义记忆搜索
    与 DynamoDB 的键值查找不同,OpenSearch 支持语义记忆检索:

import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from aws_requests_auth.aws_auth import AWSRequestsAuth
from strands import Agent, tool
import json
import timeclass OpenSearchMemoryBank:def __init__(self, endpoint: str, region: str = 'us-east-1'):# 用于 OpenSearch 的 AWS 认证credentials = boto3.Session().get_credentials()awsauth = AWSRequestsAuth(credentials, region, 'es')self.client = OpenSearch(hosts=[{'host': endpoint.replace('https://', ''), 'port': 443}],http_auth=awsauth,use_ssl=True,verify_certs=True,connection_class=RequestsHttpConnection)self.memory_index = "agent-shared-memory" # 智能体共享记忆索引self.create_memory_index()def create_memory_index(self):"""创建包含用于语义搜索的向量字段的索引"""index_body = {"settings": {"index": {"knn": True,"knn.algo_param.ef_search": 100}},"mappings": {"properties": {"memory_key": {"type": "keyword"}, # 记忆键"content": {"type": "text"}, # 内容"description": {"type": "text"}, # 描述"agent_id": {"type": "keyword"}, # 智能体ID"timestamp": {"type": "date"}, # 时间戳"tags": {"type": "keyword"}, # 标签"memory_vector": { # 记忆向量"type": "knn_vector","dimension": 1536,  # OpenAI 嵌入维度"method": {"name": "hnsw","space_type": "cosine", # 余弦相似度"engine": "nmslib"}},"context": {"type": "object"}, # 上下文"importance_score": {"type": "float"} # 重要性分数}}}if not self.client.indices.exists(self.memory_index):self.client.indices.create(self.memory_index, body=index_body)def store_memory(self, key: str, content: str, agent_id: str, description: str = "", tags: list = None, context: dict = None, importance: float = 1.0):"""存储带有语义向量的记忆"""# 生成用于语义搜索的嵌入向量embedding = self.generate_embedding(f"{description} {content}")doc = {"memory_key": key,"content": content,"description": description,"agent_id": agent_id,"timestamp": time.time(),"tags": tags or [],"memory_vector": embedding,"context": context or {},"importance_score": importance}self.client.index(index=self.memory_index,id=key,body=doc)def semantic_search(self, query: str, k: int = 5, agent_filter: str = None):"""通过语义相似性搜索记忆"""query_vector = self.generate_embedding(query)search_body = {"size": k,"query": {"bool": {"must": [{"knn": {"memory_vector": {"vector": query_vector,"k": k}}}]}},"sort": [{"importance_score": {"order": "desc"}}, # 按重要性降序{"timestamp": {"order": "desc"}} # 按时间戳降序]}# 如果指定了智能体过滤器,则添加if agent_filter:search_body["query"]["bool"]["filter"] = [{"term": {"agent_id": agent_filter}}]response = self.client.search(index=self.memory_index, body=search_body)return response['hits']['hits']def keyword_search(self, query: str, tags: list = None):"""传统关键字搜索"""search_body = {"query": {"bool": {"should": [{"match": {"content": query}},{"match": {"description": query}}]}}}if tags:search_body["query"]["bool"]["filter"] = [{"terms": {"tags": tags}}]response = self.client.search(index=self.memory_index, body=search_body)return response['hits']['hits']def hybrid_search(self, query: str, k: int = 5):"""结合语义和关键字搜索"""query_vector = self.generate_embedding(query)search_body = {"size": k,"query": {"bool": {"should": [{"knn": {"memory_vector": {"vector": query_vector,"k": k,"boost": 2.0 # 提升权重}}},{"multi_match": {"query": query,"fields": ["content", "description"],"boost": 1.0}}]}}}response = self.client.search(index=self.memory_index, body=search_body)return response['hits']['hits']def generate_embedding(self, text: str):"""使用 Amazon Bedrock 生成嵌入向量"""bedrock = boto3.client('bedrock-runtime')response = bedrock.invoke_model(modelId='amazon.titan-embed-text-v1',body=json.dumps({"inputText": text}))result = json.loads(response['body'].read())return result['embedding']# 初始化 OpenSearch 记忆库
opensearch_memory = OpenSearchMemoryBank(endpoint=os.getenv('OPENSEARCH_ENDPOINT'), # 从环境变量获取端点region=os.getenv('AWS_REGION', 'us-east-1') # 从环境变量获取区域,默认为 us-east-1
)
  1. 用于智能体的高级记忆工具

@tool
def store_semantic_memory(key: str, content: str, description: str = "", tags: str = "", importance: float = 1.0) -> str:"""存储可以通过语义搜索找到的记忆"""tag_list = [t.strip() for t in tags.split(",")] if tags else []opensearch_memory.store_memory(key=key,content=content,agent_id=os.getenv('AGENT_ID', 'unknown'), # 从环境变量获取智能体ID,默认为 unknowndescription=description,tags=tag_list,importance=importance)return f"已存储语义记忆: {key}"@tool
def find_similar_memories(query: str, limit: int = 5) -> str:"""使用 AI 语义搜索查找与查询相似的记忆"""results = opensearch_memory.semantic_search(query, k=limit)if not results:return "未找到相似记忆"memories = []for hit in results:source = hit['_source']score = hit['_score']memories.append(f"键: {source['memory_key']}\n"f"描述: {source['description']}\n"f"内容: {source['content'][:200]}...\n" # 截取前200个字符f"相关性: {score:.2f}\n"f"智能体: {source['agent_id']}\n---")return "\n".join(memories)@tool
def search_memory_by_context(query: str, context_type: str = "") -> str:"""使用混合(语义 + 关键字)搜索记忆"""results = opensearch_memory.hybrid_search(query, k=5)memories = []for hit in results:source = hit['_source']if context_type and context_type not in source.get('tags', []):continuememories.append(f"📝 {source['description']}\n"f"💭 {source['content']}\n"f"🏷️ 标签: {', '.join(source.get('tags', []))}\n"f"🤖 智能体: {source['agent_id']}\n")return "\n\n".join(memories) if memories else "未找到相关记忆"@tool
def update_memory_importance(key: str, new_importance: float) -> str:"""更新记忆的重要性分数"""opensearch_memory.client.update(index=opensearch_memory.memory_index,id=key,body={"doc": {"importance_score": new_importance}})return f"已将 {key} 的重要性更新为 {new_importance}"
  1. 使用 OpenSearch 进行生产环境部署

python

# production_agent_with_opensearch.py
import os
import asyncio
from strands import Agent
from strands.multiagent.a2a import A2AServer
from strands.session.s3_session_manager import S3SessionManagerclass ProductionAgentWithOpenSearch:def __init__(self, agent_id: str):self.agent_id = agent_id# OpenSearch 记忆库self.memory_bank = OpenSearchMemoryBank(endpoint=os.getenv('OPENSEARCH_ENDPOINT'),region=os.getenv('AWS_REGION', 'us-east-1'))# 用于会话历史的 S3 会话管理器self.session_manager = S3SessionManager(session_id=f"prod_{agent_id}",bucket=os.getenv('AGENT_MEMORY_BUCKET'), # 智能体记忆存储桶prefix=f"sessions/{agent_id}" # 前缀)# 创建带有 OpenSearch 记忆工具的智能体self.agent = Agent(name=f"生产环境智能体 {agent_id}",session_manager=self.session_manager,tools=[store_semantic_memory,find_similar_memories,search_memory_by_context,update_memory_importance,*self.get_specialized_tools() # 获取特定工具])def get_specialized_tools(self):"""重写此方法以提供智能体特定工具"""return []async def start_service(self):"""启动带有内存同步的 A2A 服务"""port = int(os.getenv('PORT', 9000)) # 端口,默认9000server = A2AServer(agent=self.agent, port=port)# 启动内存维护任务asyncio.create_task(self.memory_maintenance_task())server.serve()async def memory_maintenance_task(self):"""用于内存管理的后台任务"""while True:try:# 清理旧的、低重要性的记忆await self.cleanup_old_memories()# 根据使用情况更新记忆重要性await self.update_memory_scores()await asyncio.sleep(3600)  # 每小时运行一次except Exception as e:print(f"内存维护错误: {e}")async def cleanup_old_memories(self):"""移除旧的、不重要的记忆"""cutoff_time = time.time() - (30 * 24 * 3600)  # 30 天前search_body = {"query": {"bool": {"must": [{"range": {"timestamp": {"lt": cutoff_time}}}, # 时间戳早于 cutoff_time{"range": {"importance_score": {"lt": 0.3}}} # 重要性分数低于 0.3]}}}response = self.memory_bank.client.search(index=self.memory_bank.memory_index,body=search_body)for hit in response['hits']['hits']:self.memory_bank.client.delete(index=self.memory_bank.memory_index,id=hit['_id'])
  1. 多智能体记忆协作

class CollaborativeMemorySystem:def __init__(self):self.opensearch = OpenSearchMemoryBank(endpoint=os.getenv('OPENSEARCH_ENDPOINT'))async def cross_agent_memory_sharing(self, source_agent: str, target_agents: list, memory_query: str):"""在智能体之间共享相关记忆"""# 从源智能体查找相关记忆memories = self.opensearch.semantic_search(query=memory_query,agent_filter=source_agent,k=10)# 为目标智能体创建共享记忆条目for memory in memories:source = memory['_source']shared_key = f"shared_{source['memory_key']}_{int(time.time())}" # 共享键名for target_agent in target_agents:self.opensearch.store_memory(key=shared_key,content=source['content'],agent_id=target_agent,description=f"从 {source_agent} 共享: {source['description']}",tags=source['tags'] + ['shared_memory'], # 添加共享记忆标签importance=source['importance_score'] * 0.8  # 重要性稍低)# 具有协作记忆功能的增强型智能体
class CollaborativeAgent(ProductionAgentWithOpenSearch):def __init__(self, agent_id: str):super().__init__(agent_id)self.memory_system = CollaborativeMemorySystem()@tooldef share_memories_with_agents(self, topic: str, target_agents: str) -> str:"""与其他智能体共享关于某个主题的相关记忆"""agent_list = [a.strip() for a in target_agents.split(",")] # 智能体列表asyncio.create_task(self.memory_system.cross_agent_memory_sharing(source_agent=self.agent_id,target_agents=agent_list,memory_query=topic))return f"正在与 {len(agent_list)} 个智能体共享关于 '{topic}' 的记忆"
  1. 用于自动扩展的 OpenSearch Serverless

# terraform/opensearch.tf
resource "aws_opensearchserverless_collection" "agent_memory" {name = "agent-shared-memory" # 集合名称type = "VECTORSEARCH" # 类型为向量搜索tags = {Environment = "production" # 环境标签Purpose     = "agent-memory-bank" # 用途标签}
}resource "aws_opensearchserverless_access_policy" "agent_memory_access" {name = "agent-memory-access" # 策略名称type = "data" # 策略类型为数据policy = jsonencode([{Rules = [{ResourceType = "index" # 资源类型为索引Resource     = ["index/agent-shared-memory/*"] # 资源名称Permission   = ["aoss:*"] # 权限}]Principal = [aws_iam_role.agent_execution_role.arn] # 主体(执行角色ARN)}])
}
  1. 完整的 Docker 部署

# docker-compose.yml
version: '3.8'
services:opensearch:image: opensearchproject/opensearch:2.11.0 # OpenSearch 镜像environment:- discovery.type=single-node # 单节点发现- plugins.security.disabled=true # 禁用安全插件(仅用于开发)- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # JVM 选项ports:- "9200:9200" # 端口映射volumes:- opensearch_data:/usr/share/opensearch/data # 数据卷orchestrator: # 协调器服务build: ./orchestrator # 构建上下文environment:- AGENT_ID=orchestrator # 智能体ID- OPENSEARCH_ENDPOINT=http://opensearch:9200 # OpenSearch 端点- AGENT_MEMORY_BUCKET=shared-agent-memory # 记忆存储桶ports:- "9000:9000" # 端口映射depends_on:- opensearch # 依赖 OpenSearch 服务data-specialist: # 数据专家服务build: ./data-specialist # 构建上下文environment:- AGENT_ID=data_specialist # 智能体ID- OPENSEARCH_ENDPOINT=http://opensearch:9200 # OpenSearch 端点- AGENT_MEMORY_BUCKET=shared-agent-memory # 记忆存储桶ports:- "9001:9001" # 端口映射depends_on:- opensearch # 依赖 OpenSearch 服务volumes:opensearch_data: # 定义数据卷

OpenSearch 作为智能体记忆库的主要优势

  • 语义搜索 (Semantic Search): 按含义而非仅关键字查找记忆

  • 向量存储 (Vector Storage): 原生支持嵌入向量和相似性搜索

  • 混合搜索 (Hybrid Search): 结合语义和传统搜索

  • 可扩展性 (Scalability): 可处理数十亿条记忆条目

  • 实时性 (Real-time): 即时索引和搜索

  • 分析能力 (Analytics): 内置仪表板和监控

  • 成本效益 (Cost-Effective): 提供按使用量付费的无服务器选项

  • AWS 集成 (AWS Integration): 与 Bedrock、Lambda 等原生集成

对于分布式智能体部署中的动态记忆库,尤其是当您需要智能记忆检索和语义理解时,OpenSearch 绝对是更优越的选择!

资料来源

  • Vector search - Amazon OpenSearch Service (向量搜索 - Amazon OpenSearch Service)

  • Unlock the power of Amazon OpenSearch Service: Your learning guide for search, analytics, and generative AI solutions | AWS Training and Certification Blog (释放 Amazon OpenSearch Service 的力量:您的搜索、分析和生成式 AI 解决方案学习指南 | AWS 培训与认证博客)

  • Vector Embeddings for Search, RAG, Chatbots, Agents, and Generative AI - Vector Database for Amazon OpenSearch Service - AWS (用于搜索、RAG、聊天机器人、智能体和生成式 AI 的向量嵌入 - Amazon OpenSearch Service 的向量数据库 - AWS)

  • Open-Source Search Engine - Amazon OpenSearch Service Managed Service - AWS (开源搜索引擎 - Amazon OpenSearch Service 托管服务 - AWS)

  • Meta-tools - AWS Prescriptive Guidance (元工具 - AWS 规范性指南)


文章转载自:

http://utlC0npz.ydgzj.cn
http://aCG1aBSz.ydgzj.cn
http://epdIXttz.ydgzj.cn
http://vpibBcmt.ydgzj.cn
http://lEMkM1tr.ydgzj.cn
http://wbjbEtQB.ydgzj.cn
http://zh032Fu4.ydgzj.cn
http://45lxvR62.ydgzj.cn
http://qDA598Ir.ydgzj.cn
http://dGr0tx7i.ydgzj.cn
http://6JEluVm6.ydgzj.cn
http://yP9CaEP2.ydgzj.cn
http://rv3SSrY1.ydgzj.cn
http://sqD7qqb3.ydgzj.cn
http://iHQ7phkw.ydgzj.cn
http://w2oHCjvB.ydgzj.cn
http://GvteAfjR.ydgzj.cn
http://ezJXBPv2.ydgzj.cn
http://o2BSr9lW.ydgzj.cn
http://DXzjJAcr.ydgzj.cn
http://QJJENrGd.ydgzj.cn
http://tZYHoaZz.ydgzj.cn
http://DZtwZZtg.ydgzj.cn
http://Iz7Zs9O2.ydgzj.cn
http://dhqS8eER.ydgzj.cn
http://DyIribbx.ydgzj.cn
http://SiSJDjHk.ydgzj.cn
http://rp5zDc0A.ydgzj.cn
http://REJN8LHu.ydgzj.cn
http://r3Nk7Ijt.ydgzj.cn
http://www.dtcms.com/a/377976.html

相关文章:

  • 【设计模式】抽象工厂模式
  • LeetCode 刷题【72. 编辑距离】
  • gitlab流水线与k8s集群的联通
  • 关于神经网络中回归的概念
  • 前后端接口调试提效:Postman + Mock Server 的工作流
  • Cesium---1.133版本不修改源码支持arcgis MapServer 4490切片
  • express 框架基础和 EJS 模板
  • 多楼层室内定位可视化 Demo(A*路径避障)
  • python将pdf转txt,并切割ai
  • 可视化图解算法60: 矩阵最长递增路径
  • 4、幽络源微服务项目实战:后端公共模块创建与引入多租户模块
  • 用Next.js 构建一个简单的 CRUD 应用:集成 API 路由和数据获取
  • 如何通过url打开本地文件文件夹
  • Swagger隐藏入参中属性字段
  • JavaEE--8.网络编程
  • linux系统搭建nacos集群,并通过nginx实现负载均衡
  • 论文阅读:openai 2025 Why Language Models Hallucinate
  • Rail开发日志_9
  • opencv特征检测
  • 科普:环境隔离的工具:虚拟环境与容器Docker
  • 小迪安全v2023学习笔记(八十一讲)—— 框架安全ThinkPHPLaravelStruts2SpringBootCVE复现
  • ubuntu22.04 安装Docker
  • OpenCV 开发 -- 图像阈值处理
  • [Ubuntu][mount]ubuntu电脑挂载新硬盘
  • Maven中optional的作用
  • 使用pdfjs-dist 预览pdf,并添加文本层的实现
  • 操作系统应用开发(五)智能浏览器开发——东方仙盟元婴期
  • 蓝桥杯算法之基础知识(7)---排序题的快排和归并排序
  • leetcode-python-2154将找到的值乘以 2
  • Nginx 实战系列(十)—— LVS+Keepalived 高可用集群技术详解