当前位置: 首页 > news >正文

RabbitMQ快速入门指南

1. MQ介绍

1.1、什么是MQ

MQ全称为Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信(异步通信),而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。

异步消息中两个重要概念:

  • 消息代理(message broker,MQ服务器)
  • 目的地(destination)

当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目 的地

异步消息主要有两种形式的目的地

  • 队列(queue):点对点消息通信(point-to-point)
  • 主题(topic):发布(publish)/订阅(subscribe)消息通信
1. 2 MQ是干什么用的

应用解耦、异步、流量削锋、数据分发、错峰流控、日志收集等等…

1.3、应用场景
1.3.1 异步处理:

以用户注册为例,我们将用户信息写入数据库之后,如果再同步调用发送注册右键、发送注册短信方法,假设每一步都是50ms的话,就需要150ms以后,才可以响应用户。

我们可以用多线程的方式,发送注册邮件和发送注册短信,并发执行,假设每一步都是50ms,就需要100ms以后才可以响应用户。

上述两种方式都比较慢,可以引入一个消息队列,当将用户信息写入数据库以后,可以很快地将需要用到的信息写入消息队列,写入消息队列以后,就可以响应用户,而发送注册邮件和短息,就可以异步读取消息队列,获取相应信息,进行法注册邮件和短信。

1.3.2 应用解耦:

实现订单系统与库存系统的应用解耦

1.3.3 流量削峰:

用户发送请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面

秒杀业务根据消息队列中的请求信息,再做后续处理

1.4、常见MQ产品
  • ActiveMQ:基于JMS,apache出品,在中小型企业中应用广泛
  • RabbitMQ:基于AMQP协议,Erlang语言开发,稳定性好
  • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
  • Kafka:分布式消息系统,高吞吐量,归属于Apache顶级项目

2. RabbitMQ

2.2、什么是RabbitMQ

RabbitMQ是一个开源的消息代理和队列服务器,实现了高级消息队列协议(AMQP)。它充当应用程序之间的消息中间件,允许分布式系统中的不同组件进行异步通信。RabbitMQ使用Erlang语言开发,以其高性能、可靠性和可扩展性而闻名。

2.3、消息队列的核心概念

消息队列是一种异步通信机制,它允许应用程序通过发送和接收消息来进行通信,而不需要直接连接。这种模式带来了以下优势:

  • 解耦:生产者和消费者不需要同时在线
  • 可扩展性:可以独立扩展生产者和消费者
  • 可靠性:消息可以持久化存储,确保不丢失
  • 灵活性:支持多种消息传递模式
2.4、RabbitMQ的优势和应用场景

RabbitMQ在企业级应用中具有以下优势:

  • 多协议支持:支持AMQP、STOMP、MQTT等多种协议
  • 灵活的路由:支持多种Exchange类型和复杂的路由规则
  • 集群支持:可以构建高可用的集群架构
  • 管理界面:提供Web管理控制台
  • 丰富的客户端库:支持多种编程语言

常见应用场景包括:微服务解耦、异步任务处理、系统集成、削峰填谷等

3. RabbitMQ基础概念

3.1、Exchange(交换机)类型详解

Exchange是RabbitMQ的核心组件,负责接收生产者发送的消息并将其路由到相应的队列。主要有四种类型:

Direct Exchange(直连交换器)
  • 根据routing key精确匹配路由消息
  • 适用于单播消息传递
  • 默认的交换器类型
Fanout Exchange(扇形交换器)
  • 将消息广播到所有绑定的队列
  • 忽略routing key
  • 适用于广播场景
Topic Exchange(主题交换器)
  • 基于通配符模式匹配routing key
  • 支持"*“(单个单词)和”#"(零个或多个单词)
  • 灵活的路由规则
Headers Exchange(头交换器)
  • 基于消息头属性进行路由
  • 较少使用,性能相对较低
3.2、Queue(队列)和消息持久化

Queue是存储消息的容器,具有以下特性:

  • FIFO原则:先进先出的消息处理顺序
  • 持久化:可以配置队列和消息的持久化
  • 排他性:可以设置队列只能被一个连接使用
  • 自动删除:当没有消费者时自动删除队列
