当前位置: 首页 > news >正文

RabbitMQ--基础篇

RabbitMQ


简介:RabbitMQ 是一种开源的消息队列中间件,你可以把它想象成一个高效的“邮局”。它专门负责在不同应用程序之间传递消息,让系统各部分能松耦合地协作

优势:

  • 异步处理:比如用户注册后,主程序将发送验证邮件的任务扔进队列就立刻返回,邮件服务后续慢慢处理,避免用户等待。

  • 削峰填谷:突然的流量高峰(如秒杀活动)会被队列缓冲,避免服务器被压垮。

  • 智能路由:通过交换机(Exchange)的四种路由策略(直连/主题/广播/头匹配),实现精准投递,比如将VIP用户的订单定向到专属客服队列。

  • 故障恢复:支持消息持久化和确认机制,即使服务器宕机,消息也不会丢失。

同步VS异步(以实际开发为例子进行说明):

  • 同步业务功能的耦合度高,异步耦合度低,可以达到解耦的效果

  • 同步业务流程响应的时间长,异步响应的时间短

  • 同步模式会导致并发压力向后进行传递,异步可以削峰限流

  • 同步模式下系统结构弹性不足,异步模式下系统弹性强,可扩展性强

注意:在实际开发中并不是说异步模式就完全优与同步模式,在一定的场景下使用异步模式是优化系统的架构,但是在一些其它的业务场景下需要同步来保证流程的完整性。所以说异步还是同步要跟据具体业务进行选择。

底层实现:

  • AMQP(Advanced Message Queuing Protocol):AMQP 是 跨语言的通用消息协议适合异构系统间的复杂通信。

  • JMS(Java Message Service):JMS是 Java 专属的 API 标准适合统一 Java 生态的消息处理。

主流的MQ产品对比

对比项RabbitMQActiveMQRocketMQKafka
开发语言ErlangJavaJavaScala/Java
维护方Rabbit(公司)Apache(社区)阿里(公司)Apache(社区)
核心机制基于 AMQP 协议的生产者-消费者模型基于 JMS 的消息传递模型分布式消息队列(Topic + Tag 分类)分布式流处理平台(发布-订阅模型)
协议支持AMQP、STOMP、MQTT、HTTP 插件AMQP、STOMP、OpenWire、REST、MQTT自定义协议(支持 TCP/HTTP)自定义协议(社区封装 HTTP 支持)
客户端语言官方:Erlang、Java、Ruby;社区:多语言Java、C/C++、.NET、Python、PHP官方:Java;社区:C++(不成熟)官方:Java;社区:Python、Go、Rust 等
可用性镜像队列、仲裁队列(Quorum Queue)主从复制主从复制分区(Partition) + 副本(Replica)
单机吞吐量约 10 万/秒约 5 万/秒10 万+/秒(阿里双十一验证)百万级/秒
消息延迟微秒级毫秒级毫秒级毫秒以内
消息确认完整 ACK/NACK 机制支持 JMS ACK 模式基于数据库持久化的消息表基于副本同步和 ISR 机制
功能特性✅ 低延迟、高并发、管理界面丰富✅ 老牌稳定、支持 JMS 规范✅ 高吞吐、阿里生态集成、事务消息✅ 高吞吐、流处理、大数据场景专用
适用场景复杂路由、实时业务(如支付订单)传统企业级系统(Java 生态)电商高并发场景(如秒杀、订单)日志采集、实时分析、流式计算

原生RabbitMQAPI调用:

