封装 RabbitMQ 消息代理交互的功能
封装了与 RabbitMQ 消息代理交互的功能,包括发送和接收消息,以及管理连接和通道。
主要组件
-
依赖项:
- 代码使用了多个命名空间,包括
Microsoft.Extensions.Configuration
(用于配置管理)、RabbitMQ.Client
(用于与 RabbitMQ 的交互)、Newtonsoft.Json
(用于 JSON 序列化和反序列化)。 - 还使用了一些自定义命名空间,可能是更大应用程序框架的一部分(例如
MSEBP.Kernel.Common
)。
- 代码使用了多个命名空间,包括
-
配置模型:
RabbitConfigModel
用于保存连接 RabbitMQ 的配置设置,包括IP
、Port
、UserName
、Password
、VirtualHost
等属性。
-
连接管理:
- 类中维护了一个静态字段
_connection
用于 RabbitMQ 连接,并提供了GetConnection
方法来建立或获取该连接。 - 连接使用 IP、端口、用户名、密码和虚拟主机等参数进行建立。
- 类中维护了一个静态字段
-
接收消息:
Receive<T>
方法允许从 RabbitMQ 队列异步消费消息。- 使用
EventingBasicConsumer
来处理传入的消息,并通过提供的委托(receiveMethod
)进行处理。 - 消息处理包括错误处理,根据处理是否成功来确认(或否定确认)消息。
-
发送消息:
Send<T>
方法将对象序列化为 JSON,并将其作为消息发送到指定的 RabbitMQ 队列。- 此方法允许指定消息是否应为持久化,并为消息设置适当的属性(如头信息)。
-
通道管理:
CreateConsumerChannel
方法创建一个新的通道以与 RabbitMQ 进行交互。- 通道用于发送和接收消息,必须从已建立的连接中创建。
-
连接工厂:
GetConnection
方法是一个静态方法,使用ConnectionFactory
创建新的 RabbitMQ 连接。它配置了心跳和其他参数。
-
资源管理:
Dispose
方法确保在不再需要时正确关闭 RabbitMQ 连接。
错误处理和重试机制
- 消息消费逻辑中包含错误处理机制,以决定如何处理失败的消息处理:
- 确认(BasicAck):在成功处理消息时调用。
- 否定确认(BasicNack):允许根据情况(例如瞬态错误与永久错误)重试消息或拒绝消息
using Microsoft.Extensions.Configuration; using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks;namespace MSEBP.External.RabbitMQMsg {/// <summary>/// 接受消息的服务 /// </summary>public class RabbitMessageService : IDisposable{private readonly RabbitConfigModel _rabbitConfig;private static IConnection _connection;private readonly IConfiguration _configuration;private readonly ILogger _logger;private readonly object _connectionLock = new object();private const int ReconnectDelaySeconds = 5;public RabbitMessageService(IConfiguration configuration){_configuration = configuration;_logger = MSEApplication.Resolve<ILogger>("CorrelationId");_rabbitConfig = LoadConfig();ValidateConfig(_rabbitConfig);}private RabbitConfigModel LoadConfig(){return new RabbitConfigModel{IP = _configuration["RabbitMQ:IP"],Port = _configuration["RabbitMQ:Port"].AsInt(5672),UserName = _configuration["RabbitMQ:UserName"],Password = _configuration["RabbitMQ:Password"],VirtualHost = _configuration["RabbitMQ:VirtualHost"],DurableQueue = _configuration["RabbitMQ:DurableQueue"].AsBool(),QueueName = _configuration["RabbitMQ:QueueName"],Exchange = _configuration["RabbitMQ:Exchange"],ExchangeType = _configuration["RabbitMQ:ExchangeType"],DurableMessage = _configuration["RabbitMQ:DurableMessage"].AsBool(),RoutingKey = _configuration["RabbitMQ:RoutingKey"],};}private void ValidateConfig(RabbitConfigModel config){if (string.IsNullOrWhiteSpace(config.IP) || config.Port <= 0 || string.IsNullOrWhiteSpace(config.UserName) || string.IsNullOrWhiteSpace(config.Password)){throw new InvalidOperationException("RabbitMQ configuration is invalid.");}}public IConnection GetConnection(string clientProvidedName){if (_connection == null || !_connection.IsOpen){lock (_connectionLock){if (_connection == null || !_connection.IsOpen){_connection = CreateConnection(clientProvidedName);}}}return _connection;}private IConnection CreateConnection(string clientProvidedName){var factory = new ConnectionFactory{ClientProvidedName = clientProvidedName,Endpoint = new AmqpTcpEndpoint(new Uri($"amqp://{_rabbitConfig.IP}/")),Port = _rabbitConfig.Port,UserName = _rabbitConfig.UserName,Password = _rabbitConfig.Password,VirtualHost = _rabbitConfig.VirtualHost,RequestedHeartbeat = TimeSpan.FromSeconds(60)};while (true){try{var connection = factory.CreateConnection();connection.ConnectionShutdown += OnConnectionShutdown;_logger.Info("RabbitMQ connection established.");return connection;}catch (Exception ex){_logger.Error("Failed to create RabbitMQ connection, retrying...", ex);Thread.Sleep(TimeSpan.FromSeconds(ReconnectDelaySeconds)); // 等待5秒后重试}}}private void OnConnectionShutdown(object sender, ShutdownEventArgs e){_logger.Warn("RabbitMQ connection was shut down. Attempting to reconnect...");Reconnect();}private void Reconnect(){lock (_connectionLock){if (_connection != null && !_connection.IsOpen){_logger.Info("Disposing of RabbitMQ connection.");_connection.Dispose();_connection = null;}// 重新建立连接_connection = CreateConnection("Reconnect");}}public async Task Receive<T>(Func<T, Task> receiveMethod, CancellationToken cancellationToken){using (var channel = CreateConsumerChannel("Receive")){try{SetupQueue(channel);var consumer = new EventingBasicConsumer(channel);consumer.Received += async (model, ea) => await ProcessMessage(channel, ea, receiveMethod, cancellationToken);channel.BasicConsume(_rabbitConfig.QueueName, false, consumer);// Wait for cancellationawait Task.Delay(Timeout.Infinite, cancellationToken);}catch (OperationCanceledException){_logger.Info("Message consumption was canceled.");}catch (Exception ex){_logger.Error("Failed to set up message consumer", ex);throw;}}}private void SetupQueue(IModel channel){if (!string.IsNullOrWhiteSpace(_rabbitConfig.Exchange)){channel.ExchangeDeclare(_rabbitConfig.Exchange, _rabbitConfig.ExchangeType.ToString(), _rabbitConfig.DurableQueue);channel.QueueDeclare(_rabbitConfig.QueueName, _rabbitConfig.DurableQueue, false, false, null);channel.QueueBind(_rabbitConfig.QueueName, _rabbitConfig.Exchange, _rabbitConfig.RoutingKey);}else{channel.QueueDeclare(_rabbitConfig.QueueName, _rabbitConfig.DurableQueue, false, false, null);}channel.BasicQos(0, 1, false);}private async Task ProcessMessage<T>(IModel channel, BasicDeliverEventArgs ea, Func<T, Task> receiveMethod, CancellationToken cancellationToken){var deliveryTag = ea.DeliveryTag;var message = Encoding.UTF8.GetString(ea.Body.ToArray());try{_logger.Info($"Received message: {message}");var data = JsonConvert.DeserializeObject<T>(message);await receiveMethod(data);channel.BasicAck(deliveryTag, false); // 确认处理成功}catch (Exception ex){_logger.Error($"Error processing message: {message}", ex);channel.BasicNack(deliveryTag, false, true); // 允许重新处理}}public bool Send<T>(T info){var message = JsonConvert.SerializeObject(info);if (string.IsNullOrWhiteSpace(message)) return false;try{using (var channel = CreateConsumerChannel("Send")){SetupSendChannel(channel);var properties = CreateMessageProperties(channel);var bytes = Encoding.UTF8.GetBytes(message);if (string.IsNullOrWhiteSpace(_rabbitConfig.Exchange)){channel.BasicPublish("", _rabbitConfig.QueueName, properties, bytes);}else{channel.BasicPublish(_rabbitConfig.Exchange, _rabbitConfig.RoutingKey, properties, bytes);}}return true;}catch (Exception ex){_logger.Error("Failed to send message to RabbitMQ", ex);return false;}}private void SetupSendChannel(IModel channel){if (!string.IsNullOrWhiteSpace(_rabbitConfig.Exchange)){channel.ExchangeDeclare(_rabbitConfig.Exchange, _rabbitConfig.ExchangeType.ToString(), _rabbitConfig.DurableQueue, false, null);}else{channel.QueueDeclare(_rabbitConfig.QueueName, _rabbitConfig.DurableQueue, false, false, null);}}private IBasicProperties CreateMessageProperties(IModel channel){var properties = channel.CreateBasicProperties();properties.DeliveryMode = Convert.ToByte(_rabbitConfig.DurableMessage ? 2 : 1);properties.Headers = new Dictionary<string, object>{{ "Content-Type", "application/json" },{ "__TypeId__", _rabbitConfig.QueueName }};return properties;}public IModel CreateConsumerChannel(string clientProvidedName){return GetConnection(clientProvidedName).CreateModel();}public void Dispose(){if (_connection != null && _connection.IsOpen){_connection.Close();_connection.Dispose();}}} }