当前位置: 首页 > news >正文

RabbitMQ-基础-总结

RabbitMQ是开源消息代理软件,采用AMQP协议。通过生产者-交换机-队列-消费者的消息流,实现应用间异步通信和解耦。支持路由、持久化、集群等高可用特性,确保可靠的消息传递。

目录

一、开始

(一) 启动指令

(二) hello word

导入依赖

发送端

接收端

二、工作模式

(一) Work Queues模式

(二) 交换机

(三) Publish 发布订阅模式 Fanout

(四) Routing 路由模式(最常用)Direct

(五) Topics 主题模式 Topic

(六) RPC 同步调用

(七) Publisher Confirms

(八) 总结

三、SpringBoot整合

(一) 依赖导入

(二) RabbitMQ配置文件

(三) 消费端/监听器

(四) 生产端

四、进阶

(一) 消息可靠性投递

1. 需求:当前消息投递的问题

2. 生产端投递确认

一、配置文件功能开启

二、RabbitMQ配置类

三、测试

3. 创建备份交换机

4. 持久化

5. 消费端确认ack

一、配置文件功能开启

二、监听器

(二) 消费端限流

1. 需求

2. 配置文件限流设置

(三) 消息超时

1. 队列超时设置

2. 消息超时设置

(四) 死信

1. 死信绑定

(五) 延迟队列

1. 基于死信的延迟队列

2. 延迟队列插件

一、下载插件:(最好和版本一一对应)

二、创建交换机:

三、测试使用

(六) 消息事务

1. 配置类功能开启

2. 事务注解使用

(七) 惰性队列

(八) 优先级队列

1. 创建队列

2. 测试使用


一、开始

(一) 启动指令

启动:

rabbitmq-service start

关闭:

net stop RabbitMQ

图像化管理页面:(默认是15672端口)

http://{服务器地址}:15672/

(二) hello word

导入依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version>
</dependency>

发送端

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//设置工厂ipfactory.setHost("localhost");//设置账号factory.setUsername("admin");//设置密码factory.setPassword("123");//使用工厂创建连接Connection connection = factory.newConnection();//获取连接的信道Channel channel = connection.createChannel();/*生成消息队列参数:1.队列名称2.是否持久化队列消息,默认是false,只保存在内存3.消息是否共享(一对多)4.是否自动删除,最后一个消息的最后一个消费者断开后删除5.*其他参数**/channel.queueDeclare("HELLO",false,false,false,null);/*发送消息参数:1.发送到的交换机名2.路由的key3.*配置参数*4.消息内容/消息体*/channel.basicPublish("","HELLO",null,"hello word".getBytes());System.out.println("发送完毕");}
}

接收端

public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//设置工厂ipfactory.setHost("localhost");//设置账号factory.setUsername("admin");//设置密码factory.setPassword("123");//使用工厂创建连接Connection connection = factory.newConnection();//获取连接的信道Channel channel = connection.createChannel();/*生成消息队列参数:1.队列名称2.是否持久化队列消息,默认是false,只保存在内存3.消息是否共享(一对多)4.是否自动删除,最后一个消息的最后一个消费者断开后删除5.*其他参数**/channel.queueDeclare("HELLO",false,false,false,null);//接收消息的回调DeliverCallback deliverCallback = new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println("接收到的消息:" + new String(delivery.getBody()));}};//取消消息的回调CancelCallback cancelCallback = new CancelCallback() {@Overridepublic void handle(String s) throws IOException {System.out.println("取消消费");}};/*接收消息参数1.队列名2.消费成功后是否自动应答3.消费者获取消息时的回调 //参数是接口没有实现类,所以我们得用内部类或lamda表达式给接口赋值4.消费者取消消费的回调 //参数是接口没有实现类,所以我们得用内部类或lamda表达式给接口赋值*/channel.basicConsume("HELLO",true,deliverCallback,cancelCallback);}
}

二、工作模式

(一) Work Queues模式

多个消费者监听一个队列,竞争消息接收,谁抢到就归谁

  • 适用于大量任务时,消费者分摊任务,高效处理、

