当前位置: 首页 > news >正文

【消息队列】BrokerServer的核心概念

1. BrokerServer

在这里插入图片描述
在写代码之前,不妨再次观察这一张图片,无论是 Producer(生产者) 还是 Consumer(消费者) 都是在跟 Broker(中间人)交互,所以本质上呢,整个核心也就是这个 Broker,后续编写代码的时候,最终的产出是一个Broker(中间人)服务器,所以为了见名知意,后续把这个Broker(中间人) 就称为 BrokerServer(中间人服务器)。

BrokerServer 它的内部是如何工作的?如何管理数据的呢?这也是一个复杂的问题,BrokerServer 中有几个核心的概念需要提前了解!

2. 核心概念

虚拟主机(Virtual Host)

类似于 Mysql 中的 databases 数据库,是一个逻辑上的数据集合,一个 BrokerServer 上也可以组织不同类别的数据,可以使用 VirtualHost 作为区分。

交换机(Exchange)

生产者把消息发送给 BrokerServer 时,实际上是在给 BrokerServer 上指定的交换机发送消息,对应的交换机收到消息后,在判断要把消息转发给哪些队列。

队列(Queue)

用来存储消息的队列,消费者订阅的也就是队列,当某个队列有消息时,BrokerServer 就会把那个队列里面的消息取出来推送给订阅了这个队列的消费者。

绑定(Binding)

把队列和交换机绑定起来,这样按照一些交换机类型规则,交换机收到消息之后,就知道要把消息转发给哪些队列了。一个队列可以与多个交换机绑定,一个交换机也可以绑定多个队列。也类似于数据库中多对多这样的概念。本质上就是搞个中间表,记录下哪个队列绑定了哪个交换机就行了。

消息(Message)

此处传递的消息可以是很多类型,但是在传输的过程中,一定是一个二进制的数据,这样才能方便实现在网络中传输,那么比如服务器A想给服务器B发请求,借助这个消息队列进行转发时,这个请求也被视为消息,消息队列收到请求后,返回响应,也是一个消息。所以简单来说,凡是与 BrokerServer 打交道进行网络传输的,都被称为消息。

有了上面概念的认识,那么整个 BrokerServer 当中组织数据就如下图所示:

在这里插入图片描述

至于为什么这样组织,那就需要参考 AMQP 协议了,这里就不详细赘述了。具体后面的项目实现,只实现一个 BrokerServer 对应一个 VirtualHost,至于多个 VirtualHost 的实现就放在扩展(彩蛋)部分。

看到这,BrokerServer 是一个服务器,里面有各种数据,按照上图的方式组织了这些数据,问题来了,如果服务器断电了,或者挂了,数据就丢了,里面创建的队列,交换机,绑定,全没了,或者重新部署 BrokerServer 数据丢失了。

所以此时这个项目是要支持持久化的,至于如何持久化,采取什么样的方式去持久化,这个在后面实现代码的时候具体详细讲解。

BrokerServer 里面的重要部分了解了之后,又有一个问题了!

生产者和消费者如何才能与这个 BrokerServer 交互呢?

3. 远程方法调用

此处采用 远程方法调用 来实现与服务器的交互。问题就来到了什么是远程方法调用?

简单来说,就是与服务器交互时,生产者在本地调用了一个本地方法,比如 queueDeclare - 创建队列,这个 queueDeclare 方法底层其实就是给 BrokerServer 发送一个网络请求,这个请求告诉 BrokerServer 它要创建一个队列,然后 BrokerServer 收到这个创建队列的请求之后,就会在 BrokerServer 内部创建好这个队列并管理好这个队列,创建成功后返回一个网络响应。此时这个响应,也就是作为生产者调用 queueDeclare 方法的返回值了。

在这里插入图片描述

所以通过远程方法调用的形式,使得生产者和消费者能与 BrokerServer 交互,那么具体的 API 有哪些呢?BrokerServer 要提供哪些核心 API 供生产者消费者调用呢?此处就先简单的认识一下后续 BrokerServer 要实现的对外 API。

4. 服务器需要提供的API

BrokerServer 需要提供的方法:

创建交换机 (exchangeDeclare)

在内存中创建一个交换机,并管理起来,根据参数判断是否要持久化。

销毁交换机 (exchangeDelete)

把内存中的交换机销毁掉,如果该交换机持久化了,也要销毁掉持久化的数据。

创建队列 (queueDeclare)