3.3、Routing Key和Binding
  • Routing Key:生产者发送消息时指定的路由键
  • Binding:Exchange和Queue之间的绑定关系
  • Binding Key:绑定时指定的键,用于匹配routing key
3.4、Virtual Host(虚拟主机)

Virtual Host提供了逻辑隔离,类似于网络中的虚拟主机概念:

  • 不同vhost中的Exchange、Queue等资源完全隔离
  • 每个vhost有独立的权限控制
  • 默认vhost为"/"

4. 工作模式

RabbitMQ提供了六种消息模型(也可以说是工作模式),但是第六种其实是RPC,不是MQ,第1、2种属于点对点模型,第3、4、5种都属于订阅模型,只不过进行路由的方式不同。

5. Spring Boot集成RabbitMQ

5.1 application.yml配置:
spring:# RabbitMQrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest#虚拟主机virtual-host: /#开启发送端消息抵达broker的确认publisher-confirm-type: correlated#开启发送端消息抵达队列的确认publisher-returns: truetemplate:#mandatory 默认为FALSE,指定消息在没有被队列接收时是否强行退回还是直接丢弃,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,会直接将消息扔掉mandatory: truelistener:simple:# 设置消费端手动 ackacknowledge-mode: manualconcurrency: 5  # 最小消费者数量max-concurrency: 10  # 最大消费者数量prefetch: 10 # 每个消费者一次最多消费10条消息retry:enabled: true  # 开启重试机制max-attempts: 3  # 最大重试次数initial-interval: 1000  # 重试初始间隔时间,毫秒# 其他已有配置...connection-timeout: 60000  # 连接超时时间,毫秒requested-heartbeat: 60  # 心跳间隔时间,秒cache:connection:mode: channelsize: 50  # 缓存的connection数量channel:size: 50  # 缓存的channel数量
5.2 配置类:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {public static final String DIRECT_EXCHANGE = "spring.direct.exchange";public static final String TOPIC_EXCHANGE = "spring.topic.exchange";public static final String DIRECT_QUEUE = "spring.direct.queue";public static final String TOPIC_QUEUE = "spring.topic.queue";// 声明Direct Exchange@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE, true, false);}// 声明Topic Exchange  @Beanpublic TopicExchange topicExchange() {return new TopicExchange(TOPIC_EXCHANGE, true, false);}// 声明队列@Beanpublic Queue directQueue() {return QueueBuilder.durable(DIRECT_QUEUE).build();}@Beanpublic Queue topicQueue() {return QueueBuilder.durable(TOPIC_QUEUE).build();}// 绑定关系@Beanpublic Binding directBinding() {return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct.routing.key");}@Beanpublic Binding topicBinding() {return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.*.key");}
}
5.3 消息生产者服务
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageProducerService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendDirectMessage(String message) {rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, "direct.routing.key", message);System.out.println("Sent direct message: " + message);}public void sendTopicMessage(String routingKey, String message) {rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, routingKey, message);System.out.println("Sent topic message with key " + routingKey + ": " + message);}// 发送对象消息public void sendObjectMessage(Object obj) {rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,"direct.routing.key",obj);}
}
5.4 消息消费者服务:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;@Service
public class MessageConsumerService {// 基础消费者@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE)public void handleDirectMessage(String message) {System.out.println("Received direct message: " + message);}// 手动确认消息@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE)public void handleTopicMessage(@Payload String message,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,Channel channel) throws Exception {try {System.out.println("Processing topic message: " + message);// 模拟业务处理Thread.sleep(1000);// 手动确认channel.basicAck(deliveryTag, false);} catch (Exception e) {System.err.println("Error processing message: " + e.getMessage());// 拒绝消息并重新入队channel.basicNack(deliveryTag, false, true);}}// 接收完整消息对象@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE)public void handleCompleteMessage(Message message, Channel channel) throws Exception {String body = new String(message.getBody());String routingKey = message.getMessageProperties().getReceivedRoutingKey();System.out.println("Received message: " + body + " with routing key: " + routingKey);// 手动确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}
5.5 消息转换器配置:
JSON消息转换器:
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MessageConverterConfig {@Beanpublic Jackson2JsonMessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMessageConverter(jsonMessageConverter());// 配置确认回调template.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("Message sent successfully");} else {System.out.println("Message send failed: " + cause);}});// 配置返回回调template.setReturnsCallback(returned -> {System.out.println("Message returned: " + returned.getMessage());});return template;}
}
使用JSON转换器发送对象:
// 定义消息对象
public class UserMessage {private Long id;private String name;private String email;// 构造函数、getter、setterpublic UserMessage() {}public UserMessage(Long id, String name, String email) {this.id = id;this.name = name;this.email = email;}// getter和setter方法...
}// 发送和接收对象消息
@Service
public class UserMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendUserMessage(UserMessage user) {rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,"user.routing.key",user);}@RabbitListener(queues = "user.queue")public void handleUserMessage(UserMessage user) {System.out.println("Received user: " + user.getName() + " (" + user.getEmail() + ")");}
}

