当前位置: 首页 > news >正文

RabbitMQ的概述

Rabbit是一个公司名,MQ(Message queue)消息队列的意思,RabbitMQ是Rabbit企业下的一个消息队列产品。

RabbitMQ是一个实现了AMQP的消息队列的服务,是当前主流的消息中间件之一。

什么是MQ:

MQ本质是一个队列,FIFO先入先出,只不过队列中存放的内容是消息(Message)而已,消息是非常简单的,比如只包含文本字符串,JSON,也可以很复杂,比如内嵌对象。

MQ多用于分布式系统之间通信。

系统之间的调用通常有两种方式:

1.同步通信:

直接调用对方的服务,数据从一端出发后立即就可以达到另一端。

2.异步通信:

数据从一端发出后,先进入一个容器进行临时储存,当达到某种条件后,再由这个容器发送给另一端,这个容器的一个具体实现就是MQ(Message queue)

MQ的作用:

1.异步解耦:

在业务流程中,有些操作可能非常耗时,但并不需要立即返回结果,可以借助MQ把这些操作异步化,比如用户注册后发送注册短信或者邮件通知,可以作为异步任务进行处理,而不必等待这些操作完成后才告知用户注册成功。

2.流量削峰:

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样突发流量并不常见,但是如果投入大量资源来解决这类峰值,无疑是巨大的浪费,使用MQ能够使用关键组件支撑突发访问压力,不会因为突发流量而崩溃,比如秒杀或者促销,可以使用MQ来控制流量,将请求排队,然后根据自己的处理能力来消化这些请求。

3.消息分发:

当多个系统需要对同一数据做出响应,可以使用MQ来进行消息分发,比如支付成功,支付系统可以向MQ发送消息,其他系统订阅该消息,而无需轮询数据库。

4.延迟通知:

在需要在特定时间后发送通知场景,可以使用MQ的延迟队列消息功能,比如在电子商务平台中,如果用户下单后在一定时间内未付款,可以使用延迟队列在超时后自动取消订单。

RabbitMQ的工作流程:

先看看RabbitMQ的工作流程图:

Producer和Consumer:

Producer:生产者,是RabbitMQ Server的客户端,向RabbitMQ发送消息

Comsumer:消费者,也是RabbitMQ Server的客户端,从RabbitMQ中接受消息

Broker:就是RabbitMQ Server,主要是接收消息和收发消息。

Connection和Channel:

Connection:连接,是客户端和RabbitMQ服务器之间的一个TCP连接,这个连接是建立消息传递的基础,他负责传输客户端和服务器之间所有的数据和控制信息。

Channel:通道,信道,Channel是在Connection之上的一个抽象层,在RabbitMQ中,一个TCP连接有多个Channel,每个Channel都是独立的的虚拟链接,消息的发送和接收都是基于Channel的。

Virtual host:

Virtual host:虚拟主机,这是一个虚拟的概念,他为消息队列提供了一种逻辑上的隔离,对于RabbitMQ而言,一个BrokerServer上可以存在多个Virtual host,当多个不同用户使用同一个RabbitMQ Server提供服务时,可以虚拟划分出多个vhost,每个用户在自己的Virtual host中创建exchange/queue等。

Queue队列:

Queue:队列,是RabbitMQ内部对象,用于储存消息。

多个订阅者还可以订阅同一个队列。

Exchange:

Exchange:交换机,Message到达Broker的第一站,他负责接受生产者发送到的消息,并根据特定的规则把这些消息路由对应的一个或者多个Queue队列中。

工作流程:

RabbitMQ快速入门:

1.引入依赖:

<dependency><groupId>com.rabbitmq</groupId><artifacId>amqp-client</artifacId><version>5.7</version>
</dependency>

2.生产者代码:

1.建立连接:

//创建工厂
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);//默认端口号
factory.setVirtualHost("虚拟机名称");
factory.setUssername("用户名");
factory.setPassword("密码");
//创建连接Connection
Connection connection =factory.newConnection();

2.创建channel:

Channel channel=connection.createChannel();

3.声明一个队列:

一个队列中有以下参数,但是创建队列的时,并不是每个参数都会用到:

queueDeclare(String queue,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments)

queue:队列的名称。

durable:是否可持久化,true-设为持久化,持久化的队列是会存盘的,服务器重启后,消息不会丢失。

exclusive:

是否独占,只能有一个消费者监听队列

当Connection关闭时,是否删除队列。

autoDelete:是否自动删除,当没有Consumer时,自动删除。

