当前位置: 首页 > news >正文

MQ(RabbitMQ.1)

MQ的含义及面试题

  • MQ
    • MQ的含义
      • MQ之间的调用的方式
      • MQ的作用
      • MQ的几种产品
      • RabbitMQ
        • RabbitMQ的安装
        • RabbitMQ的使用
        • RabbitMQ⼯作流程
      • AMQP
      • Web界面操作
          • 用户相关操作
          • 虚拟主机相关操作
      • RabbitMQ的代码应用
          • 编写生产者代码
          • 编写消费者代码
        • 生产者代码
        • 消费者代码

MQ

MQ的含义

MQ,本质是一个队列,FIFO先入先出,存放的内容是消息(message)。

MQ之间的调用的方式

  1. 同步通信
    直接调⽤对⽅的服务, 数据从⼀端发出后⽴即就可以达到另⼀端.
    嘻嘻

  2. 异步通信
    数据从⼀端发出后,先进⼊⼀个容器进⾏临时存储,当达到某种条件后,再由这个容器发送给另⼀端.
    容器的⼀个具体实现就是MQ( message queue )
    很好

MQ的作用

  1. 异步解耦: 在业务流程中, ⼀些操作可能⾮常耗时, 但并不需要即时返回结果. 可以借助MQ把这些操作异步化, ⽐如 ⽤⼾注册后发送注册短信或邮件通知, 可以作为异步任务处理, ⽽不必等待这些操作完成后才告知⽤⼾注册成功
  2. 流量削峰: 在访问量剧增的情况下, 应⽤仍然需要继续发挥作⽤,。 使⽤MQ能够使关键组件⽀撑突发访问压⼒, 不会因为突发流量⽽崩溃. ⽐如秒杀或者促销活动, 可以使⽤MQ来控制流量, 将请求排队, 然后系统根据⾃⼰的处理能⼒逐步处理这些请求。
  3. 消息分发: 当多个系统需要对同⼀数据做出响应时, 可以使⽤MQ进⾏消息分发. ⽐如⽀付成功后, ⽀付系统可以向MQ发送消息, 其他系统订阅该消息, ⽽⽆需轮询数据库.
  4. 延迟通知: 在需要在特定时间后发送通知的场景中, 可以使⽤MQ的延迟消息功能, ⽐如⽤⼾下单后⼀定时间内未⽀付,可以使⽤延迟队列在超时后⾃动取消订单

MQ的几种产品

  1. Kafka
    Kafka⼀开始的⽬的就是⽤于⽇志收集和传输,追求⾼吞吐量, 性能卓越, 单机吞吐达到⼗万级, 在⽇志领域⽐较成熟, 功能较为简单,主要⽀持简单的 MQ 功能, 如果有⽇志采集需求,肯定是⾸选kafka了。
  2. RocketMQ
    在可⽤性、可靠性以及稳定性等⽅⾯都有出⾊的表现. 适合对于可靠性⽐较⾼,且并发⽐较⼤的场景, ⽐如互联⽹⾦融. 但⽀持的客⼾端语⾔不多, 且社区活跃度⼀般
  3. RabbitMQ
    采⽤Erlang语⾔开发, MQ 功能⽐较完备, 且⼏乎⽀持所有主流语⾔,开源提供的界⾯也⾮常友好, 性能较好, 吞吐量能达到万级, 社区活跃度也⽐较⾼,⽐较适合中⼩型公司, 数据量没那么⼤, 且并发没那么⾼的场景。

RabbitMQ

RabbitMQ是采⽤Erlang语⾔实AMQP(Advanced Message Queuing Protocol,⾼级消息队列协议)的
消息中间件,它最初起源于⾦融系统领域, 为了在分布式系统中存储和转发消息⽽设计的.

RabbitMQ的安装

参考RabbitMQ的安装

RabbitMQ的使用

在这里插入图片描述
RabbitMQ是⼀个消息中间件, 也是⼀个⽣产者消费者模型. 它负责接收, 存储并转发消息.
1.** Producer和Consumer**
在这里插入图片描述
Producer 就类似生产者(发送消息)
Consumer 就是消费者(接收消息)
RabbitMQ就类似于Broker(接收和收发消息)