(二) 交换机

相当于接线员

(三) Publish 发布订阅模式 Fanout

广播型交换机,只要是订阅了该交换机的队列都能收到一个消息

发送示例:

  • 交换机类型为Fanout
  • 发送前不用绑定路由键,在交换机上广播发送
public class Producer {public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String exchangerName = "test_fanout";//交换机名String queueName = "Fanout";//队列名String routingKey = "test";//路由键//创建交换机channel.exchangeDeclare(exchangerName, BuiltinExchangeType.TOPIC,true,false,false,null);/* 生成消息队列参数:1.队列名称2.是否持久化队列消息,默认是false,只保存在内存3.消息是否共享(一对多)4.是否自动删除,最后一个消息的最后一个消费者断开后删除5.*其他参数**/channel.queueDeclare(queueName,false,false,false,null);/* 绑定消息队列与路由键1.队列名2.交换机名3.路由键*/channel.queueBind(queueName, exchangerName, "");/* 发送消息参数:1.发送到的交换机名2.路由的键3.*配置参数*4.消息内容/消息体*/channel.basicPublish(exchangerName,routingKey,null,("hello:"+queueName).getBytes());System.out.println("发送完毕");}
}

(四) Routing 路由模式(最常用)Direct

可以认为是交换机+路由键绑定

发送示例:

  • 交换机类型为DIRECT
  • 发送前要绑定好路由键,发消息时需要交换机和路由键来指定发送(键可以一对多)
public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//交换机名String exchangerName = "test_routing";//创建交换机channel.exchangeDeclare(exchangerName, BuiltinExchangeType.DIRECT,true,false,false,null);/* 生成消息队列*/channel.queueDeclare("ABS",false,false,false,null);/* 绑定消息队列与路由键1.队列名2.交换机名3.路由键*/channel.queueBind("ABS",exchangerName,"test");/* 发送消息参数:1.发送到的交换机名2.路由键3.*配置参数*4.消息内容/消息体*/channel.basicPublish(exchangerName,"ABS","test","hello word".getBytes());System.out.println("发送完毕");
}

(五) Topics 主题模式 Topic

路由模式的模糊化匹配

发送演示:

  • 交换机是TOPIC类型的
  • 使用迷糊匹配作为路由键
public class Producer {public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String exchangerName = "test_topics";//交换机名String queueName = "TOPICS";//队列名String routingKey = "test.#";//路由键//创建交换机channel.exchangeDeclare(exchangerName, BuiltinExchangeType.TOPIC,true,false,false,null);/* 生成消息队列参数:1.队列名称2.是否持久化队列消息,默认是false,只保存在内存3.消息是否共享(一对多)4.是否自动删除,最后一个消息的最后一个消费者断开后删除5.*其他参数**/channel.queueDeclare(queueName,false,false,false,null);/* 绑定消息队列与路由键1.队列名2.交换机名3.路由键*/channel.queueBind(queueName,exchangerName,routingKey);/* 发送消息参数:1.发送到的交换机名2.路由的键3.*配置参数*4.消息内容/消息体*/channel.basicPublish(exchangerName,routingKey,null,("hello:"+queueName).getBytes());System.out.println("发送完毕");}
}

(六) RPC 同步调用

(七) Publisher Confirms

(八) 总结

重点在于交换机和路由键与队列绑定,从而衍生出各种模式

三、SpringBoot整合

(一) 依赖导入

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(二) RabbitMQ配置文件

spring:rabbitmq:host: localhost #MQ服务器的网址,这里用的是本机port: 5672 #端口username: admin password: 123virtual-host: /

(三) 消费端/监听器

监听器1:

只有基础的监听功能,能监听RabbitMQ中现有的队列

@Component
@Slf4j
public class MyMessageListener {public static final String QUEUE_NAME = "queue.order";/* 监听器1* 只有基础的监听功能,对MQ中的队列监听*/@RabbitListener(queues = {QUEUE_NAME})public void processMessage1(String dataString, Message message, Channel channel){System.out.println("消费端收到:"+dataString);}
}

