当前位置: 首页 > news >正文

知识点11:总线驱动的多Agent调度

知识点11:总线驱动的多Agent调度

核心概念

学习如何用总线调度Agent

详细内容

Kafka/NATS作为消息总线

Kafka简介

Kafka是一个分布式流处理平台,它具有高吞吐量、高可靠性、分布式等特点,非常适合作为多Agent系统的消息总线。

  • 核心概念
    • 主题(Topic):消息的分类标签
    • 分区(Partition):主题的物理分组,可以并行处理消息
    • 生产者(Producer):发送消息的客户端
    • 消费者(Consumer):接收消息的客户端
    • 消费者组(Consumer Group):多个消费者组成的组,共同消费一个主题
  • 特点
    • 高吞吐量:可以处理每秒数百万条消息
    • 持久化存储:消息可以持久化到磁盘,保证数据可靠性
    • 分布式架构:支持水平扩展,具有高可用性
    • 消息顺序:在同一个分区内保证消息的顺序性
  • 适用场景:大规模数据处理、实时数据流、日志收集、事件驱动架构
from kafka import KafkaProducer, KafkaConsumer# Kafka生产者示例
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8')
)# 发送消息
producer.send('agent_tasks', {'task_id': 'task1', 'type': 'analysis', 'data': 'sample data'})
producer.flush()# Kafka消费者示例
consumer = KafkaConsumer('agent_tasks',bootstrap_servers=['localhost:9092'],group_id='agent_group',value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)# 消费消息
for message in consumer:task = message.valueprint(f"Received task: {task}")# 处理任务
NATS简介

NATS是一个轻量级的消息中间件,它具有简单、高性能、低延迟等特点,也是多Agent系统中常用的消息总线。

  • 核心概念
    • 发布者(Publisher):发送消息的客户端
    • 订阅者(Subscriber):接收消息的客户端
    • 主题(Subject):消息的分类标签,支持通配符
    • 服务器(Server):NATS服务器,负责消息的路由和转发
  • 特点
    • 简单:API简洁,易于使用
    • 高性能:延迟低,吞吐量高
    • 轻量级:服务器和客户端都非常轻量
    • 灵活性:支持多种消息模式,如发布订阅、请求回复、队列等
    • 可靠性:支持消息确认、持久化等可靠性机制
  • 适用场景:实时通信、微服务架构、IoT设备通信、边缘计算
import asyncio
from nats.aio.client import Client as NATS# NATS发布者示例
async def publish_message():nc = NATS()await nc.connect(servers=['nats://localhost:4222'])# 发布消息await nc.publish("agent.tasks", json.dumps({'task_id': 'task1', 'type': 'analysis', 'data': 'sample data'}).encode())# 关闭连接await nc.close()# NATS订阅者示例
async def subscribe_messages():nc = NATS()await nc.connect(servers=['nats://localhost:4222'])async def message_handler(msg):task = json.loads(msg.data.decode())print(f"Received task: {task}")# 处理任务# 订阅消息await nc.subscribe("agent.tasks", cb=message_handler)# 保持连接while True:await asyncio.sleep(1)# 运行示例
if __name__ == '__main__':# 发布消息asyncio.run(publish_message())# 订阅消息(在实际应用中,这通常在单独的进程或线程中运行)# asyncio.run(subscribe_messages())

幂等性与重试机制

在分布式系统中,由于网络故障、节点崩溃等原因,消息可能会被重复发送或接收,因此需要实现幂等性和重试机制来保证系统的可靠性。

幂等性

幂等性是指一个操作无论执行多少次,其结果都是相同的。在多Agent系统中,保证任务处理的幂等性非常重要。

  • 实现方式
    1. 唯一标识符:为每个任务分配唯一的ID,Agent根据ID判断任务是否已经处理过
    2. 状态检查:在执行任务前检查目标状态是否已经达到
    3. 幂等操作:设计任务操作本身就是幂等的,无论执行多少次都不会产生副作用
class IdempotentTaskProcessor:def __init__(self):self.processed_tasks = set()self.lock = threading.Lock()def process_task(self, task):# 检查任务是否已经处理过task_id = task.get('task_id')if not task_id:raise ValueError("Task must have an ID")with self.lock:if task_id in self.processed_tasks:print(f"Task {task_id} already processed, skipping")return {'success': True, 'message': 'Task already processed'}# 标记任务为已处理(在实际处理之前标记,防止重复处理)self.processed_tasks.add(task_id)try:# 实际处理任务print(f"Processing task {task_id}")# 模拟任务处理time.sleep(1)return {'success': True, 'result': 'Task processed successfully'}except Exception as e:# 如果处理失败,从已处理集合中移除with self.lock:if task_id in self.processed_tasks:self.processed_tasks.remove(task_id)return {'success': False, 'error': str(e)}
重试机制

