当前位置: 首页 > news >正文

【RabbitMq】七种工作模式

目录

RabbitMQ 核心概念快速回顾

RabbitMQ 的七种工作模式详解

1. 简单模式 (Simple Mode)

2. 工作队列模式 (Work Queue Mode)

3. 发布/订阅模式 (Publish/Subscribe Mode)

4. 路由模式 (Routing Mode)

5. 通配符模式 (Topics Mode)

6. RPC 模式 (RPC Mode)

7. 发布确认模式 (Publisher Confirms Mode)

总结与选择指南


RabbitMQ 核心概念快速回顾

在深入工作模式前,先了解几个贯穿所有模式的基础概念:

  • Producer(生产者)​​:发送消息的应用程序。

  • Consumer(消费者)​​:接收并处理消息的应用程序。

  • Broker(服务器)​​:指 RabbitMQ 服务本身,负责接收、存储和转发消息。

  • Connection(连接)​​:生产者/消费者与 Broker 之间的 TCP 长连接。

  • Channel(信道)​​:在 Connection 上建立的轻量级逻辑连接,用于执行具体的 AMQP 指令。多个 Channel 可复用同一个 TCP 连接,减少开销。

  • Virtual Host(虚拟主机)​​:类似于命名空间,用于在单个 Broker 内进行资源逻辑隔离。

  • Exchange(交换机)​​:消息的路由中心,负责接收生产者发送的消息,并根据类型和规则将消息分发到一个或多个队列。本身不存储消息。

  • Queue(队列)​​:存储消息的缓冲区,消费者从中获取消息。

  • Binding(绑定)​​:连接 Exchange 和 Queue 的规则,定义了消息如何从交换机路由到队列。

  • Routing Key(路由键)​​:生产者发送消息时指定的一个关键字,Exchange 根据它来决定消息的路由路径。

  • Binding Key(绑定键)​​:在 Binding 过程中,队列与交换机关联时指定的关键字,用于和 Routing Key 进行匹配。

RabbitMQ 的七种工作模式详解

1. 简单模式 (Simple Mode)
  • 模式简介​:最简单的点对点通信模型。一个生产者直接向一个队列发送消息,一个消费者从该队列消费消息。此模式通常使用 RabbitMQ 默认的匿名交换机(名称为空字符串"")。

  • 工作原理图​:

  • 代码示例(Java)​​:

    • 生产者​:

      import com.rabbitmq.client.*;public class SimpleProducer {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列(持久化、非独占、不自动删除)channel.queueDeclare(QUEUE_NAME, true, false, false, null);String message = "Hello, Simple Mode!";// 使用默认交换机,路由键为队列名channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
      }
    • 消费者​:

      import com.rabbitmq.client.*;public class SimpleConsumer {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
      }
  • 应用场景​:简单的任务通知,如发送单条短信或邮件,只需要一个消费者处理的场景。

2. 工作队列模式 (Work Queue Mode)
  • 模式简介​:一个生产者,一个队列,但对应多个消费者。队列中的消息会被平均地分配给各个消费者(默认轮询方式),每个消息只能被一个消费者处理。

  • 核心概念​:​消息预取 (QoS)​​:通过 channel.basicQos(prefetchCount)方法限制消费者一次最多预取的消息数量,实现公平分发,防止处理慢的消费者积压消息。

  • 工作原理图​:

  • 代码示例(Java)​​:

    • 生产者​(发送多条消息):

      // ... (连接工厂设置同简单模式)
      try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare("work_queue", true, false, false, null);for (int i = 0; i < 10; i++) {String message = "Task " + i;channel.basicPublish("", "work_queue", null, message.getBytes());}
      }
    • 消费者​(需启动多个实例,并设置 QoS):

      // ... (连接工厂设置同简单模式)
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      channel.queueDeclare("work_queue", true, false, false, null);
      // 设置公平分发,一次只预取一条消息
      channel.basicQos(1);
      DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 模拟耗时任务try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {// 手动确认消息处理完成channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
      };
      // 关闭自动确认,改为手动确认
      channel.basicConsume("work_queue", false, deliverCallback, consumerTag -> { });
  • 应用场景​:任务分发和负载均衡,例如处理大量订单,由多个工作进程并行处理。

