当前位置: 首页 > news >正文

RabbitMQ可靠和延迟队列

在这里插入图片描述

核心配置类

@Configuration
public class RabbitMQConfig{//通知队列private static final String NOTIFICATION_QUEUE = "notification_queue"; //直接交换机private static final String DIRECT_EXCHANGE = "direct_exchange";//路由键private static final String ROUTING_KEY = "notify.routing.key"; // 创建队列  (控制台Durable参数表示持久化)@Bean  public Queue notificationQueue() {  // 持久化队列(消息可靠性基础)  return new Queue(NOTIFICATION_QUEUE, true);  }  // 创建交换器  (控制台Durability表示持久化)@Bean  public DirectExchange directExchange() {  return new DirectExchange(DIRECT_EXCHANGE, true, false);  }  // 绑定队列与交换器  @Bean  public Binding queueBinding() {  return BindingBuilder.bind(notificationQueue())  .to(directExchange())  .with(ROUTING_KEY);  }  //创建Topic交换器@Beanpublic TopicExchange topicExchange(){return new TopicExchange("topic_exchange",true,false);}//绑定队列@Beanpublic Binding smsBinding(){return BindingBuilder.bind(smsQueue()).to(topicExchange()).with("notify.sms.*");}@Bean  public Binding emailBinding() {  return BindingBuilder.bind(emailQueue()).to(topicExchange()).with("notify.email.#");  }  //创建惰性队列@Beanpublic Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() //开启lazy模式.build()}//开启生产者确认机制@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate remplate = new RabbitTemplate(connectionFactory);template.setConfirmCallback((correlationData,ack,cause)->{if(ack){System.out.println("消息发送成功:" + correlationData.getId());}else{System.out.println("消息发送失败:" + cause)//这里可实现重试或日志记录}});return template;}//配置死信队列@Beanpublic Queue deadLetterQueue(){return new Queue("dead_letter_queue",true);}@Beanpublic DirectExchange deadLetterExchange(){return new DirectExchange("dead_letter_exchange",true,false);}//主队列绑定死信交换器@Beanpublic Queue notificationQueue(){Map<String,Object> args = new HashMap<>();//设置死信交换器和路由键args.put("x-dead-letter-exchange", "dead_letter_exchange");  args.put("x-dead-letter-routing-key", "dead.routing.key"); return new Queue(NOTIFICATION_QUEUE, true, false, false, args);}/**配置失败消息的交换机和队列*/@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.dierect");}@Beanpublic Queue errorQueue(){return new Queue("error.queue",true);}@Beanpublic Binding errorBinding(Queue errorQueue,DirectExchange errorMessageExchange){return BindingBuilder.build(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){returnnew RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

消息生产者

  • 生产者发送消息时连接MQ失败
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
  • 消息到达MQ后,处理消息的进程发生异常

配置文件中开启重试机制

spring:rabbitmq:publisher-confirm-type: correlated #开启publisher confirm机制,并设置MQ异步回调返回回执 simple表示同步阻塞等待MQ的回执publisher-returns: true #开启publisher return 机制connection-timeout: 1s# 设置MQ的连接超时时间template:retry:enabled: true# 开启超时重试机制(阻塞式,虽然提高了消息发送成功率,但消耗性能)initial-interval: 1000ms# 失败后的初始等待时间multiplier: 1# 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3# 最大重试次数

生产者消息确认机制(默认是关闭的,需要通过配置文件开启)

  • Publisher Confirm
  • Publisher Return

MQ会根据消息处理的情况返回不同的回执:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
  • 其它情况都会返回NACK,告知投递失败
/**定义ReturnCallback
*/
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
@Component  
public class NotificationProducer {  private final RabbitTemplate rabbitTemplate;  private static final String EXCHANGE_NAME = "direct_exchange";  private static final String ROUTING_KEY = "notify.routing.key";  public NotificationProducer(RabbitTemplate rabbitTemplate) {  this.rabbitTemplate = rabbitTemplate;  }  // 发送通知消息(支持JSON格式)  public void sendNotification(String message) {  rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message);  System.out.println("发送通知消息:" + message);  } //发送短信通知(路由键:notify.sms.10086) public void sendNotification(String message,String routKey){}/**定义ConfirmCallback*/CorrelationData cd = new CorrelationData();cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.confirm>(){@Overridepublic void onFailure(Throwable ex){//Future发生异常时的处理逻辑,基本不会触发log.error("send message fail",ex);}@Overridepublic void onsuccess(CorrelationData.Confirm result){//Future 接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){log.debug("发送消息成功,收到ack!");}else{log.error("发送消息失败,收到nack,reson:{}",result.getReason());}}});rabbitTemplate.convertAndSend("hmall.direct","q","hello",cd);
}  

消息积压问题:

  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞

出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。