argument:一些参数。

channel.queueDeclare("队列名称",true,false,false,null);

4.发送消息:

当一个新的RabbitMQ节点启动时,会预声明几个内置的交换机,内置交换机名称是空字符串(""),生产者发送的消息会根据队列名称直接路由到对应的队列。

basicPublish(String exchange,String routingKey,AMQP.BasicProperties props。byte[]  body);

exchange:交换机的名字,发送消息到哪个交换机中,简单模式下,默认使用""。

routingKey:路由名称,routingKey=队列名称。

props:配置的信息。

body:发送消息的数据。

String msg="Hello world!";
channel.basicPublish("","队列名称",null,msg.getBytes());
System.out.println(msg+"消息发送成功!");

5.释放资源:

channel.close();
connection.close();

3.消费者代码:

1.建立连接。

2.创建channel

3.声明一个队列

上面三个步骤的代码和生产者一样。

4.消费消息:

消费当前队列:

basicConsume(String queue,boolean autoAck,Consumer callback);

queue:队列名称。(从哪个队列获取消息)

autoAck:是否自动确认,消费者收到消息后,自动和MQ确认。

callback:回调对象。

String basicConsume(String queue,boolean autoAck,Consume callback) throws IOException
Consumer:

Consumer用于定义消费者的行为,当我们需要从RabbitMQ接收到消息,需要提供一个实现了Consumer接口的对象。

DefaultConsumer是RabbitMQ提供的一个默认消费者,实现了Consumer接口。

核心方法:

handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body),从队列接收到消息后,会自动执行这个方法。

这个方法可以定义如何处理收到的消息,例如打印消息内容,处理业务逻辑或者将消息储存到数据库等。

consumerTag:消费者标签,通常是消费者在订阅队列时指定的。

envelope:包含消息的封包信息,如队列名称,交换机等。

properties:一些配置文件。

body:消息的具体内容。

DefaultConsumer consumer=new DefaultConsumer(channel){@Overridepublic void headleDelivery (String consumerTag,Envelope envelop,AMQP.BasicProperties properties,byte[] body) throws IOExeception{System.out.println("接收到消息:"+new String(body));
}
};
channel.basicConsume("队列名称",true,consumer);

5.释放资源:

//等待回调函数执行完毕之后,关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
http://www.dtcms.com/a/485870.html

相关文章:

  • 使用PyTorch实现图像分类任务的全流程详解
  • JAVA代泊车接机送机服务代客泊车系统源码支持小程序+APP+H5
  • 吃谷机主题商城小程序的界面功能设计
  • 创建网站超市网络免费推广平台
  • 【征文计划】码上分享:基于 Rokid CXR-M SDK 构建「AI远程协作助手」实战全记录
  • PortSwigger靶场之CSRF where token is tied to non-session cookie通关秘籍
  • laya报错:GET http://xxx/bin/%22%22 404(Not Found)
  • 兴义市住房和城乡建设局网站莲花网站
  • 标题:Linux 系统中的“保险库管理员”:深入浅出理解 /etc/shadow 文件
  • CSS3》》 transform、transition、translate、animation 区别
  • HTML实现流星雨
  • JavaWeb-html、css-网页正文制作
  • GaussDB 分布式下, 报错concurrent update under Stream mode is not yet support
  • 服务器连接百度网盘并下载文件
  • 云计算实验3——CentOS中storm的安装
  • 一次被“动画关闭”启发的思考:Animate.css 与 prefers-reduced-motion 的无障碍设计
  • 《突破同质化:太空殖民地NPC行为差异化的底层架构》
  • 做网站ppt常见c2c网站有哪些
  • 专业手机网站建设价格明细表wordpress xiu 5.6
  • CSS 组合选择符详解
  • css:`target-before and :target-after 和 scroll-target-group`
  • 项目中执行SQL报错oracle.jdbc.OracleDatabaseException: ORA-00942: 表或视图不存在
  • 上门养老小程序源码 uniapp PHP MySQL
  • 供应链数据分析:Excel+Power BI双引擎打造智能供应链
  • 从零开始部署 GitLab CE 18.4.2:Docker Compose 新手教程
  • 高并发内存池日志
  • 使用PyTorch实现自定义损失函数以FocalLoss为例的详细教程
  • 《彻底理解C语言指针全攻略(4)--数组与指针的关系专题(下)》
  • app模板网站网站的源代码有什么用
  • Vue3的Pinia状态管理库【8】