重试机制是指在任务处理失败时,自动重新尝试处理任务,直到任务成功或达到最大重试次数。

  • 实现方式
    1. 固定间隔重试:每次重试之间等待固定的时间
    2. 指数退避重试:每次重试的等待时间呈指数增长
    3. 随机退避重试:在指数退避的基础上增加随机因子,避免多个Agent同时重试
    4. 最大重试次数:设置最大重试次数,防止无限重试
def retry(func, max_retries=3, base_delay=1, max_delay=10):"""重试装饰器,用于添加重试机制到函数参数:func: 要重试的函数max_retries: 最大重试次数base_delay: 基础延迟时间(秒)max_delay: 最大延迟时间(秒)返回:包装后的函数"""def wrapper(*args, **kwargs):retries = 0last_exception = Nonewhile retries <= max_retries:try:# 尝试执行函数return func(*args, **kwargs)except Exception as e:last_exception = eretries += 1if retries > max_retries:# 达到最大重试次数,抛出异常raise# 计算重试延迟(指数退避 + 随机因子)delay = min(base_delay * (2 ** (retries - 1)), max_delay)jitter = random.uniform(0.8, 1.2)  # 添加随机因子,避免雪崩效应actual_delay = delay * jitterprint(f"Operation failed (attempt {retries}/{max_retries}), retrying in {actual_delay:.2f}s: {str(e)}")time.sleep(actual_delay)# 理论上不会到达这里,但为了安全起见raise last_exceptionreturn wrapper# 使用示例
@retry(max_retries=5, base_delay=0.5)
def unstable_operation():"""模拟不稳定的操作,有一定概率失败"""if random.random() < 0.7:raise Exception("Random failure")return "Success"

延迟与优先级队列

在多Agent系统中,不同的任务可能有不同的时间要求和优先级,因此需要实现延迟队列和优先级队列来合理安排任务的执行顺序。

延迟队列

延迟队列允许设置任务的延迟执行时间,任务会在指定的时间后才被处理。

  • 实现方式
    1. 定时检查:定期检查任务的执行时间是否已到
    2. 消息调度:使用支持延迟消息的消息中间件,如RabbitMQ的Delayed Message Exchange、Redis的Sorted Set等
import heapq
import threading
import timeclass DelayedQueue:def __init__(self):self.queue = []  # 优先队列,用于存储延迟任务self.lock = threading.Lock()self.event = threading.Event()self.worker_thread = threading.Thread(target=self._worker, daemon=True)self.worker_thread.start()def _worker(self):"""工作线程,负责处理到达执行时间的任务"""while True:with self.lock:if not self.queue:# 队列为空,等待新任务self.event.clear()else:# 获取最早要执行的任务execute_time, _, task_id, task = self.queue[0]current_time = time.time()if execute_time <= current_time:# 任务已到达执行时间,从队列中移除并执行heapq.heappop(self.queue)# 在锁外执行任务,避免阻塞其他操作self.event.clear()try:self._execute_task(task_id, task)except Exception as e:print(f"Failed to execute task {task_id}: {e}")continueelse:# 任务尚未到达执行时间,计算需要等待的时间wait_time = execute_time - current_timeself.event.clear()# 等待指定时间或直到有新任务加入self.event.wait(wait_time)continue# 等待新任务self.event.wait()def _execute_task(self, task_id, task):"""执行任务的回调函数,需要被子类重写"""raise NotImplementedError("_execute_task must be implemented by subclass")def enqueue(self, task_id, task, delay=0):"""添加任务到延迟队列"""execute_time = time.time() + delaywith self.lock:# 使用heapq插入任务,按照执行时间排序heapq.heappush(self.queue, (execute_time, time.time(), task_id, task))# 通知工作线程有新任务self.event.set()def size(self):"""获取队列中的任务数量"""with self.lock:return len(self.queue)def shutdown(self):"""关闭延迟队列"""self.event.set()self.worker_thread.join(timeout=1.0)# 使用示例
class TaskExecutor(DelayedQueue):def _execute_task(self, task_id, task):print(f"Executing task {task_id}: {task}")# 创建延迟队列
if __name__ == '__main__':executor = TaskExecutor()# 添加延迟任务executor.enqueue("task1", {"action": "send_email"}, delay=2)  # 2秒后执行executor.enqueue("task2", {"action": "generate_report"}, delay=5)  # 5秒后执行executor.enqueue("task3", {"action": "backup_data"}, delay=1)  # 1秒后执行print(f"Queue size: {executor.size()}")# 保持程序运行,直到所有任务执行完成time.sleep(10)executor.shutdown()
优先级队列