在内存中创建一个队列,并管理起来,根据参数判断是否要持久化。

销毁队列 (queueDelete)

把内存中的队列给销毁掉,如果该队列持久化了,也要销毁掉持久化的数据,同时队列里面放的消息,也要全部销毁。

创建绑定 (queueBind)

在内存中创建队列与交换机之间的关系,判断需要绑定的队列和交换机是否都持久化了,都持久化绑定才持久化,不然绑定就没意义了。

删除绑定 (queueUnbind)

把内存中的绑定给给销毁掉,如果该绑定持久化了,也要销毁掉持久化的数据。

发布消息 (basicPublish)

把消息发给交换机,按照一定的转发规则,转发到对应的队列中,如果对应的队列有消费者订阅,则推送给消费者。根据参数判断是否要持久化。

订阅消息 (basicConsume)

消费者订阅一个队列,当队列有消息了,就会推送给这个消费者。

确认消息 (basicAck)

消费者手动确认消息,将这条未被确认的消息改成已被消费的状态即可。当然也可以否定的确认,就是告诉 BrokerServer 这个消息消费失败,但是此处的消息队列暂时不实现否定确认这个功能。

上述的 BrokerServer 提供的方法,也就是实现具体业务的,正儿八经在服务器上管理数据的。但是仍然需要提供一组供生产者和消费者在它们本地调用的方法,也就是远程方法调用,这个方法里面本质上是通过发请求给 BrokerServer,让 BrokerServer 去实现具体的业务。

此时对于提供给生产者和消费者本地使用的方法,统一称为客户端(client)的方法,也就是类似于 MySQL 的 JDBC 的那一套操作。

4. 客户端需要实现的方法

Client 需要实现的方法:

创建交换机 (exchangeDeclare)

客户端通过网络发请求给 BrokerServer,并带上参数,让 BrokerServer 创建一个交换机,本质就是远程调用 BrokerServer 里面的 exchangeDelcare 方法。

销毁交换机 (exchangeDelete)

客户端通过网络发请求给 BrokerServer,并带上参数,让 BrokerServer 销毁一个交换机,本质就是远程调用 BrokerServer 里面的 exchangeDelete 方法。

创建队列 (queueDeclare)

客户端通过网络发请求给 BrokerServer,并带上参数,让 BrokerServer 创建一个队列,本质就是远程方法调用 BrokerServer 里面的 queueDeclare 方法。

销毁队列 (queueDelete)

客户端通过网络发请求给 BrokerServer,并带上参数,让 BrokerServer 销毁一个队列,本质就是远程方法调用 BrokerServer 里面的 queueDeclare 方法。

创建绑定 (queueBind)

客户端通过网络发请求给 BrokerServer,并带上参数,让 BrokerSever 创建一个绑定,本质就是远程方法调用 BrokerServer 里面的 queueBind 方法。

删除绑定 (queueUnbind)

客户端通过网络发请求给 BrokerServer,并带上参数,让 BrokerSever 删除一个绑定,本质就是远程方法调用 BrokerServer 里面的 queueUnbind 方法。

发布消息 (basicPublish)

客户端通过网络发请求给 BrokerServer,并带上参数,给 BrokerServer 发送一条消息到指定交换机,本质就是远程方法调用 BrokerServer 里面的 basicPublish 方法。

订阅消息 (basicConsume)

客户端通过网络发请求给 BrokerServer,并带上参数,告诉 BrokerServer 要订阅哪个队列,本质就是远程方法调用 BrokerServer 里面的 basicConsume 方法。

确认消息 (basicAck)

客户端通过网络发请求给 BrokerServer,并带上参数,告诉 BrokerServer 哪个消息消费完成了,本质就是远程方法调用 BrokerServer 里面的 basicAck 方法。

除了上述与服务器一一对应的远程调用方法之外,不知道聪明的你是否又发现了一个问题。如何实现远程方法调用呢?

此处采取的方案是,使用 TCP + 自定义应用层协议,实现生产者消费者和 BrokerServer 之间的交互。

至于此处的如何自定义应用层协议,则会放在编写代码时进行讲解。

既然是 TCP,想与服务器通信,就应该建立一个 TCP 连接,于是客户端这边还得提供两个方法,分别是:

创建 Connection

通过自定义一个 Connection 对象,里面使用 Socket 与服务器建立连接,一个 Connection 就代表了一个 TCP 连接。