  • 添加x-queue-mod=lazy参数即可设置队列为Lazy模式
  • 配置文件中可以通过@Bean创建设置lazy方法设置Lazy模式

在这里插入图片描述

消息消费者

  • 消息接收后尚未处理突然宕机
  • 消息接收后处理过程中抛出异常

消费者确认机制三种回执:

  • ack 成功处理消息,RabbitMQ从队列中删除消息
  • nack 消息处理失败,RabbitMQ需要再次投递消息
  • reject 消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
# 配置多线程并发数  
spring:  rabbitmq:  listener:  simple:  concurrency: 10  # 最小并发数  max-concurrency: 20 # 最大并发数  acknowledge-mode: none # 不做处理   manual 手动模式,需要在业务中调用api发送ack或reject  auto 自动模式业务异常会自动返回nack,正常返回ackretry:enable: true #开启消费者失败重试initial-interval: 1000s #初始的失败等待时长为1秒multiplier: 1 # 败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 #最大重试次数stateless: true# true无状态;false有状态。如果业务中包含事务,这里改为false
@Component  
public class NotificationConsumer {  @RabbitListener(queues = "notification_queue")  public void processNotification(Channel channel,String message) throw Exception{  try{String content = new String(message.getBody(),"UTF-8");// 这里添加具体通知逻辑(异步执行,不阻塞队列)}catch(Exception e){//处理失败,拒绝消息并放入死信队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}}  //监听lazy队列@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode",value = "lazy")))public void listenLazyQueue(String msg){log.info("接收到lazy.queue的消息:{}",msg);}@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg)throws InterruptedException{log.info("spring 消费者收到的消息:" + msg);if(true){throw new MessageConversionException("故意的")}log.info("消息处理完成");}
}  

失败处理策略

业务Controller接口

@RestController  
@RequestMapping("/notify")  
public class NotificationController {  private final NotificationProducer producer;  public NotificationController(NotificationProducer producer) {  this.producer = producer;  }  // 接收通知请求,异步发送消息  @PostMapping  public String triggerNotification(@RequestBody String content) {  producer.sendNotification(content);  return "通知已提交(异步处理中)";  }  
}  

延迟消息

订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。

实现延迟消息也有两种方案:

  • 死信交换机+TTL
  • 延迟消息插件

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

DelayExchange插件

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。

插件下载地址:GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
在这里插入图片描述

基于Docker安装

docker volume inspect mq-plugins #先查看RabbitMQ的插件目录对应的数据卷

在这里插入图片描述
插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

运行结果如下:
在这里插入图片描述

//基于注解的方式声明延迟交换机
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue",durable = "true"),exchange = @Exchange(name = "delay.direct",delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}",msg);
}
/**基于配置文件的方式
*/
@Slf4j
@Configuration
publicclass DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称.delayed() // 设置delay的属性为true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){returnnew Queue("delay.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}

发送延迟消息,必须通过x-delay属性设定延迟时间

@Test
void testPublisherDelayMessage() {// 1.创建消息String message = "hello, delayed message";// 2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}});
}

在这里插入图片描述
在这里插入图片描述

①、common模块中创建一个记录消息延迟时间的消息体

@Data
publicclass MultiDelayMessage<T> {/*** 消息体*/private T data;/*** 记录延迟时间的集合*/private List<Long> delayMillis;public MultiDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}publicstatic <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){returnnew MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));}/*** 获取并移除下一个延迟时间* @return 队列中的第一个延迟时间*/public Long removeNextDelay(){return delayMillis.remove(0);}/*** 是否还有下一个延迟时间*/public boolean hasNextDelay(){return !delayMillis.isEmpty();}
}

