当前位置: 首页 > news >正文

.NET Core 中 RabbitMQ 和 MassTransit 的使用

介绍

随着越来越多的应用程序和系统必须具备高可用性和可扩展性,选择合适的软件架构来构建它们变得至关重要。在当今不断发展变化的技术格局中,人们对效率、速度和可靠性的需求持续增长,架构师和开发人员面临着持续的挑战,即创建能够无缝适应不断变化的需求的弹性解决方案。

为了应对这些挑战,现代应用程序和服务越来越多地被设计为相互通信以完成特定任务的互连服务和组件。在这种动态且快节奏的环境中,选择正确的消息传递基础架构和工具对于确保这些分布式或独立服务之间的顺畅通信和高效协调至关重要。

在本篇博文中,我们将探讨两个在 .NET Core 应用程序中实现此类通信的强大工具:RabbitMQ 和 MassTransit。通过了解它们的功能、差异和最佳使用场景,您可以做出明智的决定,选择最适合您应用程序需求的工具。

如果您喜欢此文章,请收藏、点赞、评论,谢谢,祝您快乐每一天。

RabbitMQ 和 MassTransit

RabbitMQ 是一款开源消息和流媒体代理软件,最初实现了高级消息队列协议 (AMQP)。自诞生以来,它通过插件架构进行了扩展,以支持其他协议。本质上,RabbitMQ 是一款定义队列的软件,应用程序可以向队列发布消息或从队列中消费消息。

MassTransit 是一个适用于 .NET 的开源消息传递库。它提供了一个轻量级但功能丰富的框架,用于实现各种通信模式,例如发布-订阅 (PubSub)、请求-响应和事件驱动架构。MassTransit 的主要目标是在消息传输上提供一致的抽象层,无论是 RabbitMQ 还是其他消息传递平台。

消息队列在微服务和分布式架构中的重要性

在微服务和分布式架构领域,服务间的解耦是一项至关重要的基本原则。通过减少相互依赖,服务可以减少受其他服务可用性或故障影响。消息队列是实现这种解耦的关键。服务可以将消息发布到队列,允许其他服务使用并异步处理这些消息。这种异步通信模型不仅增强了系统的弹性和响应能力,还提高了可扩展性和敏捷性。

此外,在其他服务无法立即响应的情况下,消息队列在分布式系统中的服务间交互中发挥着核心作用。

此外,消息队列有助于实现可靠的通信模式,例如发布-订阅和请求-响应,使服务能够无缝交互,同时保持松散耦合。这种松散耦合不仅增强了容错能力,而且还促进了各个服务的演进和迭代,而不会中断整个系统。

MassTransit简介

在深入探讨实现和代码之前,我们先花几分钟时间快速了解一下 MassTransit。如果您已经熟悉 MassTransit,可以跳过本节。

MassTransit 的架构围绕几个关键构建块展开,这些构建块简化了 .NET Core 应用程序中消息传递模式的实现。这些构建块提供了一个统一的框架,用于定义消息端点、处理消息消费以及协调服务之间的通信流。让我们来探索构成 MassTransit 功能骨干的一些主要组件:

消息

消息服务是 MassTransit 中服务之间通信的主要方式。它们包含端点之间的数据和命令交换。它们是强类型契约,可以通过记录、类或接口定义。消息类型主要有两种:事件和命令。命令指示服务执行某项操作,而事件则表示某项操作已发生。

消费者

消费者负责处理来自消息队列或主题的消息。在 MassTransit 中,消费者会实现消息处理器,用于指定如何处理特定类型的消息。

生产者

负责将消息发送到消息队列或主题,以供其他服务使用。生产者可以通过两种不同的方式生产消息:发布消息或发送消息。两者的区别在于,发送消息时,消息会通过特定地址投递到特定端点;而发布消息时,消息会广播给所有订阅了该特定消息类型的消费者。

请求