优先级队列允许为任务设置优先级,高优先级的任务会先被处理。

  • 实现方式
    1. 优先队列:使用优先队列数据结构,如Python的heapq模块
    2. 优先级主题:在消息中间件中为不同优先级的任务创建不同的主题
import heapq
import threadingclass PriorityQueue:def __init__(self):self.queue = []  # 优先队列self.lock = threading.Lock()self.not_empty = threading.Condition(self.lock)def put(self, priority, task_id, task):"""添加任务到优先级队列,priority越小,优先级越高"""with self.lock:# 使用heapq插入任务,按照优先级排序# 注意:heapq是最小堆,所以优先级越小的任务会先被处理heapq.heappush(self.queue, (priority, time.time(), task_id, task))# 通知等待的线程有新任务self.not_empty.notify()def get(self, block=True, timeout=None):"""从队列中获取优先级最高的任务"""with self.lock:if block:if timeout is not None:end_time = time.time() + timeout# 等待直到队列非空或超时while not self.queue:remaining = end_time - time.time()if remaining <= 0:raise TimeoutError("Priority queue get timed out")self.not_empty.wait(remaining)else:# 无限等待直到队列非空while not self.queue:self.not_empty.wait()if not self.queue:raise IndexError("Priority queue is empty")# 获取优先级最高的任务_, _, task_id, task = heapq.heappop(self.queue)return task_id, taskdef size(self):"""获取队列中的任务数量"""with self.lock:return len(self.queue)def empty(self):"""检查队列是否为空"""with self.lock:return len(self.queue) == 0# 使用示例
if __name__ == '__main__':pq = PriorityQueue()# 添加不同优先级的任务pq.put(3, "task1", {"action": "send_reminder"})  # 低优先级pq.put(1, "task2", {"action": "handle_emergency"})  # 高优先级pq.put(2, "task3", {"action": "process_order"})  # 中优先级print(f"Queue size: {pq.size()}")# 按优先级顺序获取任务while not pq.empty():task_id, task = pq.get()print(f"Processing task {task_id} (priority: ?): {task}")

案例分析

多Agent任务调度:分析→执行→报告

案例描述:使用消息总线实现一个多Agent任务调度系统,该系统包含三种类型的Agent:分析Agent、执行Agent和报告Agent,它们通过消息总线协作完成复杂任务。

系统架构

提交任务
获取任务
分析结果
任务分配
执行结果
汇总报告
生成报告
反馈结果
任务发起者
任务队列
分析Agent
消息总线
执行Agent
报告Agent
结果存储

实现代码

