当前位置: 首页 > news >正文

RabbitMQ01——基础概念、docker配置rabbitmq、内部执行流程、五种消息类型、测试第一种消息类型

目录

一、MQ( Message Queue 消息队列)

什么是消息队列

消息队列的作用

消息队列的缺点

MQ的两种实现方式

AMQP和JMS的区别和联系

常见的mq产品

二、docker下安装配置rabbitmq

1、拉取镜像

2、创建并启动容器

3、查看容器状态

4、查看容器日志

5、创建用户 赋权

6、使用浏览器打开RabbitMQ管理界面。

三、rabbitmq 内部执行流程

rabbtimq的5种消息模型

四、测试第一种simple消息模型 

实现过程:

1、引入jar包

2、封装工具类

3、创建生产者对象

消息确认机制(ACK) ACKnowledge

自动确认消息

手动确认消息


一、MQ( Message Queue 消息队列)

什么是消息队列

MQ(Message Queue)消息队列(消息中间件),是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息,来实现进程之间的通信

消息生产者把消息发送给消息队列,消息队列存储转发消息,消息消费者接收消息、处理消息。

消息队列的作用

一、异步

主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。

二、解耦

一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。

三、削峰填谷

高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。

消息队列的缺点

  • 系统可用性降低。依赖服务也多,服务越容易挂掉。需要考虑MQ瘫痪的情况。

    系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?(集群)

  • 系统复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性。

    MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

  • 业务一致性。主业务和从属业务一致性的处理。

    A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

MQ的两种实现方式

JMS:即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信

AMQP:即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

AMQP和JMS的区别和联系

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式

  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。

  • JMS规定了两种消息模型(点对点,发布订阅);而AMQP的消息模型更加丰富(常用的有5种)

常见的mq产品

二、docker下安装配置rabbitmq

1、拉取镜像

docker pull rabbitmq:latest
​
docker pull rabbitmq:3.9.0-management

2、创建并启动容器

docker run -id --name rabbitmq -p 5672:5672 -p 15672:15672 -v /etc/rabbimq:/etc/rabbitmq  rabbitmq:3.9.0-management

3、查看容器状态

docker ps

如果容器状态显示为Up,并且端口映射正确,那么RabbitMQ服务已经成功启动。

4、查看容器日志

docker logs rabbitmq

访问RabbitMQ管理界面

5、创建用户 赋权

# 进入容器
docker exec -it rabbitmq bash
​
# 在容器内执行
rabbitmqctl add_user admin 123456
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

 

6、使用浏览器打开RabbitMQ管理界面。

默认情况下,管理界面端口为15672。在浏览器地址栏输入以下URL:

http://<宿主机IP地址>:15672

三、rabbitmq 内部执行流程

rabbtimq的5种消息模型

四、测试第一种simple消息模型 

  • P(producer/ publisher):生产者,一个发送消息的用户应用程序。

  • C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序

  • 队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

  • RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。

实现过程:

创建springboot项目的时候选择以下依赖

1、引入jar包

 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、封装工具类

/*
rabbitmq工具类*/
public class MQUtil {public static Connection getConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("1.94.230.82");factory.setPort(5672);factory.setVirtualHost("/yan3");factory.setUsername("admin");factory.setPassword("123456");return factory.newConnection();}
}

3、创建生产者对象

package com.hl.rabbitmq01.simple;import com.hl.rabbitmq01.util.MQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*
生产者  javaSE方式简单测试
Simple简单消息模型
生产者----消息队列----消费者*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1、创建连接Connection connection = MQUtil.getConnection();//2、基于连接,创建信道Channel channel = connection.createChannel();//3、基于信道,创建队列/*参数:1. queue:队列名称,如果没有一个名字叫simpleQueue01的队列,则会创建该队列,如果有则不会创建2. durable:是否持久化,当mq重启之后,消息还在3. exclusive:* 是否独占。只能有一个消费者监听这队列4。当Connection关闭时,是否删除队列autoDelete:是否自动删除。当没有Consumer时,自动删除掉5. arguments:参数。*/channel.queueDeclare("simpleQueue01", false, false, false, null);//发送消息到消息队列/*参数:1. exchange:交换机名称。简单模式下交换机会使用默认的 ""2. routingKey:路由名称,简单模式下路由名称使用消息队列名称3. props:配置信息4. body:发送消息数据*/channel.basicPublish("","simpleQueue01",null,"Hello World".getBytes());//4、关闭信道,断开连接channel.close();connection.close();}
}

