当前位置: 首页 > news >正文

一篇文章入门RabbitMQ:基本概念与Java使用

Java配置RabbitMQ

spring:rabbitmq:host: 192.168.88.130port: 5672virtual-host: /hostusername: rootpassword: 123

发送消息

@SpringBootTest
public class SpringAmqpTest {@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage() {String queueName = "simple.queue";String msg = "hello,amqp";rabbitTemplate.convertAndSend(queueName,msg);}
}

接收消息

一个队列绑定多个消费者的情形

轮询

默认情况:会将消息轮询投递给消费者

只是简单的分配消息,没有考虑消息的复杂程度,可能会出现消息堆积

能者多劳

修改application.yml设置preFerch为1,确保同一时刻最多投递消费者1条消息,消费者处理完才能获取下一个消息

    listener:simple:prefetch: 1

交换机

接受publisher发送的消息

Fanout交换机

将消息广播到每一个跟其绑定的queue

Direct交换机

每个queue与exchange都会设置BindingKey

发布者发布消息时指定消息的RoutingKey

exchange将消息路由到BindingKey与RoutingKey一致的队列

Topic交换器

Routing Key可以是多个单词的列表,并且以.分割

通配符: xx.# / #.xx

声明队列与交换机

控制台

SpringAMQP

基于Bean
  • Queue:声明队列,用工厂类QueueBuilder构建
  • Exchange:声明交换机,用工厂类ExchangeBuilder构建
  • Binding:声明队列和交换机的绑定关系,用工厂类BindingBuilder构建
基于注解
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "hmall.queue1"),exchange = @Exchange(name = "hmall.exchange1", type = "direct"),key = {"red","blue"}//创建两个队列,Key分别为red和blue
))

消息转换器

当发送对象时,spring会把对象转换为字节

JDK序列化

Spring默认MessageConvert处理,实现是SimpleMessageConverter,基于JDK的ObjectOutputStream序列化

  • JDK的序列化有安全风险
  • 消息太大
  • 消息可读性差

JSON序列化

用JSON序列化代替JDK序列化

<!--        JSON--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>

在publisher和consumer启动项/配置类中都要配置MessageConverter

@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}

可靠性问题

生产者可靠性

生产者重连
spring:rabbitmq:connection-timeout: 1s #MQ的连接超时时间template:retry:enabled: true #开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次的等待时长倍数max-attempts: 3 #最大重试次数

阻塞式的重试,在多次重试的等待过程中,当前线程是被阻塞的,影响业务性能

生产者确认

开启确认机制后,MQ收到消息会将确认消息返回给生产者

spring:rabbitmq:publisher-confirm-type: correlated #开始publisher confirm机制,#none关闭机制,simple同步阻塞等待回执消息,correlated异步返回回执消息 publisher-returns: true #开启publisher returns机制,返回失败消息

在publisher项目启动项配置ReturnCallback

@Configuration
@Slf4j
public class MqConfirmConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置回调rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){@Overridepublic void returnedMessage(ReturnedMessage returned) {log.debug("收到消息的return callback,{},{},{},{},{}",returned.getExchange(),returned.getReplyCode(),returned.getReplyText(),returned.getRoutingKey(),returned.getMessage());}});}
}

发送消息,指定消息ID,消息ConfirmCallback