6. 高级特性

6.1 消息确认机制(ACK):

RabbitMQ提供了多种消息确认机制来保证消息的可靠传递:

自动确认(Auto ACK):
// 消息被消费者接收后立即确认
@RabbitListener(queues = "auto.ack.queue", ackMode = "AUTO")
public void handleAutoAck(String message) {System.out.println("Auto ACK: " + message);
}
手动确认(Manual ACK):
@RabbitListener(queues = "manual.ack.queue", ackMode = "MANUAL")
public void handleManualAck(String message,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,Channel channel) throws Exception {try {// 处理业务逻辑processMessage(message);// 确认消息channel.basicAck(deliveryTag, false);} catch (Exception e) {// 拒绝消息,重新入队channel.basicNack(deliveryTag, false, true);}
}
发布确认:
@Configuration
public class PublisherConfirmConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);// 启用发布确认template.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("Message published successfully");} else {System.out.println("Message publish failed: " + cause);}});// 启用消息返回template.setReturnsCallback(returned -> {System.out.println("Message returned: " + returned.getMessage().toString());System.out.println("Reply code: " + returned.getReplyCode());System.out.println("Reply text: " + returned.getReplyText());});return template;}
}
6.2 死信队列(DLX)处理

死信队列用于处理无法正常消费的消息,常见的死信场景包括:

  • 消息被拒绝且不重新入队
  • 消息TTL过期
  • 队列达到最大长度