3. 发布/订阅模式 (Publish/Subscribe Mode)
  • 模式简介​:引入 ​Fanout 类型的交换机。生产者将消息发送到交换机,交换机会将消息广播到所有与之绑定的队列。每个队列都有一个消费者,因此一条消息会被多个消费者同时收到。

  • 核心概念​:​Fanout Exchange​:不处理任何 Routing Key,会将收到的消息广播到所有绑定到它的队列。

  • 工作原理图​:

  • 代码示例(Java)​​:

    • 生产者​(声明 Fanout 交换机并发送消息):

      // ... (连接工厂设置)
      try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明一个Fanout类型的交换机channel.exchangeDeclare("logs_exchange", BuiltinExchangeType.FANOUT);String message = "Broadcast Message!";// 发送消息到交换机,路由键在Fanout模式下可为空channel.basicPublish("logs_exchange", "", null, message.getBytes());
      }
    • 消费者​(每个消费者创建临时队列并绑定到交换机):

      // ... (连接工厂设置)
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      // 声明一个临时队列(非持久化、独占、自动删除)
      String queueName = channel.queueDeclare().getQueue();
      // 将该队列绑定到指定的Fanout交换机,路由键为空
      channel.queueBind(queueName, "logs_exchange", "");
      DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");
      };
      channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  • 应用场景​:系统广播、事件通知。例如,用户注册后需同时发送邮件、初始化资料、写日志等。

4. 路由模式 (Routing Mode)
  • 模式简介​:使用 ​Direct 类型的交换机。队列在绑定时会指定一个 ​Binding Key。生产者发送消息时指定 ​Routing Key。交换机只会将消息路由给那些 Binding Key 与 Routing Key ​完全匹配​ 的队列。

  • 核心概念​:​Direct Exchange​:基于 Routing Key 的精确匹配进行路由。

  • 工作原理图​:

  • 代码示例(Java,使用 Spring AMQP 示例)​​:

    • 配置类​(声明 Direct Exchange、队列和绑定):

      @Configuration
      public class RabbitConfig {@Beanpublic DirectExchange directExchange() {return new DirectExchange("exchange.direct");}@Beanpublic Queue queueA() {return new Queue("queue.direct.a");}@Beanpublic Queue queueB() {return new Queue("queue.direct.b");}@Beanpublic Binding bindingA(DirectExchange directExchange, Queue queueA) {// 将队列A绑定到交换机,Binding Key为"error"return BindingBuilder.bind(queueA).to(directExchange).with("error");}@Beanpublic Binding bindingB(DirectExchange directExchange, Queue queueB) {// 将队列B绑定到交换机,Binding Key为"info"return BindingBuilder.bind(queueB).to(directExchange).with("info");}
      }
    • 生产者​(发送消息时指定 Routing Key):

      @Service
      public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendErrorMsg() {String message = "An error occurred!";// 发送到Direct Exchange,指定Routing Key为"error"rabbitTemplate.convertAndSend("exchange.direct", "error", message);}public void sendInfoMsg() {String message = "This is an info message.";// 发送到Direct Exchange,指定Routing Key为"info"rabbitTemplate.convertAndSend("exchange.direct", "info", message);}
      }
    • 消费者​(监听特定队列):

      @Service
      public class MessageReceiver {@RabbitListener(queues = "queue.direct.a")public void receiveErrorMsg(String message) {System.out.println("Error Queue Received: " + message);}@RabbitListener(queues = "queue.direct.b")public void receiveInfoMsg(String message) {System.out.println("Info Queue Received: " + message);}
      }
  • 应用场景​:有选择地接收消息。例如日志系统,将 error、warning、info 日志分别路由到不同队列处理。

5. 通配符模式 (Topics Mode)
  • 模式简介​:路由模式的增强版,使用 ​Topic 类型的交换机。Binding Key 和 Routing Key 都是包含多部分的字符串(用点.分隔)。支持两种通配符进行模糊匹配​:*(匹配一个单词)和#(匹配零个或多个单词)。

  • 核心概念​:​Topic Exchange​:基于通配符匹配的路由机制,非常灵活。

  • 工作原理图​:

  • 代码示例(Java)​​:

    • 生产者​(发送带有复杂 Routing Key 的消息):

      // ... (连接工厂设置)
      try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明一个Topic类型的交换机channel.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);String routingKey = "quick.orange.rabbit";String message = "A quick orange rabbit message.";channel.basicPublish("topic_logs", routingKey, null, message.getBytes());
      }
    • 消费者​(使用通配符绑定模式):

      // ... (连接工厂设置)
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      String queueName = channel.queueDeclare().getQueue();
      // 绑定使用通配符,接收所有中间单词为orange的消息
      channel.queueBind(queueName, "topic_logs", "*.orange.*");
      DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");
      };
      channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  • 应用场景​:需要高度灵活性的消息路由。例如新闻推送,用户可订阅如 sports.*(所有体育新闻)、*.europe.weather(欧洲天气)等主题。