//=========================================发送消息的代码示例=================================
public class Producer {  public static void main(String[] args) throws Exception {  // 创建连接工厂  ConnectionFactory connectionFactory = new ConnectionFactory();  // 设置主机地址  connectionFactory.setHost("192.168.200.100");  // 设置连接端口号:默认为 5672connectionFactory.setPort(5672);// 虚拟主机名称:默认为 /connectionFactory.setVirtualHost("/");// 设置连接用户名;默认为guest  connectionFactory.setUsername("guest");// 设置连接密码;默认为guest  connectionFactory.setPassword("123456");// 创建连接  Connection connection = connectionFactory.newConnection();  // 创建频道  Channel channel = connection.createChannel();  // 声明(创建)队列  // queue      参数1:队列名称  // durable    参数2:是否定义持久化队列,当 MQ 重启之后还在  // exclusive  参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列  // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除  // arguments  参数5:队列其它参数  channel.queueDeclare("simple_queue", true, false, false, null);  // 要发送的信息  String message = "你好;小兔子!";  // 参数1:交换机名称,如果没有指定则使用默认Default Exchange  // 参数2:路由key,简单模式可以传递队列名称  // 参数3:配置信息  // 参数4:消息内容  channel.basicPublish("", "simple_queue", null, message.getBytes());  System.out.println("已发送消息:" + message);  // 关闭资源  channel.close();  connection.close();  }  }
//=========================================接收消息的代码示例=================================
public class Consumer {  public static void main(String[] args) throws Exception {  // 1.创建连接工厂  ConnectionFactory factory = new ConnectionFactory();  // 2. 设置参数  factory.setHost("192.168.200.100");  factory.setPort(5672);  factory.setVirtualHost("/");  factory.setUsername("guest");factory.setPassword("123456");  // 3. 创建连接 Connection        Connection connection = factory.newConnection();  // 4. 创建Channel  Channel channel = connection.createChannel();  // 5. 创建队列  // 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建  // 参数1. queue:队列名称  // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在  // 参数3. exclusive:是否独占。  // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉  // 参数5. arguments:其它参数。  channel.queueDeclare("simple_queue",true,false,false,null);  // 接收消息  DefaultConsumer consumer = new DefaultConsumer(channel){  // 回调方法,当收到消息后,会自动执行该方法  // 参数1. consumerTag:标识  // 参数2. envelope:获取一些信息,交换机,路由key...  // 参数3. properties:配置信息  // 参数4. body:数据  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("consumerTag:"+consumerTag);  System.out.println("Exchange:"+envelope.getExchange());  System.out.println("RoutingKey:"+envelope.getRoutingKey());  System.out.println("properties:"+properties);  System.out.println("body:"+new String(body));  }  };  // 参数1. queue:队列名称  // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息  // 参数3. callback:回调对象  // 消费者类似一个监听程序,主要是用来监听消息  channel.basicConsume("simple_queue",true,consumer);  }  }

封装RabbitMQ工具类:

public class ConnectionUtil {  //跟据自己服务的具体需求进行相关ip+端口的配置(动态变化)public static final String HOST_ADDRESS = "192.168.200.100";  public static Connection getConnection() throws Exception {  // 定义连接工厂  ConnectionFactory factory = new ConnectionFactory();  // 设置服务地址  factory.setHost(HOST_ADDRESS);  // 端口  factory.setPort(5672);  //设置账号信息,用户名、密码、vhost  factory.setVirtualHost("/");  factory.setUsername("guest");  factory.setPassword("123456");  // 通过工程获取连接  Connection connection = factory.newConnection();  return connection;  }  public static void main(String[] args) throws Exception {  Connection con = ConnectionUtil.getConnection();  // amqp://guest@192.168.200.100:5672/  System.out.println(con);  con.close();  }  }

RabbitMQ体系结构:

  • 生产者(Producer):发送消息到 RabbitMQ 的应用程序。

  • 消费者(Consumer):从队列中接收并处理消息的应用程序。

  • 交换机(Exchange):接收生产者消息,根据类型和路由规则将消息分发到队列。