监听器2:

可以创建临时交换机与队列,关闭spring服务后临时交换机与队列也会删除

@Component
@Slf4j
public class MyMessageListener {public static final String EXCHANGE_DIRECT =  "exchange.direct.order";public static final String ROUTING_KEY = "order";public static final String QUEUE_NAME = "queue.order";/* 监听器2* 除了基础的监听功能,还能创建交换机、队列(而且只有Spring项目运行时才能在MQ图像界面看到)*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME,durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT),key = {ROUTING_KEY}))public void processMessage2(String dataString, Message message, Channel channel){System.out.println("消费端收到:"+dataString);}
}

(四) 生产端

利用rabbitTemplate.convertAndSend方法,指定交换机、路由键、队列名

@SpringBootTest
public class RabbitMQTest {public static final String EXCHANGE_DIRECT =  "exchange.direct.order";public static final String ROUTING_KEY = "order";public static final String QUEUE_NAME = "queue.order";@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid test1(){rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"Tis is TEST");}
}

四、进阶

(一) 消息可靠性投递

1. 需求:当前消息投递的问题

ACK:应答,用于表示数据已经成功接收

幂等性:一个操作无论执行一次还是多次,所产生的结果都是相同的

2. 生产端投递确认

一、配置文件功能开启
spring:rabbitmq:host: localhostport: 5672username: adminpassword: 123virtual-host: /publisher-confirm-type: CORRELATED #交换机确认publisher-returns: true #队列确认
二、RabbitMQ配置类

继承并重写ConfirmCallback、ReturnsCallback和其中的方法,并手动设置RabbitTemplate的对应方法类为本类(利用多态)

  • confirm:交换机确认,无论成功与否都会调用
  • returnedMessage:队列确认,失败才会发送
@Configuration
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {// 交换机确认@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {log.info("rabbitMQ confirm 方法发送:correlationData:{},ack:{},cause:{}",correlationData,b,s);}// 队列确认@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("rabbitMQ returnedMessage 方法回调:{}",returnedMessage);}@Resourceprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void initRabbitTemplate(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}}

扩展:@PosterConstruct注解,在对象创建完成后自动执行标识的方法,可以确保一些设置生效

@PosterConstruct的使用条件:

  • 方法无参
  • 方法非静态
  • 方法无返回值 /返回值为void
三、测试

测试1:交换机不存在

@Test
void test1(){rabbitTemplate.convertAndSend("aaa",ROUTING_KEY,"Tis is TEST");
}
rabbitMQ confirm 方法回调:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'aaa' in vhost '/', class-id=60, method-id=40)

测试2:路由键无对应队列

@Test
void test1(){rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,"555555","Tis is TEST");
}
rabbitMQ returnedMessage 方法回调:ReturnedMessage [message=(Body:'Tis is TEST' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=exchange.direct.order, routingKey=555555]

3. 创建备份交换机

备份交换机相当于转发消息,但路由键可能有问题无法完全匹配,所以使用fanout广播型交换机,就不用在意路由键了

添加备份到主交换机:(要在主机创建时用额外参数来添加)

4. 持久化

Spring中的RabbitMQ已经为值设置好了持久化

直译:指定此队列是否应该是持久的。默认情况下,如果提供队列名称,则它是持久的。

5. 消费端确认ack

对于spring监听器的升级,可以处理读取失败的数据,并选择是否重读

一、配置文件功能开启
spring:rabbitmq:listener:simple:acknowledge-mode: manual #开启消息手动ack模式
二、监听器
@RabbitListener(queues = {QUEUE_NAME})
public void processMessage1(String dataString, Message message, Channel channel) throws Exception {// 获取标签,1 2 3... ,暂时不知道是啥long deliveryTag = message.getMessageProperties().getDeliveryTag();// 核心操作try{log.info("消费端信息内容:"+ dataString);// 返回ackchannel.basicAck(deliveryTag,false);//第二个参数:是否批量处理} catch (IOException e) {// 错误后的重投递逻辑// 获取是否已经给过重投机会了 true:给过机会了 false:没有重试过Boolean redelivered = message.getMessageProperties().getRedelivered();if (!redelivered) {// 第一次错误/* basicNack1.信息2.是否批量处理3.是否要重新投递到队列*/channel.basicNack(deliveryTag,false,true);}else {// 已经重试过了channel.basicNack(deliveryTag,false,false);}throw new RuntimeException(e);}// 也能传nack,但basicReject 与 basicNack的区别是:basicNack可以批量处理//channel.basicReject(deliveryTag,false);
}