请求/响应是分布式系统中一种普遍的消息传递模式,其中一个服务向另一个服务发送请求,并等待响应后再继续处理。虽然这种方法可能会引入延迟,尤其是在服务托管在不同的进程或机器上时,但在某些情况下,它仍然是必要且通常是首选的方法。在 MassTransit 中,开发人员利用请求客户端异步发起请求并处理响应,而不会阻塞应用程序的执行。这种异步特性,加上对“await”关键字的支持,确保在等待远程服务响应时保持系统性能,从而有助于构建更高效、响应更快的分布式架构。

异常

处理是使用 MassTransit 构建弹性分布式系统的关键环节。如果在消息处理过程中出现错误,MassTransit 提供了强大的异常管理机制。其中一种机制是将失败的消息路由到错误队列,以便记录和分析这些消息,以便进行故障排除。
有关 MassTransit 的完整文档,请务必访问在线提供的官方文档。

示例项目:使用 MassTransit 和 RabbitMQ 模拟传感器数据

为了演示如何使用 MassTransit 和 RabbitMQ,我创建了一个示例项目,模拟从传感器收集测量数据。偶尔,这些测量结果会显示异常。

在此模拟中,传感器将其读数(测量值)发布到 RabbitMQ 队列。在服务器端,TelemetryService会使用这些消息。它的主要职责是验证和处理传入的数据。如果检测到异常, TelemetryService 会将异常消息发布到另一个服务AnomalyHandlingService,该服务负责处理和管理异常。

设置环境

在我们开始之前,让我们用 RabbitMQ 和 MassTransit 设置环境。

由于本博文的重点是如何使用 RabbitMQ 和 MassTransit,我们将使用单个节点保持 RabbitMQ 的设置简单而基本。但是,其他集群 RabbitMQ 配置的实现逻辑是相同的。

由于我们将使用 RabbitMQ 作为 docker 镜像,因此请使用以下命令开始:

docker run -d --hostname rmq-host --name rmq-c -p 15672:15672 -p 5672:5672 rabbitmq:3-management
这将在带有管理插件的 docker 容器中启动 RabbitMQ,并公开端口 5672 和 15672。

设置 MassTransit

创建新项目后,下一步是配置 MassTransit 与 RabbitMQ 配合使用。此配置通常在Program.cs文件中完成。为简单起见,我们将使用基本身份验证连接到 RabbitMQ。

添加必要的设置包括初始化 MassTransit、指定 RabbitMQ 作为消息代理,以及定义连接详细信息(例如主机、用户名和密码)。此步骤可确保您的应用程序能够通过 RabbitMQ 可靠地发布和使用消息。

// Add MassTransit services
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<TelemetryDataConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", host =>
{
host.Username("guest");
host.Password("guest");
});

        cfg.ConfigureEndpoints(context);
});
});

发布消息

MassTransit 提供了一种直观的方式来从消息代理(在本例中为 RabbitMQ)发布和使用消息。首先,我们需要定义一个Message类,它表示要发送到队列的信息的数据结构。该类充当发布者和消费者之间的契约,确保数据格式的一致性。

创建消息类后,我们可以使用 MassTransit 的 API 将消息发布到队列。之后,这些消息就可以被相应的订阅者使用。

我们示例中的消息类如下所示:

public class TelemetryDataMessage
{
// Unique identifier for the device sending the telemetry data
public string DeviceId { get; set; }
// Timestamp of when the telemetry data was recorded
public DateTime Timestamp { get; set; }
public WaterMeasurementData WaterMeasurementData { get; set; }
//Include metadata about the message, such as data quality or source
public string DataQuality { get; set; }

    public override string ToString()
{
return $"DeviceId: {DeviceId}, Timestamp: {Timestamp}, WaterMeasurementData: {WaterMeasurementData}";
}
}

在我们的示例项目中,DataInjectorSim是一个简单的控制台应用程序,旨在将传感器读数发布到 RabbitMQ。为此,我们首先需要设置 MassTransit,如前面“设置 MassTransit”部分所述。

设置完成后,应用程序将使用一个工作线程类持续向 RabbitMQ 队列发布消息。该工作线程模拟传感器定期发送测量数据的行为,为我们的系统提供稳定的消息流。

消息发布是通过MassTransit 的核心组件Bus完成的。Bus 封装了配置、传输机制和端点,简化了与消息代理交互的复杂性。它提供了一个易于使用的 API,用于将消息发布到消息代理。