    • 四大类型

类型路由规则典型场景
Direct精确匹配 Routing Key(如 order.pay一对一精准投递(如支付成功通知)
Topic通配符匹配(如 order.**.pay多服务订阅同一类消息(如日志分类)
Fanout广播到所有绑定队列(无视 Routing Key群发通知(如系统公告)
Headers通过消息头(Headers)键值对匹配复杂条件路由(需灵活匹配时)
  • 队列(Queue):定义交换机与队列之间的映射关系,指定路由规则

  • 信道(Channel):复用 TCP 连接的轻量级虚拟链路,减少资源消耗。

  • 虚拟主机(Virtual Host):逻辑隔离的“消息域”,不同 vhost 间资源(交换机、队列)互不干扰。

总结:RabbitMQ 通过 生产者-交换机-队列-消费者 模型实现异步通信,核心在于灵活的路由规则(交换机类型)和可靠性保障(持久化、确认机制)。

基础篇

工作模式

  • 简单模式:最简单的消息队列模型,包含一个生产者、一个队列和一个消费者。生产者直接将消息发送到队列,消费者从队列中接收消息。

  • 工作队列模式(Work Queues):使用默认的交换机,一个队列对应多个消费者,消息按轮询(Round-Robin)或公平分发(Fair Dispatch)分配给消费者,避免单个消费者过载。

  • 发布/订阅模式(Publish/Subscribe):使用 扇形交换机(Fanout Exchange)生产者将消息发送到交换机,交换机将消息广播到所有绑定的队列,每个消费者独立接收一份消息副本。

  • 路由模式(Routing):使用 直接交换机(Direct Exchange)生产者指定消息的 路由键(Routing Key),交换机根据路由键将消息精确匹配到绑定的队列

  • 主题模式(Topics):使用 主题交换机(Topic Exchange)路由键支持通配符匹配(* 匹配一个词,# 匹配多个词)。例如路由键 stock.usd.nyse 可被 *.nysestock.# 订阅。

  • 远程过程调用(RPC):通过消息队列实现远程调用。客户端发送请求消息时附带回调队列和唯一ID,服务端处理完成后将响应发送到回调队列,客户端通过ID匹配响应。

  • 发布者确认(Publisher Confirms):生产者发送消息后,RabbitMQ会异步返回确认(ACK)或未确认(NACK),确保消息成功到达交换机或队列。

工作队列模式(Work Queues)
  1. 并行处理能力

  • 多消费者竞争消费一个队列可绑定多个消费者,消息被并发处理消息只会被其中的一个消费者拿到。

  • 横向扩展:通过增加消费者数量,轻松应对高并发或大流量场景。

  1. 负载均衡机制

  • 轮询分发(Round-Robin)默认策略,均摊消息到所有消费者简单但可能因消费者性能差异导致负载不均。

  • 公平分发(Fair Dispatch):通过 prefetch_count 限制消费者同时处理的消息数,确保“能者多劳”,避免慢消费者堆积任务。

  1. 消息可靠性保障

  • ACK确认机制:消费者处理完成后需手动发送确认(ACK),若处理失败或消费者宕机,消息自动重新入队,确保任务不丢失。

  • 持久化支持:队列和消息均可设置为持久化(durable=true),防止RabbitMQ服务重启后数据丢失。

//================================生产端代码循环发送10次消息================================  
public class Producer {  public static final String QUEUE_NAME = "work_queue";  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  channel.queueDeclare(QUEUE_NAME,true,false,false,null);  for (int i = 1; i <= 10; i++) {  String body = i+"hello rabbitmq~~~";  channel.basicPublish("",QUEUE_NAME,null,body.getBytes());  }  channel.close();  connection.close();  }  }
 
//================================消费端代码竞争消息================================public class Consumer1/2 {  static final String QUEUE_NAME = "work_queue";  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  channel.queueDeclare(QUEUE_NAME,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("Consumer1 body:"+new String(body));  }  };  channel.basicConsume(QUEUE_NAME,true,consumer);  }  }

发布/订阅模式(Publish/Subscribe)
  1. 消息广播机制

  • 扇形交换机(Fanout Exchange)驱动生产者将消息发送到交换机,交换机会将消息无条件广播到所有与其绑定的队列,每个队列的消费者都能收到一份消息副本。

  • 一对多分发一条消息可被多个消费者同时接收,适用于需要广泛触达的场景(如系统通知、日志收集)。

//====================================生产者代码====================================
public class Producer {  public static void main(String[] args) throws Exception {  // 1、获取连接  Connection connection = ConnectionUtil.getConnection();  // 2、创建频道  Channel channel = connection.createChannel();  // 参数1. exchange:交换机名称  // 参数2. type:交换机类型  //     DIRECT("direct"):定向  //     FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。  //     TOPIC("topic"):通配符的方式  //     HEADERS("headers"):参数匹配  // 参数3. durable:是否持久化  // 参数4. autoDelete:自动删除  // 参数5. internal:内部使用。一般false  // 参数6. arguments:其它参数  String exchangeName = "test_fanout";  // 3、创建交换机  channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);  // 4、创建队列  String queue1Name = "test_fanout_queue1";  String queue2Name = "test_fanout_queue2";  channel.queueDeclare(queue1Name,true,false,false,null);  channel.queueDeclare(queue2Name,true,false,false,null);  // 5、绑定队列和交换机  // 参数1. queue:队列名称  // 参数2. exchange:交换机名称  // 参数3. routingKey:路由键,绑定规则  //     如果交换机的类型为fanout,routingKey设置为""  channel.queueBind(queue1Name,exchangeName,"");  channel.queueBind(queue2Name,exchangeName,"");  String body = "日志信息:张三调用了findAll方法...日志级别:info...";  // 6、发送消息  channel.basicPublish(exchangeName,"",null,body.getBytes());  // 7、释放资源  channel.close();  connection.close();  }  }
 
//====================================消费者1代码===================================
public class Consumer1 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String queue1Name = "test_fanout_queue1";  channel.queueDeclare(queue1Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");  }  };  channel.basicConsume(queue1Name,true,consumer);  }  }
​
//====================================消费者2代码===================================
public class Consumer2 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String queue2Name = "test_fanout_queue2";  channel.queueDeclare(queue2Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");  }  };  channel.basicConsume(queue2Name,true,consumer);  }  }

路由模式(Routing)
  1. 基于路由键的精确分发

  • 直接交换机(Direct Exchange)驱动生产者发送消息时需指定路由键(Routing Key),交换机会将消息精确匹配到绑定相同路由键的队列。

  • 条件性路由:仅当队列绑定的路由键与消息的路由键完全一致时,消息才会被投递,实现按条件分发。

  1. 灵活的消息过滤

  • 多队列绑定不同路由键可为同一交换机绑定多个队列,每个队列声明不同的路由键(例如 errorinfowarning,实现消息分类处理。

  • 生产者可控性:生产者通过指定路由键决定消息的目标队列,无需消费者干预。

//================================生产者代码========================================  
public class Producer {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String exchangeName = "test_direct";  // 创建交换机  channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);  // 创建队列  String queue1Name = "test_direct_queue1";  String queue2Name = "test_direct_queue2";  // 声明(创建)队列  channel.queueDeclare(queue1Name,true,false,false,null);  channel.queueDeclare(queue2Name,true,false,false,null);  // 队列绑定交换机  // 队列1绑定error  channel.queueBind(queue1Name,exchangeName,"error");  // 队列2绑定info error warning  channel.queueBind(queue2Name,exchangeName,"info");  channel.queueBind(queue2Name,exchangeName,"error");  channel.queueBind(queue2Name,exchangeName,"warning");  String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";  // 发送消息  channel.basicPublish(exchangeName,"warning",null,message.getBytes());  System.out.println(message);  // 释放资源  channel.close();  connection.close();  }  }
 
//===============================消费者1代码========================================
public class Consumer1 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String queue1Name = "test_direct_queue1";  channel.queueDeclare(queue1Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("Consumer1 将日志信息打印到控制台.....");  }  };  channel.basicConsume(queue1Name,true,consumer);  }  }
​
//===============================消费者2代码========================================
public class Consumer2 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String queue2Name = "test_direct_queue2";  channel.queueDeclare(queue2Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("Consumer2 将日志信息存储到数据库.....");  }  };  channel.basicConsume(queue2Name,true,consumer);  }  }

主题模式(Topics)
  1. 基于通配符的灵活路由

  • 主题交换机(Topic Exchange)驱动:生产者发送消息时指定带层级的路由键(Routing Key,如 order.europe.paid),消费者通过绑定键(Binding Key)使用通配符* 匹配一个词,# 匹配零或多个词)订阅消息。

    • 示例:绑定键 *.europe.* 可匹配 order.europe.paidshipment.europe.delayed

    • 绑定键 stock.# 可匹配 stock.usd.nysestock.eur.london.close

  • 多维度匹配支持复杂的分层路由逻辑,适用于需要按多条件分类的场景。

  1. 高度动态的消息过滤

  • 灵活订阅:消费者可动态定义绑定键的通配规则,按需订阅特定模式的消息,无需修改生产者逻辑。

  • 精准与模糊匹配结合既能精确匹配固定路由键,也能通过通配符覆盖一类消息。

//================================生产者代码========================================
public class Producer {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String exchangeName = "test_topic";  channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);  String queue1Name = "test_topic_queue1";  String queue2Name = "test_topic_queue2";  channel.queueDeclare(queue1Name,true,false,false,null);  channel.queueDeclare(queue2Name,true,false,false,null);  // 绑定队列和交换机  // 参数1. queue:队列名称  // 参数2. exchange:交换机名称  // 参数3. routingKey:路由键,绑定规则  //      如果交换机的类型为fanout ,routingKey设置为""  // routing key 常用格式:系统的名称.日志的级别。  // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库  channel.queueBind(queue1Name,exchangeName,"#.error");  channel.queueBind(queue1Name,exchangeName,"order.*");  channel.queueBind(queue2Name,exchangeName,"*.*");  // 分别发送消息到队列:order.info、goods.info、goods.error  String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";  channel.basicPublish(exchangeName,"order.info",null,body.getBytes());  body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";  channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());  body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";  channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());  channel.close();  connection.close();  }  }
 
//================================消费者1代码=======================================
public class Consumer1 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String QUEUE_NAME = "test_topic_queue1";  channel.queueDeclare(QUEUE_NAME,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  }  };  channel.basicConsume(QUEUE_NAME,true,consumer);  }  }
​
//================================消费者2代码=======================================
public class Consumer2 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String QUEUE_NAME = "test_topic_queue2";  channel.queueDeclare(QUEUE_NAME,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  }  };  channel.basicConsume(QUEUE_NAME,true,consumer);  }  }

相关文章:

