如何让Agent停止输出,使用strands agents sdk
停止请求(也称为取消或中断)允许你在智能体执行过程中中止运行。这对于长时间运行的任务、用户发起的取消操作和资源管理至关重要。让我展示几种实现方法:
获取知识
完美!让我为你展示为 Strands 智能体实现停止请求(取消/中断)的全面方法。这对于需要取消长时间运行操作的生产环境应用至关重要。
1. 基础 Asyncio 取消功能
1.1 简单任务取消
import asyncio from strands import Agent, tool import time@tool def long_running_task(duration: int = 10) -> str:"""需要长时间运行的任务"""for i in range(duration):time.sleep(1) # 模拟工作print(f"工作中... {i+1}/{duration}")return f"任务在 {duration} 秒后完成"agent = Agent(name="可取消智能体",tools=[long_running_task],prompt="你执行可取消的长时间运行任务。" )class CancellableAgent:def __init__(self, agent: Agent):self.agent = agentself.current_task = Noneself.is_cancelled = Falseasync def run_with_cancellation(self, message: str, timeout: float = None):"""运行支持取消的智能体"""try:# 创建可取消任务self.current_task = asyncio.create_task(self.agent.run_async(message))if timeout:# 带超时运行result = await asyncio.wait_for(self.current_task, timeout=timeout)else:# 无超时运行result = await self.current_taskreturn resultexcept asyncio.CancelledError:print("❌ 智能体执行被取消")return "任务被用户取消"except asyncio.TimeoutError:print("⏰ 智能体执行超时")self.cancel()return "任务超时"except Exception as e:print(f"❌ 错误: {e}")return f"错误: {e}"def cancel(self):"""取消正在运行的智能体"""if self.current_task and not self.current_task.done():print("🛑 取消智能体执行...")self.current_task.cancel()self.is_cancelled = Truereturn Truereturn False# 使用示例 async def cancellation_example():"""智能体取消示例"""cancellable = CancellableAgent(agent)# 启动长时间运行任务task = asyncio.create_task(cancellable.run_with_cancellation("启动一个10秒的任务"))# 等待一会,然后取消await asyncio.sleep(3)print("用户要求取消...")cancelled = cancellable.cancel()if cancelled:print("✅ 取消请求已发送")# 等待任务完成(或被取消)result = await taskprint(f"最终结果: {result}")# 运行示例 asyncio.run(cancellation_example())
1.2 支持取消的流式处理
python
import asyncio from strands import Agent, tool@tool def streaming_task(steps: int = 20) -> str:"""可在流式处理期间取消的任务"""import timeresults = []for i in range(steps):time.sleep(0.5) # 模拟工作results.append(f"步骤 {i+1} 完成")return f"所有 {steps} 个步骤完成: {', '.join(results)}"streaming_agent = Agent(name="流式处理智能体",tools=[streaming_task],prompt="你逐步执行流式处理任务。" )class CancellableStreamingAgent:def __init__(self, agent: Agent):self.agent = agentself.current_stream = Noneself.is_cancelled = Falseasync def stream_with_cancellation(self, message: str):"""支持取消的流式处理"""try:self.is_cancelled = Falseasync for chunk in self.agent.stream_async(message):# 检查取消if self.is_cancelled:print("\n🛑 流被用户取消")breakprint(chunk, end="", flush=True)# 小延迟以允许取消检查await asyncio.sleep(0.1)if not self.is_cancelled:print("\n✅ 流完成")except asyncio.CancelledError:print("\n❌ 流被取消")except Exception as e:print(f"\n❌ 流错误: {e}")def cancel_stream(self):"""取消当前流"""self.is_cancelled = Trueprint("🛑 取消请求已发送...")# 使用示例 async def streaming_cancellation_example():"""流式处理取消示例"""streaming_cancellable = CancellableStreamingAgent(streaming_agent)# 启动流式处理任务stream_task = asyncio.create_task(streaming_cancellable.stream_with_cancellation("启动一个20步的流式处理任务"))# 3秒后取消await asyncio.sleep(3)streaming_cancellable.cancel_stream()# 等待流结束await stream_taskasyncio.run(streaming_cancellation_example())
2. 支持停止请求的 Web API
2.1 支持取消的 FastAPI
python
from fastapi import FastAPI, BackgroundTasks, HTTPException from fastapi.responses import StreamingResponse from strands import Agent, tool import asyncio import uuid from typing import Dict import jsonapp = FastAPI()# 全局任务注册表 running_tasks: Dict[str, asyncio.Task] = {}@tool def long_computation(complexity: int = 10) -> str:"""执行长时间计算"""import timetotal = 0for i in range(complexity * 1000000):total += iif i % 100000 == 0:time.sleep(0.01) # 允许取消点return f"计算结果: {total}"web_agent = Agent(name="Web智能体",tools=[long_computation],prompt="你执行可取消的计算任务。" )@app.post("/start-task") async def start_task(message: str):"""启动可取消任务"""task_id = str(uuid.uuid4())async def run_agent():try:result = await web_agent.run_async(message)return {"status": "completed", "result": str(result)}except asyncio.CancelledError:return {"status": "cancelled", "result": "任务被取消"}except Exception as e:return {"status": "error", "result": str(e)}# 启动任务task = asyncio.create_task(run_agent())running_tasks[task_id] = taskreturn {"task_id": task_id, "status": "started"}@app.post("/cancel-task/{task_id}") async def cancel_task(task_id: str):"""取消运行中的任务"""if task_id not in running_tasks:raise HTTPException(status_code=404, detail="任务未找到")task = running_tasks[task_id]if task.done():return {"status": "already_completed"}# 取消任务task.cancel()try:await taskexcept asyncio.CancelledError:pass# 清理del running_tasks[task_id]return {"status": "cancelled", "task_id": task_id}@app.get("/task-status/{task_id}") async def get_task_status(task_id: str):"""获取任务状态"""if task_id not in running_tasks:raise HTTPException(status_code=404, detail="任务未找到")task = running_tasks[task_id]if task.done():try:result = await taskdel running_tasks[task_id] # 清理已完成任务return resultexcept asyncio.CancelledError:del running_tasks[task_id]return {"status": "cancelled"}else:return {"status": "running"}@app.get("/stream-task") async def stream_task(message: str):"""流式处理可取消任务"""task_id = str(uuid.uuid4())async def generate_stream():try:async for chunk in web_agent.stream_async(message):yield f"data: {json.dumps({'type': 'chunk', 'data': chunk, 'task_id': task_id})}\n\n"await asyncio.sleep(0.1) # 允许取消yield f"data: {json.dumps({'type': 'complete', 'task_id': task_id})}\n\n"except asyncio.CancelledError:yield f"data: {json.dumps({'type': 'cancelled', 'task_id': task_id})}\n\n"return StreamingResponse(generate_stream(),media_type="text/event-stream",headers={"X-Task-ID": task_id})# 客户端使用示例: """ # 启动任务 response = requests.post("http://localhost:8000/start-task", params={"message": "执行复杂度为50的复杂计算"}) task_id = response.json()["task_id"]# 检查状态 status = requests.get(f"http://localhost:8000/task-status/{task_id}")# 如果需要则取消 cancel_response = requests.post(f"http://localhost:8000/cancel-task/{task_id}") """if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)
2.2 支持实时取消的 WebSocket
python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect from strands import Agent, tool import asyncio import json import uuidapp = FastAPI()@tool def interactive_task(steps: int = 10) -> str:"""可取消的交互式任务"""import timefor i in range(steps):time.sleep(1)print(f"交互步骤 {i+1}/{steps}")return f"交互任务完成,共 {steps} 步"ws_agent = Agent(name="WebSocket智能体",tools=[interactive_task],prompt="你通过WebSocket执行交互式任务。" )@app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket):"""支持取消的WebSocket"""await websocket.accept()current_task = Nonetry:while True:# 接收消息data = await websocket.receive_text()message_data = json.loads(data)if message_data["type"] == "start":# 如果之前有任务在运行,则取消if current_task and not current_task.done():current_task.cancel()# 启动新任务async def run_task():try:async for chunk in ws_agent.stream_async(message_data["message"]):await websocket.send_text(json.dumps({"type": "chunk","data": chunk}))await asyncio.sleep(0.1)await websocket.send_text(json.dumps({"type": "complete"}))except asyncio.CancelledError:await websocket.send_text(json.dumps({"type": "cancelled"}))current_task = asyncio.create_task(run_task())elif message_data["type"] == "cancel":if current_task and not current_task.done():current_task.cancel()await websocket.send_text(json.dumps({"type": "cancelled"}))else:await websocket.send_text(json.dumps({"type": "no_task_running"}))except WebSocketDisconnect:if current_task and not current_task.done():current_task.cancel()# 客户端JavaScript示例: """ const ws = new WebSocket('ws://localhost:8000/ws');// 启动任务 ws.send(JSON.stringify({type: 'start',message: '启动一个15步的交互式任务' }));// 取消任务 ws.send(JSON.stringify({type: 'cancel' }));ws.onmessage = function(event) {const data = JSON.parse(event.data);console.log('收到:', data); }; """
3. 支持停止请求的 A2A
3.1 可取消的 A2A 客户端
python
from strands_tools.a2a_client import A2AClientToolProvider from strands import Agent import asyncio import httpxclass CancellableA2AClient:"""支持取消的A2A客户端"""def __init__(self, agent_urls: list):self.provider = A2AClientToolProvider(known_agent_urls=agent_urls)self.client_agent = Agent(name="可取消A2A客户端",tools=self.provider.tools,prompt="你与远程智能体协调,并且可以被取消。")self.current_request = Noneasync def call_with_cancellation(self, message: str, timeout: float = 30):"""调用支持取消的A2A智能体"""try:# 创建可取消任务self.current_request = asyncio.create_task(self.client_agent.run_async(message))# 带超时等待result = await asyncio.wait_for(self.current_request, timeout=timeout)return {"status": "success", "result": str(result)}except asyncio.CancelledError:return {"status": "cancelled", "result": "请求被取消"}except asyncio.TimeoutError:self.cancel_current_request()return {"status": "timeout", "result": f"请求在 {timeout} 秒后超时"}except Exception as e:return {"status": "error", "result": str(e)}def cancel_current_request(self):"""取消当前A2A请求"""if self.current_request and not self.current_request.done():self.current_request.cancel()return Truereturn False# 使用示例 async def a2a_cancellation_example():"""A2A取消示例"""a2a_client = CancellableA2AClient(["http://localhost:9000","http://localhost:9001"])# 启动A2A请求request_task = asyncio.create_task(a2a_client.call_with_cancellation("执行复杂的多智能体计算", timeout=10))# 3秒后取消await asyncio.sleep(3)cancelled = a2a_client.cancel_current_request()if cancelled:print("🛑 A2A请求已取消")result = await request_taskprint(f"结果: {result}")# asyncio.run(a2a_cancellation_example())
3.2 可取消的 A2A 服务器
python
from strands import Agent, tool from strands.multiagent.a2a import A2AServer import asyncio import threading import timeclass CancellableA2AServer:"""支持取消的A2A服务器"""def __init__(self, port: int):self.current_operations = {} # 跟踪运行中的操作self.agent = Agent(name="可取消A2A服务器",tools=[self.cancellable_operation,self.cancel_operation,self.list_operations],prompt="你执行可通过操作ID取消的操作。")self.server = A2AServer(agent=self.agent, port=port)@tooldef cancellable_operation(self, operation_name: str, duration: int = 10) -> str:"""启动可取消操作"""import uuidoperation_id = str(uuid.uuid4())# 创建取消事件cancel_event = threading.Event()self.current_operations[operation_id] = {"name": operation_name,"cancel_event": cancel_event,"status": "running"}def long_operation():"""实际长时间运行的操作"""for i in range(duration):if cancel_event.is_set():self.current_operations[operation_id]["status"] = "cancelled"return f"操作 {operation_name} 在第 {i} 步被取消"time.sleep(1) # 模拟工作self.current_operations[operation_id]["status"] = "completed"return f"操作 {operation_name} 成功完成"# 在后台线程中启动操作thread = threading.Thread(target=long_operation, daemon=True)thread.start()return f"已启动操作 '{operation_name}',ID: {operation_id}"@tooldef cancel_operation(self, operation_id: str) -> str:"""取消运行中的操作"""if operation_id not in self.current_operations:return f"操作 {operation_id} 未找到"operation = self.current_operations[operation_id]if operation["status"] != "running":return f"操作 {operation_id} 未在运行 (状态: {operation['status']})"# 发送取消信号operation["cancel_event"].set()operation["status"] = "cancelling"return f"已请求取消操作 {operation_id}"@tooldef list_operations(self) -> str:"""列出所有操作及其状态"""if not self.current_operations:return "没有运行中的操作"operations = []for op_id, op_info in self.current_operations.items():operations.append(f"ID: {op_id}, 名称: {op_info['name']}, 状态: {op_info['status']}")return "操作:\n" + "\n".join(operations)def start_server(self):"""启动可取消A2A服务器"""print(f"🚀 在端口 {self.server.port} 上启动可取消A2A服务器")self.server.serve()# 使用示例 if __name__ == "__main__":server = CancellableA2AServer(9000)server.start_server()
4. 高级取消模式
4.1 带清理的优雅关闭
python
import asyncio import signal from strands import Agent, tool import atexitclass GracefulAgent:"""支持优雅关闭和清理的智能体"""def __init__(self):self.running_tasks = set()self.is_shutting_down = False# 注册信号处理器signal.signal(signal.SIGINT, self.signal_handler)signal.signal(signal.SIGTERM, self.signal_handler)atexit.register(self.cleanup)self.agent = Agent(name="优雅智能体",tools=[self.managed_task],prompt="你执行支持优雅关闭的任务。")@tooldef managed_task(self, task_name: str, duration: int = 5) -> str:"""支持优雅关闭的任务"""import timefor i in range(duration):if self.is_shutting_down:return f"任务 '{task_name}' 在第 {i} 步优雅停止"time.sleep(1)print(f"任务 '{task_name}' - 步骤 {i+1}/{duration}")return f"任务 '{task_name}' 成功完成"async def run_with_tracking(self, message: str):"""运行带任务跟踪的智能体"""task = asyncio.create_task(self.agent.run_async(message))self.running_tasks.add(task)try:result = await taskreturn resultfinally:self.running_tasks.discard(task)def signal_handler(self, signum, frame):"""处理关闭信号"""print(f"\n🛑 收到信号 {signum},启动优雅关闭...")self.is_shutting_down = True# 取消所有运行中的任务for task in self.running_tasks:if not task.done():task.cancel()def cleanup(self):"""退出时清理"""print("🧹 执行清理...")self.is_shutting_down = True# 使用示例 async def graceful_shutdown_example():"""优雅关闭示例"""graceful_agent = GracefulAgent()try:# 启动多个任务tasks = [graceful_agent.run_with_tracking(f"启动受管任务 'Task-{i}',时长10秒")for i in range(3)]# 等待所有任务results = await asyncio.gather(*tasks, return_exceptions=True)for i, result in enumerate(results):print(f"任务 {i} 结果: {result}")except KeyboardInterrupt:print("被用户中断")# 使用 Ctrl+C 运行以测试优雅关闭 # asyncio.run(graceful_shutdown_example())
4.2 基于超时的取消
python
import asyncio from strands import Agent, tool from typing import Optionalclass TimeoutAgent:"""可配置超时的智能体"""def __init__(self, default_timeout: float = 30):self.default_timeout = default_timeoutself.agent = Agent(name="超时智能体",tools=[self.timed_operation],prompt="你执行支持超时的操作。")@tooldef timed_operation(self, operation: str, duration: int = 5) -> str:"""可能超时的操作"""import timestart_time = time.time()for i in range(duration):time.sleep(1)elapsed = time.time() - start_timeprint(f"操作 '{operation}' - 已用时 {elapsed:.1f}秒")return f"操作 '{operation}' 在 {duration} 秒内完成"async def run_with_timeout(self, message: str, timeout: Optional[float] = None) -> dict:"""带超时运行智能体"""timeout = timeout or self.default_timeouttry:result = await asyncio.wait_for(self.agent.run_async(message),timeout=timeout)return {"status": "success","result": str(result),"timeout": timeout}except asyncio.TimeoutError:return {"status": "timeout","result": f"操作在 {timeout} 秒后超时","timeout": timeout}except Exception as e:return {"status": "error","result": str(e),"timeout": timeout}# 使用示例 async def timeout_example():"""基于超时的取消示例"""timeout_agent = TimeoutAgent(default_timeout=5)# 这个会超时result1 = await timeout_agent.run_with_timeout("启动定时操作 'LongTask',时长10秒",timeout=3)print(f"结果 1: {result1}")# 这个会完成result2 = await timeout_agent.run_with_timeout("启动定时操作 'ShortTask',时长2秒",timeout=5)print(f"结果 2: {result2}")asyncio.run(timeout_example())
关键要点:
使用
asyncio.create_task()
创建可取消任务调用
task.cancel()
请求取消在代码中处理
asyncio.CancelledError
使用
asyncio.wait_for()
实现基于超时的取消在长时间运行的操作中实现协作式取消
跟踪运行中的任务以进行适当清理
处理信号以实现优雅关闭
向用户提供取消状态的反馈
这些模式让你完全控制智能体执行,并允许用户在需要时停止操作!🛑