工作者类的代码如下所示:

public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly IBus _bus;

    public Worker(ILogger<Worker> logger, IBus bus)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_bus = bus ?? throw new ArgumentNullException(nameof(bus));
}

    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
if (_logger.IsEnabled(LogLevel.Information))
{
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
}

            var data = GetTelemetryDataMessage();

            var serializedData = JsonSerializer.Serialize(data);
_logger.LogDebug("Publishing telemetry data message: {telemetryDataMessage}", serializedData);

            await _bus.Publish(data, cancellationToken);

            await Task.Delay(3000, cancellationToken);
}
}

    public TelemetryDataMessage GetTelemetryDataMessage()
{
var random = new Random();
var deviceIdList = new List<string> { "Device-1", "Device-2", "Device-3" };

        var deviceId = deviceIdList[random.Next(0, deviceIdList.Count)];
var timestamp = DateTime.Now;

        var telemteryDataMessage = new TelemetryDataMessage()
{
DeviceId = deviceId,
Timestamp = timestamp,
WaterMeasurementData = GetWaterMeasurementData()
};

        return telemteryDataMessage;
}

    private static WaterMeasurementData GetWaterMeasurementData()
{
var random = new Random();
var waterMeasurementData = new WaterMeasurementData
{
WaterLevel = random.NextDouble() * 100, // 0 to 100 cm
pHLevel = Math.Round(random.NextDouble() * 14, 2), // 0 to 14 pH
Temperature = Math.Round(random.NextDouble() * 40, 2), // 0 to 40 °C
NitrateConcentration = Math.Round(random.NextDouble() * 50, 2), // 0 to 50 mg/L
DataQuality = "High"
};

        // Occasionally inject anomalies
if (random.Next(0, 10) < 3) // 30% chance to inject an anomaly
{
waterMeasurementData = InjectAnomaly(waterMeasurementData);
waterMeasurementData.DataQuality = "Low";
}

        return waterMeasurementData;
}

    private static WaterMeasurementData InjectAnomaly(WaterMeasurementData waterMeasurementData)
{
var random = new Random();
var anomalyType = random.Next(0, 4); // 0 to 3

        switch (anomalyType)
{
case 0:
// Inject an anomaly for WaterLevel (e.g., negative or excessively high value)
waterMeasurementData.WaterLevel = random.NextDouble() > 0.5 ? -random.NextDouble() * 100 : random.NextDouble() * 200;
break;
case 1:
// Inject an anomaly for pHLevel (e.g., negative or above 14)
waterMeasurementData.pHLevel = random.NextDouble() > 0.5 ? -Math.Round(random.NextDouble() * 14, 2) : Math.Round(random.NextDouble() * 28, 2);
break;
case 2:
// Inject an anomaly for Temperature (e.g., negative or excessively high value)
waterMeasurementData.Temperature = random.NextDouble() > 0.5 ? -Math.Round(random.NextDouble() * 40, 2) : Math.Round(random.NextDouble() * 80, 2);
break;
case 3:
// Inject an anomaly for NitrateConcentration (e.g., negative or excessively high value)
waterMeasurementData.NitrateConcentration = random.NextDouble() > 0.5 ? -Math.Round(random.NextDouble() * 50, 2) : Math.Round(random.NextDouble() * 100, 2);
break;
}

        return waterMeasurementData;
}
}
现在我们已经准备好消息和发布者(或生产者),我们将为该消息创建一个消费者类。

MassTransit 中发布的消息是否有自己的队列?

使用 MassTransit 时,您自然会想知道每种已发布的消息类型是否都有各自的队列,还是所有消息都发送到同一个队列。简而言之:不,已发布的消息没有自己的专用队列。相反,路由是基于发布-订阅模型动态处理的,确保消息仅传递给对其感兴趣的消费者。

它是如何工作的?

MassTransit 采用发布-订阅模式进行消息分发。以下是该流程的分解:

交换器和主题:消息发布后,会被发送到 RabbitMQ 中的一个交换器。交换器通常以消息类型命名,负责将消息路由到合适的消费者队列。
消费者队列:每个消费者都有自己的队列。例如:
监听消息的消费者OrderSubmitted 将拥有一个与交换机绑定的队列OrderSubmitted 。
另一个监听PaymentProcessed 消息的消费者将拥有与交换机绑定的不同队列PaymentProcessed 。
动态路由:MassTransit 使用消息类型作为路由键。当您发布消息时,只有订阅该特定消息类型的队列才会收到该消息。这确保了消息仅传递给相关的消费者。

从队列消费消息

要从 RabbitMQ(或任何支持的消息代理)消费消息,您需要创建一个消息消费者。具体来说,消息消费者是一个负责处理一种或多种消息类型的类。对于每种消息类型,该类都会实现并定义处理接收到的消息IConsumer<TMessage>的逻辑。Consume

举个实际的例子,TelemetryService是一个标准的控制台应用程序,它使用来自 RabbitMQ 的消息。为了实现这一点,我们在启动配置期间在方法中注册了消息消费者类AddMassTransit。这样可以确保消费者正确连接并接收和处理来自队列的消息。

builder.Services.AddMassTransit(x =>
{
x.AddConsumer<TelemetryDataConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", host =>
{
host.Username("guest");
host.Password("guest");
});

        cfg.ConfigureEndpoints(context);
});
});

builder.Services.AddLogging();
builder.Services.AddSingleton<ITelemetryDataProcessor, TelemetryDataProcessor>();
builder.Services.AddSingleton<IAnomalyDetector, AnomalyDetector>();

var host = builder.Build();
host.Run();

服务使用来自 RabbitMQ 的消息后,ITelemetryDataProcessor将接管并验证和处理数据。如果在此处理过程中检测到异常,服务会向 RabbitMQ 发送一条新的异常消息。然后,该异常消息会被另一个专为异常管理而设计的服务拾取并处理。

这种方法演示了如何使用 RabbitMQ 和 MassTransit 构建模块化、事件驱动的架构,其中每个服务都有明确定义的职责并通过消息传递进行通信。

消费者类别如下所示:

public sealed class TelemetryDataConsumer : IConsumer<TelemetryDataMessage>
{
private readonly ILogger<TelemetryDataConsumer> _logger;
private readonly ITelemetryDataProcessor _telemtryDataProcessor;

    public TelemetryDataConsumer(ILogger<TelemetryDataConsumer> logger, ITelemetryDataProcessor telemtryDataProcessor)
{
_logger = logger;
_telemtryDataProcessor = telemtryDataProcessor;
}

    public async Task Consume(ConsumeContext<TelemetryDataMessage> context)
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Consuming telemetry data message: {telemetryDataMessage}", context.Message);
}

        if (!await ValidateMessageAsync(context))
{
return;
}

        await _telemtryDataProcessor.Process(context.Message);

        if (_logger.IsEnabled(LogLevel.Information))
{
_logger.LogInformation("Telemetry data message processed: {telemetryDataMessage}", context.Message);
}
}

    private async Task<bool> ValidateMessageAsync(ConsumeContext<TelemetryDataMessage> context)
{
if (context.Message == null)
{
_logger.LogError("Received null telemetry data message.");
await context.Publish(new DeadLetterEvent(
Timestamp: DateTime.UtcNow,
Reason: "Null message",
OriginalMessage: null));
return false;
}

        if (string.IsNullOrWhiteSpace(context.Message.DeviceId) ||
context.Message.Timestamp == default ||
context.Message.WaterMeasurementData == null)
{
_logger.LogWarning("Invalid telemetry data for DeviceId: {DeviceId} at {Timestamp}",
context.Message.DeviceId, context.Message.Timestamp);
await context.Publish(new DeadLetterEvent(
Timestamp: DateTime.UtcNow,
Reason: "Invalid telemetry data",
OriginalMessage: context.Message));
return false;
}

        return true;
}
}
本节介绍了使用 MassTransit 和 RabbitMQ 构建事件驱动遥测系统的基础知识。我们设置了一个生产者来发布传感器数据,一个消费者来处理数据,以及一些处理异常的逻辑。打好基础后,我们将探索更高级的用例和技术。