4、创建消费者 

package com.hl.rabbitmq01.simple;import com.hl.rabbitmq01.util.MQUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*
简单消息模型---消费者
生产者---消息队列---消费者*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接Connection connection = MQUtil.getConnection();//2.创建信道Channel channel = connection.createChannel();//3.创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {System.out.println("Received: " + new String(body));}};//4.绑定消费者对象和消息队列/*参数1:消息队列名称参数2:是否自动确认消息参数3:消费者对象*/channel.basicConsume("simpleQueue01", true, consumer);}
}

消息确认机制(ACK) ACKnowledge

  • 自动ACK:消息一旦被接收,消费者自动发送ACK

  • 手动ACK:消息接收后,不会发送ACK,需要手动调用

  • 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便

  • 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。

自动确认消息

  DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {System.out.println("Received: " + new String(body));}};//4.绑定消费者对象和消息队列/*参数1:消息队列名称参数2:是否自动确认消息  参数3:消费者对象*/channel.basicConsume("simpleQueue01", true, consumer);

手动确认消息

channel.basicAck(envelope.getDeliveryTag(), false);

   //3.创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {System.out.println("Received: " + new String(body));//手动确认消息//参数1:消息id// 参数2:是否批量确认消息  false:只确认当前消息  true:批量确认删除 (小于当前消息id的消息,批量确认)channel.basicAck(envelope.getDeliveryTag(), false);}};//4.绑定消费者对象和消息队列/*参数1:消息队列名称参数2:是否自动确认消息  true自动确认消息  false:手动确认消息参数3:消费者对象*/channel.basicConsume("simpleQueue01", false, consumer);

http://www.dtcms.com/a/287440.html

相关文章:

  • 西门子 S7-1500 PLC 电源选型指南:系统电源与负载电源的核心区别
  • SPARKLE:深度剖析强化学习如何提升语言模型推理能力
  • 商业秘密的法律属性与保护路径探析
  • Vue的路由模式的区别和原理
  • RTSP推流客户端-ffmpeg和live555对比
  • Oracle Database 23ai 技术细节与医疗 AI 应用
  • windows docker-01-desktop install windows10 + wls2 启用
  • 加法速算之尾数法
  • 语音识别技术:从声音到文字的 AI 魔法
  • 多云联邦集群管理(1)(集群联邦管理)
  • 基于大模型打造故障预警服务器巡检机器人
  • World of Warcraft [CLASSIC] The Ruby Sanctum [RS] Halion
  • 【C# in .NET】19. 探秘抽象类:具体实现与抽象契约的桥梁
  • 用逻辑回归(Logistic Regression)处理鸢尾花(iris)数据集
  • 【专业扫盲】源极退化电阻
  • min_25筛学习笔记+牛客多校02E
  • NumPy 30分钟速成计划
  • 【运维心得】老旧系统迁移到虚拟机的另类解决
  • 力扣刷题(第九十二天)
  • nodejs值process.kill
  • 【RK3576】【Android14】固件烧录
  • 13.多种I/O函数
  • WPF为启动界面(Splash Screen)添加背景音乐
  • simulink系列之汽车应用层信号处理
  • android studio libs.versions.toml 配置
  • 计算机网络——IPv4(25王道最新版)
  • 从丢包到恢复:TCP重传机制的底层逻辑全解
  • Java-77 深入浅出 RPC Dubbo 负载均衡全解析:策略、配置与自定义实现实战
  • nginx.conf模版
  • 使用DataGrip连接安装在Linux上的Redis