import json
import time
import threading
import random
from kafka import KafkaProducer, KafkaConsumer# 配置信息
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
TASK_TOPIC = 'task_queue'
ANALYSIS_RESULT_TOPIC = 'analysis_results'
EXECUTION_RESULT_TOPIC = 'execution_results'
REPORT_TOPIC = 'reports'class TaskScheduler:def __init__(self):# 初始化Kafka生产者self.producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,value_serializer=lambda x: json.dumps(x).encode('utf-8'))def submit_task(self, task_id, task_data):# 提交任务到任务队列task = {'task_id': task_id,'task_data': task_data,'timestamp': time.time(),'priority': random.randint(1, 5)  # 随机生成优先级}self.producer.send(TASK_TOPIC, task)self.producer.flush()print(f"任务 {task_id} 已提交到队列")def shutdown(self):# 关闭生产者self.producer.close()class AnalysisAgent:def __init__(self, agent_id):self.agent_id = agent_id# 初始化Kafka消费者和生产者self.consumer = KafkaConsumer(TASK_TOPIC,bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,group_id=f'analysis_agents',value_deserializer=lambda x: json.loads(x.decode('utf-8')))self.producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,value_serializer=lambda x: json.dumps(x).encode('utf-8'))self.running = Truedef start(self):# 启动Agentprint(f"分析Agent {self.agent_id} 已启动")try:for message in self.consumer:if not self.running:breaktask = message.valuetask_id = task['task_id']print(f"分析Agent {self.agent_id} 收到任务: {task_id}")# 分析任务try:analysis_result = self.analyze_task(task)# 发送分析结果到消息总线result_message = {'task_id': task_id,'analysis_agent_id': self.agent_id,'analysis_result': analysis_result,'timestamp': time.time()}self.producer.send(ANALYSIS_RESULT_TOPIC, result_message)self.producer.flush()print(f"分析Agent {self.agent_id} 已完成任务分析: {task_id}")except Exception as e:print(f"分析Agent {self.agent_id} 处理任务 {task_id} 失败: {str(e)}")finally:self.shutdown()def analyze_task(self, task):# 模拟任务分析过程task_data = task['task_data']print(f"分析Agent {self.agent_id} 正在分析任务数据: {task_data}")# 模拟分析耗时time.sleep(random.uniform(1, 3))# 根据任务类型进行不同的分析task_type = task_data.get('type', 'default')if task_type == 'data_analysis':analysis = {'required_resources': ['database', 'cpu'],'estimated_time': random.randint(5, 15),'dependencies': [],'analysis_notes': '需要从多个数据源获取数据并进行整合分析'}elif task_type == 'report_generation':analysis = {'required_resources': ['template_engine', 'printer'],'estimated_time': random.randint(3, 10),'dependencies': ['data_analysis_completed'],'analysis_notes': '需要等待数据分析师提供的分析结果'}else:analysis = {'required_resources': ['general'],'estimated_time': random.randint(2, 8),'dependencies': [],'analysis_notes': '标准处理流程'}return analysisdef shutdown(self):# 关闭Agentself.running = Falseself.consumer.close()self.producer.close()print(f"分析Agent {self.agent_id} 已关闭")class ExecutionAgent:def __init__(self, agent_id):self.agent_id = agent_id# 初始化Kafka消费者和生产者self.consumer = KafkaConsumer(ANALYSIS_RESULT_TOPIC,bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,group_id=f'execution_agents',value_deserializer=lambda x: json.loads(x.decode('utf-8')))self.producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,value_serializer=lambda x: json.dumps(x).encode('utf-8'))self.running = Truedef start(self):# 启动Agentprint(f"执行Agent {self.agent_id} 已启动")try:for message in self.consumer:if not self.running:breakanalysis_result = message.valuetask_id = analysis_result['task_id']print(f"执行Agent {self.agent_id} 收到任务: {task_id}")# 执行任务try:execution_result = self.execute_task(analysis_result)# 发送执行结果到消息总线result_message = {'task_id': task_id,'execution_agent_id': self.agent_id,'execution_result': execution_result,'timestamp': time.time()}self.producer.send(EXECUTION_RESULT_TOPIC, result_message)self.producer.flush()print(f"执行Agent {self.agent_id} 已完成任务执行: {task_id}")except Exception as e:print(f"执行Agent {self.agent_id} 处理任务 {task_id} 失败: {str(e)}")finally:self.shutdown()def execute_task(self, analysis_result):# 模拟任务执行过程task_id = analysis_result['task_id']analysis = analysis_result['analysis_result']print(f"执行Agent {self.agent_id} 正在执行任务: {task_id}")# 模拟执行耗时(根据分析结果估计的时间)execution_time = analysis['estimated_time']time.sleep(execution_time * 0.1)  # 简化模拟# 模拟执行结果success = random.random() > 0.1  # 90%的成功率if success:result = {'status': 'completed','data': f'任务 {task_id} 的执行结果数据','execution_time': execution_time,'resources_used': analysis['required_resources'],'notes': '任务执行成功'}else:result = {'status': 'failed','error': '模拟执行失败','attempts': 1,'notes': '任务执行失败,需要重试'}return resultdef shutdown(self):# 关闭Agentself.running = Falseself.consumer.close()self.producer.close()print(f"执行Agent {self.agent_id} 已关闭")class ReportAgent:def __init__(self, agent_id):self.agent_id = agent_id# 初始化Kafka消费者和生产者self.consumer = KafkaConsumer(EXECUTION_RESULT_TOPIC,bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,group_id=f'report_agents',value_deserializer=lambda x: json.loads(x.decode('utf-8')))self.producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,value_serializer=lambda x: json.dumps(x).encode('utf-8'))self.running = Trueself.task_results = {}def start(self):# 启动Agentprint(f"报告Agent {self.agent_id} 已启动")try:for message in self.consumer:if not self.running:breakexecution_result = message.valuetask_id = execution_result['task_id']print(f"报告Agent {self.agent_id} 收到任务结果: {task_id}")# 生成报告try:report = self.generate_report(execution_result)# 发送报告到消息总线report_message = {'task_id': task_id,'report_agent_id': self.agent_id,'report': report,'timestamp': time.time()}self.producer.send(REPORT_TOPIC, report_message)self.producer.flush()print(f"报告Agent {self.agent_id} 已生成任务报告: {task_id}")except Exception as e:print(f"报告Agent {self.agent_id} 处理任务结果 {task_id} 失败: {str(e)}")finally:self.shutdown()def generate_report(self, execution_result):# 生成任务报告task_id = execution_result['task_id']result = execution_result['execution_result']# 模拟报告生成耗时time.sleep(random.uniform(0.5, 1.5))# 根据执行结果生成不同的报告if result['status'] == 'completed':report = {'task_id': task_id,'status': 'success','summary': f'任务 {task_id} 已成功完成','details': {'execution_time': result['execution_time'],'resources_used': result['resources_used'],'data_preview': result['data'][:50] + ('...' if len(result['data']) > 50 else '')},'recommendations': ['任务完成情况良好,建议定期执行类似任务']}else:report = {'task_id': task_id,'status': 'failure','summary': f'任务 {task_id} 执行失败','details': {'error': result['error'],'attempts': result['attempts']},'recommendations': ['建议检查系统资源,然后重试任务', '考虑增加任务执行的超时时间']}return reportdef shutdown(self):# 关闭Agentself.running = Falseself.consumer.close()self.producer.close()print(f"报告Agent {self.agent_id} 已关闭")class ResultCollector:def __init__(self):# 初始化Kafka消费者self.consumer = KafkaConsumer(REPORT_TOPIC,bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,group_id='result_collectors',value_deserializer=lambda x: json.loads(x.decode('utf-8')))self.results = {}self.running = Truedef start(self):# 启动结果收集器print("结果收集器已启动")try:for message in self.consumer:if not self.running:breakreport_message = message.valuetask_id = report_message['task_id']report = report_message['report']# 存储结果self.results[task_id] = reportprint(f"结果收集器已收到任务 {task_id} 的最终报告")print(f"任务 {task_id} 状态: {report['status']}")print(f"任务 {task_id} 摘要: {report['summary']}")finally:self.shutdown()def get_result(self, task_id):# 获取指定任务的结果return self.results.get(task_id)def shutdown(self):# 关闭结果收集器self.running = Falseself.consumer.close()print("结果收集器已关闭")# 测试代码
def run_test():try:# 创建任务调度器scheduler = TaskScheduler()# 创建并启动各个Agentanalysis_agent = AnalysisAgent("analysis_1")execution_agent = ExecutionAgent("execution_1")report_agent = ReportAgent("report_1")result_collector = ResultCollector()# 启动Agent线程threads = [threading.Thread(target=analysis_agent.start),threading.Thread(target=execution_agent.start),threading.Thread(target=report_agent.start),threading.Thread(target=result_collector.start)]for thread in threads:thread.daemon = Truethread.start()# 等待Agent启动time.sleep(2)# 提交几个测试任务tasks = [{"task_id": "task_001", "task_data": {"type": "data_analysis", "subject": "季度销售数据"}},{"task_id": "task_002", "task_data": {"type": "report_generation", "template": "财务报表"}},{"task_id": "task_003", "task_data": {"type": "default", "action": "system_check"}}]for task in tasks:scheduler.submit_task(task["task_id"], task["task_data"])time.sleep(1)# 等待任务处理完成print("\n等待任务处理完成...\n")time.sleep(30)print("\n测试完成!\n")except KeyboardInterrupt:print("\n测试被用户中断\n")finally:# 关闭所有组件try:scheduler.shutdown()analysis_agent.shutdown()execution_agent.shutdown()report_agent.shutdown()result_collector.shutdown()except:passif __name__ == '__main__':run_test()