死信队列配置:
@Configuration
public class DeadLetterConfig {public static final String BUSINESS_EXCHANGE = "business.exchange";public static final String BUSINESS_QUEUE = "business.queue";public static final String DEAD_LETTER_EXCHANGE = "dlx.exchange";public static final String DEAD_LETTER_QUEUE = "dlx.queue";// 业务交换器@Beanpublic DirectExchange businessExchange() {return new DirectExchange(BUSINESS_EXCHANGE);}// 死信交换器@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(DEAD_LETTER_EXCHANGE);}// 业务队列(配置死信交换器)@Beanpublic Queue businessQueue() {return QueueBuilder.durable(BUSINESS_QUEUE).withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE).withArgument("x-dead-letter-routing-key", "dead.letter.routing.key").withArgument("x-message-ttl", 10000) // 消息TTL 10秒.build();}// 死信队列@Beanpublic Queue deadLetterQueue() {return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}// 绑定关系@Beanpublic Binding businessBinding() {return BindingBuilder.bind(businessQueue()).to(businessExchange()).with("business.routing.key");}@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter.routing.key");}
}
死信处理服务:
@Service
public class DeadLetterService {@Autowiredprivate RabbitTemplate rabbitTemplate;// 发送业务消息public void sendBusinessMessage(String message) {rabbitTemplate.convertAndSend(DeadLetterConfig.BUSINESS_EXCHANGE,"business.routing.key",message);}// 业务消息处理@RabbitListener(queues = DeadLetterConfig.BUSINESS_QUEUE)public void handleBusinessMessage(String message,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,Channel channel) throws Exception {try {System.out.println("Processing business message: " + message);// 模拟处理失败if (message.contains("error")) {throw new RuntimeException("Business processing failed");}channel.basicAck(deliveryTag, false);} catch (Exception e) {System.err.println("Business processing failed: " + e.getMessage());// 拒绝消息,不重新入队,进入死信队列channel.basicNack(deliveryTag, false, false);}}// 死信消息处理@RabbitListener(queues = DeadLetterConfig.DEAD_LETTER_QUEUE)public void handleDeadLetterMessage(String message) {System.out.println("Handling dead letter message: " + message);// 记录日志、发送告警、人工处理等logDeadLetterMessage(message);}private void logDeadLetterMessage(String message) {// 实现日志记录逻辑System.out.println("Dead letter logged: " + message);}
}
6.3 消息TTL和队列过期:
消息TTL配置:
@Service
public class TTLMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;// 发送带TTL的消息public void sendTTLMessage(String message, int ttlSeconds) {MessageProperties properties = new MessageProperties();properties.setExpiration(String.valueOf(ttlSeconds * 1000)); // 毫秒Message msg = new Message(message.getBytes(), properties);rabbitTemplate.send("ttl.exchange", "ttl.routing.key", msg);}// 使用MessagePostProcessor设置TTLpublic void sendTTLMessageWithProcessor(String message, int ttlSeconds) {rabbitTemplate.convertAndSend("ttl.exchange","ttl.routing.key", message,msg -> {msg.getMessageProperties().setExpiration(String.valueOf(ttlSeconds * 1000));return msg;});}
}
队列TTL配置:
@Bean
public Queue ttlQueue() {return QueueBuilder.durable("ttl.queue").withArgument("x-message-ttl", 60000) // 队列中消息的默认TTL.withArgument("x-expires", 300000)    // 队列没有消费者时的过期时间.build();
}
6.4 优先级队列:     
优先级队列配置:
@Bean
public Queue priorityQueue() {return QueueBuilder.durable("priority.queue").withArgument("x-max-priority", 10) // 最大优先级为10.build();
}
发送优先级消息:
@Service
public class PriorityMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendPriorityMessage(String message, int priority) {rabbitTemplate.convertAndSend("priority.exchange","priority.routing.key",message,msg -> {msg.getMessageProperties().setPriority(priority);return msg;});}@RabbitListener(queues = "priority.queue")public void handlePriorityMessage(String message, @Header("priority") Integer priority) {System.out.println("Received priority " + priority + " message: " + message);}
}

http://www.dtcms.com/a/390370.html

相关文章:

  • 在项目中通过LangChain4j框架接入AI大模型
  • c语言9:从内存到实践深入浅出理解数组
  • sglang使用笔记
  • 本地大模型编程实战(36)使用知识图谱增强RAG(2)生成知识图谱
  • clip——手写数字识别
  • commons-numbers
  • MySqL-day4_01(内置函数、存储过程、视图)
  • 用html5写一个手机ui
  • 2.canvas学习
  • 【系统架构设计(34)】计算机网络架构与技术基础
  • 计网1.2 计算机网络体系结构与参考模型
  • ML-Watermelonbook
  • E/E架构新课题的解决方案
  • 【CVPR 2025】用于密集图像预测的频率动态卷积
  • 整体设计 语言拼凑/逻辑拆解/词典缝合 之 1 表达词项的散列/序列/行列 (豆包助手)
  • FPGA学习篇——Verilog学习之半加器的实现
  • Python快速入门专业版(三十五):函数实战2:文件内容统计工具(统计行数/单词数/字符数)
  • CSS的文本样式二【文本布局】
  • redis配置与优化
  • STM32 单片机 - 中断
  • 【网络工程师】ACL基础实验
  • 小实验--LCD1602显示字符和字符串
  • Java 的双亲委派模型(Parent Delegation Model)
  • ​​[硬件电路-249]:LDO(低压差线性稳压器)专用于线性电源,其核心设计逻辑与线性电源高度契合,而与开关电源的工作原理存在本质冲突。
  • conda命令行指令大全
  • TCP三次握手与四次挥手
  • Python读取Excel中指定列的所有单元格内容
  • 【DMA】DMA入门:理解DMA与CPU的并行
  • Redis数据库(一)—— 初步理解Redis:从基础配置到持久化机制
  • Salesforce中的事件驱动架构:构建灵活可扩展的企业应用