当前位置: 首页 > news >正文

从零开始学习RabbitMQ

学习RabbitMQ的先导知识

RabbitMQ是一款高性能的异步通讯组件。

1 同步通讯:发送方发送请求后,必须等待接收方处理并返回响应。

2 异步通讯:发送方发送请求后,无需等待接收方的即时响应,可以立即返回并继续处理其他任务。

3 同步调用

同步调用是程序或服务间交互的一种模式,核心特征是调用方发起请求后,必须等待被调用方处理完成并返回结果,期间调用方会处于阻塞状态,无法执行其他操作,直到拿到响应后才继续后续流程。

维度优点缺点
开发成本流程线性,无需处理异步回调,代码简单需处理超时、重试逻辑,否则易导致失败
资源利用率逻辑直观,调试方便等待期间资源(线程、连接)被占用,吞吐量低
容错能力结果即时可知,便于快速发现问题被调用方故障(宕机、超时)会直接导致调用失败
实时性时效性强,能立即获取结果,满足实时业务需求无法应对 “长耗时任务”(如大数据计算),会导致调用方长期阻塞

4 异步调用

异步调用是程序或服务间交互的一种非阻塞模式,核心特征是调用方发起请求后无需等待被调用方返回结果,可立即继续执行其他任务,被调用方处理完成后会通过预设方式(如回调、消息通知)反馈结果(或无需反馈)。

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

· 消息发送者:投递消息的人,就是原来的调用方

· 消息代理:管理、暂存、转发消息,你可以把它理解成微信服务器

· 消息接收者:接收和处理消息的人,就是原来的服务提供方

异步调用的优势是什么?

· 耦合度低,拓展性强

· 异步调用,无需等待,性能好

·故障隔离,下游服务故障不影响上游业务

· 缓存消息,流量削峰填谷

异步调用的问题是什么?

· 不能立即得到调用结果,时效性差

· 不确定下游业务执行是否成功

· 业务安全依赖于Broker(代理)的可靠性

5 MQ技术选型:

MQ(MessageQueue),消息队列,也是异步调用中的Broker

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP, XMPP, SMTP, STOMPOpenWire, STOMP,
REST, XMPP, AMQP
自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网:https://www.rabbitmq.com/

RabbitMQ的整体架构及核心概念

1 publisher:消息发送者

2 consumer:消息消费者

3 queue:队列,存储消息

4 exchange:交换机,负责路由消息给队列(可路由一个或多个)

5 virtual-host:虚拟主机,起到数据隔离的作用

生产者发消息到交换机,交换机路由消息给队列,消费者监听队列拿到消息

交换机不能存储消息,只能转发消息。

若多个队列绑定同一台交换机,交换机转发消息,每个队列都会收到消息。

Java客户端

AMQP:Advanced Message Queuing Protocol(高级消息队列协议),是一种协议,是由于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

Spring AMQP:是基于AMQP协议定义的一套API规范(接口),提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象接口,spring-rabbit是底层的默认实现。

SpringAmqp官网:https://spring.io/projects/spring-amqp

SpringAMQP收发消息:

1 引入spring-boot-starter-amqp依赖

2 配置rabbitmq服务端信息

3 利用RabbitTemplate发送消息

4 利用@RabbitListener注解声明要监听的队列,监听消息

Work Queues

Work Queues,任务模型,就是让多个消费者绑定到一个队列,共同消费队列中的消息。