实践练习

用Kafka实现一个Agent任务流

练习要求

  1. 搭建一个简单的Kafka环境(可以使用Docker快速部署)
  2. 实现至少3个不同类型的Agent,它们通过Kafka进行通信
  3. 设计一个任务流,例如:数据收集→数据处理→结果存储
  4. 实现任务的发布、订阅、处理和结果反馈
  5. 添加错误处理和重试机制
  6. 可以使用Python或其他你熟悉的语言实现

参考答案提示

以下是一个简单的基于Kafka的Agent任务流实现方案:

# 安装必要的库
# pip install kafka-pythonimport json
import time
import threading
import random
from kafka import KafkaProducer, KafkaConsumer# 配置信息
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']# 主题定义
RAW_DATA_TOPIC = 'raw_data'
PROCESSED_DATA_TOPIC = 'processed_data'
STORED_DATA_TOPIC = 'stored_data'
ERROR_TOPIC = 'errors'class DataCollectorAgent:def __init__(self, agent_id):self.agent_id = agent_idself.producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,value_serializer=lambda x: json.dumps(x).encode('utf-8'))self.running = Truedef start(self):print(f"数据收集Agent {self.agent_id} 已启动")counter = 0while self.running:# 模拟收集数据data_id = f"data_{counter}"raw_data = self.collect_data()# 发送数据到Kafkamessage = {'data_id': data_id,'data': raw_data,'collector_id': self.agent_id,'timestamp': time.time()}try:self.producer.send(RAW_DATA_TOPIC, message)self.producer.flush()print(f"数据收集Agent {self.agent_id} 已收集并发送数据: {data_id}")except Exception as e:# 处理发送失败error_message = {'source': 'collector','agent_id': self.agent_id,'data_id': data_id,'error': str(e),'timestamp': time.time()}self.producer.send(ERROR_TOPIC, error_message)self.producer.flush()print(f"数据收集Agent {self.agent_id} 发送数据 {data_id} 失败: {str(e)}")counter += 1# 每2-5秒收集一次数据time.sleep(random.uniform(2, 5))def collect_data(self):# 模拟数据收集# 在实际应用中,这里可能是从API、数据库、文件等获取数据data_type = random.choice(['sensor', 'log', 'user_activity'])if data_type == 'sensor':return {'type': 'temperature','value': round(random.uniform(20, 30), 2),'unit': 'Celsius','location': f'room_{random.randint(1, 10)}'}elif data_type == 'log':return {'type': 'system','level': random.choice(['INFO', 'WARNING', 'ERROR']),'message': f'System event occurred at {time.strftime("%Y-%m-%d %H:%M:%S")}','component': f'component_{random.randint(1, 5)}'}else:return {'type': 'click','user_id': f'user_{random.randint(100, 999)}','page': f'page_{random.randint(1, 20)}','timestamp': time.time()}def shutdown(self):self.running = Falseself.producer.close()print(f"数据收集Agent {self.agent_id} 已关闭")class DataProcessorAgent:def __init__(self, agent_id):self.agent_id = agent_idself.consumer = KafkaConsumer(RAW_DATA_TOPIC,bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,group_id=f'data_processors',value_deserializer=lambda x: json.loads(x.decode('utf-8')))self.producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,value_serializer=lambda x: json.dumps(x).encode('utf-8'))self.running = Truedef start(self):print(f"数据处理Agent {self.agent_id} 已启动")try:for message in self.consumer:if not self.running:breakraw_data_message = message.valuedata_id = raw_data_message['data_id']print(f"数据处理Agent {self.agent_id} 收到数据: {data_id}")# 处理数据try:processed_data = self.process_data(raw_data_message['data'])# 发送处理后的数据到Kafkaprocessed_message = {'data_id': data_id,'processed_data': processed_data,'processor_id': self.agent_id,'timestamp': time.time()}self.producer.send(PROCESSED_DATA_TOPIC, processed_message)self.producer.flush()print(f"数据处理Agent {self.agent_id} 已处理数据: {data_id}")except Exception as e:# 处理失败error_message = {'source': 'processor','agent_id': self.agent_id,'data_id': data_id,'error': str(e),'timestamp': time.time()}self.producer.send(ERROR_TOPIC, error_message)self.producer.flush()print(f"数据处理Agent {self.agent_id} 处理数据 {data_id} 失败: {str(e)}")finally:self.shutdown()def process_data(self, raw_data):# 模拟数据处理data_type = raw_data.get('type')if data_type == 'temperature':# 温度数据处理:转换单位,添加状态value_celsius = raw_data['value']value_fahrenheit = (value_celsius * 9/5) + 32if value_celsius > 26:status = 'high'elif value_celsius < 22:status = 'low'else:status = 'normal'return {'original_value': value_celsius,'value_fahrenheit': round(value_fahrenheit, 2),'status': status,'location': raw_data['location'],'processed_at': time.strftime("%Y-%m-%d %H:%M:%S")}elif data_type in ['INFO', 'WARNING', 'ERROR']:# 日志数据处理:提取关键信息,添加严重程度severity_map = {'INFO': 1, 'WARNING': 2, 'ERROR': 3}return {'level': data_type,'severity': severity_map[data_type],'message_summary': raw_data['message'][:50] + ('...' if len(raw_data['message']) > 50 else ''),'component': raw_data['component'],'processed_at': time.strftime("%Y-%m-%d %H:%M:%S")}else:# 其他数据处理return {'original_type': data_type,'processed_data': raw_data,'status': 'processed','processed_at': time.strftime("%Y-%m-%d %H:%M:%S")}def shutdown(self):self.running = Falseself.consumer.close()self.producer.close()print(f"数据处理Agent {self.agent_id} 已关闭")class DataStorageAgent:def __init__(self, agent_id):self.agent_id = agent_idself.consumer = KafkaConsumer(PROCESSED_DATA_TOPIC,bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,group_id=f'data_storages',value_deserializer=lambda x: json.loads(x.decode('utf-8')))self.producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,value_serializer=lambda x: json.dumps(x).encode('utf-8'))self.running = True# 模拟存储self.storage = {}def start(self):print(f"数据存储Agent {self.agent_id} 已启动")try:for message in self.consumer:if not self.running:breakprocessed_data_message = message.valuedata_id = processed_data_message['data_id']print(f"数据存储Agent {self.agent_id} 收到处理后的数据: {data_id}")# 存储数据try:storage_result = self.store_data(data_id, processed_data_message['processed_data'])# 发送存储结果到Kafkastored_message = {'data_id': data_id,'storage_result': storage_result,'storage_id': self.agent_id,'timestamp': time.time()}self.producer.send(STORED_DATA_TOPIC, stored_message)self.producer.flush()print(f"数据存储Agent {self.agent_id} 已存储数据: {data_id}")except Exception as e:# 存储失败error_message = {'source': 'storage','agent_id': self.agent_id,'data_id': data_id,'error': str(e),'timestamp': time.time()}self.producer.send(ERROR_TOPIC, error_message)self.producer.flush()print(f"数据存储Agent {self.agent_id} 存储数据 {data_id} 失败: {str(e)}")finally:self.shutdown()def store_data(self, data_id, processed_data):# 模拟数据存储# 在实际应用中,这里可能是存储到数据库、文件系统、云存储等time.sleep(random.uniform(0.5, 1.5))  # 模拟存储耗时# 存储到内存(模拟)self.storage[data_id] = processed_data# 生成存储结果return {'status': 'success','storage_location': f'database/collection/{data_id}','size_bytes': len(str(processed_data)),'storage_time': time.strftime("%Y-%m-%d %H:%M:%S")}def shutdown(self):self.running = Falseself.consumer.close()self.producer.close()print(f"数据存储Agent {self.agent_id} 已关闭")# 打印存储统计信息print(f"总共存储了 {len(self.storage)} 条数据")class ErrorHandler:def __init__(self):self.consumer = KafkaConsumer(ERROR_TOPIC,bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,group_id='error_handlers',value_deserializer=lambda x: json.loads(x.decode('utf-8')))self.running = Trueself.errors = []def start(self):print("错误处理器已启动")try:for message in self.consumer:if not self.running:breakerror_message = message.valueself.errors.append(error_message)print(f"捕获到错误: {error_message}")# 这里可以添加错误处理逻辑,如重试、通知管理员等finally:self.shutdown()def shutdown(self):self.running = Falseself.consumer.close()print("错误处理器已关闭")print(f"总共处理了 {len(self.errors)} 个错误")# 启动系统
def start_system():try:# 创建各个Agentcollector = DataCollectorAgent("collector_1")processor = DataProcessorAgent("processor_1")storage = DataStorageAgent("storage_1")error_handler = ErrorHandler()# 启动Agent线程threads = [threading.Thread(target=collector.start),threading.Thread(target=processor.start),threading.Thread(target=storage.start),threading.Thread(target=error_handler.start)]for thread in threads:thread.daemon = Truethread.start()print("\n系统已成功启动!\n")print("数据流向: 数据收集 → 数据处理 → 数据存储")print("按 Ctrl+C 停止系统...\n")# 保持主程序运行while True:time.sleep(1)except KeyboardInterrupt:print("\n正在停止系统...\n")finally:# 关闭所有组件try:collector.shutdown()processor.shutdown()storage.shutdown()error_handler.shutdown()except:passprint("\n系统已完全停止。")if __name__ == '__main__':start_system()