@Testvoid testConfirmCallback(){//创建cdCorrelationData cd = new CorrelationData(UUID.randomUUID().toString());//添加confirmcallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onSuccess(CorrelationData.Confirm result) {log.info("消息发送成功");if(result.isAck()){log.info("消息发送成功");//收到ack}else{//收到nacklog.error("消息发送失败");}}@Overridepublic void onFailure(Throwable ex) {log.error("消息发送失败,原因{}",result.getReason);}});rabbitTemplate.convertAndSend("confirm.exchange","1","hello,confirm",cd);}

MQ可靠性

默认情况下,RabbitMQ会将收到的消息保存在内存中以降低收发消息的延迟,会导致两个问题

  • MQ宕机,内存中的消息会消失
  • 内存空间有限,消费者故障或消息处理过慢,会导致消息堆积,引发MQ阻塞
数据持久化
  • 持久化队列(Persistent Queue)
    队列本身的元数据(如名称、配置)和队列中的消息会被持久化存储(通常写入磁盘或数据库)。即使消息队列服务重启、崩溃或服务器断电,队列和消息也不会丢失,恢复服务后可继续处理。
  • 交换机持久化
  • 消息持久化 delivery_mode = 2
Lazy Queue
  • 接收到消息直接存入磁盘而非内存(内存只保留最近的消息,默认2048条)
  • 消费者要消费信息时才会从磁盘中读取到内存
  • 支持数百万条消息存储

消费者可靠性

消费者确认机制

消费者处理消息结束后,应该向RabbitMQ发送回执,说明信息处理状态

  • ack: 成功处理消息,RabbitMQ从队列中删除该消息
  • nack: 失败,需要再次投递消息
  • reject: 失败并拒绝该消息,RabbitMQ从队列中删除该消息
spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: auto #none关闭, auto自动, manual手动
none:消息投递到消费者立刻返回ack
消息失败处理

当消费者出现异常时,消息会不断requeue,无限循环,使mq的消息处理飙升

解决方法:使用retry机制

spring:rabbitmq:listener:simple:prefetch: 1retry:enabled: true #开启消费者失败重试max-attempts: 3 #最大重试次数initial-interval: 1000 #第一次重试间隔multiplier: 1 #每次重试间隔倍数stateless: true #无状态;false有状态。默认无状态,如果业务中包含事务改为false

如果重试次数耗尽消息依然失败,则需要有MessageRecoverer接口来处理

  • RejectAndDontRequeueRecoverer:重试耗尽后直接reject,默认
  • ImmediateRequeueMessageRecovere: 返回nack,消息重新入队
  • RepublishMessageRecoverer: 将失败消息投递到指定的交换机,如error.direct,之后由人工处理(由该交换机路由到对应的错误队列(如 error.queue)。)
业务幂等性

幂等 指的是f(x) = f(f(x))。在程序开发中指同一个业务执行一次或多次对业务状态的影响是一致的。

  1. 给每个消息设置唯一的id,业务处理成功后将id放入数据库,下次接受消息在数据库查询id是否存在
@Bean
public MessageConverter jacksonMessageConvertor(){Jackson2JsonMessageConverter jjmc = new Jackson2JasonMessageConverter();jjmc.setCreateMessageIds(true);//发送消息时附带UUIDreturn jjmc;
}
  1. 结合业务本身

延迟消息

生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息

应用:延迟任务

死信交换机

成为死信(dead letter条件:

  • 消费者使用reject,nack声明消费失败,并且requeue为false
  • 消息是一个过期消息,超时无人消费
  • 消息队列堆积满了,最早的消息可能成为死信

如果队列通过dead-letter-exchange属性指定了一个交换机,死信就会投递到这个交换机中,这个交换机为死信交换机(Dead Letter Exchange,DLX)

延迟消息插件

原理是设计了一种支持延迟消息功能的交换机,消息投递到交换机后可以暂存一定时间,到期后再投递到队列

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "hmall.queue1",durable = "true"),exchange = @Exchange(name = "hmall.exchange1", type = "direct",delayer = "true"),key = {"red","blue"}//创建两个队列,Key分别为red和blue,延迟消息设为true,持久化队列
))

延迟订单支付

http://www.dtcms.com/a/430555.html

相关文章:

  • @ResponseStatus 注解详解
  • Linux--权限
  • 【连载3】MySQL 的 MVCC 机制剖析
  • C++封装和继承特性
  • Linux(操作系统)文件系统--对打开文件的管理
  • 【Unity笔记】Unity XR 模式下 Point Light 不生效的原因与解决方法
  • 图片设计网站推荐wordpress下载的主题怎么安装
  • 分布式存储分片核心:从哈希取模到Redis哈希槽,从哈希类到非哈希类
  • C++ 操作 Redis
  • 旅游网站开发文献综述沈阳做网站大约要多少钱
  • 精美个人网站wordpress设置网站主题
  • PyCharm保姆级详细使用手册(Python新手快速上手篇)
  • 3.springboot-容器功能-@注解
  • python开发手机网站开发今天时政新闻热点是什么
  • 【网络编程】深入 HTTP:从报文交互到服务构建,洞悉核心机制
  • java面试0119-java中创建对象的方式?
  • 线程中互斥锁和读写锁相关区别应用示例
  • 网站开发logo绍兴网页设计
  • 2017主流网站风格win7 iis配置网站 视频教程
  • wordpress同步微信公众号seo外包是什么
  • 如何评价一个网站做的好不好展厅网站
  • wordpress站点克隆vip影视建设网站官网
  • 网站免费申请注册软件开发人员犯罪
  • 优秀个人网站设计模板互联网技术发展现状
  • 云南做网站价格网站的策划书
  • 做本地网站要服务器吗自动化毕设题目网站开发
  • 网站后端技术有哪些文学网站做编辑
  • 做淘客应该知道的网站咸阳学校网站建设费用
  • 适合女生做的网站投资公司网站设计
  • 专业网站维护如何免费建立自己的网页