Work模型的使用:(可以解决消息堆积

  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

Fanout交换机

真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机有3种:

  • Fanout  广播
  • Direct   定向
  • Topic  话题

Fanout Exchange会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式。

交换机的作用:

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • FanoutExchange会将消息路由到每个绑定的队列

Direct交换机

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

● 每一个Queue都与Exchange设置一个BindingKey(不同queue的BindingKey可一致,即实现Fanout广播的作用)

● 发布者发送消息时,指定消息的RoutingKey

● Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

Topic交换机(推荐)

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并以.分割。

Queue与Exchange指定BindingKey时可以使用通配符:

◆#:代指0个或多个单词

◆ *: 代指一个单词

Direct交换机和Topic交换机:

  • · Topic交换机接收的消息RoutingKey可以是多个单词,以.分割
  • · Topic交换机与队列绑定时的bindingKey可以指定通配符
  • · #:代表0个或多个词
  • · *: 代表1个词

声明队列交换机

在实际开发中,我们一般不用控制台来声明交换机和队列,而是使用java代码

1 SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  • · Queue:用于声明队列,可以用工厂类QueueBuilder构建也可以new
  • · Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建也可以new
  • · Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建也可以new

在消费者方声明一个Fanout交换机 并基于@Bean创建队列与其绑定

但这种方式太繁琐 不建议使用

2 SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:

消息转换器

建议采用JSON序列化代替默认的JDK序列化 默认JDK序列化在传入Object对象时会将其序列化变为乱码,占用内存且易引发安全问题。

1 在publisher和consumer中都要引入jackson依赖(也可以在父工程中引入)

2 在publisher和consumer的启动类中都要配置MessageConverter

消息可靠性问题

RabbitMQ 的消息可靠性是分布式系统中非常关键的问题,核心目标是确保消息从生产者发送RabbitMQ 服务器存储消费者处理的全链路不丢失。

发送者的可靠性

1 生产者重连

有时候由于网络波动,可能出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:

当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

2 生产者确认

侧重于消息发送失败的解决方法

RabbitMQ了Publisher ConfirmPublisher Return两种确认机制。开启确机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

1 在punblisher的application.yml中添加配置:

publisher-confirm-type有三种:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执消息
  • correlated:MQ异步回调方式返回回执消息

2 每个RabbitTemplate只能配置一个ReturnCallback,所以在项目启动过程中配置:

3 发送消息,指定消息ID,消息ConfirmCallback

如何处理生产者的确认消息?

· 生产者确认需要额外的网络和系统资源开销,尽量不要使用

· 如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题

· 对于nack消息可以有限次数重试,依然失败则记录异常消息

MQ的可靠性

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
· 一旦MQ宕机,内存中的消息会丢失
· 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞

数据持久化

RabbitMQ实现数据持久化包括3个方面:

· 交换机持久化  durable=true

· 队列持久化  durable=true

· 消息持久化  deliveryMode=2

Lazy Queue

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列。

惰性队列的特征如下:

  • · 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
  • · 消费者要消费消息时才会从磁盘中读取并加载到内存
  • · 支持数百万条的消息存储

在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

要设置一个队列为惰性队列,只要在声明队列时,指定x-queue-mode属性为lazy即可

或使用注解

RabbitMQ如何保证消息的可靠性

· 首先通过配置可以让交换机、队列、以及发送的消息都持久化。这样

队列中的消息会持久化到磁盘,MQ重启消息依然存在。

· RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后会称为

队列的默认模式。LazyQueue会将所有消息都持久化。

· 开启持久化和生产者确认时,RabbitMQ只有在消息持久化完成后才

会给生产者返回ACK回执

消费者的可靠性

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer  Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
· ack:成功处理消息,RabbitMQ从队列中删除该消息
· nack:消息处理失败,RabbitMQ需要再次投递消息
· reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.
当业务出现异常时,根据异常判断返回不同结果:
◆ 如果是业务异常,会自动返回nack
◆ 如果是消息处理或校验异常,自动返回reject

消息失败处理

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列:

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • · RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • · ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • · RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

将失败处理策略改为RepublishMessageRecoverer:

1 首先,定义接收失败消息的交换机、队列及其绑定关系

2 然后,定义RepublishMessageRecoverer:

消费者如何保证消息一定被消费?

· 开启消费者确认机制为auto,由spring确认消息处理成功后返回ack,异常时返回nack

· 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x))。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。我们要保障每个业务的幂等性。

唯一消息ID

方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息:

1 每一条消息都生成一个唯一的id,与消息一起投递给消费者。

2 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库

3 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

业务判断

方案二,是结合业务逻辑,基于业务本身做判断。以我们的业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付。只有未支付订单才需要修改,其它状态不做处理。

如何保证支付服务与交易服务之间的订单状态一致性?

◆ 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
◆ 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了MQ的持久化,避免因服务宕机导致消息丢失。
◆最后,我们还在交易服务更新订单状态时做了业务幂等判断,避免因消息重复消费导致订单状态异常。

如果交易服务消息处理失败,有没有什么兜底方案?

◆ 我们可以在交易服务设置定时任务(SpringTask),定期查询订单支付状态。这样即
便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

延迟消息

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。

延迟任务:设置在一定时间之后才执行的任务。

死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

  • · 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false
  • · 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
  • · 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

延迟消息插件

RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

发送消息时需要通过消息头x-delay来设置过期时间:

http://www.dtcms.com/a/516509.html

相关文章:

  • 台州市住房和城乡建设局网站做美容美发学校网站公司
  • [答疑]考虑复用,尺度应该怎样把握
  • 注册网站借钱平台犯不犯法个人网站logo需要备案吗
  • 最新电大网站开发维护竞价托管推广代运营
  • 字符串统计
  • Docker与Tomcat:一键部署Java Web应用的完美组合
  • 【同步/异步 日志系统】 --- 前置技术
  • 图论基础和表示
  • 网站建设为了什么怎么看网站谁做的
  • [小白]spring boot接入emqx
  • Spring Boot 实现GZIP压缩优化
  • Spring Boot使用Redis实现消息队列
  • 互联网大厂Java面试实战:以Spring Boot与微服务为核心的技术场景剖析
  • 做网站页面的软件毕业设计网站成品
  • 《一个浏览器多人用?Docker+Neko+cpolar实现跨网共享》
  • design设计网站网站优化方法页面
  • C++基础:(十七)模版进阶:深入探索非类型参数、特化、分离编译与实战技巧
  • 《Git:从入门到精通(五)—— Git:Gitee远程仓库创建与克隆指南》
  • UML学习文档(一)
  • 淘宝放单网站开发网站wordpress错误
  • Latex中的错误汇总
  • huggingface transformers调试问题--加载本地路径模型时pdb断点消失
  • KMP算法详解 -- 串的模式匹配
  • 用php做网站的方法学网站建设前途
  • 网站不用下载免费软件曰本孕妇做爰网站
  • 【微信小程序 + 消息订阅 + 授权】 微信小程序实现消息订阅流程介绍,代码示例(仅前端)
  • 网站开发找哪家什么查网站是否降权
  • 【经典书籍】C++ Primer 第13类继承精华讲解
  • “VMware与vmx86驱动程序版本不匹配:预期为:417,实际为416。”解决步骤,亲测有效!!!
  • 查找组成一个偶数最接近的两个素数