在实际练习中,你需要:

  1. 首先安装和启动Kafka(可以使用Docker快速部署)
  2. 安装必要的Python库(kafka-python)
  3. 根据你的需求修改上面的代码,添加更多的功能和错误处理
  4. 运行代码并观察数据在Agent之间的流动
  5. 可以尝试添加更多的Agent类型,如监控Agent、分析Agent等
  6. 考虑添加持久化存储、监控指标收集等功能

文章转载自:

http://JUaJ5VMq.sjmxh.cn
http://ToRR935v.sjmxh.cn
http://vKbrZOYT.sjmxh.cn
http://3srnRB6C.sjmxh.cn
http://9PyTXb3E.sjmxh.cn
http://ivKN34St.sjmxh.cn
http://FJsF5SSb.sjmxh.cn
http://AHk3Lj6E.sjmxh.cn
http://UgBufW2P.sjmxh.cn
http://Qv7LotBg.sjmxh.cn
http://2V8wyY0o.sjmxh.cn
http://nqsh7jWk.sjmxh.cn
http://lfYMAok1.sjmxh.cn
http://HJOilj6T.sjmxh.cn
http://SQ1oRU78.sjmxh.cn
http://H9tFwdiB.sjmxh.cn
http://3rACssPw.sjmxh.cn
http://GWL1QUcz.sjmxh.cn
http://1rv5CONZ.sjmxh.cn
http://ZRfAFHpi.sjmxh.cn
http://on3Qgu31.sjmxh.cn
http://vxxzzltt.sjmxh.cn
http://SE1X4yvo.sjmxh.cn
http://XaG0Pz9C.sjmxh.cn
http://AZBVEaA7.sjmxh.cn
http://EWcdO5dc.sjmxh.cn
http://pDSlUwO7.sjmxh.cn
http://ZhPrjejE.sjmxh.cn
http://iDgqHIVS.sjmxh.cn
http://gAUCM2xL.sjmxh.cn
http://www.dtcms.com/a/385006.html