6. RPC 模式 (RPC Mode)
  • 模式简介​:利用 RabbitMQ 实现类似远程过程调用。客户端发送请求消息,并在消息属性中设置一个回调队列 (replyTo) 和唯一标识 (correlationId)。服务端处理请求后,将响应发送到回调队列,客户端通过 correlationId匹配响应。

  • 核心概念​:​回调队列 (Reply-To Queue)​​ 和 ​关联ID (Correlation ID)​​:用于匹配请求和响应。

  • 工作原理图​:

  • 代码示例(Java,关键步骤概览)​​:

    • 客户端​:

      // 生成唯一的correlationId和回调队列名
      String corrId = java.util.UUID.randomUUID().toString();
      String replyQueueName = channel.queueDeclare().getQueue();
      // 设置消息属性
      AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
      // 发送请求到请求队列
      channel.basicPublish("", "rpc_queue", props, message.getBytes());
      // 监听回调队列,等待响应
    • 服务端​:

      // 从请求队列消费消息
      DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 处理请求...String response = processRequest(new String(delivery.getBody()));// 获取客户端指定的回调队列名String replyTo = delivery.getProperties().getReplyTo();// 发送响应到回调队列,并设置相同的correlationIdAMQP.BasicProperties respProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();channel.basicPublish("", replyTo, respProps, response.getBytes());
      };
      channel.basicConsume("rpc_queue", true, deliverCallback, consumerTag -> { });
  • 应用场景​:需要等待服务端处理结果的后端交互,如计算密集型任务。

7. 发布确认模式 (Publisher Confirms Mode)
  • 模式简介​:一种可靠性投递机制,而非新的通信拓扑。生产者通过将信道设置为 Confirm 模式,可以确保消息成功发送到 RabbitMQ 服务器。服务器会异步返回一个确认 (ACK) 或未确认 (NACK) 回执。

  • 核心概念​:​发布者确认 (Publisher Confirm)​​:RabbitMQ 提供的消息可靠投递机制。

  • 工作原理图​:

  • 代码示例(Java,异步确认)​​:

    // 将通道设置为Confirm模式
    channel.confirmSelect();
    // 添加异步确认监听器
    channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {// 消息成功到达BrokerSystem.out.println("Message with tag " + deliveryTag + " ACK'd");}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {// 消息可能未到达Broker,需要重发System.out.println("Message with tag " + deliveryTag + " NACK'd");}
    });
    // 发送消息
    String message = "Important message!";
    channel.basicPublish("", "confirmed_queue", null, message.getBytes());
  • 应用场景​:对消息可靠性要求极高的场景,如金融交易、订单支付等。

总结与选择指南

下表总结了七种模式的核心特点,助您按需选择:

工作模式

核心组件

路由规则

典型应用场景

简单模式

默认交换机

队列名精确匹配

单任务通知

工作队列模式

默认交换机

轮询分发

任务分发、负载均衡

发布/订阅模式

Fanout Exchange

广播

事件广播、系统通知

路由模式

Direct Exchange

Routing Key 精确匹配

有选择的消息路由(如日志分级)

通配符模式

Topic Exchange

Routing Key 通配符匹配

灵活的主题订阅(如新闻推送)

RPC模式

两个队列

请求/响应配对

需要同步结果的远程调用

发布确认模式

Confirm机制

可靠性确认

确保消息可靠投递

建议​:

  1. 消息持久化​:对于重要消息,将队列 (durable=true) 和消息 (deliveryMode=2) 都设置为持久化。

  2. 手动ACK​:在消费者端使用手动确认,确保消息被成功处理后再从队列中移除。

  3. QoS预取​:在工作队列模式中,使用 channel.basicQos(1)实现公平分发。

  4. 连接复用​:使用连接池和每个线程独立的 Channel 来优化性能。

http://www.dtcms.com/a/435550.html

相关文章:

  • 官方网站下载cad建设部监理协会网站
  • 万方智能体投票火热进行中~
  • 不可见系统(Invisibility)
  • 建设读书网站的意义黄冈网站推广平台
  • SpringAI-Alibaba 快速开始
  • 网站制作费用一览表自己怎么设计公主房
  • 西安网站建设缑阳建中文搜索引擎排名
  • 五种IO模型,同步IO和异步IO
  • 网站开发环境安装程序nodejs wordpress
  • wordpress跨站脚本攻击漏洞网站风格的表现形式
  • html个人网站怎么做搜狗推广管家
  • 【读书笔记】《Linux内核设计与实现》(第1章-第5章)
  • C++中继承的理解与应用
  • 深圳有哪些网站是做餐饮沙龙的如何进行网站制作
  • Linux基本使用(Ubuntu)
  • 张家港网站制作公司专业建网站 成都
  • 线性代数 · SVD | 几何本质、求解方法与应用
  • 网站建设 phpwordpress 文章参数
  • 网站建设ppt答辩安徽六安有哪些区县
  • 《操作系统真象还原》 第九章 第二部分
  • 网站开发服务器的选择wordpress自动添加视频
  • 外贸网站源码自己建站模板
  • 网站开发制作公司宁波商城网站开发设计
  • 做网站的主要内容公司图标设计logo
  • 郑州七彩网站建设公司网络架构设计方案
  • 408之二叉树(一)
  • 阳泉市住房保障和城乡建设管理局网站建设协会网站的公司
  • 【密码学实战】openHiTLS CRL命令行:证书吊销列表
  • 烟台网站建设方案报价国开行网站毕业申请怎么做
  • js中异步回调函数的执行机制与事件循环