当前位置: 首页 > news >正文

徐州企业网站推广请人做网站买断域名

徐州企业网站推广,请人做网站买断域名,Wordpress如何自定义小工具,职业装定制文章目录发送方确认confirm 确认模式1. 配置 RabbitMQ2. 设置确认回调逻辑并发送消息3. 测试完整代码return 退回模式1. 配置 RabbitMQ2. 设置返回回调逻辑并发送消息3. 测试常见面试题如何保证 RabbitMQ 消息的可靠传输发送方确认 在使用 RabbitMQ 的时候,可以通过…

文章目录

  • 发送方确认
    • confirm 确认模式
      • 1. 配置 RabbitMQ
      • 2. 设置确认回调逻辑并发送消息
      • 3. 测试
      • 完整代码
    • return 退回模式
      • 1. 配置 RabbitMQ
      • 2. 设置返回回调逻辑并发送消息
      • 3. 测试
    • 常见面试题
      • 如何保证 RabbitMQ 消息的可靠传输

发送方确认

在使用 RabbitMQ 的时候,可以通过消息持久化来解决因为服务器的一次崩溃而导致的消息丢失

但是还有一个问题,当消息的生产者将消息发送初期之后,消息到底有没有正确地到达服务器呢?

  • 如果在消息到达服务器之前已经丢失(比如 RabbitMQ 重启,那么 RabbitMQ 重启期间生产者消息投递失败),持久化操作也解决不了这个问题,因为消息根被没有到达服务器,何谈持久化?

RabbitMQ 为我们提供了两种解决方案:

  1. 通过事务机制实现
  2. 通过发送方确认(publisher confirm)机制实现

事务机制比较消耗性能,在实际工作中使用也不多,我们主要极少 confirm 机制来实现发送方的确认。

RabbitMQ 为我们提供了两个方式来控制消息的可靠性投递

  1. confirm 确认模式
  2. return 返回模式

confirm 确认模式

Producer 在发送消息的时候,对发送端设置一个 ConfirmCallback 的监听,无论消息是否到达 Exchange,这个监听都会被执行

  • 如果 Exchange 成功收到,ACKAcknowledge character,确认字符)为 true
  • 如果没有收到消息,ACK 就为 false

步骤如下:

  1. 配置 RabbitMQ
  2. 设置确认回调逻辑并发送消息
  3. 测试

接下来看实现步骤:

1. 配置 RabbitMQ

配置确认机制
spring:  rabbitmq:  addresses: amqp://guest:guest@127.0.0.1:5672/coding  listener:  simple:  acknowledge-mode: manual  # 消息接收确认publisher-confirm-type: correlated  # 消息发送确认

2. 设置确认回调逻辑并发送消息

无论消息确认成功还是失败,都会调用 ConfirmCallbackconfirm 方法。

  • 如果消息成功发送到 Brokeracktrue
  • 如果消息发送失败,ackfalse,并且 cause 提供失败的原因
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {  RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);  rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {  @Override  public void confirm(CorrelationData correlationData, boolean ack, String cause) {  System.out.println("");  if (ack) {  System.out.printf("消息接收成功,id: %s \n", correlationData.getId());  } else {  System.out.printf("消息接收失败,id: %s \n", correlationData.getId());  }  }  });  return rabbitTemplate;  
}
  • ConfirmCallback() 里面的 confirm(@Nullable CorrelationData, boolean ack, @Nullables String cause)
    • 确认回调
    • correlationDate:发送消息时的附加信息,通常用于在确认回调中识别特定的消息
    • ack:交换机是否收到消息,收到为 true,未收到为 false
    • cause:当消息确认失败时,这个字符串参数将提供失败的原因,这个原因可以用于调试和错误处理。成功时,causenull
@Resource(name = "confirmRabbitTemplate")  
private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")  
public String confirm() {  CorrelationData correlationData = new CorrelationData("1");  confirmRabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "confirm", "confirm test...", correlationData);  return "确认成功";  
}