相关文章:

  • 使用 Docker 搭建私有 PyPI 镜像仓库:支持多平台二进制包同步
  • HarmonyOS实现快递APP自动识别地址(国际版)
  • IPsec实验笔记
  • 工业IOT平台助力水泥集团实现数字化转型
  • 【CSS】图片自适应等比例缩放
  • Java 21 虚拟线程高并发落地全指南:中间件适配、场景匹配与细节优化的技术实践
  • 设计模式(C++)详解—适配器模式(1)
  • 圆周点生成的数学原理与Python实现
  • 牛客:校门外的树
  • JavaScript数据网格方案AG Grid 34.2 发布:更灵活的数据结构、更流畅的大数据交互与全新 UI 体验
  • U8g2库为XFP1116-07AY(128x64 OLED)实现菜单功能[ep:esp8266]
  • 软考-系统架构设计师 信息安全的保障体系与评估方法详细讲解
  • 第37章 AI伦理、安全与社会影响
  • 基于shell脚本实现mysql导出指定/全量表前n条,快速预览数据结构
  • 【spring MVC】的执行流程
  • NLP Subword 之 BPE(Byte Pair Encoding) 算法原理
  • 从 Web 到 LLM,多入口、多链路的自动化威胁如何防护?
  • Roo Code代码库索引功能
  • 以太网链路聚合实验
  • 机理流程图绘制,如此简单 !
  • 从按钮到接口:权限系统设计的艺术与实践 —— 打造细粒度可扩展的权限架构
  • 3D 打印在道具制作领域的应用调研与轻资产介入策略创意报告
  • Python多进程通信完全指南:打破进程隔离的壁垒
  • webrtc之语音活动下——VAD人声判定原理以及源码详解
  • S32K3平台RTC应用笔记
  • 开源收银系统_大型收银系统源码_OctShop
  • UE5 蓝图接口函数类型知多少?
  • 【MySQL分库分表:海量数据架构的终极解决方案】
  • 深入解析 Apache RocketMQ架构组成与核心组件作用
  • Tomcat下载和安装教程(图文并茂,适合新手)