主要api

  • channel.basicAck(deliveryTag,false);,返回ack,可以设置是否批处理
  • channel.basicNack(deliveryTag,false,true);,返回nack,可以设置是否批处理,且可以设置是否重投到队列
  • channel.basicReject(deliveryTag,false);返回nack,无法设置是否批处理,且可以设置是否重投到队列

(二) 消费端限流

1. 需求

默认情况下消费端会一口气把所有队列中的消息取出,效率低、消耗大,可以设置消费限流,减缓消费速度

ready在消费端开机后直接清零,unacked卡住过多

2. 配置文件限流设置

spring:rabbitmq:listener:simple:prefetch: 1 #设置每秒只读取1个消息

ready在消费端开机后逐渐减少,unacked只有一个

(三) 消息超时

哪个时间短,哪个生效

1. 队列超时设置

创建队列时在参数表里设置,单位为 /ms

2. 消息超时设置

对消息进行后置处理时才能单独设置超时时间,单位也是 /ms

@Test
void test3(){// 对消息进行后置处理时才能单独设置超时时间MessagePostProcessor postProcessor = message -> {message.getMessageProperties().setExpiration("5000");// 单位也是 /msreturn message;};rabbitTemplate.convertAndSend(EXCHANGE_DIRECT_TIME,ROUTING_KEY,"Tis is TEST",postProcessor);
}

(四) 死信

1. 死信绑定

死信和死信交换机其实和一般的交换机队列一样,但要为别的队列绑定上死信交换机就需要额外的操作

参数解释:

  • 死信交换机名
  • 死信用的路由键
  • 队列容量
  • 超时时间

对应快捷键

(五) 延迟队列

1. 基于死信的延迟队列

将需要延迟响应的数据发到正常队列,设置好过期时间,如果超时就能被监听死信队列的消费者发现

2. 延迟队列插件

本质上是延迟了交换机发送到队列的时间

一、下载插件:(最好和版本一一对应)

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v4.2.0-rc.1

二、创建交换机:

交换机类型为 x-delayed-message

实际类型写在交换机参数里 x-delayed-type :***

三、测试使用
// 延迟插测试
@Test
void test4(){// 对消息进行后置处理时,添加插件参数x-delay,指定延迟时间MessagePostProcessor postProcessor = message -> {message.getMessageProperties().setHeader("x-delay","10000");// 单位也是 /msreturn message;};rabbitTemplate.convertAndSend(EXCHANGE_DIRECT_DELAY,ROUTING_KEY,"Tis is TEST",postProcessor);
}

小知识:

前面的生产端投递确认时有交换机与队列的两个确认方法,如果使用了延迟功能,因为交换机迟迟不发送至消息队列,所以队列的确认回调无论成功与否都会出现

(六) 消息事务

只有生产端业务如果出错,那么提交缓存池才会回滚。无法控制后续网络、确认等服务的原子性

1. 配置类功能开启

@Configuration
public class RabbitTransactionConfig {@Bean// 事务管理器public RabbitTransactionManager transactionManager(CachingConnectionFactory cachingConnectionFactory){return new RabbitTransactionManager(cachingConnectionFactory);}@Bean// RabbitTemplate开启Transactedpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}// 如果只有这两个配置,可能会无法使用,可以提供 CachingConnectionFactory 的Bean定义@Beanpublic CachingConnectionFactory cachingConnectionFactory() {// 最简单的本地配置return new CachingConnectionFactory("localhost");}
}

