当前位置: 首页 > news >正文

【仿RabbitMQ的发布订阅式消息队列】--- 概念理解

 Welcome to 9ilk's Code World

       

(๑•́ ₃ •̀๑) 个人主页:       9ilk

(๑•́ ₃ •̀๑) 文章专栏:    项目


 本篇博客主要是对消息队列模型的一个需求分析, 引出相关的模型概念。

核心概念

  • 生产者(Producer)
  • 消费者(Consumer)
  • 中间人(Broker)
  • 发布(Publish)
  • 订阅(Subscribe)
  • 一个生产者,一个消费者

  • N个生产者,N个消费者

    其中, Broker Server最核心的部分, 负责消息的存储和转发。而在 AMQP(Advanced Message Queuing Protocol-高级消息队列协议,一个提供统一 消息服务的应用层标准高级消息队列协议,是现代消息中间件的核心协议,广泛用于微服务架构和企业应用中,它使得遵从该规范的客户端应用和消息中间件服务器的全功能互操作成为可能)模型中,也就是消息中间件服务器 Broker 中,又存在以下概念:

  • 虚拟机 (VirtualHost): 类似于 MySQL 的 "database", 是一个逻辑上的集合。一个 BrokerServer 上可以存在多个 VirtualHost

  • 交换机 (Exchange): 生产者把消息先发送到 Broker 的 Exchange 上,再根据不同 的规则, 把消息转发给不同的 Queue

  • 队列 (Queue) : 真正用来存储消息的部分,每个消费者决定自己从哪个 Queue 上 读取消息。

  • 绑定 (Binding) : ExchangeQueue 之间的关联关系,ExchangeQueue 可以 理解成 "多对多" 关系,使用一个关联表就可以把这两个概念联系起来。

  • 消息 (Message) : 传递的内容。

因此内部的主要工作流程:

1. 接收生产者消息

2. 查找交换机绑定关系

3. 应用路由规则

4. 分发到目标消息队列

5. 处理消息确认

上述数据结构,既需要在内存中存储,也需要在硬盘中存储

内存存储:方便使用

硬盘存储:重启数据不丢失

   Exchange, Queue, Binding, Message 等数据都有持久化需求当程序重启 / 主机重启, 保证上述内容不丢失。

   需要注意的是,Exchange和Queue可以理解为"多对多"的关系,即一个Exchange可以绑定多个 Queue (可以向多个 Queue 中转发消息) ,一个 Queue 也可以被多个 Exchange 绑定 (一个 Queue 中的消息可以来自于多个 Exchange)

核心接口

对于 Broker 来说, 要实现以下核心 API,通过这些 API 来实现消息队列的基本功能:

  1. 创建交换机 (exchangeDeclare)
  2. 销毁交换机 (exchangeDelete)
  3. 创建队列 (queueDeclare)
  4. 销毁队列 (queueDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)
  10. 取消订阅 (basicCancel)

   另一方面, ProducerConsumer通过网络的方式, 远程调用这些 API, 实现生产者
消费者模型。

交换机类型

    对于 RabbitMQ 来说, 主要支持四种交换机类型:

• Direct:生产者发送消息时, 直接指定被该交换机绑定的队列名。

• Fanout:生产者发送的消息会被复制到该交换机绑定的所有队列中。

• Topic:绑定队列到交换机上时, 指定一个字符串为 binding Key。发送消息指定一个字符串为 routing Key。当 routing Keybinding Key满足一定的匹配条件的时候, 则把 消息投递到指定队列

• Header:比较复杂, 比较少见,因此本项目主要用的是前三种。

类比理解:QQ群发红包的例子

• Direct 是发一个专属红包, 只有指定的人能领,像对号入座,必须完全匹配。

• Fanout 是使用了魔法, 发一个 10 块钱红包, 群里的每个人都能领 10 块钱,像大喇叭广播,所有人都听。

•Topic 是发一个画图红包, 发 10 块钱红包, 同时出个题, 得画的像的人, 才能领。也是每个领到的人都能领 10 块钱,像智能标签,按规则分类。

网络通信

   生产者和消费者都是客户端程序,Broker作为服务器,客户端和服务端之间通过网络通信。在网络通信的过程中, 客户端部分要提供对应的 api, 来实现对服务器的操作:

  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)
  13. 确认消息 (basicAck)
  14. 取消订阅(basicCancel)

在Broker基础上,客户端还要增加Connection 操作和 Channel 操作

Connection 对应一个 TCP 连接

Channel 则是 Connection 中的逻辑通道

类比理解:Connection是"路",而Channel是路上的"车道",一条路上可以有多条车道并行通车。

为什么使用Channel:

  • 提高性能:避免频繁建立/断开TCP连接
  • 资源节约:多个操作共享同一个连接资源
  • 并发安全:不同线程 使用不同Channel,避免竞争

消息应答

        RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,RabbitMQ引入消息应答机制,消息应答就是 : 消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ它已经处理了,RabbitMQ可以把该消息删除了。

        消费者收到的每一条消息都必须进行确认。消息确认后,RabbitMQ才会从队列删除这条消息,RabbitMQ不会为未确认的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

 RabbitMQ中消息的应答,共有两种方式:

  • 自动应答 : 消费者只要消费了消息 , 就算应答完毕了,Broker 直接删除这个消息。

  • 手动应答 : 消费者手动调用应答接口, Broker 收到应答请求之后, 才真正删除这个消息。

http://www.dtcms.com/a/539891.html

相关文章:

  • 图书销售系统数据库设计方案
  • SpringBoot+MybatisPlus+自定义注解+切面实现水平数据隔离功能(附代码下载)
  • Linux小课堂: JavaWeb 应用环境配置与 Tomcat 安装指南
  • Linux小课堂: Tomcat容器中部署Jenkins的完整流程与关键技术要点
  • 本地部署消息中间件 RabbitMQ 并实现外网访问 (Linux 版本)
  • Kafka在Spring Boot生态中的浅析与应用
  • 南京网站建设与维护英文购物网站模板下载
  • Linux网络编程:进程间关系和守护进程
  • 在 Ubuntu 上使用 Docker 部署思源笔记:一份详尽的实践教程以及常见错误汇总
  • 劳务网站有做吗公众号文章采集wordpress
  • Linux中,vi(vim)编辑器大部分快捷键
  • ADUM5201CRWZ-RL双通道数字隔离器 ADI亚德诺半导体 集成电路IC芯片解析
  • Ubuntu texlive安装后无法编译中文论文解决方法
  • UniversalSmartStateFilter:统一状态过滤器的架构设计与实现
  • 四旋翼机器人手臂路径规划
  • 5G专网平台客户案例分享:基于可编程5G的智慧电网巡检原型系统
  • 做网站现在什么尺寸合适深圳刚刚突然宣布
  • 基于深度学习与OCR研发的报关单识别接口技术解析
  • Power Apps:预览SharePoint文档库的PDF文档
  • ElasticSearch-基础
  • 常州市网站制作娶妻为什么不娶外贸女
  • MySQL 窗口函数全解析:NTILE() 函数深度指南
  • 【大模型与智能体论文】REACT:协同语言模型中的推理与行动
  • 攻克兼容、安全、零中断的“不可能三角”:电科金仓异构多活架构交出集团化医院信创最佳答卷!
  • Duckdb rusty_sheet插件使用心得
  • PyTorch torch.ones()张量创建详解
  • 菜鸟教程网站建设lazy load wordpress
  • 湖南 中小企业 网站建设百度做网站推广
  • 基于小波变换的图像阈值去噪MATLAB实现
  • 网站建设怎么收费网站优化有哪些类型