配置用例

使用 SSL 配置 MassTransit

下面介绍如何在 .NET 应用程序中设置 MassTransit 以通过 SSL 安全地连接到 RabbitMQ:

builder.Services.AddMassTransit(x =>
{
x.AddConsumer<TelemetryDataConsumer>(configure =>
{
configure.UseMessageRetry(r => r.Immediate(5));
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", host =>
{
host.Username("guest");
host.Password("guest");

            host.UseSsl(ssl =>
{
ssl.ServerName = "localhost";
ssl.CertificatePassphrase = "password";
ssl.CertificateValidationCallback = (sender, certificate, chain, sslPolicyErrors) =>
{
// Optional: Validate the certificate. Return true to trust the server certificate.
return true;
};
});
});

        cfg.ConfigureEndpoints(context);
});
});

该host.UseSsl方法配置SSL设置,包括:

ServerName:这必须与服务器证书中的通用名称 (CN) 或主题备用名称 (SAN) 匹配。
CertificateValidationCallback:允许自定义处理证书验证,对于自签名证书很有用。
AllowPolicyErrors:放宽验证策略,这在开发中很有用,但在生产中应避免。

配置主机选项

MassTransit 允许我们通过配置来管理公交车的启动和关闭行为MassTransitHostOptions。这些选项可以控制应用程序如何处理代理连接、启动和关闭的时间以及长时间运行的消费者。

可以调整以下选项以满足您的应用程序的需求:

选项描述
等待开始设置为 时true,应用程序会等待总线连接到代理,然后继续启动。默认值false允许异步连接。
开始超时定义应用程序在启动期间等待代理连接建立的时间。如果超时,则启动失败。默认情况下,没有超时。
停止超时设置应用程序等待总线关闭的最长时间,包括处理任何正在进行的消息。默认情况下,它将无限期等待。
消费者停止超时指定关闭期间长时间运行的使用者的超时时间。超过此时间后,ConsumeContext.CancellationToken将被取消。此值必须小于或等于StopTimeout

英文描述:

下面是如何在 .NET 应用程序中使用选项模式配置这些选项的示例:

builder.Services.AddOptions<MassTransitHostOptions>()
.Configure<IServiceProvider>((options, sp) =>
{
options.WaitUntilStarted = false;
options.StartTimeout = TimeSpan.FromSeconds(10);
options.StopTimeout = TimeSpan.FromSeconds(10);
options.ConsumerStopTimeout = TimeSpan.FromSeconds(10);
});

MassTransit过滤器

MassTransit 提供了多种类型的过滤器,以实现类似中间件的消息处理。过滤器是在管道各个阶段拦截消息的组件,用于执行日志记录、验证、转换或其他自定义逻辑等任务。让我们回顾一下 MassTransit 提供了哪些过滤器以及如何使用它们。

KillSwitch

MassTransit 中的 KillSwitch 过滤器是一种特殊类型的过滤器,用于在特定条件下中止消息处理。当“终止开关”被触发时,它会停止进一步的消息处理,并停止消息消费,从而有效地阻止消费者进行任何进一步的工作。

(此处参考官方文档中的示例)

builder.Services.AddMassTransit(x =>
{
x.AddConsumer<TelemetryDataConsumer>(configure =>
{
configure.UseMessageRetry(r => r.Immediate(5));
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", host =>
{
host.Username("guest");
host.Password("guest");
});

        cfg.ConfigureEndpoints(context);

        // filters
cfg.UseKillSwitch(options =>
{
options.SetActivationThreshold(10);
options.SetTripThreshold(0.15);
options.SetRestartTimeout(TimeSpan.FromSeconds(30));
});
});
});

断路器

MassTransit内置了断路器过滤器 (Circuit Breaker Filter),旨在防止下游组件出现故障时系统过载。它会监控消息管道中的故障,当在规定的时间内发生过多错误时,它会“跳闸”,暂时停止消息处理。

这种处理暂停可确保故障组件不会因重试或额外负载而不堪重负,从而有时间恢复。重置间隔过后,电路将“闭合”,并恢复正常的消息处理。