RabbitTemplate.ConfirmCallbackConfirmListener 区别
RabbitMQ 中,ConfirmListenerConfirmCallback 都是用来处理消息确认的机制,但他们属于不同的客户端库,并且使用的场景和方式有所不同

  1. ConfirmListenerRabbitMQ Java Client 库中的接口。这个库是 RabbitMQ 官方提供的一个直接于 RabbitMQ 服务器交互的客户端库。ConfirmListener 接口提供了两个方法:handleAckhandleNack,用于处理消息确认好否定确认的事件
  2. ConfirmCallbackSpring AMQP 框架中的一个接口。专门为 Spring 环境设计,用于简化与 RabbitMQ 交互的过程,它只包含一个 confirm 方法,用于处理消息确认的回调
    SpringBoot 应用中,通常会使用 ConfirmCallback,因为它与 Spring 框架的其他部分更加整合,可以利用 Spring 的配置和依赖注入功能。而在 RabbitMQ Java Client 库时,则可能会直接实现 ConfirmListener 接口,更直接的与 RabbitMQChannel 交互

3. 测试

运行程序,调用接口: http://127.0.0.1:8080/producer/confirm

观察控制台,消息确认成功:image.png|254

接下来把交换机名称改一下,重新运行,会触发另一个结果

// 发送失败
confirmRabbitTemplate.convertAndSend("confirm_exchange1", "confirm", "confirm test...", correlationData);

运行结果:image.png

  • 原因中,明确显示 “no exchange 'confirm_exchange1' in vhost 'bite' “,也就是说,bite 这个虚拟机,没有名字为 confirm_exchange1 的交换机

完整代码

public class Constant {  public static final String ACK_EXCHANGE_NAME = "ack_exchange";  public static final String ACK_QUEUE = "ack_queue";  
}
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Bean("rabbitTemplate")public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.printf("");if (ack){System.out.printf("消息接收成功, id:%s \n", correlationData.getId());} else {System.out.printf("消息接收失败, id:%s, cause: %s", correlationData.getId(), cause);}}});return rabbitTemplate;}
}
import com.bite.rabbitmq.constant.Constant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitmqConfig {//publish-confirm模式//1. 交换机@Bean("confirmExchange")public Exchange confirmExchange() {return ExchangeBuilder.topicExchange(Constant.CONFIRM_EXCHANGE_NAME).durable(true).build();}//2. 队列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(Constant.CONFIRM_QUEUE).build();}//3. 队列和交换机绑定 Binding@Bean("confirmBinding")public Binding confirmBinding(@Qualifier("confirmExchange") Exchange exchange, @Qualifier("confirmQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();}
}
package com.bite.rabbitmq.controller;import com.bite.rabbitmq.constant.Constant;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/product")
public class ProductController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() throws InterruptedException {CorrelationData correlationData1 = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME,"confirm", "confirm test...", correlationData1);//发送失败// confirmRabbitTemplate.convertAndSend("confirm_exchange1", "confirm", "confirm test...", correlationData1);Thread.sleep(2000);return "确认成功";}
}

return 退回模式

消息到达 Exchange 之后,会根据路由规则匹配,把消息放入 Queue 中,ExchangeQueue 的过程:

  • 如果一条消息无法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等),可以选择把消息退回给发送者
  • 消息返回发送者时,我们可以设置一个返回回调方法,对消息进行处理

步骤如下:

  1. 配置 RabbitMQ
  2. 设置返回回调逻辑并发送信息
  3. 测试

接下来看实现步骤

1. 配置 RabbitMQ

# 配置确认机制
spring:  rabbitmq:  addresses: amqp://guest:guest@127.0.0.1:5672/coding  listener:  simple:  acknowledge-mode: manual  # 消息接收确认publisher-confirm-type: correlated  # 消息发送确认

2. 设置返回回调逻辑并发送消息

  • 消息无法被路由到人恶化队列,它将返回给发送者,这时 setReturnCallback 设置的回调将被触发