②、trade-service中定义一个常量类

public interface MqConstants {String DELAY_EXCHANGE = "trade.delay.topic";String DELAY_ORDER_QUEUE = "trade.order.delay.queue";String DELAY_ORDER_ROUTING_KEY = "order.query";
}

③、nacos中抽取一个共享配置shared-mq.xml

spring:rabbitmq:host: ${hm.mq.host:192.168.150.101}# 主机名port: ${hm.mq.port:5672}# 端口virtual-host: ${hm.mq.vhost:/hmall}# 虚拟主机username: ${hm.mq.un:hmall}# 用户名password: ${hm.mq.pw:123}# 密码listener:simple:prefetch: 1# 每次只能获取一条消息,处理完成才能获取下一个消息

trade-service模块添加共享配置
在这里插入图片描述

④、trade-service下单任务
在这里插入图片描述

⑤、消息监听

@Slf4j
@Component
@RequiredArgsConstructor
publicclass OrderStatusListener {privatefinal IOrderService orderService;privatefinal PayClient payClient;privatefinal RabbitTemplate rabbitTemplate;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE, type = ExchangeTypes.TOPIC),key = MqConstants.DELAY_ORDER_ROUTING_KEY))public void listenOrderCheckDelayMessage(MultiDelayMessage<Long> msg) {// 1.获取消息中的订单idLong orderId = msg.getData();// 2.查询订单,判断状态:1是未支付,大于1则是已支付或已关闭Order order = orderService.getById(orderId);if (order == null || order.getStatus() > 1) {// 订单不存在或交易已经结束,放弃处理return;}// 3.可能是未支付,查询支付服务PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);if (payOrder != null && payOrder.getStatus() == 3) {// 支付成功,更新订单状态orderService.markOrderPaySuccess(orderId);return;}// 4.确定未支付,判断是否还有剩余延迟时间if (msg.hasNextDelay()) {// 4.1.有延迟时间,需要重发延迟消息,先获取延迟时间的int值int delayVal = msg.removeNextDelay().intValue();// 4.2.发送延迟消息rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,message -> {message.getMessageProperties().setDelay(delayVal);return message;});return;}// 5.没有剩余延迟时间了,说明订单超时未支付,需要取消订单orderService.cancelOrder(orderId);}
}

相关文章:

  • 接收rabbitmq消息
  • 中心化交易所(CEX)架构:高并发撮合引擎与合规安全体系
  • [蓝桥杯 2024 国 Python B] 设计
  • TripGenie:畅游济南旅行规划助手:个人工作纪实(二十四)
  • Arduino入门教程:1、Arduino硬件介绍
  • LAN、WAN、WLAN、VLAN 、VPN对比
  • Java异步编程深度解析:从基础到复杂场景的难题拆解
  • 动态多目标进化算法:VARE(Vector Autoregressive Evolution)求解DF1-DF14,提供完整MATLAB代码
  • [服务器] Amazon Lightsail SSH连接黑屏的常见原因及解决方案
  • 曼昆《经济学原理》第九版 第十七章寡头垄断
  • 【leetcode】36. 有效的数独
  • 【Axure高保真原型】中继器表格更多操作
  • API:解锁数字化协作的钥匙及开放实现路径深度剖析
  • 产品升级 | 新一代高性能数据采集平台BRICK2 X11,助力ADAS与自动驾驶开发
  • 【AI】模型vs算法(以自动驾驶为例)
  • RPA与Agent技术如何结合,以实现跨系统、跨平台的工作流程自动化?
  • 本地docker部署的dify,不用git命令如何无损升级?
  • Redis分布式缓存(RDB、AOF、主从同步)
  • ArcGIS中坐标系一致但图层无法重叠问题解决
  • JavaWeb期末速成
  • 佛山营销网站建设服务/竞价网络推广外包
  • 可以做盗版漫画网站吗/做关键词优化
  • wordpress在手机版/windows优化大师好用吗
  • 手机上怎么做自己的网站/优化网站排名费用
  • java入门网站/媒体发布平台
  • 上海 网站开发 工作室/搜索引擎排名优化seo