在这里插入图片描述

  1. Connection和Channel
    Connection: 连接. 是客⼾端和RabbitMQ服务器之间的⼀个TCP连接. 这个连接是建⽴消息传递的基础, 它负责传输客⼾端和服务器之间的所有数据和控制信息.
    Channel: 通道, 信道. Channel是在Connection之上的⼀个抽象层. 在 RabbitMQ 中, ⼀个TCP连接可以有多个Channel, 每个Channel 都是独⽴的虚拟连接. 消息的发送和接收都是基于 Channel的.
    通道的主要作⽤是将消息的读写操作复⽤到同⼀个TCP连接上,这样可以减少建⽴和关闭连接的开销,提⾼性能.
    在这里插入图片描述
  2. Virtual host
    Virtual host:虚拟主机. 这是⼀个虚拟概念. 它为消息队列提供了⼀种逻辑上的隔离机制
  3. Queue
    Queue: 队列, 是RabbitMQ的内部对象, ⽤于存储消息
    在这里插入图片描述
    多个消费者, 可以订阅同⼀个队列
    在这里插入图片描述
  4. Exchange
    Exchange: 交换机. message 到达 broker 的第⼀站, 它负责接收⽣产者发送的消息, 并根据特定的规则把这些消息路由到⼀个或多个Queue列中
    在这里插入图片描述
RabbitMQ⼯作流程

在这里插入图片描述

  1. Producer生产了一条消息
  2. Producer连接到RabbitMQBroker,建立一个连接(Connection),开启一个信道(Channel)
  3. Producer声明一个交换机(Exchange),路由消息
  4. Producer声明一个队列(Queue),存放信息
  5. Producer发送消息至RabbitMQBroker
  6. RabbitMQBroker接收消息,并存入相应的队列(Queue)中,如果未找到相应的队列,则根据生产者的配置,选择丢弃或者退回给生产者

AMQP

AMQP(Advanced Message Queuing Protocol)是⼀种⾼级消息队列协议, AMQP定义了⼀套确定的消息交换功能, 包括交换器(Exchange), 队列(Queue) 等. 这些组件共同⼯作, 使得⽣产者能够将消息发送到交换器. 然后由队列接收并等待消费者接收. AMQP还定义了⼀个⽹络协议, 允许客⼾端应⽤通过该
协议与消息代理和AMQP模型进⾏交互通信
在这里插入图片描述

Web界面操作

用户相关操作

添加用户
在这里插入图片描述
输入相关用户名密码以及选择角色
在这里插入图片描述
观察用户是否添加成功
在这里插入图片描述
⽤⼾相关操作
在这里插入图片描述
在用户详情页面可以进行更新或者删除的操作
在这里插入图片描述
在这里插入图片描述

虚拟主机相关操作

设置虚拟主机名称
在这里插入图片描述
观察设置结果
在这里插入图片描述

RabbitMQ的代码应用

编写生产者代码
  1. 创建连接
//1。创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("110.41.51.65");//ip 默认值localhost
factory.setPort(15673);//默认值5672
factory.setVirtuaLHost("bite");//虚拟机名称,默认)
factory.setUsername("study");//用户名,默认guest
factory.setPassword("study");//密码,默认guest
//3。创建连接Connection
Connection connection = factory.newConnection() ;
  1. 创建Channel
Channel channel = connection.createChannel();
  1. 声明一个队列Queue