为什么要使用它?

想象一下这样的场景:您的一个消费者依赖于一个突然失去响应的数据库。如果没有断路器,MassTransit 可能会继续向该消费者发送消息,从而导致反复出现故障,并可能在整个系统中引发连锁问题。断路器过滤器会介入,通过暂停进一步的处理,直到数据库稳定下来,从而阻止这种循环。

cfg.UseCircuitBreaker(cb =>
{
cb.TrackingPeriod = TimeSpan.FromMinutes(1);
cb.TripThreshold = 20;
cb.ActiveThreshold = 10;
cb.ResetInterval = TimeSpan.FromMinutes(5);
});
有关 MassTransit 过滤器的更多信息和详细信息,请参阅官方文档。

使用 MassTransit 与 RabbitMQ 的最佳实践

隔离消息模型:为消息契约维护一个单独的项目或程序集。这可确保发布者和消费者之间的一致性,从而更轻松地进行更新和版本控制。

实现单例总线实例:将 MassTransit 总线初始化为单例,以防止与 RabbitMQ 的多个连接,从而减少资源消耗和潜在的连接问题。

设计周全的路由拓扑:精心规划您的交换器和队列结构,以确保高效的消息路由。实施死信队列以妥善处理消息故障,并避免将所有消息路由到单个队列,从而避免出现瓶颈。
利用 RabbitMQ 的功能(例如发布确认和持久队列)来增强消息的可靠性和持久性。

结论

MassTransit 与 RabbitMQ 强强联手,打造可扩展、可靠且易于维护的分布式系统。通过理解断路器等模式,并遵循配置、可观察性和资源管理方面的最佳实践,您可以确保您的消息传递基础架构稳健且能够抵御故障。

请记住,成功的关键在于规划和迭代——设计周到的消息契约,利用内置功能,并持续监控系统运行状况。借助这些工具和技术,您将能够应对现代消息系统的挑战,同时确保应用程序平稳运行。

如果您喜欢此文章,请收藏、点赞、评论,谢谢,祝您快乐每一天。

http://www.dtcms.com/a/390390.html

相关文章:

  • 使用QT进行3D开发建模
  • 阿里云开源DeepResearch:轻量化AI推理框架技术解析与实践指南
  • Visual Studio 2026 Insiders 重磅发布:AI 深度集成、性能飞跃、全新设计
  • 大模型初识(基础模型 业务集成+智能体Agent+Prompt提示词优化)
  • 【4/20】Node.js 入门:设置后端服务器,实现一个简单 API 端点
  • Kafka事务:构建可靠的分布式消息处理系统
  • 补环境-JS原型链检测:在Node.js中完美模拟浏览器原型环境
  • TCP端口号的作用
  • 笔记本电脑维修指南(芯片级)
  • Burpsuite进行暴力破解
  • 虚拟现实CAVE系统中的光学跟踪技术,1:1呈现CAD模型沉浸式交互
  • 2025拍照手机综合排名与场景化选购指南
  • TCP 抓包分析:tcp抓包工具、 iOS/HTTPS 流量解析全流程
  • 从电商API到数据分析的全流程教程
  • 【踩坑】ELK日志解析优化实战:解决多行合并与字段提取问题
  • 大数据高校舆情分析系统 snownlp情感分析 数据分析 可视化 Flask框架 大数据实战(源码)✅
  • 【12/20】数据库高级查询:MongoDB 聚合管道在用户数据分析中的应用,实现报告生成
  • Oceanbase tablegroup表组与负载均衡实践
  • 什么是批量剪辑矩阵源码,支持OEM!
  • RabbitMQ快速入门指南
  • 在项目中通过LangChain4j框架接入AI大模型
  • c语言9:从内存到实践深入浅出理解数组
  • sglang使用笔记
  • 本地大模型编程实战(36)使用知识图谱增强RAG(2)生成知识图谱
  • clip——手写数字识别
  • commons-numbers
  • MySqL-day4_01(内置函数、存储过程、视图)
  • 用html5写一个手机ui
  • 2.canvas学习
  • 【系统架构设计(34)】计算机网络架构与技术基础