将 RabbitMQ 与 .NET Core Web API 和 Worker Services 结合使用
我不会深入探讨微服务与单体架构之间的区别,因为这个问题很大很复杂,远远超出了我在这里分享的范围。简而言之,每种架构都有其优缺点,具体取决于组织的需求、技能组合等。
但我认为创建并比较这两种方法的简单示例可能会很有趣。接下来,我将首先介绍更传统的“单体”方法,即向数据库添加新项目。然后,我将介绍一个非常精简的微服务方法。这两种方法的结果相同:从 Web UI 向数据库插入新项目。
如果您喜欢此文章,请收藏、点赞、评论,谢谢,祝您快乐每一天。
前端
我有一个简单的Angular网站,它模拟了一个同样简单且虚构的制药厂ERP界面。它包含这类应用中常见的CRUD操作。接下来我将以“物料”为例,它是生物制药制造业的核心实体。在这里,我们可以查看物料、添加新物料、编辑现有物料,以及将物料标记为“非活动状态”(在我的任何用例中,我们都不会删除物料)。
这是显示材料列表的网格:
假设我们需要添加一种新物料。该公司正在生产一种新产品,需要的物料目前在其 ERP 数据库中尚不存在。数据库是SQL Server。以下是使用 SQL Management Studio 查看物料数据的视图。
让我们转到“添加新材料”表单并添加将用于制造的新材料。
该材料已成功插入数据库:
后端
在本例中,材质的插入由一个非常典型的 .NET Core API 控制器模式处理。Angular 服务组件将序列化的 Material 对象发送到远程控制器方法:
控制器调用 CreateMaterialUseCase 的服务级实例。其 Execute 方法将材料对象传递给存储库方法:
存储库方法 CreateMaterialAsync 使用实体框架插入材料。
到目前为止,这都是非常简单的样板控制器架构。在我们添加新材料的例子中,实际生活中可能会发生哪些问题导致数据插入数据库失败?如果数据库由于某种原因处于离线状态,插入操作显然会失败。在这种情况下,UI 很可能会返回 500 内部服务器错误。
我们还可以问一下,就可扩展性而言,这是否是理想的模式,例如,考虑一个在一小时内直接向数据库插入数千次的站点。
考虑离线数据库的情况是一个很好的过渡,让我们看看 RabbitMQ 等消息代理框架如何为我们刚刚研究的更单一的模式提供替代方案。
消息代理
首先,我们来简单介绍一下消息代理框架的工作原理。它的核心是消息队列。队列是一个用于存储消息的内存缓冲区。作为队列,它遵循 FIFO(先进先出)的数据结构。如果您是一位长期使用 Windows 和 Microsoft 开发栈的开发人员,您可能已经熟悉Microsoft 消息队列(MMQ)。
消息生产者通常将消息发布为实体或类对象的序列化(Json)实例。这些消息被放置在队列中,如果没有消费者,它们将无限期地停留在那里。
消费者是接收并处理来自队列的消息的应用程序或服务。通常(但并非总是如此)消费者是一个持续运行的进程,例如 Windows 服务。
在我们的案例中,实现生产者-消费者关系的中间件是RabbitMQ。它是免费的开源产品,并且似乎拥有出色的支持和文档。其网站在这里。
让我们再看一下我们的 API 控制器应用程序,并了解我将用于插入新材料的消息生产者的实现。
RabbitMQ 服务
我想在我的 API 应用程序中创建一个替代插入数据库的替代方案。因此,我做的第一件事就是创建一个类,该类使用 .NET Core 的 RabbitMQ 客户端包(当然可以通过 NuGet 获取)来接收传入的类对象,然后将序列化版本推送到消息队列。
这个类是GenericRabbitMQService<T>。我之所以要处理通用对象,是因为以后我需要将来自不同实体(Material、User、Product 等)的消息推送到一个方法中。我不想每次需要发布新的实体类型时都重写那些几乎重复的代码。
这是课程:
如您所见,只有一种方法,即PublishAsync,它接受两个参数:一个T 项和一个MessageBus实例。 MessageBus 是一个简单的模型类,它存储了 RabbitMQ 所需的两项信息:主机名(在本例中为localhost)和队列名称(在本例中为CreateMaterial)。
GenericRabbitMQService中有三个部分值得关注。通过channel.QueueDeclareAsync调用,我们实例化了消息队列。在本例中,该队列名为CreateMaterial。如果该队列存在,我们的消息将被推送到该队列。如果不存在,则创建该队列并将消息推送到该队列。
其次,我们使用 Newtonsoft.Json 将传入的 Material 对象序列化为 Json 字符串。可以将此步骤视为将 C# 对象转换为可发布、队列友好的 Json 对象。
最后,使用channel.BasicPublishAsync将 Json 对象推送到队列。对于总共十行代码来说,还不错!
我从另一个服务级别类PublishCreateMaterialUseCase调用PublishAsync:
最后要做的是创建一个新的 HttpPost 控制器方法。此方法调用 PublishCreateMaterialUseCase 中实现的 Execute 方法。
再次讨论前端
因此,我们假设我们已经使用Swagger、Postman等成功测试了新版 RabbitMQ 版本的 CreateMaterial 控制器。现在我们需要更新 Angular 前端以将新材料提交给 RabbitMQ 控制器。
我希望此更新是可配置的。也就是说,我希望能够通过更改配置设置来在新旧控制器之间进行选择,而不是手动更改组件服务代码中的(硬编码)端点 URL。
因此我添加了一个 config.json 文件,其中包含满足此要求所需的设置。
如果useMessageQueue为 true,则新素材将发布到 RabbitMQ 控制器。否则,将发布到原始控制器。
这是我的材质服务组件的更新版本。您可以看到检查新设置的条件逻辑。
在添加新内容来测试 Angular 更新之前,我们先快速浏览一下 RabbitMQ 的 UI。该 UI 方便您在本地查看队列和消息。我的版本如下所示:
目前我的 CreateMaterial 队列是空的。让我们添加一种新材料——淀粉乙醇酸钠,看看会发生什么:
如果我们回到 RabbitMQ UI,我们现在可以看到 CreateMaterial 队列中有一条消息:
我们可以深入查看该消息:
很酷,但现在怎么办呢?我们希望这条现在位于队列中的消息成为我们 ERP 数据库中一条新的物料记录。这该怎么做呢?我们需要一个服务来消费这条消息,所以我现在来描述一下消费者是如何工作的。
.NET Core 工作服务
正如我之前提到的,像 Windows 服务这样的正在运行的进程是消息消费者的理想选择。在我的示例中,我将介绍一个 .NET Core Worker 服务的简单示例。我们将了解它如何监视 RabbitMQ 队列中的新消息以及如何处理消息。
这是解决方案资源管理器中目录和文件结构的视图。我将介绍其中的一些类。
Program.cs是服务配置和运行的地方。在这里,我们将 Worker 类作为托管服务添加到应用程序中。
Worker.cs是所有工作发生的地方。它是一个进程,按照配置的时间间隔运行并执行某些操作。在本例中,我们希望它监视两个 RabbitMQ 队列:CreateMaterial和UpdateMaterial,并消费它们中的任何消息。
在我们的例子中,我们希望工作进程每 5 秒检查一次队列。以下代码配置了时间跨度:
私有只读 TimeSpan _stoppingCheckInterval = TimeSpan.FromSeconds(5);
RabbitMQ 提供了在发现消息时引发的事件处理程序:
_createConsumer = 新的 EventingBasicConsumer(_channel);
_createConsumer.Received += CreateMaterialReceivedHandler;
CreateMaterialReceivedHandler非常简单。
执行时,消息字符串被反序列化为 Material 实体,然后传递给存储库方法。此方法使用ADO.NET和存储过程将 Material 插入数据库。
我们要做的最后一件事是运行工作服务,并确认它获取了新的材料消息并将其插入数据库。首先,我们可以看到服务的控制台输出。
如果我们转到 RabbitMQ UI,我们可以看到 CreateMaterial 队列中没有消息。
最后,我们确认材料在数据库中并出现在 ERP UI 中。
概括
以上我们介绍了两个简单的例子,分别从 UI 表单获取一个材料实体并将其插入数据库。在第一种情况下,数据库操作与控制器发生在同一个应用程序中。
在第二种情况下,我们不是直接插入数据库,而是将材料实体序列化为 Json 字符串,并使用 .NET Core 的 RabbitMQ 客户端将其推送到消息队列。该消息由正在运行的工作服务应用程序使用,该应用程序将反序列化的消息插入数据库。
我不会通过表达对某一种方法的偏好来结束本文。我认为两种方法各有利弊。根据您组织的需求、SDLC方法、团队成员的技能组合以及许多其他因素,您可能会觉得其中一种方法比另一种更合适。
如果您喜欢此文章,请收藏、点赞、评论,谢谢,祝您快乐每一天。