关闭 Connection

通过自定义一个 Connection 对象,断开与服务器的 Socket 连接,一个 Connection 就代表了一个 TCP 连接。

建立了连接,就可以通过 Socket 对象与服务器通信了,但是这里效率不够好,如果一个程序有多个生产者/消费者,也就需要建立多次 TCP 连接,学了网络这个模块的内容就知道,建立一个 TCP 连接开销还是蛮大的,需要三次握手,四次挥手。那能不能整个程序所有的生产者/消费者公用一个 TCP 连接进行通信呢?在逻辑上与他们区分就可以了。这样显然是能够做到的。

此处就可以引入 Channel 这样的一个概念,一个 TCP 连接里面包含多个 Channel,多个 Channel 共用一个 TCP 连接,也就是使用同一个 Socket 对象进行网络传输数据。此时每个 Channel 就可以类似于一个生产者或一个消费者。
在这里插入图片描述
所以 Channel 本质上就是逻辑上进行了隔离,优化了同一个程序中有多个生产者/消费者时需要建立多个 TCP 连接的问题,引入 Channel 之后,一个客户端程序,只需要保持与 BrokerServer 有一个 TCP 连接即可。

那么与客户端交互的代码,比如发布消息,创建队列,订阅队列 …,就交给 Channel 实现就可以了。此时一个 Channel 对象,既可以发布消息,也可以订阅队列,那么它就可以充当一个程序中的其中一个生产者/消费者。

既然引入了 Channel,但是本质上用的是一条连接,所以需要告知 BrokerServer 每次交互用的是哪个 Channel,这样才能实现逻辑上的隔离,也就是 BrokerServer 得存储一份 Channel,当 Channel 关闭,也要告知 BrokerServer 这个 Channel 关闭了,对应 BrokerServer 存储的 Channel 也应当删除。

此时客户端这边就需要增加两个方法,分别是:

创建 Channel

客户端通过网络发请求给 BrokerServer,并带上参数,告诉 BrokerServer 有这样一个 Channel 对象,便于实现逻辑上的隔离。

关闭 Channel

客户端通过网络发请求给 BrokerServer,并带上参数,告诉 BrokerServer 这个 Channel 关闭了,后续不再使用这个 Channel 进行交互了,BrokerServer 随之也会清理掉保存的 Channel。

6. 交换机的类型

这里需要补充一些内容,前面提到 basicPublish 发布消息的时候,说发布的消息本质上先是发送给 BrokerServer 上的交换机,交换机收到这个消息之后,在根据这个交换的类型,得到不同的转发规则,从而将消息转发给满足转发规则的队列。所以在这就需要了解交换机有哪几种类型和哪些转发规则。

前面在说 BrokerServer 里面核心概念的组织的时候提到了 AMQP 协议,那么这个协议也规定了交换机应有如下的四种类型:

  1. Direct 直接交换机

    生产者发送消息的时候,会指定交换机名,同时也会指定一个目标队列的名字,交换机收到消息后,就会去看是否绑定了这个目标队列,如果绑定了,则转发给这个队列(把消息放入到目标队列中),如果没有绑定,消息直接丢弃。
    在这里插入图片描述

    上述就是交换机类型为 Direct 直接交换机时发送消息的流程,这个图一定要仔细看清楚,当生产者发送消息时,消息中包含了交换机的名字,也就是要发往指定交换机,消息到了 BrokerServer 时就会去判断这个交换机的类型,此时 Exchange1 的类型是 Direct,所以这个消息里面是会指定目标队列的名字的,此时 routingKey 就是目标队列的名字。简单来说,直接交换机,消息的 routingKey 就是要转发的队列名。

  2. Fanout 扇出交换机

    生产者发送消息的时候,会指定交换机名,但不用指定一个目标队列的名字,交换机收到消息后,就会找出所有与它绑定的队列,然后转发给这些队列,绑定了几个队列,就转发给几个,每次转发的消息都是单独的消息(唯一key),只是消息内容一样而已。
    在这里插入图片描述

    上述就是交换机类型为 Fanout 扇出交换机时发送消息的流程,这个图一定要仔细看清楚,当生产者发送消息时,消息中包含了交换机的名字,也就是要发往指定交换机,消息到了 BrokerServer 时就会去判断这个交换机的类型,此时 Exchange1 的类型是 Fanout,也就是所有与该交换机绑定的队列,都要被转发消息。那么此时消息中的 routingKey 就没有任何作用了。

  3. Topic 主题交换机

    生产这发送消息的时候,会指定交换机名,会指定一个 routingKey,交换机收到消息后,就会找出所有与它绑定的队列,由于找出与交换机绑定的队列,本质就是去看有没有 Binding 这个对象,一个 Binding 对象记录了一个交换机和一个队列的绑定关系,并且额外有一个 bindingKey 的属性,这个属性值是在创建绑定的时候客户端指定的。此时交换机就会拿着这个消息里面的 routingKey 去与 bindingKey 进行匹配,判断是否匹配成功,匹配成功就把消息转发到这个队列中,如果匹配失败则不转发。至于如何判断是否匹配成功呢?匹配的规则又是什么呢?这个就等后续写代码详细讲解。
    在这里插入图片描述

    上述就是交换机类型为 Topic 主题交换机时发送消息的流程,这个图一定要仔细看清楚,当生产者发送消息时,消息中包含了交换机的名字,也就是要发往指定交换机,消息到了 BrokerServer 时就会去判断这个交换机的类型,此时 Exchange1 的类型是 Topic,也就是只有 routingKey 与 bindingKey 匹配成功才能转发给这个队列,至于匹配规则,后续写代码时才详细讲解,这里由于 routingKey = “bbb”,而 bindingKey = “ccc”,那么这俩一模一样肯定匹配成功嘛。##