/**  * retrun 返回模式  */  
@Primary  
@Bean("confirmRabbitTemplate2")  
public RabbitTemplate confirmRabbitTemplate2(CachingConnectionFactory connectionFactory) {  RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);  rabbitTemplate.setMandatory(true);  rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {  @Override  public void returnedMessage(ReturnedMessage returnedMessage) {  System.out.printf("消息被退回:%s", returnedMessage);  }  });  return rabbitTemplate;  
}
@Resource(name = "confirmRabbitTemplate2")  
private RabbitTemplate confirmRabbitTemplate2;@RequestMapping("/msgReturn")  
public String msgReturn() {  CorrelationData correlationData = new CorrelationData("2");  confirmRabbitTemplate2.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME, "confirm1", "message return test...", correlationData);  return "消息发送成功";  
}
  • 使用 RabbitTemplatesetMandatory 方法设置消息的 mandatory 属性为 true(默认为 false
  • 这个属性的作用是告诉 RabbitMQ,如果一条消息无法被任何队列消费,RabbitMQ 应该将消息返回给发送者,此时 ReturnCallback 就会被触发

回调函数中,有一个参数:ReturnedMessage,包含以下属性

public class ReturnedMessage {  // 返回的消息对象,包含了消息体和消息属性private final Message message;  // 由 Broker 提供的回复码,表示消息无法路由的原因,通常是一个数字代码,每个数字代表不同的含义private final int replyCode;// 一个文本字符串,提供了无法路由消息的额外信息或错误描述  private final String replyText;  // 消息被发送到的交换机名称private final String exchange;// 消息的路由键,即发送消息时指定的键  private final String routingKey;
}

3. 测试

运行程序,调用接口: http://127.0.0.1:8080/producer/msgReturn

观察控制台,消息被退回:image.png

常见面试题

#高频面试

如何保证 RabbitMQ 消息的可靠传输

先放一张 RabbitMQ 消息传递图image.png

从这个图中,可以看出,消息可能丢失的场景以及解决方案:

  1. 生产者将消息发送到 RabbitMQ 失败

    1. 可能原因:网络问题等
    2. 解决办法:参考——发送方确认,confirm 模式
  2. 消息在交换机中无法路由到指定队列:

    1. 可能原因:代码或配置层面错误,导致消息路由失败
    2. 解决办法:参考——发送方确认,return 模式
  3. 消息队列自身数据丢失

    1. 可能原因:消息到达 RabbitMQ 之后,RabbitMQ Server 宕机导致消息丢失
    2. 解决办法:参考——持久性。开启 RabbitMQ 持久化,就是消息写入之后会持久化到磁盘,如果 RabbitMQ 挂了,回复之后会自动读取之前存储的数据(极端情况下,RabbitMQ 还未持久化就挂了,可能导致少量的数据丢失,这个概率极低,也可以通过集群的方式提高可靠性)
  4. 消息者异常,导致消息丢失

    1. 可能原因:消息到达消费者,还没来得及消费,消费者宕机、消费者逻辑有问题
    2. 解决办法:参考——消息确认RabbitMQ 提供了消费者应答机制来使 RabbitMQ 能够感知到消费者是否消费成功消息。默认情况下消费者应答机制是自动答应的,可以开启手动确认,当消费者确认消费成功后才会删除消息,从而避免消息丢失。除此之外,也可以配置重试机制,当消息消费异常时,通过消息重试确保消息的可靠性
http://www.dtcms.com/a/618833.html

相关文章:

  • 锦州网站制作公司关于网站项目建设的申请
  • 什么是CSV(周期同步速度模式)和PV(轮廓速度模式)
  • 有些人做网站不用钱的 对吗网页界面设计的特点
  • 从开发到部署:Docker 化前端应用全流程指南
  • 网站开发安全小贴士苏州seo网站推广公司
  • 临沂住房和城乡建设局网站打不开男人与女人做视频网站
  • USDT区块链转账 vs SWIFT跨境转账:技术逻辑与场景博弈的深度拆解
  • 网站设计外文文献苏州怎么做网站排名优化
  • 数据结构 2.0
  • 公募基金与私募基金评价指标深度研究
  • volatile和优化
  • 笔试强训练习-8
  • CodexField Wallet:贯穿创作、资产与智能协作的统一账户层
  • 高并发优惠权益聚合接口的优雅实现(含超时控制 + 来源标识 + Fallback 降级)
  • GrokAI9999 | 支持无敏感AI生图,不限次生成视频
  • 算法题 双指针
  • 问答网站怎么做营销建设单位到江川区住房和城乡建设局网站
  • 大数据实用指南:etl + ambari
  • ONAP网络自动化平台介绍与架构
  • 网站开发确认表购物网站后台模板下载
  • 网站模板安全管理系统外贸网站定做
  • 商务礼品网站模板买卖友情链接
  • 解码线程编程
  • 杰恩设计网站是谁做的百度一下百度一下百度一下
  • 哈希表和冲突处理
  • seo网站建设是什么wordpress首页加广告位
  • 网站开发所需硬件山东建设银行官方网站
  • Adversarial AtA学习(第二十三周周报)
  • 阿里云企业网站备案农村建设自己的网站首页
  • Unity UI框架笔记