如果只有这两个配置,可能会无法使用,可以提供 CachingConnectionFactory 的Bean定义

2. 事务注解使用

@SpringBootTest
@Slf4j
public class RabbitTransactionTest {public static final String EXCHANGE_DIRECT =  "exchange.direct.order";public static final String ROUTING_KEY = "order";@Resourceprivate RabbitTemplate rabbitTemplate;@Test@Transactional@Rollback(value = false)// 为了观测结果才写的public void test1(){rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"Tis is TEST----1----");log.info("aaa"+10/0);rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"Tis is TEST----2----");}
}

添加@Rollback 注解的原因:

junit的测试类中,如果有事务,那么测试消息永远只在内存中回滚,因为测试用消息如果真的上传可能会影响到正常业务。所以我们要手动把这种回滚关闭

(七) 惰性队列

惰性队列是指消息尽可能存储在磁盘上,而不是内存中。只有在消费者准备消费时,消息才会被加载到内存。

特性

普通队列

惰性队列

存储位置

优先内存,内存满才刷磁盘

直接写入磁盘

内存占用

性能

读写速度快

读写速度较慢

适用场景

消息量小,实时性要求高

消息量大,消费缓慢,允许延迟

消息堆积

容易因内存不足崩溃

支持海量消息堆积

适用于大容量慢消费的情况

(八) 优先级队列

利用优先级来控制重要消息的快速传递

1. 创建队列

最大优先级 x-max-proproty

  • 消息优先级不能超过该值
  • 默认为0,所以默认无法设置优先级

2. 测试使用

@Test
void test5(){MessagePostProcessor postProcessor = message -> {message.getMessageProperties().setPriority(1);// 优先级数值return message;};rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"Tis is TEST",postProcessor);
}
http://www.dtcms.com/a/600618.html

相关文章:

  • 学习react第二天
  • 【JVS更新日志】低代码、APS排产、物联网、企业计划11.12更新说明!
  • 前端注释规范:如何写“后人能看懂”的注释(附示例)
  • C语言编译器下载地址 | 如何选择适合自己的C语言编译器
  • HarmonyOS之深入解析如何实现语音朗读能力
  • 台州企业网站的建设做网站能挣多少钱
  • 网站开发内容包括哪些wordpress 统计代码
  • 【昇腾CANN工程实践】BERT情感分析API性能优化实录:从CPU到NPU的15倍加速
  • 【Linux基础开发工具 (二)】详解Linux文本编辑器:Vim从入门到精通——完整教程与实战指南(上)
  • 使用 BR 备份 TiDB 到阿里云 OSS 存储
  • 机器学习项目——基于集成学习提升树情绪分类(代码/论文)
  • C++ 抽象类与多态原理深度解析:从纯虚函数到虚表机制(附高频面试题)
  • 尚硅谷 SpringCloud 01 分布式概念-工程创建-nacos安装-nacos服务注册与发现 -远程调用
  • C# Sqlite帮助类
  • 传统方式部署 Hadoop 高可用集群
  • 微软 Win11 经典版 Outlook 曝 BUG,加速 SSD 损耗
  • C++在边缘AI加速中的硬件优化:结合位运算与SIMD提升推理效率
  • 网站开发文档撰写作业牡丹江整站优化
  • QT:ItemView视图控件
  • 让UI完全按屏幕比例变化的方法
  • 结项报告完整版:Apache SeaTunnel 支持 Flink 引擎 Schema Evolution 功能
  • 微服务生态组件之Spring Cloud LoadBalancer详解和源码分析
  • 重庆长寿网站设计公司哪家专业网站跳转微信链接
  • 阿里云域名DNS解析URL转发不支持HTTPS?
  • leetcode 2654. 使数组所有元素变成 1 的最少操作次数 中等
  • AI取名大师 | PM2 部署 Bun.js 应用及配置 Let‘s Encrypt 免费 HTTPS 证书
  • 结项报告完整版 | Apache SeaTunnel支持metalake开发
  • 【cursor】进阶技巧Rules
  • WebServer05
  • 【数据分析-Excel】常用函数汇总