7. 简单总结

介绍到这,前期简单的 BrokerServer 了解就完成的差不多了。下面就来简单总结下,BrokerServer 这里主要讲了哪些知识点。

① 聊到了 BrokerServer 的核心几个概念,VirtualHost,Exchange,Queue,Binding,Message。对于 BrokerServer 讨论到了要考虑持久化的问题。(详细见4.1章节)。

② 聊到了 生产者/消费者 也就是客户端如何与 BrokerServer 进行通信,本项目采用自定义应用层协议配合远程方法调用实现与 BrokerServer 交互(详细见4.2章节)。

③ 聊到了 BrokerServer 需要提供给客户端调用的 API (详细见4.3章节)。

④ 聊到了客户端为了与 BrokerServer 交互(远程调用BrokerServer的 API)的方法(详细见4.4章节)。

⑤ 聊到了交换机的类型,分别有 Direct,Topic,Fanout,交换机的类型不同,涉及到的转发方式也是不同的(详细见4.5章节)。

所以最终我们要实现的目标,就是如下思维导图所呈现的:
在这里插入图片描述
后面的具体代码实现环节,就会按照上述思维导图从上往下进行讲解以及编写代码。

相关文章:

  • 在Electron中通过Node-API调用DLL导出函数的完整指南
  • 神经网络前向微分和后向微分区别
  • 面试题汇总(一)
  • 机器学习4-PCA降维
  • CMake学习笔记(一):工程的新建和如何将源文件生成二进制文件
  • conda 更换镜像究极方法
  • 新品速递 | 多通道可编程衰减器+矩阵系统,如何破解复杂通信测试难题?
  • YOLO11改进-模块-引入多域学习MDL(Multi-Domain Learning) 使用频域增强图像特征
  • jQuery UI 简介
  • IntelliJ IDEA集成MarsCode AI
  • Java开发的AI应用框架简述——LangChain4j、Spring AI、Agent-Flex
  • 将PDF转为Word的在线工具
  • 从@Param注解开始,深入了解 MyBatis 参数映射的原理
  • 3.6V-30V宽压输入降压同步IC内置MOS,电流4A/5A/6A,可以满足汽车应急电源,BMS电池,电池组USB口输出等储能应用
  • SpringBoot 校园新闻网站
  • python网络爬虫开发实战之基本库使用
  • 基于qt的桌面宠物——纳西妲源码纯享
  • CS144 Lab Checkpoint 5: down the stack (the network interface)
  • http status是什么?常见的http状态码指的是什么意思?
  • FPGA开发,使用Deepseek V3还是R1(9):FPGA的全流程(详细版)
  • 进口食品销售销售在那个网站做/搜索seo优化
  • 站长seo计费系统/怎么制作网页推广
  • 物流网站建设重要性/微信推广多少钱一次
  • b站视频推广费用一般多少/网页开发用什么软件
  • 口碑营销的前提及好处有哪些/做seo需要用到什么软件
  • 聊城做网站哪里好/百度开发平台