多智能体篇:智能体的“语言”——ACL协议与消息队列实现
多智能体篇:智能体的“语言”——ACL协议与消息队列实现
引言:多智能体通信的核心
在单智能体开发阶段,我们的智能体主要与用户进行交互,处理任务逻辑并利用记忆和工具调用实现个性化响应。然而,一旦系统扩展为 多智能体系统(MAS,Multi-Agent System),通信成为系统能否高效运行的关键因素。
-
问题:如果每个智能体独立运行,没有统一通信机制,它们无法协作、共享信息或协调任务。
-
解决方案:
- ACL(Agent Communication Language):定义智能体之间通信的规范,明确消息结构和意图
- 消息队列(Message Queue):作为智能体通信的骨架,提供异步、可靠、解耦的消息传输
本文将带你从 ACL理论入手,逐步到 Python实现ACL封装类,再到 消息队列构建智能体总线,最终完成一个 能发送和接收带意图消息的智能体原型。
第一章:ACL协议原理
1.1 什么是ACL?
ACL(Agent Communication Language,智能体通信语言)是多智能体系统中定义消息格式、语义与交互规则的协议。它确保智能体之间能够明确表达意图、理解消息内容并做出响应。
ACL核心概念:
-
Performative(行为)
- 描述消息类型或意图,例如:
inform
(告知)、request
(请求)、query
(查询)、confirm
(确认)、refuse
(拒绝)
- 描述消息类型或意图,例如:
-
Sender & Receiver(发送者与接收者)
- 消息的发送者与接收者标识
-
Content(内容)
- 消息的具体载荷,可以是自然语言文本或结构化数据
-
Ontology(本体)
- 定义消息内容的语义结构,使智能体能够一致理解内容
-
Conversation ID(会话ID)
- 用于标识一轮对话或任务交互,支持多轮消息跟踪
示例ACL消息(JSON格式):
{"performative": "inform","sender": "agent_1","receiver": "agent_2","content": {"task": "document_analysis","status": "completed","result": "核心摘要信息"},"ontology": "DocumentProcessingOntology","conversation_id": "conv_12345"
}
1.2 ACL的作用
- 明确意图:避免智能体误解消息内容
- 支持多轮对话:通过Conversation ID管理任务生命周期
- 跨智能体一致性:Ontology保证语义统一
- 任务追踪与监控:系统可以根据ACL消息日志追踪任务执行状态
1.3 ACL与多智能体交互模式
常见多智能体交互模式:
-
点对点通信(Peer-to-Peer)
- 两个智能体直接交换ACL消息
-
发布-订阅模式(Publish-Subscribe)
- 通过消息总线广播消息,多个智能体可订阅特定主题
-
请求-响应模式(Request-Reply)
- 请求智能体发送任务,响应智能体返回结果
这些模式在消息队列中都有相应的实现方式。
第二章:消息队列概览
2.1 消息队列的作用
消息队列(MQ,Message Queue)是多智能体系统的通信骨架。它可以:
- 异步通信:发送者无需等待接收者响应
- 解耦系统:智能体之间不直接依赖
- 支持并发:多智能体可以同时发送、接收消息
- 可靠传输:消息可以持久化,确保在系统故障时不丢失
2.2 常用消息队列技术
技术 | 特点 | 使用场景 |
---|---|---|
Redis | 内存型,支持发布/订阅(Pub/Sub) | 高速短消息、状态同步 |
RabbitMQ | AMQP协议,持久化消息、复杂路由 | 任务分发、可靠通信 |
Kafka | 高吞吐量、日志存储 | 海量事件流处理、多智能体日志 |
2.3 消息队列与ACL结合
在多智能体系统中:
- ACL定义消息内容和结构
- 消息队列负责传输和分发
二者结合,形成 可扩展的多智能体通信总线。
第三章:ACL消息封装类实现
为了简化ACL消息创建和解析,我们可以封装一个 Python 类。
import json
import uuid
from datetime import datetimeclass ACLMessage:def __init__(self, performative, sender, receiver, content, ontology="DefaultOntology", conversation_id=None):self.performative = performativeself.sender = senderself.receiver = receiverself.content = contentself.ontology = ontologyself.conversation_id = conversation_id or str(uuid.uuid4())self.timestamp = datetime.now().isoformat()def to_json(self):return json.dumps({"performative": self.performative,"sender": self.sender,"receiver": self.receiver,"content": self.content,"ontology": self.ontology,"conversation_id": self.conversation_id,"timestamp": self.timestamp})@staticmethoddef from_json(json_str):data = json.loads(json_str)return ACLMessage(data["performative"],data["sender"],data["receiver"],data["content"],data.get("ontology", "DefaultOntology"),data.get("conversation_id"))# 测试
msg = ACLMessage("inform", "agent_1", "agent_2", {"task": "analyze_document"})
print(msg.to_json())
-
优势:
- 标准化消息结构
- 自动生成会话ID和时间戳
- 支持序列化与反序列化
第四章:消息队列集成
4.1 Redis Pub/Sub 实现智能体通信
安装依赖
pip install redis
发布消息(Producer)
import redisr = redis.Redis(host='localhost', port=6379, db=0)
channel = "agent_channel"msg = ACLMessage("inform", "agent_1", "agent_2", {"task": "document_analysis"})
r.publish(channel, msg.to_json())
订阅消息(Consumer)
import redisr = redis.Redis(host='localhost', port=6379, db=0)
pubsub = r.pubsub()
pubsub.subscribe("agent_channel")for message in pubsub.listen():if message['type'] == 'message':acl_msg = ACLMessage.from_json(message['data'])print(f"接收到消息: {acl_msg.content} 来自: {acl_msg.sender}")
-
特点:
- 简单快速,适合小型多智能体系统
- 支持异步通信
- 不适合复杂路由和消息持久化
4.2 RabbitMQ 实现可靠消息通信
安装依赖
pip install pika
发送消息
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='agent_queue')msg = ACLMessage("request", "agent_1", "agent_2", {"task": "generate_summary"})
channel.basic_publish(exchange='', routing_key='agent_queue', body=msg.to_json())
connection.close()
接收消息
import pikadef callback(ch, method, properties, body):acl_msg = ACLMessage.from_json(body)print(f"接收到消息: {acl_msg.content} 来自: {acl_msg.sender}")connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='agent_queue')
channel.basic_consume(queue='agent_queue', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
-
优势:
- 消息持久化
- 支持复杂路由
- 更适合企业级多智能体系统
第五章:多智能体通信原型设计
5.1 系统架构
- 智能体A:任务发起者
- 智能体B:任务执行者
- 消息总线:Redis或RabbitMQ
- ACL消息:标准化任务和结果
5.2 流程示例
- A生成ACL消息(performative=“request”)
- 发送到消息总线
- B订阅消息,解析内容,执行任务
- B生成ACL消息(performative=“inform”)反馈结果
- A接收结果并继续处理
5.3 Python示例整合
# agent_a.py
msg = ACLMessage("request", "agent_a", "agent_b", {"task": "analyze_document"})
r.publish("agent_channel", msg.to_json())# agent_b.py
for message in pubsub.listen():if message['type'] == 'message':acl_msg = ACLMessage.from_json(message['data'])if acl_msg.performative == "request":result = {"summary": "文档核心摘要"}reply = ACLMessage("inform", "agent_b", acl_msg.sender, result, conversation_id=acl_msg.conversation_id)r.publish("agent_channel", reply.to_json())
-
特点:
- 支持异步通信
- 支持任务追踪(Conversation ID)
- 易于扩展多个智能体