queueDeclare(String queue, boolean durable, boolean exclusive, boolean
autoDelete, Map<String, Object> arguments)
1.queue:队列名称
2.durable:是否持久化。true-设置队列为持久化,待久化的队列会存盘,服务器重启之后,消息不
丢失。
3.exclusive:
*是否独占,只能有一个消费者监听队列
*当Connection关闭时,是否删除队列
4.autoDelete:是否自动删除,当没有Consumer时,自动删除掉
5.arguments:一些参数
//如果没有一个hello_world这样的一个队列,会自动创建,如果有,则不创建/*
channel.queueDeclare("hello",true,false,false,null);`
  1. 发送消息
    当一个新的RabbitMQ节点启动时,它会预声明(declare)几个内置的交换机,内置交换机名称是空字符串(“”).生产者发送的消息会根据队列名称直接路由到对应的队列.
    在这里插入图片描述
//6.通过channel发送消息到队列中
/*
basicPublish(String exchange, String routingKey, AMQP.BasicProperties props,
byte[] body)
1.exchange:交换机名称,简单模式下,交换机会使用默认的""
2.routingKey:路由名称,routingKey=队列名称
3.props:配置信息
4.body:发送消息的数据
String msg = "Hello World";
//使用的是内置交换机。使用内置交换机时,routingKey要和队列名称一样,才可以路由到对应的队
列上去
channel.basicPublish("", "hello",null,msg·getBytes());
System.out.println(msg +"消息发送成功");
  1. 释放资源
channel.close();
connection.close();

运行后就出现了Connections以及Channels的相关信息了
在这里插入图片描述
在这里插入图片描述

编写消费者代码

1.创建连接
2.创建Channel
3.声明一个队列Queue
4.消费消息
5.释放资源
消费当前队列

  1. basicConsume
basicConsume(String queue,boolean autoAck,Consumer callback)
参数:
1.queue:队列名称
2。autoAck:是否自动确认,消费者收到消息之后,自动和MQ确认
3.callback:回调对象
String basicConsume(String queue, boolean autoAck, Consumer callback) throws
IOException;
  1. Consumer
    Consumer用于定义消息消费者的行为.当我们需要从RabbitMQ接收消息时,需要提供一个实现了Consumer接口的对象.
    DefaultConsumer是RabbitMQ提供的一个默认消费者,实现了Consumer接口.
consumerTag:消费者标签,通常是消费者在订阅队列时指定的.
envelope:包含消息的封包信息,如队列名称,交换机等.
properties:一些配置信息
body:消息的具体内容
handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties,byte[]body):

在这个方法中,我们可以定义如何处理接收到的消息,例如打印消息内容,处理业务逻辑或者将消息存储到数据库等,

/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1。queue:队列名称
2。autoAck:是否自动确认,消费者收到消息之后,自动和MQ确认
3.callback:回调对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
回调方法,当收到消息后,会自动执行该方法
1.consumerTag:标识
2.envelope:获取一些信息,交换机,路由key
3。properties:配置信息
4.body:数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:"+ new String(body));
3;
channel.basicConsume("hello"true. consumer);

释放资源

//等待回调函数执行完毕之后,关闭资源
TimeUnit.SECONDS.sleep(5);
//7.释放资源消费者相当于是一个监听程序,不需要关闭资源
channel.close();
connection.close();

在这里插入图片描述
在这里插入图片描述

生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitProducer {
 public static void main(String[] args) throws Exception {
 // 1. 创建连接⼯⼚
 ConnectionFactory factory = new ConnectionFactory();
 //2. 设置参数
 factory.setHost("110.41.51.65");//ip 默认值localhost
 factory.setPort(15673); //默认值5672
 factory.setVirtualHost("bite");//虚拟机名称, 默认 /
 factory.setUsername("study");//⽤⼾名,默认guest
 factory.setPassword("study");//密码, 默认guest
 //3. 创建连接Connection
 Connection connection = factory.newConnection();
 //4. 创建channel通道
 Channel channel = connection.createChannel();
 //5. 声明队列
 /*
 queueDeclare(String queue, boolean durable, boolean exclusive, 
boolean autoDelete, Map<String, Object> arguments)
 1.queue: 队列名称
 2.durable: 是否持久化, 当mq重启之后, 消息还在
 3.exclusive:
 * 是否独占, 只能有⼀个消费者监听队列
 * 当Connection关闭时, 是否删除队列
 4.autoDelete: 是否⾃动删除, 当没有Consumer时, ⾃动删除掉
 5.arguments: ⼀些参数
 */
 //如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
 channel.queueDeclare("hello", true, false, false, null);
 //6. 通过channel发送消息到队列中
 /*
 basicPublish(String exchange, String routingKey, AMQP.BasicProperties 
props, byte[] body)
 1. exchange: 交换机名称, 简单模式下, 交换机会使⽤默认的""
 2.routingKey: 路由名称, routingKey = 队列名称
 3.props: 配置信息
 4.body: 发送消息的数据
 */
 String msg = "Hello World";
 //使⽤的是内置交换机. 使⽤内置交换机时, routingKey要和队列名称⼀样, 才可以路由
到对应的队列上去
 channel.basicPublish("", "hello", null, msg.getBytes());
 //7.释放资源
 System.out.println(msg + "消息发送成功");
 channel.close();
 connection.close();
 }
}
消费者代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class RabbitmqConsumer {
 public static void main(String[] args) throws Exception {
// 1. 创建连接⼯⼚
 ConnectionFactory factory = new ConnectionFactory();
 //2. 设置参数
 factory.setHost("110.41.51.65");//ip 默认值localhost
 factory.setPort(15673); //默认值5672
 factory.setVirtualHost("bite");//虚拟机名称, 默认 /
 factory.setUsername("study");//⽤⼾名,默认guest
 factory.setPassword("study");//密码, 默认guest
 //3. 创建连接Connection
 Connection connection = factory.newConnection();
 //4. 创建channel通道
 Channel channel = connection.createChannel();
 //5. 声明队列
 /*
 queueDeclare(String queue, boolean durable, boolean exclusive, 
boolean autoDelete, Map<String, Object> arguments)
 1.queue: 队列名称
 2.durable: 是否持久化, 当mq重启之后, 消息还在
 3.exclusive:
 * 是否独占, 只能有⼀个消费者监听队列
 * 当Connection关闭时, 是否删除队列
 4.autoDelete: 是否⾃动删除, 当没有Consumer时, ⾃动删除掉
 5.arguments: ⼀些参数
 */
 //如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
 channel.queueDeclare("hello", true, false, false, null);
 //6. 接收消息, 并消费
 /*
 basicConsume(String queue, boolean autoAck, Consumer callback)
 参数:
 1. queue: 队列名称
 2. autoAck: 是否⾃动确认, 消费者收到消息之后,⾃动和MQ确认
 3. callback: 回调对象
 */
 DefaultConsumer consumer = new DefaultConsumer(channel) {
 /*
 回调⽅法, 当收到消息后, 会⾃动执⾏该⽅法
 1. consumerTag: 标识
 2. envelope: 获取⼀些信息, 交换机, 路由key
 3. properties:配置信息
 4. body:数据
 */
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
 System.out.println("接收到消息: " + new String(body));
 }
 };
 channel.basicConsume("hello", true, consumer);
 //等待回调函数执⾏完毕之后, 关闭资源
 TimeUnit.SECONDS.sleep(5);
 //7. 释放资源 消费者相当于是⼀个监听程序, 不需要关闭资源
 //顺序不可改变
// channel.close();
// connection.close();
 }
}

相关文章:

  • CSS 字体学习笔记
  • C#里使用MaterialDesign时在VS2022里出错
  • vdso概念及原理,vdso_fault缺页异常,vdso符号的获取
  • 【Raqote】 1.1.2 路径填充ShaderClipMaskBlitter结构体(blitter.rs)
  • AI(学习笔记第一课) 在vscode中配置continue
  • LVS(Linux虚拟服务器)
  • Go语言从零构建SQL数据库(7):实现ALTER TABLE语句的解析
  • Open-TeleVision源码解析——宇树摇操方案的重要参考:VR控制人形机器人采集数据
  • 【Docker基础】Compose 使用手册:场景、文件与命令详解
  • 数据结构第五版【李春葆】
  • AWS出海合规解决方案:全球业务扩张的技术指南
  • 深度学习理论-直观理解 Attention
  • 【语音识别】vLLM 部署 Whisper 语音识别模型指南
  • 理解 MCP 协议的数据传递:HTTP 之上的一层“壳子
  • Spring State Machine入门实践
  • 算法思想之位运算(二)
  • C语言编写的线程池
  • 【Mybatis-plus】应用笔记及用例(持续更新)
  • esp32-idf Linux 环境安装教程
  • 【Code】《代码整洁之道》笔记-Chapter9-单元测试
  • 讲座预告|政府在人工智能研究和应用领域的作用
  • 人民日报:上海“模速空间”何以汇聚超百家大模型企业
  • 国家卫健委对近日肖某引发舆情问题开展调查
  • 停电催生商机,中国品牌 “照亮” 西班牙
  • 美商界报告:全美超86万岗位依赖对华出口,关税将重创美国出口商
  • 秦洪看盘|资金切换主线,重构市场风格