  • Crawl4AI:高效的开源 Python 网页爬取与数据提取库
  • 【5G通信】redcap和bwp 随手记
  • 论文速读《DARE:基于扩散模型的自主机器人探索新范式》
  • debian12 安装docker
  • 多模态大语言模型arxiv论文略读(六十四)
  • 美团二面:使用分布式调度框架该考虑哪些问题?
  • 【Java ee 初阶】文件IO和操作(下)
  • 【Java ee 初阶】文件操作和IO(上)
  • 企业级可观测性实现:OpenObserve云原生平台的本地化部署与远程访问解析
  • COLT_CMDB_linux_userInfo_20250508.sh修复历史脚本输出指标信息中userName与输出信息不一致问题
  • 解构语言模型推理过程,超越最终答案:通过分析子思考路径提升大语言模型推理准确性的方法研究
  • Python3正则表达式:字符串魔法师的指南[特殊字符]‍♂️
  • 《Scala基础》
  • flink超时未揽收单量统计
  • 华为首款鸿蒙电脑正式亮相,开启国产操作系统新篇章
  • 多线程初阶(2)
  • 长难句。。
  • Kafka消息队列之 【消费者分组】 详解
  • maven 安装 本地 jar
  • 紫禁城多语言海外投资理财返利源码带前端uniapp纯工程文件
  • 以军总参谋长:已进入“决定性打击计划的第二阶段”
  • 上海如何为街镇营商环境赋能?送政策、配资源、解难题、强活力
  • 一季度全国消协组织为消费者挽回经济损失23723万元
  • 吴清稳市场稳预期发布会十要点:谈平准基金、股市稳定、公募改革和巴菲特
  • 特朗普称美军舰商船应免费通行苏伊士运河,外交部:反对任何霸凌言行
  • 李云泽:将尽快推出支持小微企业民营企业融资一揽子政策