当前位置: 首页 > wzjs >正文

wordpress 可视化建站企业服务类网站

wordpress 可视化建站,企业服务类网站,新密郑州网站建设,wordpress公告模板一.发送者的可靠性 1.生产者重试机制 修改publisher模块的application.yaml文件 spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下…

一.发送者的可靠性

1.生产者重试机制

修改publisher模块的application.yaml文件

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

注意:

①当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是 阻塞式 的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
②如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。 

2.生产者确认机制

RabbitMQ提供了生产者消息确认机制,包括 Publisher Confirm 和 Publisher Return 两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的 回执

如何返回基本内容如下:

① 当消息投递到MQ,但是路由失败时,通过 Publisher Return 返回异常信息,同时通过 Publisher Confirm 返回ACK 的确认信息,代表投递成功。
② 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功。
③ 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功。
④ 其它情况都会返回NACK,告知投递失败。

其中 ack 和 nack 属于 Publisher Confirm 机制,ack 是投递成功;nack 是投递失败。而return 则属于 Publisher Return 机制。默认两种机制都是关闭状态,需要通过配置文件来开启。

①开启生产者确认机制

在publisher模块的 application.yaml 中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型  publisher-returns: true # 开启publisher return机制

这里 publisher-confirm-type 有三种模式可选:

① none:关闭confirm机制
② simple:同步阻塞等待MQ的回执
③ correlated:MQ异步回调返回回执

②定义ReturnCallback

每个 RabbitTemplate 只能配置一个 ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
③定义ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

@Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

总结:Publisher Confirm 用来确认消息是否发送到MQ,而Publish Return 用来通知生产者哪些消息由于路由失败没有被接收

注意:

开启生产者确认比较消耗MQ性能,一般不建议开启。 

二.MQ的可靠性

1.数据持久化

交换机持久化,队列持久化,消息持久化(先保存到内存在写入磁盘),这三个持久化都是默认开启的。如果消息类型是非持久化的,只有在消息队列满了后会被迫写入磁盘。

总结:

持久化是持续将消息写入磁盘,非持久化是当mq内存被使用完毕后才将消息写入磁盘,因此性能较差。

2.LazyQueue

① 接收到(不论临时还是持久的消息)消息后直接存入磁盘而非内存

② 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)(如果消费者的速度很快也会把消息提前缓存到内存)
③ 支持数百万条的消息存储

这种模式属于数据持久化的升级版。

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。

三.消费者的可靠性

1.消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  - ack:成功处理消息,RabbitMQ从队列中删除该消息
  - nack:消息处理失败,RabbitMQ需要再次投递消息
  - reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

     ①none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
     ②manual:手动模式。需要自己在业务代码中调用api,发送 ack 或 reject ,存在业务入侵,但更灵活。
     ③auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack.  当业务出现异常时,根据异常判断返回不同结果:

  • 如果是 业务异常,会自动返回 nack,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。
  • 如果是 消息处理或校验异常,自动返回 reject。

 通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做处理

2.失败重试机制(对消费者确认机制的增强)

如果上面的代码一直返回nack会导致无线循环。

所以我们配置消费者自己重试,如果超过了配置重试的次数,就会返回reject

修改consumer服务的application.yml文件,添加内容:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

配置之后出现的现象:

- 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
- 本地重试3次以后,抛出了 AmqpRejectAndDontRequeueException 异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是 reject 

3.自定义失败处理策略(对消费者重试机制的增强)

有上面失败之后是直接返回的reject,这可能并不是我们想要返回的结果,于是我们有了失败处理策略。

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery 接口来定义的,它有3个不同实现:

  ①RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject ,丢弃消息。默认就是这种方式 (黑马说不可选,下面两个可选)
  ②ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack ,消息重新入队 
  ③RepublishMessageRecoverer :重试耗尽后,将失败消息投递到指定的交换机,然后转入到指定的队列,后续由人工集中处理。

代码实现:

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); //这里的with是routingkey}// 这个是配置消息处理失败之后投入到那个交换机@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

4.确保业务幂等性(解决消费者重复消费的问题)

幂等性:指同一个业务,执行一次或多次对业务状态的影响是一致的。

数据的删除,查询一般是幂等的,但是修改和新增不是幂等的,案例如下:

所以,我们要尽可能避免业务被重复执行。

为了解决上面图中的问题我们有了如下的解决方案:

①使用唯一消息id

给每一个消息都设置一个唯一ID,用ID区分是否被消费过。当我们消费一个消息的时候,先在数据库中查询是否存在这个数据的ID,如果不存在就消费。如果存在就说明这个消息之前被消费过。

给消息设置唯一id:这就是在配置消息转换器的时候添加了一点代码

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

 获取消息的id:

@RabbitListener(queues = "simple.queue")
public void listensimpleQueue(Message message) {log.info("监听到simple.queue的消息:ID:【{}】", message.getMessageProperties().getMessageId());log.info("监听到simple.queue的消息:【{}】", new String(message.getBody()));
}

缺点:

 但是这种方法也有自己的缺点,也就是对业务逻辑有侵入性,而且还有额外的数据库操作。 

②根据业务判断

根据上面出现非幂等性问题,我们可以把业务逻辑改成这样就可以保证业务的幂等性了。

http://www.dtcms.com/wzjs/609204.html

相关文章:

  • 西安企业网站设计公司wordpress网址域名
  • 一些做的好的网站域名创建
  • 南通seo网站建设费用适合小公司的记账软件
  • 做头像的网站有哪些wordpress会员等级
  • 北京cms建站模板易无忧建站
  • 新浪博客怎么给自己网站做链接百度四川建设厅网站
  • 网站空间文件夹视频网站建设 知乎
  • 定制网站开发方案wordpress自定义文章代码和样式
  • 网站搜索引擎优化的基本内容微擎商城
  • 作品展示的网站做网站下载哪个软件
  • 微网站好制作吗北京公司地址推荐
  • 山东建设报网站中企动力邮箱登录网址
  • 手机 网站内 搜索网站建设哪个空间比较好
  • 商丘网站建设运营公司怎么区分用vs和dw做的网站
  • 做网站值钱吗网站安全监测预警平台建设成效
  • j2ee 建设简单网站域名访问wordpress
  • 图书馆网站设计方案抖音搜索关键词排名查询
  • 网站建设方案书doc模板网站文件上传wordpress修改
  • 网站建设管理的建议开发一个页面多少钱
  • 一起做的网站重庆顶呱呱网站建设
  • 嘉兴市南湖区建设街道网站建设网站需要了解些什么东西
  • 什么是网站实施太湖县城乡建设局网站
  • 做体育直播网站抓好门户网站 建设
  • 全国建设造价信息网站网站加ico
  • 景区网站建设要求企业网站建设 价格
  • wordpress网站二次开发学网络工程师
  • zencart 官方网站少儿编程加盟店8
  • 做自媒体可以搬运国外网站新闻吗一个公司的网址怎么弄
  • 网站 数据库+1html5网页设计教程
  • 西宁网站建设优化百度seo推广怎么做