当前位置: 首页 > wzjs >正文

福州网站建设免费网站app哪个好

福州网站建设,免费网站app哪个好,高性能网站建设进阶指南pdf,中文域名注册目录 1.添加包 2. 连接配置 2.1.连接字符串 2.2.连接对象 3.创建连接服务 3.1.添加配置获取方法 3.2.服务实现类 3.3.服务接口 4.创建生产者服务 4.1.生产者实现类 4.2.生产者接口 5.创建消费者服务 5.1.消费者服务接口 5.2.消费者接口 6.注册 7.简单使用案例 7.1.实现…

      

目录

1.添加包

2. 连接配置

        2.1.连接字符串

   2.2.连接对象

3.创建连接服务

        3.1.添加配置获取方法

        3.2.服务实现类

        3.3.服务接口

4.创建生产者服务

        4.1.生产者实现类

         4.2.生产者接口

5.创建消费者服务

        5.1.消费者服务接口

5.2.消费者接口

6.注册

7.简单使用案例

        7.1.实现

        7.2.接口

        7.3.控制器


        在 .NET Core 应用程序中使用 RabbitMQ 有许多好处,主要体现在其作为消息队列系统所带来的灵活性、可靠性和扩展性等方面,还能促进微服务架构的实施,是构建现代分布式应用的理想选择之一

1.添加包

        添加 RabbitMQ.Client 包。

2. 连接配置

        2.1.连接字符串

        dbsettings.json文件添加 RabbitMQ 连接配置

//RabbitMQ配置
"RabbitMQSettings": {"HostName": "ip地址",             //地址"Port": "端口",                   //端口"UserName": "RabbitMQ用户名",     //用户名"Password": "RabbitMQ密码",       //密码"VirtualHost": "/",              //本地虚拟地址"RetryCount": 5,                 //最大重试次数"RetryInterval": 5,              //断开重连次数"PrefetchCount": 5,              //预取消息数量"ConsumerCount": 5               //消费者数量
}

   2.2.连接对象

namespace Frame3_DataRepository.RabbitMQRepository.BaseMQ
{/// <summary>/// 消息队列配置类/// </summary>public class RabbitMQSettings{/// <summary>/// RabbitMQ 服务器地址/// </summary>public string HostName { get; set; }/// <summary>/// 端口号,默认5672/// </summary>public int Port { get; set; } = 5672;/// <summary>/// 用户名/// </summary>public string UserName { get; set; }/// <summary>/// 密码/// </summary>public string Password { get; set; }/// <summary>/// 虚拟主机,默认为//// </summary>public string VirtualHost { get; set; } = "/";/// <summary>/// 连接重试次数/// </summary>public int RetryCount { get; set; } = 5;/// <summary>/// 重试间隔(秒)/// </summary>public int RetryInterval { get; set; } = 5;/// <summary>/// 预取消息数量/// </summary>public ushort PrefetchCount { get; set; }/// <summary>/// 消费者数量/// </summary>public int ConsumerCount { get; set; }}/// <summary>/// 持久化/// </summary>public enum DeliveryMode : byte{NonPersistent = 1,Persistent = 2}/// <summary>/// 消费者状态信息/// </summary>public class ConsumerStatus{/// <summary>/// 当前活跃消费者数量/// </summary>public int CurrentCount { get; set; }/// <summary>/// 最大允许消费者数量/// </summary>public int MaxCount { get; set; }/// <summary>/// 活跃消费者标签列表/// </summary>public List<string> ActiveConsumers { get; set; } = new();}
}

        案例如下

3.创建连接服务

        先创建配置获取方法,再创建 RabbitMqClient 服务实现类和 IRabbitMqClient 服务接口来MQ连接。

        3.1.添加配置获取方法

using Microsoft.Extensions.Configuration;namespace Frame4_LibraryCore.BaseConfig
{/// <summary>/// 全局配置/// </summary>public static class Config{/// <summary>/// 从指定的 JSON 配置文件中读取配置,并反序列化为指定类型/// </summary>/// <typeparam name="T">目标配置类型(如 RedisSettings、DatabaseSettings 等)</typeparam>/// <param name="fileName">JSON 配置文件名(如 "appsettings.json")</param>/// <param name="sessions">配置节点名称(如 "RedisSettings")</param>/// <returns>返回绑定后的强类型配置对象</returns>public static T GetSetting<T>(string fileName, string sessions){//创建 ConfigurationBuilder 实例,用于构建配置var builder = new ConfigurationBuilder()//设置配置文件的基础路径为当前程序运行目录.SetBasePath(Directory.GetCurrentDirectory())//添加 JSON 文件作为配置源://- fileName: 指定要加载的 JSON 文件//- optional: false 表示文件必须存在,否则抛出异常//- reloadOnChange: true 表示文件修改时自动重新加载.AddJsonFile(fileName, optional: false, reloadOnChange: true);//构建配置对象(IConfigurationRoot)IConfigurationRoot config = builder.Build();//获取指定配置节点(sessions),并将其反序列化为类型 Tvar conn = config.GetSection(sessions).Get<T>();//返回反序列化后的配置对象return conn;}}
}

         案例如下

 

        3.2.服务实现类

using Frame3_DataRepository.RabbitMQRepository.BaseMQ;
using Frame4_LibraryCore.BaseConfig;
using Frame6_LibraryUtility;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;namespace Frame3_DataRepository.RabbitMQRepository
{/// <summary>/// 队列服务实现类/// 提供消息队列连接和通道管理功能/// </summary>public sealed class RabbitMqClient : BaseServiceSingleton, IRabbitMqClient{/// <summary>/// RabbitMQ 连接工厂实例/// </summary>private readonly IConnectionFactory _connectionFactory;/// <summary>/// 日志记录器/// </summary>private readonly ILogger<RabbitMqClient> _logger;/// <summary>/// 连接重试最大次数/// </summary>private readonly int _retryCount;/// <summary>/// 重试间隔时间(秒)/// </summary>private readonly int _retryInterval;/// <summary>/// RabbitMQ 连接对象/// </summary>private IConnection _connection;/// <summary>/// 标识对象是否已被释放/// </summary>private bool _disposed;/// <summary>/// 连接操作的线程锁/// </summary>private readonly SemaphoreSlim _connectionLock = new(1, 1);/// <summary>/// 心跳检测定时器,用于定期检查连接状态/// </summary>private Timer _heartbeatTimer;/// <summary>/// 心跳检测间隔(秒),默认30秒/// </summary>private const int HeartbeatInterval = 30;/// <summary>/// 预取消息数量/// </summary>private readonly ushort _prefetchCount;/// <summary>/// 最大允许的消费者数量/// </summary>private readonly int _maxConsumerCount;/// <summary>/// 构造函数,初始化RabbitMQ服务/// </summary>/// <param name="logger">日志记录器,从DI容器注入</param>/// <exception cref="ArgumentNullException">当必需参数为null时抛出</exception>public RabbitMqClient(ILogger<RabbitMqClient> logger){//参数校验,确保依赖注入的参数不为null_logger = logger ?? throw new ArgumentNullException(nameof(logger));//读取配置//var settingsValue = settings?.Value ?? throw new ArgumentNullException(nameof(settings));var settingsValue = Config.GetSetting<RabbitMQSettings>("dbsettings.json", "RabbitMQSettings");//从配置中初始化重试参数_retryCount = settingsValue.RetryCount;_retryInterval = settingsValue.RetryInterval;//配置连接工厂参数_connectionFactory = new ConnectionFactory{HostName = settingsValue.HostName,                  // 主机地址Port = settingsValue.Port,                          // 端口号UserName = settingsValue.UserName,                  // 用户名Password = settingsValue.Password,                  // 密码VirtualHost = settingsValue.VirtualHost,            // 虚拟主机AutomaticRecoveryEnabled = true,                    // 启用自动恢复NetworkRecoveryInterval = TimeSpan.FromSeconds(10), // 网络恢复间隔10秒RequestedHeartbeat = TimeSpan.FromSeconds(60)        // 设置心跳间隔60秒};// 初始化心跳检测定时器_heartbeatTimer = new Timer(HeartbeatCheck, null, Timeout.Infinite, Timeout.Infinite);//初始化预取消息数量_prefetchCount = settingsValue.PrefetchCount;//初始化消费者数量_maxConsumerCount = settingsValue.ConsumerCount;}/// <summary>/// 检查当前是否已建立有效连接/// </summary>public bool IsConnected => _connection?.IsOpen == true && !_disposed;/// <summary>/// 预取消息数量/// </summary>public ushort prefetchCount => _prefetchCount;/// <summary>/// 消费者数量/// </summary>public int ConsumerCount => _maxConsumerCount;/// <summary>/// 尝试建立RabbitMQ连接/// </summary>/// <returns>是否连接成功</returns>public async Task<bool> TryConnectAsync(){// 已连接则直接返回成功if (IsConnected) return true;// 加锁防止多线程同时创建连接await _connectionLock.WaitAsync();try{// 双重检查锁定模式if (IsConnected) return true;//记录建立连接_logger.LogInformation("正在建立RabbitMQ连接...");// 带重试机制的连接逻辑for (int i = 0; i < _retryCount; i++){try{//创建新连接_connection = await _connectionFactory.CreateConnectionAsync().ConfigureAwait(false);//订阅连接关闭事件_connection.ConnectionShutdownAsync += OnConnectionShutdown;//验证连接是否成功建立if (IsConnected){//记录连接成功_logger.LogInformation("RabbitMQ连接已成功建立");//连接成功后启动心跳检测 不用心跳检测可注释//StartHeartbeatCheck();return true;}}catch (BrokerUnreachableException ex){//专门处理Broker不可达异常_logger.LogWarning(ex, $"RabbitMQ服务不可达,第{i}次重试...");}catch (Exception ex){//处理其他类型的异常_logger.LogWarning(ex, $"RabbitMQ连接异常,第{i}次重试...");}//如果未达到最大重试次数,等待间隔时间后重试if (i < _retryCount){//等待间隔时间后重试await Task.Delay(_retryInterval * 1000).ConfigureAwait(false);}}//记录连接到最大重试数_logger.LogError($"RabbitMQ连接失败,已达到最大重试次数{_retryCount}");return false;}finally{//确保锁被释放_connectionLock.Release();}}/// <summary>/// 创建通道/// </summary>/// <returns>创建的通道对象</returns>/// <exception cref="InvalidOperationException">当连接不可用时抛出</exception>public async Task<IChannel> CreateChannelAsync(){//确保连接可用if (!IsConnected && !await TryConnectAsync().ConfigureAwait(false)){throw new InvalidOperationException("没有可用的RabbitMQ连接");}try{return await _connection.CreateChannelAsync().ConfigureAwait(false);}catch (OperationInterruptedException ex){//处理RabbitMQ操作中断异常_logger.LogError(ex, "创建RabbitMQ通道时操作被中断");throw;}catch (Exception ex){//处理其他创建通道时的异常_logger.LogError(ex, "创建RabbitMQ通道失败");throw;}}/// <summary>/// 连接关闭事件处理程序,自动尝试重新连接/// </summary>private async Task OnConnectionShutdown(object sender, ShutdownEventArgs args){//记录连接关闭事件,包括关闭原因_logger.LogWarning($"RabbitMQ连接已关闭,原因: {args}");//如果服务未被释放,尝试自动重新连接if (!_disposed){try{//异步尝试重新连接,不阻塞当前线程await TryConnectAsync().ConfigureAwait(false);}catch (Exception ex){//记录重连失败异常_logger.LogError(ex, "重连失败");}}}/// <summary>/// 心跳检测回调方法,定期检查连接状态/// </summary>private async void HeartbeatCheck(object state){try{// 如果连接不存在或已关闭,尝试重新连接if (!IsConnected){//记录检测断开重新连接_logger.LogWarning("心跳检测发现连接断开,尝试重新连接...");//建立连接await TryConnectAsync().ConfigureAwait(false);}}catch (Exception ex){// 记录心跳检测过程中的异常_logger.LogError(ex, "心跳检测异常");}}/// <summary>/// 启动心跳检测定时器/// </summary>private void StartHeartbeatCheck(){// 设置定时器,定期执行心跳检测_heartbeatTimer.Change(TimeSpan.FromSeconds(HeartbeatInterval),TimeSpan.FromSeconds(HeartbeatInterval));}/// <summary>/// 停止心跳检测定时器/// </summary>private void StopHeartbeatCheck(){// 禁用定时器_heartbeatTimer.Change(Timeout.Infinite, Timeout.Infinite);}/// <summary>/// 释放资源/// </summary>public void Dispose(){//释放Dispose(true);//优化垃圾回收GC.SuppressFinalize(this);}/// <summary>/// 受保护的释放方法/// </summary>/// <param name="disposing">是否主动释放</param>private void Dispose(bool disposing){//如果已经释放,直接返回if (_disposed) return;//如果是主动释放,处理托管资源if (disposing){try{//停止心跳检测StopHeartbeatCheck();//释放定时器资源_heartbeatTimer?.Dispose();//取消事件订阅if (_connection != null){_connection.ConnectionShutdownAsync -= OnConnectionShutdown;}//释放连接资源_connection?.Dispose();//释放线程锁资源_connectionLock.Dispose();//记录连接关闭日志_logger.LogInformation("RabbitMQ连接已关闭");}catch (Exception ex){//记录资源释放过程中的异常_logger.LogError(ex, "关闭RabbitMQ连接时出错");}finally{//标记为已释放状态_disposed = true;}}}}
}

        案例如下

        3.3.服务接口

using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;namespace Frame3_DataRepository.RabbitMQRepository
{/// <summary>/// 队列服务接口/// 供生产者和消费者调用/// </summary>public interface IRabbitMqClient //: IDisposable{/// <summary>/// 获取当前连接状态/// true表示已建立有效连接,false表示连接不可用/// </summary>bool IsConnected { get; }/// <summary>/// 预取消息数量/// </summary>ushort prefetchCount { get; }/// <summary>/// 消费者数量/// </summary>int ConsumerCount { get; }/// <summary>/// 尝试建立RabbitMQ连接/// </summary>/// <returns>是否连接成功</returns>Task<bool> TryConnectAsync();/// <summary>/// 异步创建RabbitMQ通道/// 用于消息发布、消费等操作/// </summary>/// <returns>RabbitMQ通道实例</returns>/// <exception cref="InvalidOperationException">当无法创建连接时抛出</exception>/// <exception cref="OperationInterruptedException">当RabbitMQ操作被中断时抛出</exception>Task<IChannel> CreateChannelAsync();void Dispose();}
}

         案例如下

4.创建生产者服务

        创建生产者服务实现类 MQProducerService 和接口 IMQProducerService

        4.1.生产者实现类

using Frame6_LibraryUtility;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System.Text;
using System.Text.Json;namespace Frame3_DataRepository.RabbitMQRepository.Producer
{/// <summary>/// 生产者服务实现类/// RabbitMQ/// </summary>public sealed class MQProducerService : BaseService, IMQProducerService{/// <summary>/// 基础连接服务/// </summary>private readonly IRabbitMqClient _iRabbitMQService;/// <summary>/// 日志记录器/// </summary>private readonly ILogger<MQProducerService> _logger;/// <summary>/// 构造函数,注入依赖/// </summary>/// <param name="iRabbitMQService"></param>/// <param name="logger"></param>public MQProducerService(IRabbitMqClient iRabbitMQService, ILogger<MQProducerService> logger){//参数校验,确保依赖注入的参数不为null_iRabbitMQService = iRabbitMQService ?? throw new ArgumentNullException(nameof(iRabbitMQService));_logger = logger ?? throw new ArgumentNullException(nameof(logger));}/// <summary>/// 发布消息-点对点模式(Point-to-Point)/// </summary>/// <typeparam name="T">消息类型,必须是class类型</typeparam>/// <param name="queueName">目标队列名称</param>/// <param name="message">要发布的消息对象</param>/// <param name="messageId">可选的消息ID,未提供时自动生成</param>/// <param name="exchange">可选交换机名称,默认使用直接交换机</param>/// <param name="headers">可选的消息头字典</param>/// <param name="withDLX">是否启用死信队列</param>/// <param name="maxRetryCount">最大重试次数(仅当启用死信队列时有效)</param>/// <returns>异步任务</returns>/// <exception cref="ArgumentNullException">当必要参数为空时抛出</exception>/// <exception cref="InvalidOperationException">当消息序列化失败时抛出</exception>public async Task PublishByPTPAsync<T>(string queueName, T message, string messageId = null, string exchange = null, IDictionary<string, object> headers = null, bool withDLX = true, int maxRetryCount = 3) where T : class{//参数校验if (string.IsNullOrWhiteSpace(queueName))throw new ArgumentNullException(nameof(queueName), "队列名称不能为空");if (message == null)throw new ArgumentNullException(nameof(message), "消息内容不能为空");//生成消息ID(如果未提供则使用Guid)messageId = messageId.IsEmpty() ? messageId = Guid.NewGuid().ToString() : messageId;//声明变量用于存储序列化后的消息,便于错误处理var jsonMessage = string.Empty;//声明RabbitMQ通道IChannel? channel = null;try{//创建RabbitMQ通道channel = await _iRabbitMQService.CreateChannelAsync();// 死信队列配置var arguments = new Dictionary<string, object>();if (withDLX){// 死信交换机配置var dlxExchange = $"{queueName}.DLX";var dlxQueue = $"{queueName}.DLQ";// 声明死信交换机和队列await channel.ExchangeDeclareAsync(dlxExchange, ExchangeType.Direct, durable: true);await channel.QueueDeclareAsync(queue: dlxQueue,durable: true,exclusive: false,autoDelete: false,arguments: null);await channel.QueueBindAsync(dlxQueue, dlxExchange, dlxQueue);// 设置死信队列参数arguments.Add("x-dead-letter-exchange", dlxExchange);arguments.Add("x-dead-letter-routing-key", dlxQueue);arguments.Add("x-max-retry-count", maxRetryCount); // 自定义属性,记录最大重试次数arguments.Add("x-max-length", 100000);arguments.Add("x-queue-mode", "lazy");}// 添加消费者数量限制arguments["x-max-consumers"] = _iRabbitMQService.ConsumerCount;//声明队列(确保队列存在)var queueDeclareOk = await channel.QueueDeclareAsync(queue: queueName,       //队列名称durable: true,          //队列持久化(服务器重启后仍然存在)exclusive: false,       //非独占队列autoDelete: false,      //不自动删除arguments: arguments//new Dictionary<string, object>//{//    // 只允许一个活跃消费者//    //["x-single-active-consumer"] = true,//    ["x-max-consumers"] = _iRabbitMQService.ConsumerCount,//});//序列化消息为JSON格式jsonMessage = JsonSerializer.Serialize(message);//将消息内容转换为UTF-8字节数组var body = Encoding.UTF8.GetBytes(jsonMessage);//创建消息属性var properties = new BasicProperties{Persistent = true,                                                          //消息持久化(需要队列也是持久化的才有效)MessageId = messageId,                                                      //设置唯一消息ID用于追踪ContentType = "application/json",                                           //明确指定内容类型为JSONTimestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())    //添加时间戳};//设置消息头(如果提供)if (headers != null && headers.Any()){properties.Headers = new Dictionary<string, object>(headers);}// 发布消息到队列await channel.BasicPublishAsync(exchange: exchange ?? string.Empty,         //交换机名称(空字符串表示默认direct交换机)routingKey: queueName,                      //路由键(对于默认交换机就是队列名)mandatory: false,                           //不强制要求消息被路由到队列basicProperties: properties,                //消息属性body: body                                  //消息体);//记录成功日志(结构化日志)_logger.LogInformation("消息发布成功。\r\n交换机: {Exchange}\r\n消息ID: {MessageId}\r\n队列: {queueName}\r\n消息: {jsonMessage}", exchange ?? "(默认)", messageId, queueName, jsonMessage);}catch (JsonException jsonEx){// 专门处理JSON序列化错误_logger.LogError(jsonEx, "消息序列化失败。\r\n类型: {MessageType}", typeof(T).Name);throw new InvalidOperationException("消息序列化失败", jsonEx);}catch (OperationInterruptedException opEx){// 处理RabbitMQ操作中断异常_logger.LogError(opEx, "RabbitMQ操作中断。\r\n消息: {jsonMessage}\r\n队列: {queueName}", jsonMessage, queueName);throw;}catch (Exception ex){// 处理其他所有异常_logger.LogError(ex, "消息发布失败。\r\n交换机: {Exchange}\r\n队列: {queueName}\r\n消息: {jsonMessage}", exchange ?? "(默认)", queueName, jsonMessage);throw;}finally{// 确保通道资源被释放await channel?.CloseAsync();}}/// <summary>/// 发布消息-发布/订阅模式(Pub/Sub)/// </summary>/// <typeparam name="T">消息类型,必须是 class 类型</typeparam>/// <param name="exchangeName">目标 Exchange 名称</param>/// <param name="message">要发布的消息对象</param>/// <param name="messageId">可选的消息唯一标识符,默认自动生成</param>/// <param name="headers">可选的消息头字典</param>/// <returns>异步任务</returns>public async Task PublishByPubSubAsync<T>(string exchangeName, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class{//校验 Exchange 名称是否为空或空白字符串if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");//校验消息内容是否为 nullif (message == null)throw new ArgumentNullException(nameof(message), "消息内容不能为空");//如果未提供消息ID,则使用 Guid 生成唯一的 IDmessageId = messageId.IsEmpty() ? Guid.NewGuid().ToString() : messageId;//用于存储序列化后的 JSON 消息(便于日志和异常处理)var jsonMessage = string.Empty;//声明 RabbitMQ 通道变量,初始为 nullIChannel? channel = null;try{//创建 RabbitMQ 通道channel = await _iRabbitMQService.CreateChannelAsync();//声明 Fanout 类型的 Exchange(广播模式)await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Fanout,   //扇出类型,广播给所有绑定队列durable: true,               //可持久化autoDelete: false);          //不自动删除//将消息对象序列化为 JSON 字符串jsonMessage = JsonSerializer.Serialize(message);//将 JSON 消息转换为 UTF-8 编码的字节数组var body = Encoding.UTF8.GetBytes(jsonMessage);//创建消息属性var properties = new BasicProperties{Persistent = true,                                                //消息持久化//DeliveryMode = (DeliveryModes)DeliveryMode.Persistent,MessageId = messageId,                                            //设置唯一消息IDContentType = "application/json",                                 //内容类型为 JSONTimestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //添加时间戳};//如果提供了 Headers,则设置到消息属性中if (headers != null && headers.Any()){properties.Headers = new Dictionary<string, object>(headers);}//向 Exchange 发送消息(不指定 Routing Key,Fanout 类型忽略此参数)await channel.BasicPublishAsync(exchange: exchangeName,      //目标 Exchange 名称routingKey: string.Empty,    //Fanout 类型不需要路由键mandatory: false,            //不要求消息必须被投递basicProperties: properties, //消息属性body: body);                 //消息体字节数据//记录消息发布成功的日志信息_logger.LogInformation("消息已发布到Exchange。\r\nExchange: {Exchange}\r\n消息ID: {MessageId}\r\n消息: {jsonMessage}", exchangeName, messageId, jsonMessage);}catch (JsonException jsonEx){//捕获 JSON 序列化异常并记录错误日志_logger.LogError(jsonEx, "消息序列化失败。\r\n类型: {MessageType}", typeof(T).Name);//抛出自定义异常,包含原始异常信息throw new InvalidOperationException("消息序列化失败", jsonEx);}catch (Exception ex){//捕获其他所有异常并记录错误日志_logger.LogError(ex, "消息发布到Exchange失败。\r\nExchange: {Exchange}\r\n消息: {jsonMessage}", exchangeName, jsonMessage);//抛出异常throw;}finally{// 确保通道资源被释放await channel?.CloseAsync();}}/// <summary>/// 发布消息-路由模式(Routing)/// </summary>/// <typeparam name="T">消息体的类型,必须为引用类型</typeparam>/// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>/// <param name="routingKey">消息的路由键,用于匹配绑定队列。不能为空或空白字符串。</param>/// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>/// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>/// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>/// <returns>异步任务</returns>public async Task PublishByRoutingAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class{//检查 Exchange 名称是否为空或空白字符if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");//检查路由键是否为空或空白字符if (string.IsNullOrWhiteSpace(routingKey))throw new ArgumentNullException(nameof(routingKey), "路由键不能为空");//检查消息对象是否为 nullif (message == null)throw new ArgumentNullException(nameof(message), "消息内容不能为空");//如果没有提供 MessageId,则生成一个 Guid 字符串作为唯一标识messageId = messageId.IsEmpty() ? Guid.NewGuid().ToString() : messageId;//用于记录日志的消息 JSON 字符串var jsonMessage = string.Empty;//声明一个 IChannel 对象,初始为 nullIChannel? channel = null;try{//创建 RabbitMQ Channel(通道)channel = await _iRabbitMQService.CreateChannelAsync();//声明一个 Direct 类型的 Exchange(如果不存在则创建)await channel.ExchangeDeclareAsync(exchange: exchangeName,     //指定 Exchange 的名称type: ExchangeType.Direct,  //指定 Exchange 的类型为 Direct(直连模式)durable: true,              //设置为持久化 Exchange,RabbitMQ 重启后不会丢失autoDelete: false           //不自动删除 Exchange,即使最后一个队列被解绑也不会自动删除);//将消息对象序列化为 JSON 字符串jsonMessage = JsonSerializer.Serialize(message);//将 JSON 字符串转换为 UTF-8 编码的字节数组var body = Encoding.UTF8.GetBytes(jsonMessage);//创建并初始化 BasicProperties(消息属性)var properties = new BasicProperties{Persistent = true, //设置消息持久化MessageId = messageId, //设置消息 IDContentType = "application/json", //内容类型为 JSONTimestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //时间戳};//如果提供了自定义 Headers,则复制到消息属性中if (headers != null && headers.Any()){properties.Headers = new Dictionary<string, object>(headers);}//发布消息到指定 Exchange 和路由键await channel.BasicPublishAsync(exchange: exchangeName,      //指定消息要发送到的 Exchange 名称routingKey: routingKey,      //指定消息的路由键,用于 Exchange 路由决策mandatory: false,            //如果为 true,当消息无法路由到任何队列时会返回给生产者;false 则直接丢弃basicProperties: properties, //消息的属性,如持久化、内容类型、消息 ID、时间戳等body: body                   //消息的实际内容(字节数组),通常是序列化后的 JSON 数据);//记录日志:消息发送成功_logger.LogInformation("消息已通过路由键发送。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息ID: {MessageId}\r\n消息: {jsonMessage}",exchangeName, routingKey, messageId, jsonMessage);}catch (Exception ex){//捕获异常并记录日志_logger.LogError(ex, "消息发送失败。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息: {jsonMessage}",exchangeName, routingKey, jsonMessage);//抛出异常以便上层处理throw;}finally{// 确保通道资源被释放await channel?.CloseAsync();}}/// <summary>/// 发布消息-主题模式(Topic)/// </summary>/// <typeparam name="T"></typeparam>/// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>/// <param name="routingKey">消息的路由键,用于匹配 Topic 类型 Exchange 的绑定规则。不能为空或空白字符串。</param>/// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>/// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>/// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>/// <returns>异步任务</returns>public async Task PublishByTopicAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class{//检查 Exchange 名称是否为空或空白字符if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");//检查路由键是否为空或空白字符if (string.IsNullOrWhiteSpace(routingKey))throw new ArgumentNullException(nameof(routingKey), "路由键不能为空");//检查消息对象是否为 nullif (message == null)throw new ArgumentNullException(nameof(message), "消息内容不能为空");//如果没有提供 MessageId,则生成一个 Guid 字符串作为唯一标识messageId = messageId.IsEmpty() ? Guid.NewGuid().ToString() : messageId;//用于记录日志的消息 JSON 字符串var jsonMessage = string.Empty;//声明一个 IChannel 对象,初始为 nullIChannel? channel = null;try{//创建 RabbitMQ Channel(通道)channel = await _iRabbitMQService.CreateChannelAsync();// 删除已存在的 Exchange(如果不需要保留消息)//await channel.ExchangeDeleteAsync("TopicTest");//声明一个 Topic 类型的 Exchange(如果不存在则创建)await channel.ExchangeDeclareAsync(exchange: exchangeName,     //指定要声明的 Exchange 名称,名称由外部传入的 exchangeName 变量指定type: ExchangeType.Topic,   //设置 Exchange 的类型为 Topic(主题模式),支持通配符路由键匹配durable: true,              //设置 Exchange 为持久化,即使 RabbitMQ 重启也不会丢失autoDelete: false           //设置 Exchange 不自动删除,即使最后一个绑定被移除后仍保留);//将消息对象序列化为 JSON 字符串jsonMessage = JsonSerializer.Serialize(message);//将 JSON 字符串转换为 UTF-8 编码的字节数组var body = Encoding.UTF8.GetBytes(jsonMessage);//创建并初始化 BasicProperties(消息属性)var properties = new BasicProperties{Persistent = true, //设置消息持久化MessageId = messageId, //设置消息 IDContentType = "application/json", //内容类型为 JSONTimestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //时间戳};//如果提供了自定义 Headers,则复制到消息属性中if (headers != null && headers.Any()){properties.Headers = new Dictionary<string, object>(headers);}//发布消息到指定 Exchange 和路由键await channel.BasicPublishAsync(exchange: exchangeName,      //指定消息要发送到的 Exchange 名称routingKey: routingKey,      //指定消息的路由键,用于 Exchange 路由决策mandatory: false,            //如果为 true,当消息无法路由到任何队列时会返回给生产者;false 则直接丢弃basicProperties: properties, //消息的属性,如持久化、内容类型、消息 ID、时间戳等body: body                   //消息的实际内容(字节数组),通常是序列化后的 JSON 数据);//记录日志:消息发送成功_logger.LogInformation("消息已通过路由键发送。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息ID: {MessageId}\r\n消息: {jsonMessage}",exchangeName, routingKey, messageId, jsonMessage);}catch (Exception ex){//捕获异常并记录日志_logger.LogError(ex, "消息发送失败。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息: {jsonMessage}",exchangeName, routingKey, jsonMessage);//抛出异常以便上层处理throw;}finally{// 确保通道资源被释放await channel?.CloseAsync();}}/// <summary>/// 发布消息-请求/响应(RPC)/// </summary>/// <typeparam name="TRequest">请求消息的类型,必须为引用类型</typeparam>/// <typeparam name="TResponse">期望的响应消息类型,必须为引用类型</typeparam>/// <param name="exchangeName">要发送请求的目标 Exchange 名称。不能为空或空白字符串。</param>/// <param name="routingKey">用于路由请求消息的路由键。不能为空或空白字符串。</param>/// <param name="request">请求对象,将被序列化为 JSON 并作为消息体发送。</param>/// <param name="timeout">等待响应的超时时间。默认为 default(可能无限期等待)。</param>/// <returns>异步任务</returns>public async Task<TResponse> PublishByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, TRequest request, TimeSpan timeout = default) where TRequest : class where TResponse : class{// 参数校验if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentException("Exchange名称不能为空", nameof(exchangeName));if (string.IsNullOrWhiteSpace(routingKey))throw new ArgumentException("路由键不能为空", nameof(routingKey));if (request == null)throw new ArgumentNullException(nameof(request));// 设置默认超时时间(30秒)var actualTimeout = timeout == default ? TimeSpan.FromSeconds(5) : timeout;if (actualTimeout <= TimeSpan.Zero)throw new ArgumentException("超时时间必须大于0", nameof(timeout));// 生成唯一 CorrelationIdvar correlationId = Guid.NewGuid().ToString();// 创建 TaskCompletionSourcevar tcs = new TaskCompletionSource<TResponse>();// 创建独立 Channelvar channel = await _iRabbitMQService.CreateChannelAsync();try{// 在 PublishByPRCAsync 方法中,发送请求前添加:await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Direct, // RPC通常使用Directdurable: true,            // 持久化autoDelete: false);// 声明临时队列用于接收响应var replyQueue = await channel.QueueDeclareAsync(queue: "",durable: false,exclusive: true,autoDelete: true);// 创建消费者var consumer = new AsyncEventingBasicConsumer(channel);consumer.ReceivedAsync += (model, ea) =>{try{if (ea.BasicProperties?.CorrelationId == correlationId){var response = JsonSerializer.Deserialize<TResponse>(ea.Body.Span);tcs.TrySetResult(response);}}catch (Exception ex){tcs.TrySetException(ex);}return Task.CompletedTask;};// 开始监听回复队列var consumerTag = await channel.BasicConsumeAsync(queue: replyQueue,autoAck: true,consumer: consumer);// 构建消息属性var props = new BasicProperties();props.ReplyTo = replyQueue;props.CorrelationId = correlationId;props.ContentType = "application/json";// 序列化请求体var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request));// 发送请求await channel.BasicPublishAsync(exchange: exchangeName,routingKey: routingKey,mandatory: false,basicProperties: props,body: body);// 设置超时取消using var cts = new CancellationTokenSource(actualTimeout);cts.Token.Register(() =>{if (!tcs.Task.IsCompleted){tcs.TrySetException(new TimeoutException($"RPC请求超时({actualTimeout.TotalSeconds}秒)"));channel.BasicCancelAsync(consumerTag);}});return await tcs.Task;}finally{// 确保通道资源被释放await channel?.CloseAsync();}}/// <summary>/// 将死信队列中的消息重新发布到原始队列(泛型版本)/// </summary>/// <typeparam name="T">消息体的类型(如 DTO 类)</typeparam>/// <param name="queueName">原始队列名称</param>/// <param name="batchSize">每次处理的消息批大小</param>/// <param name="delay">重发延迟时间(毫秒)</param>/// <returns>成功处理的消息数量</returns>public async Task<int> RepublishDeadLetterMessagesAsync<T>(string queueName, int batchSize = 100, int delay = 0) where T : class{// 检查传入的队列名是否为空或空白字符串,若为空则抛出异常if (string.IsNullOrWhiteSpace(queueName))throw new ArgumentNullException(nameof(queueName));// 构造死信队列(DLQ)名称,格式为:{原始队列名}.DLQvar dlxQueueName = $"{queueName}.DLQ";// 声明一个 RabbitMQ 的 Channel 对象,用于后续操作IChannel? channel = null;// 记录已处理的消息数量int processedCount = 0;try{// 创建一个新的 Channel 实例(通过服务注入的 _iRabbitMQService)channel = await _iRabbitMQService.CreateChannelAsync();// 检查死信队列是否存在(被动声明方式)try{// 如果不存在会抛出异常,catch 中捕获并记录日志后返回 0await channel.QueueDeclarePassiveAsync(dlxQueueName);}catch{// 日志记录:如果 DLQ 不存在,则直接返回 0_logger.LogWarning("死信队列 {DLXQueueName} 不存在", dlxQueueName);return 0;}// 循环获取最多 batchSize 条消息for (int i = 0; i < batchSize; i++){// 使用 BasicGet 从 DLQ 获取一条消息(不自动确认)var result = await channel.BasicGetAsync(dlxQueueName, autoAck: false);// 如果没有更多消息了,跳出循环if (result == null)break;// 获取消息体内容,并转为 byte[] 数组var body = result.Body.ToArray();// 获取原始消息属性(BasicProperties),用于后续操作var originalProperties = result.BasicProperties;// 获取当前消息的 DeliveryTag,用于确认或拒绝消息var deliveryTag = result.DeliveryTag;try{// 如果设置了 delay > 0,则等待指定时间(模拟延迟重试)if (delay > 0)await Task.Delay(delay);// 生成新的唯一 MessageId,用于追踪消息var messageId = Guid.NewGuid().ToString();// 创建新的 BasicProperties 实例,用于新消息的属性设置var properties = new BasicProperties{Persistent = true,                                                          // 设置消息持久化(需队列也持久化才生效)MessageId = messageId,                                                      // 设置唯一消息 IDContentType = "application/json",                                           // 明确内容类型为 JSONTimestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())    // 添加当前时间戳};// 将消息重新发布到原始队列,使用默认交换机(exchange 为空)await channel.BasicPublishAsync(exchange: string.Empty,routingKey: queueName,mandatory: false,basicProperties: properties,body: body);// 确认 DLQ 中该消息已被正确处理(ACK)await channel.BasicAckAsync(deliveryTag, multiple: false);// 成功处理计数器 +1processedCount++;// 日志记录:消息已成功重新发布_logger.LogInformation("已重新发布死信消息 {MessageId} 到队列 {QueueName}",properties.MessageId ?? "未知ID", queueName);}catch (Exception ex){// 日志记录:消息重发失败_logger.LogError(ex, "重新发布死信消息失败,DeliveryTag={DeliveryTag}", deliveryTag);// 拒绝消息并重新入队(回到 DLQ),requeue: true 表示重新入队await channel.BasicNackAsync(deliveryTag, multiple: false, requeue: true);}}// 返回成功处理的消息数量return processedCount;}catch (Exception ex){// 日志记录:整个处理过程中发生错误_logger.LogError(ex, "处理死信队列 {DLXQueueName} 时发生错误", dlxQueueName);// 抛出异常,供上层调用者捕获处理throw;}finally{// 确保通道资源被释放await channel?.CloseAsync();}}}
}

        案例如下

         4.2.生产者接口

namespace Frame3_DataRepository.RabbitMQRepository.Producer
{/// <summary>/// 生产者服务接口/// RabbitMQ/// </summary>public interface IMQProducerService{/// <summary>/// 发布消息-点对点模式(Point-to-Point)/// </summary>/// <param name="queue">队列名称</param>/// <typeparam name="T">消息类型,必须是引用类型</typeparam>/// <param name="message">要发布的消息对象</param>/// <param name="messageId">消息ID(可选,未提供时自动生成GUID)</param>/// <param name="exchange">交换机名称(空字符串表示默认交换机)</param>/// <returns>异步任务</returns>/// <exception cref="ArgumentNullException">当队列名或消息为空时抛出</exception>Task PublishByPTPAsync<T>(string queue, T message, string messageId = null, string exchange = null, IDictionary<string, object> headers = null, bool withDLX = true, int maxRetryCount = 3) where T : class;/// <summary>/// 发布消息-发布/订阅模式(Pub/Sub)/// </summary>/// <typeparam name="T">消息类型,必须是 class 类型</typeparam>/// <param name="exchangeName">目标 Exchange 名称</param>/// <param name="message">要发布的消息对象</param>/// <param name="messageId">可选的消息唯一标识符,默认自动生成</param>/// <param name="headers">可选的消息头字典</param>/// <returns>异步任务</returns>Task PublishByPubSubAsync<T>(string exchangeName, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class;/// <summary>/// 发布消息-路由模式(Routing)/// </summary>/// <typeparam name="T">消息体的类型,必须为引用类型</typeparam>/// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>/// <param name="routingKey">消息的路由键,用于匹配绑定队列。不能为空或空白字符串。</param>/// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>/// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>/// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>/// <returns>异步任务</returns>Task PublishByRoutingAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class;/// <summary>/// 发布消息-主题模式(Topic)/// </summary>/// <typeparam name="T"></typeparam>/// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>/// <param name="routingKey">消息的路由键,用于匹配 Topic 类型 Exchange 的绑定规则。不能为空或空白字符串。</param>/// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>/// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>/// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>/// <returns>异步任务</returns>Task PublishByTopicAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class;/// <summary>/// 发布消息-请求/响应(RPC)/// </summary>/// <typeparam name="TRequest">请求消息的类型,必须为引用类型</typeparam>/// <typeparam name="TResponse">期望的响应消息类型,必须为引用类型</typeparam>/// <param name="exchangeName">要发送请求的目标 Exchange 名称。不能为空或空白字符串。</param>/// <param name="routingKey">用于路由请求消息的路由键。不能为空或空白字符串。</param>/// <param name="request">请求对象,将被序列化为 JSON 并作为消息体发送。</param>/// <param name="timeout">等待响应的超时时间。默认为 default(可能无限期等待)。</param>/// <returns>异步任务</returns>Task<TResponse> PublishByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, TRequest request, TimeSpan timeout = default) where TRequest : class where TResponse : class;/// <summary>/// 将死信队列中的消息重新发布到原始队列(泛型版本)/// </summary>/// <typeparam name="T">消息体的类型(如 DTO 类)</typeparam>/// <param name="queueName">原始队列名称</param>/// <param name="batchSize">每次处理的消息批大小</param>/// <param name="delay">重发延迟时间(毫秒)</param>/// <returns>成功处理的消息数量</returns>Task<int> RepublishDeadLetterMessagesAsync<T>(string queueName, int batchSize = 100, int delay = 0) where T : class;}
}

        案例如下

5.创建消费者服务

         创建消费者服务实现类 MQConsumerService 和接口 IMQConsumerService

        5.1.消费者服务接口

using Frame3_DataRepository.RabbitMQRepository.BaseMQ;
using Frame6_LibraryUtility;
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System.Collections.Concurrent;
using System.Text;
using System.Text.Json;namespace Frame3_DataRepository.RabbitMQRepository.Consumer
{/// <summary>/// 消费者服务实现类/// 提供可靠的消息消费功能,支持自动重试和错误处理/// </summary>public sealed class MQConsumerService : BaseServiceSingleton, IMQConsumerService{/// <summary>/// RabbitMQ 基础服务/// </summary>private readonly IRabbitMqClient _rabbitMQService;/// <summary>/// 日志记录器/// </summary>private readonly ILogger<MQConsumerService> _logger;/// <summary>/// 最大消费者数量限制/// </summary>private readonly int _maxConsumerCount;/// <summary>/// 当前消费者数量计数器(线程安全)/// </summary>private int _currentConsumerCount;/// <summary>/// 用于限制消费者数量的信号量/// </summary>private readonly SemaphoreSlim _consumerLimitSemaphore;/// <summary>/// 当前活跃的消费者字典,线程安全集合/// Key: 消费者标签/// Value: (通道对象, 消费者对象)/// </summary>private readonly ConcurrentDictionary<string, (IChannel Channel, AsyncEventingBasicConsumer Consumer)> _activeConsumers;/// <summary>/// 构造函数,依赖注入初始化/// </summary>/// <param name="rabbitMQService">RabbitMQ基础服务</param>/// <param name="logger">日志记录器</param>/// <exception cref="ArgumentNullException">当参数为null时抛出</exception>public MQConsumerService(IRabbitMqClient rabbitMQService, ILogger<MQConsumerService> logger){_rabbitMQService = rabbitMQService ?? throw new ArgumentNullException(nameof(rabbitMQService));_logger = logger ?? throw new ArgumentNullException(nameof(logger));_activeConsumers = new ConcurrentDictionary<string, (IChannel, AsyncEventingBasicConsumer)>();_maxConsumerCount = _rabbitMQService.ConsumerCount;_currentConsumerCount = 0;_consumerLimitSemaphore = new SemaphoreSlim(_maxConsumerCount, _maxConsumerCount);}/// <summary>/// 消费消息-点对点(Point-to-Point)/// </summary>/// <typeparam name="T">消息类型</typeparam>/// <param name="queueName">要消费的队列名称</param>/// <param name="messageHandler">消息处理委托</param>/// <param name="prefetchCount">预取消息数量,控制消费者负载</param>/// <param name="autoAck">是否自动确认消息,建议设为false实现可靠消费</param>/// <param name="withDLX">是否启用死信队列</param>/// <returns>取消令牌源,用于停止消费</returns>public async Task<CancellationTokenSource> StartConsumingByPTPAsync<T>(string queueName, Func<T, Task> messageHandler, bool withDLX = true, ushort prefetchCount = 0, bool autoAck = false) where T : class{//参数校验if (string.IsNullOrWhiteSpace(queueName))throw new ArgumentNullException(nameof(queueName), "队列名称不能为空");if (messageHandler == null)throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");//等待获取消费者槽位(带超时防止死锁)if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30))){throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");}//当前消费者数量+1Interlocked.Increment(ref _currentConsumerCount);//创建取消令牌源var cancellationTokenSource = new CancellationTokenSource();//创建通道IChannel? channel = null;try{//赋值预读取数量if (prefetchCount == 0){prefetchCount = _rabbitMQService.prefetchCount;}//创建通道channel = await _rabbitMQService.CreateChannelAsync();//设置QoS(服务质量),控制预取消息数量await channel.BasicQosAsync(prefetchSize: 0,                //不限制预取消息总大小prefetchCount: prefetchCount,   //prefetchCount > 0 ? prefetchCount : _rabbitMQService.prefetchCount,   //每次预取的消息数量global: false                   //应用于当前消费者而非整个通道);//检查队列是否存在try{await channel.QueueDeclarePassiveAsync(queueName);}catch{_logger.LogError($"队列 {queueName} 不存在");throw;}//创建消费者var consumer = new AsyncEventingBasicConsumer(channel);//注册消息接收事件consumer.ReceivedAsync += async (model, ea) =>{try{//反序列化消息var message = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));//记录接收日志_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\n队列: {QueueName}\r\n消息体:{message}", ea.BasicProperties.MessageId, queueName, message?.ToJson());//处理消息await messageHandler(message);//如果不是自动确认模式,手动确认消息if (!autoAck){await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag,    //消息投递标签multiple: false                 //不批量确认);}}catch (JsonException jsonEx){//处理反序列化错误_logger.LogError(jsonEx, "消息反序列化失败。\r\n队列: {QueueName}", queueName);//拒绝消息,不重新入队if (!autoAck){await channel.BasicRejectAsync(deliveryTag: ea.DeliveryTag, requeue: false);}}catch (Exception ex){//处理业务逻辑错误_logger.LogError(ex, "消息处理失败。\r\n队列: {QueueName}", queueName);//如果不是自动确认模式(autoAck=false),需要手动处理消息确认和重试逻辑if (!autoAck){//获取当前消息的属性对象var properties = ea.BasicProperties;//获取消息头,如果headers为null则创建新的字典var headers = properties.Headers ?? new Dictionary<string, object>();//获取当前重试次数,如果不存在x-retry-count头则默认为0int retryCount = headers.TryGetValue("x-retry-count", out var retryObj) ? Convert.ToInt32(retryObj) : 0;//获取最大重试次数,如果不存在x-max-retry-count头则默认为1int maxRetryCount = headers.TryGetValue("x-max-retry-count", out var maxRetryObj) ? Convert.ToInt32(maxRetryObj) : 1;//如果启用了死信队列(withDLX=true)且当前重试次数已达最大重试次数if (withDLX && retryCount >= maxRetryCount){//记录警告日志,说明消息已达到最大重试次数_logger.LogWarning("消息已达到最大重试次数 {MaxRetryCount},将被移入死信队列", maxRetryCount);//拒绝消息,requeue=false表示不重新入队,消息将被路由到死信队列await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);}else{//创建新的消息属性对象,复制原始消息的所有属性var newProperties = new BasicProperties{ContentType = properties.ContentType,       //复制内容类型ContentEncoding = properties.ContentEncoding,   //复制内容编码DeliveryMode = properties.DeliveryMode,         //复制投递模式(1-非持久化,2-持久化)Priority = properties.Priority,                 //复制消息优先级CorrelationId = properties.CorrelationId,       //复制关联ID(用于请求 - 响应模式)ReplyTo = properties.ReplyTo,                   //复制回复队列名称Expiration = properties.Expiration,             //复制消息过期时间MessageId = properties.MessageId,               //复制消息IDTimestamp = properties.Timestamp,               //复制时间戳Type = properties.Type,                         //复制消息类型UserId = properties.UserId,                     //复制用户IDAppId = properties.AppId,                       //复制应用IDClusterId = properties.ClusterId,               //复制集群ID//复制消息头,并更新重试次数Headers = new Dictionary<string, object>(headers){["x-retry-count"] = retryCount + 1          //重试次数+1}};//重新发布消息到原始队列await channel.BasicPublishAsync(exchange: string.Empty,             //exchange: 空字符串表示默认direct交换机routingKey: queueName,              //使用原始队列名称mandatory: false,                   //false表示如果无法路由则丢弃消息basicProperties: newProperties,     //使用更新后的属性(包含新的重试次数)body: ea.Body                       //原始消息体);//确认原始消息已被处理(从队列中移除)//- deliveryTag: 消息投递标签//- multiple: false表示只确认单条消息await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);}}}};//开始消费队列消息var consumerTag = await channel.BasicConsumeAsync(queue: queueName,   //队列名称autoAck: autoAck,   //自动确认设置consumer: consumer  //消费者实例);//将消费者添加到活跃集合_activeConsumers.TryAdd(consumerTag, (channel, consumer));//注册取消令牌回调,当取消时停止消费者cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));//记录消费者/总消费者数量_logger.LogInformation("成功启动消费者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount},当前消费者标记: {consumerTag}", _currentConsumerCount, _maxConsumerCount, consumerTag);return cancellationTokenSource;}catch (Exception ex){//清理创建失败的通道资源channel?.Dispose();//当前消费者数量-1Interlocked.Decrement(ref _currentConsumerCount);_consumerLimitSemaphore.Release();//记录启动失败日志_logger.LogError(ex, "启动消费者失败。队列: {QueueName}", queueName);throw;}//finally//{//    // 如果出错或任务完成,确保 channel 被释放//    channel?.Dispose();//}}/// <summary>/// 消费消息-发布/订阅模式(Pub/Sub)/// </summary>/// <typeparam name="T">消息类型</typeparam>/// <param name="exchangeName">要订阅的Exchange名称</param>/// <param name="messageHandler">消息处理委托</param>/// <param name="prefetchCount">预取消息数量,控制消费者负载</param>/// <param name="autoAck">是否自动确认消息</param>/// <returns>取消令牌源,用于停止消费</returns>public async Task<CancellationTokenSource> StartConsumingByPubSubAsync<T>(string exchangeName, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class{//校验exchange名称是否为空if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");//校验消息处理器是否为空if (messageHandler == null)throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");//等待获取消费者槽位(防止并发消费者过多)if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30))){//获取失败则抛出超时异常throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");}//原子增加当前消费者数量Interlocked.Increment(ref _currentConsumerCount);//创建取消令牌源用于后续停止消费var cancellationTokenSource = new CancellationTokenSource();IChannel? channel = null;try{//如果未指定prefetchCount,则使用默认值if (prefetchCount == 0){prefetchCount = _rabbitMQService.prefetchCount;}//创建 RabbitMQ 通道channel = await _rabbitMQService.CreateChannelAsync();//设置QoS(服务质量),限制预取的消息数量await channel.BasicQosAsync(prefetchSize: 0,prefetchCount: prefetchCount,global: false);//声明一个 fanout 类型的 Exchange(广播模式)await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Fanout,durable: true,   //可持久化autoDelete: false); //不自动删除//自定义临时队列名称var queueName = "PubSub-" + DateTime.Now.ToString("yyyyMMddHHmmssfff");//创建临时队列(由RabbitMQ自动生成名字)//var queueResult = await channel.QueueDeclareAsync();var queueResult = await channel.QueueDeclareAsync(queue: queueName,   //队列名称durable: false,     //队列是否持久化       false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列)           true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)exclusive: true,    //队列是否排他         true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列)    false:队列可被多个消费者共享(默认值,适合常规队列)autoDelete: true    //队列是否自动删除     true:当最后一个消费者断开连接后,队列自动删除(适合临时队列)            false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值));//将队列绑定到Exchange(fanout类型忽略routingKey)await channel.QueueBindAsync(queue: queueName,exchange: exchangeName,routingKey: "");//创建异步消费者对象var consumer = new AsyncEventingBasicConsumer(channel);//注册消息接收事件处理逻辑consumer.ReceivedAsync += async (model, ea) =>{try{//反序列化消息体为泛型对象Tvar message = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));//记录收到消息的日志_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\nExchange: {ExchangeName}\r\n消息体:{message}", ea.BasicProperties.MessageId, exchangeName, message?.ToJson());//调用用户定义的消息处理方法await messageHandler(message);//如果不是自动确认,则手动发送 Ack 确认消息已处理if (!autoAck){await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);}}catch (JsonException jsonEx){//消息反序列化失败,记录错误日志_logger.LogError(jsonEx, "消息反序列化失败。\r\nExchange: {ExchangeName}", exchangeName);//非自动确认模式下拒绝消息,不重入队列if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);}}catch (Exception ex){//消息处理过程中发生其他异常,记录错误日志_logger.LogError(ex, "消息处理失败。\r\nExchange: {ExchangeName}", exchangeName);//非自动确认模式下拒绝消息,并尝试重新入队if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: true);}}};//启动消费,开始监听消息var consumerTag = await channel.BasicConsumeAsync(queue: queueName,autoAck: autoAck,consumer: consumer);//将消费者和通道保存起来以便后续取消操作_activeConsumers.TryAdd(consumerTag, (channel, consumer));//注册取消回调,当取消令牌被触发时调用StopConsumingcancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));//记录启动成功日志_logger.LogInformation("成功启动订阅者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount}", _currentConsumerCount, _maxConsumerCount);//返回取消令牌源,供外部控制消费终止return cancellationTokenSource;}catch (Exception ex){//出现异常时释放资源channel?.Dispose();//原子减少消费者计数Interlocked.Decrement(ref _currentConsumerCount);//释放信号量槽位_consumerLimitSemaphore.Release();//记录启动失败日志_logger.LogError(ex, "启动订阅者失败。Exchange: {ExchangeName}", exchangeName);//抛出异常throw;}}/// <summary>/// 消费消息-路由模式(Routing)/// </summary>/// <typeparam name="T">消息反序列化的目标类型</typeparam>/// <param name="exchangeName">要绑定的 Exchange 名称</param>/// <param name="routingKey">用于绑定队列和 Exchange 的路由键</param>/// <param name="messageHandler">处理接收到的消息的异步回调函数</param>/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>/// <param name="autoAck">是否自动确认消息</param>/// <returns>CancellationTokenSource,用于取消消费操作</returns>public async Task<CancellationTokenSource> StartConsumingByRoutingAsync<T>(string exchangeName, string routingKey, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class{//校验exchange名称是否为空if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");//校验路由键是否为空if (routingKey == null)throw new ArgumentNullException(nameof(routingKey), "路由键不能为空");//校验消息处理器是否为空if (messageHandler == null)throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");//等待获取消费者槽位(防止并发消费者过多)if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30))){//获取失败则抛出超时异常throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");}//原子增加当前消费者数量Interlocked.Increment(ref _currentConsumerCount);// 创建 CancellationTokenSource,用于后续控制取消消费var cancellationTokenSource = new CancellationTokenSource();IChannel? channel = null;try{// 创建一个新的 RabbitMQ Channelchannel = await _rabbitMQService.CreateChannelAsync();// 声明一个 Direct 类型的 Exchange(如果不存在则创建)await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Direct,durable: true,          // Exchange 持久化autoDelete: false);     // 不自动删除//自定义临时队列名称var queueName = "Routing-" + DateTime.Now.ToString("yyyyMMddHHmmssfff");//创建临时队列(由RabbitMQ自动生成名字)//var queueResult = await channel.QueueDeclareAsync();var queueResult = await channel.QueueDeclareAsync(queue: queueName,   //队列名称durable: false,     //队列是否持久化       false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列)           true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)exclusive: true,    //队列是否排他         true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列)    false:队列可被多个消费者共享(默认值,适合常规队列)autoDelete: true    //队列是否自动删除     true:当最后一个消费者断开连接后,队列自动删除(适合临时队列)            false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值));// 将队列绑定到指定的 Exchange,并使用给定的 routingKeyawait channel.QueueBindAsync(queueName, exchangeName, routingKey);// 创建异步消费者var consumer = new AsyncEventingBasicConsumer(channel);// 注册消息接收事件处理逻辑consumer.ReceivedAsync += async (model, ea) =>{try{// 反序列化消息体为泛型 T 对象var msg = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));//记录收到消息的日志_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\nExchange: {ExchangeName}\r\n消息体:{message}", ea.BasicProperties.MessageId, exchangeName, msg?.ToJson());// 调用用户定义的消息处理函数await messageHandler(msg);// 如果不是自动确认,则手动发送 Ack 确认消息已处理成功if (!autoAck)await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);}catch (JsonException jsonEx){//消息反序列化失败,记录错误日志_logger.LogError(jsonEx, "消息反序列化失败。\r\nExchange: {ExchangeName}", exchangeName);//非自动确认模式下拒绝消息,不重入队列if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);}}catch (Exception ex){//消息处理过程中发生其他异常,记录错误日志_logger.LogError(ex, "消息处理失败。\r\nExchange: {ExchangeName}", exchangeName);//非自动确认模式下拒绝消息,并尝试重新入队if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: true);}}};// 开始消费队列中的消息var consumerTag = await channel.BasicConsumeAsync(queueName, autoAck, consumer);// 记录当前消费者信息以便后续取消或释放资源_activeConsumers.TryAdd(consumerTag, (channel, consumer));// 注册取消令牌,在取消时调用 StopConsuming 方法停止消费cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));//记录启动成功日志_logger.LogInformation("成功启动订阅者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount}", _currentConsumerCount, _maxConsumerCount);// 返回 CancellationTokenSource,供外部控制取消return cancellationTokenSource;}catch (Exception ex){//出现异常时释放资源channel?.Dispose();//原子减少消费者计数Interlocked.Decrement(ref _currentConsumerCount);//释放信号量槽位_consumerLimitSemaphore.Release();//记录启动失败日志_logger.LogError(ex, "启动订阅者失败。Exchange: {ExchangeName}", exchangeName);//抛出异常throw;}}/// <summary>/// 消费消息-主题模式(Topic)/// </summary>/// <typeparam name="T">消息反序列化的目标类型</typeparam>/// <param name="exchangeName">要绑定的 Exchange 名称</param>/// <param name="topicPattern">用于绑定队列的 Topic 匹配规则(如 user.*)</param>/// <param name="messageHandler">处理接收到的消息的异步回调函数</param>/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>/// <param name="autoAck">是否自动确认消息</param>/// <returns>CancellationTokenSource,用于取消消费操作</returns>public async Task<CancellationTokenSource> StartConsumingByTopicAsync<T>(string exchangeName, string topicPattern, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class{//校验exchange名称是否为空if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");//校验匹配规则是否为空if (topicPattern == null)throw new ArgumentNullException(nameof(topicPattern), "topicPattern不能为空");//校验消息处理器是否为空if (messageHandler == null)throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");//等待获取消费者槽位(防止并发消费者过多)if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30))){//获取失败则抛出超时异常throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");}//原子增加当前消费者数量Interlocked.Increment(ref _currentConsumerCount);// 创建 CancellationTokenSource,用于后续控制取消消费var cancellationTokenSource = new CancellationTokenSource();IChannel? channel = null;try{// 创建一个新的 RabbitMQ Channelchannel = await _rabbitMQService.CreateChannelAsync();// 声明一个 Topic 类型的 Exchange(如果不存在则创建)await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Topic,durable: true,          // Exchange 持久化autoDelete: false);     // 不自动删除//自定义临时队列名称var queueName = "Topic-" + DateTime.Now.ToString("yyyyMMddHHmmssfff");//创建临时队列(由RabbitMQ自动生成名字)//var queueResult = await channel.QueueDeclareAsync();var queueResult = await channel.QueueDeclareAsync(queue: queueName,   //队列名称durable: false,     //队列是否持久化       false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列)           true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)exclusive: true,    //队列是否排他         true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列)    false:队列可被多个消费者共享(默认值,适合常规队列)autoDelete: true    //队列是否自动删除     true:当最后一个消费者断开连接后,队列自动删除(适合临时队列)            false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值));// 将队列绑定到指定的 Exchange,并使用 Topic 模式匹配规则await channel.QueueBindAsync(queueName, exchangeName, topicPattern);// 创建异步消费者var consumer = new AsyncEventingBasicConsumer(channel);// 注册消息接收事件处理逻辑consumer.ReceivedAsync += async (model, ea) =>{try{// 反序列化消息体为泛型 T 对象var msg = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));//记录收到消息的日志_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\nExchange: {ExchangeName}\r\n消息体:{message}", ea.BasicProperties.MessageId, exchangeName, msg?.ToJson());// 调用用户定义的消息处理函数await messageHandler(msg);// 如果不是自动确认,则手动发送 Ack 确认消息已处理成功if (!autoAck)await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);}catch (JsonException jsonEx){//消息反序列化失败,记录错误日志_logger.LogError(jsonEx, "消息反序列化失败。\r\nExchange: {ExchangeName}", exchangeName);//非自动确认模式下拒绝消息,不重入队列if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);}}catch (Exception ex){//消息处理过程中发生其他异常,记录错误日志_logger.LogError(ex, "消息处理失败。\r\nExchange: {ExchangeName}", exchangeName);//非自动确认模式下拒绝消息,并尝试重新入队if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: true);}}};// 开始消费队列中的消息var consumerTag = await channel.BasicConsumeAsync(queueName, autoAck, consumer);// 记录当前消费者信息以便后续取消或释放资源_activeConsumers.TryAdd(consumerTag, (channel, consumer));// 注册取消令牌,在取消时调用 StopConsuming 方法停止消费cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));//记录启动成功日志_logger.LogInformation("成功启动订阅者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount}", _currentConsumerCount, _maxConsumerCount);// 返回 CancellationTokenSource,供外部控制取消return cancellationTokenSource;}catch (Exception ex){//出现异常时释放资源channel?.Dispose();//原子减少消费者计数Interlocked.Decrement(ref _currentConsumerCount);//释放信号量槽位_consumerLimitSemaphore.Release();//记录启动失败日志_logger.LogError(ex, "启动订阅者失败。Exchange: {ExchangeName}", exchangeName);//抛出异常throw;}}/// <summary>/// 消费消息-请求/响应(RPC)/// </summary>/// <typeparam name="TRequest">请求消息的类型</typeparam>/// <typeparam name="TResponse">响应消息的类型</typeparam>/// <param name="exchangeName">Exchange 名称,通常为空字符串表示默认 Exchange</param>/// <param name="routingKey">用于监听的队列名称(同时也是 routingKey)</param>/// <param name="handler">处理请求并返回响应的异步回调函数</param>/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>/// <returns>CancellationTokenSource,用于取消消费操作</returns>public async Task<CancellationTokenSource> StartConsumingByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, Func<TRequest, Task<TResponse>> handler, ushort prefetchCount = 0) where TRequest : class where TResponse : class{// 参数校验if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentException("Exchange名称不能为空", nameof(exchangeName));if (string.IsNullOrWhiteSpace(routingKey))throw new ArgumentException("路由键不能为空", nameof(routingKey));if (handler == null)throw new ArgumentNullException(nameof(handler), "消息处理器不能为空");// 等待获取消费者槽位if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30))){throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");}try{Interlocked.Increment(ref _currentConsumerCount);// 创建新的 RabbitMQ Channelvar channel = await _rabbitMQService.CreateChannelAsync();try{// 设置预取数量(控制并发处理能力)if (prefetchCount > 0){await channel.BasicQosAsync(0, prefetchCount, false);}// 声明队列(与生产者保持一致)var queueDeclareOk = await channel.QueueDeclareAsync(queue: routingKey,   //队列名称durable: true,       //队列是否持久化       false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列)           true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)exclusive: false,    //队列是否排他         true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列)    false:队列可被多个消费者共享(默认值,适合常规队列)autoDelete: false,   //队列是否自动删除     true:当最后一个消费者断开连接后,队列自动删除(适合临时队列)            false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值)arguments: null);// 绑定队列到Exchangeawait channel.QueueBindAsync(queue: routingKey,exchange: exchangeName,routingKey: routingKey);// 创建异步消费者var consumer = new AsyncEventingBasicConsumer(channel);// 消息处理逻辑consumer.ReceivedAsync += async (model, ea) =>{try{// 反序列化请求var request = JsonSerializer.Deserialize<TRequest>(ea.Body.Span);// 处理请求var response = await handler(request);// 准备响应属性var replyProps = new BasicProperties();replyProps.CorrelationId = ea.BasicProperties.CorrelationId;replyProps.ContentType = "application/json";// 发送响应await channel.BasicPublishAsync(exchange: "", // 默认ExchangeroutingKey: ea.BasicProperties.ReplyTo,mandatory: false,basicProperties: replyProps,body: Encoding.UTF8.GetBytes(JsonSerializer.Serialize(response)));// 确认消息await channel.BasicAckAsync(ea.DeliveryTag, false);}catch (Exception ex){_logger.LogError(ex, "处理RPC请求失败: {CorrelationId}",ea.BasicProperties?.CorrelationId);// 拒绝消息且不重新入队await channel.BasicNackAsync(ea.DeliveryTag, false, false);}};// 开始消费var consumerTag = await channel.BasicConsumeAsync(queue: routingKey,autoAck: false, // 手动确认consumer: consumer);// 创建取消令牌var cts = new CancellationTokenSource();// 注册取消回调cts.Token.Register(async () =>{try{await channel.BasicCancelAsync(consumerTag);await channel.CloseAsync();}catch (Exception ex){_logger.LogWarning(ex, "取消消费者时发生错误");}finally{channel.Dispose();Interlocked.Decrement(ref _currentConsumerCount);_consumerLimitSemaphore.Release();}});return cts;}catch{// 发生异常时确保通道被关闭channel?.Dispose();throw;}}catch{// 发生异常时释放信号量Interlocked.Decrement(ref _currentConsumerCount);_consumerLimitSemaphore.Release();throw;}}/// <summary>/// 停止指定消费者的消息消费/// </summary>/// <param name="consumerTag">消费者标签</param>/// <returns>异步任务</returns>public async Task StopConsuming(string consumerTag){//从活跃集合中移除消费者if (_activeConsumers.TryRemove(consumerTag, out var consumerInfo)){try{//取消消费者订阅await consumerInfo.Channel.BasicCancelAsync(consumerTag);//异步释放通道资源await consumerInfo.Channel.DisposeAsync();//记录停止成功日志_logger.LogInformation("已停止消费者。消费者标签: {ConsumerTag}", consumerTag);}catch (OperationInterruptedException opEx){//记录操作中断警告日志_logger.LogWarning(opEx, "消费者取消操作被中断。消费者标签: {ConsumerTag}", consumerTag);}catch (Exception ex){//记录停止失败错误日志_logger.LogError(ex, "停止消费者时出错。消费者标签: {ConsumerTag}", consumerTag);throw;}finally{Interlocked.Decrement(ref _currentConsumerCount);_consumerLimitSemaphore.Release();_logger.LogInformation("当前消费者数: {CurrentCount}/{MaxCount}", _currentConsumerCount, _maxConsumerCount);}}else{// 记录未找到消费者警告日志_logger.LogWarning("未找到对应的消费者。消费者标签: {ConsumerTag}", consumerTag);}}/// <summary>/// 停止所有消费者的消息消费/// </summary>/// <returns>异步任务</returns>public async Task StopAllConsuming(){// 遍历所有消费者标签并停止foreach (var consumerTag in _activeConsumers.Keys.ToList()){try{await StopConsuming(consumerTag).ConfigureAwait(false);}catch (Exception ex){// 记录单个消费者停止失败日志,继续停止其他消费者_logger.LogError(ex, "停止消费者时出错。消费者标签: {ConsumerTag}", consumerTag);}}}/// <summary>/// 获取当前消费者状态/// </summary>public ConsumerStatus GetConsumerStatus(){// 创建并返回一个新的 ConsumerStatus 对象,用于封装当前消费者的运行状态return new ConsumerStatus{// 设置当前消费者数量CurrentCount = _currentConsumerCount,// 设置最大消费者数量MaxCount = _maxConsumerCount,// 获取当前所有活跃消费者的标识符列表ActiveConsumers = _activeConsumers.Keys.ToList()};}}}

        案例如下

5.2.消费者接口

using Frame3_DataRepository.RabbitMQRepository.BaseMQ;namespace Frame3_DataRepository.RabbitMQRepository.Consumer
{/// <summary>/// 消费者服务接口/// RabbitMQ/// </summary>public interface IMQConsumerService{/// <summary>/// 消费消息-点对点(Point-to-Point)/// </summary>/// <typeparam name="T">消息类型</typeparam>/// <param name="queueName">要消费的队列名称</param>/// <param name="messageHandler">消息处理委托</param>/// <param name="prefetchCount">预取消息数量,控制消费者负载</param>/// <param name="autoAck">是否自动确认消息,建议设为false实现可靠消费</param>/// <returns>取消令牌源,用于停止消费</returns>/// <exception cref="ArgumentNullException">当必要参数为空时抛出</exception>Task<CancellationTokenSource> StartConsumingByPTPAsync<T>(string queueName, Func<T, Task> messageHandler, bool withDLX = true, ushort prefetchCount = 0, bool autoAck = false) where T : class;/// <summary>/// 消费消息-发布/订阅模式(Pub/Sub)/// </summary>/// <typeparam name="T">消息类型</typeparam>/// <param name="exchangeName">要订阅的Exchange名称</param>/// <param name="messageHandler">消息处理委托</param>/// <param name="prefetchCount">预取消息数量,控制消费者负载</param>/// <param name="autoAck">是否自动确认消息</param>/// <returns>取消令牌源,用于停止消费</returns>Task<CancellationTokenSource> StartConsumingByPubSubAsync<T>(string exchangeName, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class;/// <summary>/// 消费消息-路由模式(Routing)/// </summary>/// <typeparam name="T">消息反序列化的目标类型</typeparam>/// <param name="exchangeName">要绑定的 Exchange 名称</param>/// <param name="routingKey">用于绑定队列和 Exchange 的路由键</param>/// <param name="messageHandler">处理接收到的消息的异步回调函数</param>/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>/// <param name="autoAck">是否自动确认消息</param>/// <returns>CancellationTokenSource,用于取消消费操作</returns>Task<CancellationTokenSource> StartConsumingByRoutingAsync<T>(string exchangeName, string routingKey, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class;/// <summary>/// 消费消息-主题模式(Topic)/// </summary>/// <typeparam name="T">消息反序列化的目标类型</typeparam>/// <param name="exchangeName">要绑定的 Exchange 名称</param>/// <param name="topicPattern">用于绑定队列的 Topic 匹配规则(如 user.*)</param>/// <param name="messageHandler">处理接收到的消息的异步回调函数</param>/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>/// <param name="autoAck">是否自动确认消息</param>/// <returns>CancellationTokenSource,用于取消消费操作</returns>Task<CancellationTokenSource> StartConsumingByTopicAsync<T>(string exchangeName, string topicPattern, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class;/// <summary>/// 消费消息-请求/响应(RPC)/// </summary>/// <typeparam name="TRequest">请求消息的类型</typeparam>/// <typeparam name="TResponse">响应消息的类型</typeparam>/// <param name="exchangeName">Exchange 名称,通常为空字符串表示默认 Exchange</param>/// <param name="routingKey">用于监听的队列名称(同时也是 routingKey)</param>/// <param name="handler">处理请求并返回响应的异步回调函数</param>/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>/// <returns>CancellationTokenSource,用于取消消费操作</returns>Task<CancellationTokenSource> StartConsumingByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, Func<TRequest, Task<TResponse>> handler, ushort prefetchCount = 0) where TRequest : class where TResponse : class;/// <summary>/// 停止指定消费者的消息消费/// </summary>/// <param name="consumerTag">消费者标签</param>/// <returns>异步任务</returns>Task StopConsuming(string consumerTag);/// <summary>/// 停止所有消费者的消息消费/// </summary>/// <returns>异步任务</returns>Task StopAllConsuming();/// <summary>/// 获取当前消费者状态/// </summary>/// <returns></returns>ConsumerStatus GetConsumerStatus();}
}

        案例如下

6.注册

         在 Program 或 Startup 中注册队列。

 // 注册 RabbitMQ 连接服务为单例(Singleton)// IRabbitMqClient 是一个接口,代表 RabbitMQ 客户端连接的抽象// RabbitMqClient 是其具体实现类// 单例生命周期意味着在整个应用程序生命周期中只创建一次该实例,所有请求共享同一个实例builder.Services.AddSingleton<IRabbitMqClient, RabbitMqClient>();// 注册 RabbitMQ 消息生产者服务为作用域(Scoped)// IMQProducerService 是用于发送消息的接口// MQProducerService 是其实现类// Scoped 生命周期表示在同一个请求上下文中使用同一个实例(适用于 Web 应用场景)builder.Services.AddScoped<IMQProducerService, MQProducerService>();// 注册 RabbitMQ 消息消费者服务为单例(Singleton)// IMQConsumerService 是用于消费消息(接收并处理消息)的接口// MQConsumerService 是其实现类// 使用 Singleton 是因为消费者通常需要长时间运行、持续监听队列,适合整个应用周期内保持一个实例builder.Services.AddSingleton<IMQConsumerService, MQConsumerService>();

         案例如下

7.简单使用案例

        下面是 实现、接口和控制器的使用案例

        7.1.实现

using Frame1_Service.IService.Product;
using Frame2_DataModel.Entity.Products;
using Frame3_DataRepository.RabbitMQRepository.Consumer;
using Frame3_DataRepository.RabbitMQRepository.Producer;
using Frame6_LibraryUtility;
using RabbitMQ.Client.Exceptions;namespace Frame1_Service.Service.Product
{public class RabbitMQTestSvr : BaseService, IRabbitMQTestSvr{/// <summary>/// 生产者/// </summary>private readonly IMQProducerService _iRabbitMQProducer;/// <summary>/// 消费者/// </summary>private readonly IMQConsumerService _iRabbitMQConsumer;/// <summary>/// 构造/// </summary>/// <param name="iRabbitMQProducer"></param>/// <param name="iRabbitMQConsumer"></param>public RabbitMQTestSvr(IMQProducerService iRabbitMQProducer, IMQConsumerService iRabbitMQConsumer){_iRabbitMQConsumer = iRabbitMQConsumer;_iRabbitMQProducer = iRabbitMQProducer;}/// <summary>/// 模拟消费逻辑/// </summary>/// <param name="model"></param>/// <returns></returns>private async Task ProcessOrderAsync(ProductsEntity model){Console.WriteLine("消费成功:" + model.ToJson());}/// <summary>/// 生产者-点对点(Point-to-Point)/// </summary>/// <param name="model"></param>/// <returns></returns>public async Task<ResultModel<bool>> ProducerTest(ProductsEntity model){var result = new ResultModel<bool>() { Data = false };// 创建 Random 实例Random random = new Random();model.Id = Guid.NewGuid().ToString();model.ProductName = "测试" + (random.Next(1, 999)).ToString();model.Price = random.Next(1000, 9999);model.Stock = random.Next(1, 99);await _iRabbitMQProducer.PublishByPTPAsync<ProductsEntity>("ProducerTest", model);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 消费者-点对点(Point-to-Point)/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> ConsumerTest(){var result = new ResultModel<bool>() { Data = false };await _iRabbitMQConsumer.StartConsumingByPTPAsync<ProductsEntity>("ProducerTest", ProcessOrderAsync);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 生产者-发布订阅(Pub/Sub)/// </summary>/// <param name="model"></param>/// <returns></returns>public async Task<ResultModel<bool>> ProducerPubSub(ProductsEntity model){var result = new ResultModel<bool>() { Data = false };// 创建 Random 实例Random random = new Random();model.Id = Guid.NewGuid().ToString();model.ProductName = "测试" + (random.Next(1, 999)).ToString();model.Price = random.Next(1000, 9999);model.Stock = random.Next(1, 99);await _iRabbitMQProducer.PublishByPubSubAsync<ProductsEntity>("PubSubTest", model);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 消费者-发布订阅(Pub/Sub)/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> ConsumerPubSub(){var result = new ResultModel<bool>() { Data = false };await _iRabbitMQConsumer.StartConsumingByPubSubAsync<ProductsEntity>("PubSubTest", ProcessOrderAsync);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 生产者-路由模式(Routing)/// </summary>/// <param name="model"></param>/// <returns></returns>public async Task<ResultModel<bool>> ProducerRouting(ProductsEntity model){var result = new ResultModel<bool>() { Data = false };// 创建 Random 实例Random random = new Random();model.Id = Guid.NewGuid().ToString();model.ProductName = "测试" + (random.Next(1, 999)).ToString();model.Price = random.Next(1000, 9999);model.Stock = random.Next(1, 99);await _iRabbitMQProducer.PublishByRoutingAsync<ProductsEntity>("RoutingTest", "Routing", model);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 消费者-路由模式(Routing)/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> ConsumerRouting(){var result = new ResultModel<bool>() { Data = false };await _iRabbitMQConsumer.StartConsumingByRoutingAsync<ProductsEntity>("RoutingTest", "Routing", ProcessOrderAsync);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 生产者-主题模式(Topic)/// </summary>/// <param name="model"></param>/// <returns></returns>public async Task<ResultModel<bool>> ProducerTopic(ProductsEntity model){var result = new ResultModel<bool>() { Data = false };// 创建 Random 实例Random random = new Random();model.Id = Guid.NewGuid().ToString();model.ProductName = "测试" + (random.Next(1, 999)).ToString();model.Price = random.Next(1000, 9999);model.Stock = random.Next(1, 99);await _iRabbitMQProducer.PublishByTopicAsync<ProductsEntity>("TopicTest", "Topic.test", model);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 消费者-主题模式(Topic)/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> ConsumerTopic(){var result = new ResultModel<bool>() { Data = false };await _iRabbitMQConsumer.StartConsumingByTopicAsync<ProductsEntity>("TopicTest", "Topic.*", ProcessOrderAsync);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 生产者-请求响应模式(RPC)/// </summary>/// <param name="model"></param>/// <returns></returns>public async Task<ResultModel<CalculateResponse>> ProducerRPC(ProductsEntity model){var result = new ResultModel<CalculateResponse>();var request = new CalculateRequest { X = 5, Y = 7 };// 创建 Random 实例Random random = new Random();model.Id = Guid.NewGuid().ToString();model.ProductName = "测试" + (random.Next(1, 999)).ToString();model.Price = random.Next(1000, 9999);model.Stock = random.Next(1, 99);var response = await _iRabbitMQProducer.PublishByPRCAsync<CalculateRequest, CalculateResponse>("RPCTest", "RPC", request);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = response;return result;}/// <summary>/// 消费者-请求响应模式(RPC)/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> ConsumerRPC(){var result = new ResultModel<bool> { Data = false };try{//示例:模拟一个计算器服务(可替换为真实的 ICalculatorService)Func<CalculateRequest, Task<CalculateResponse>> handler = async req =>{await Task.Delay(10); //模拟异步处理return new CalculateResponse { Result = req.X + req.Y };};//启动消费者var cts = await _iRabbitMQConsumer.StartConsumingByPRCAsync<CalculateRequest, CalculateResponse>(exchangeName: "RPCTest", routingKey: "RPC", handler);result.Data = true;result.Code = ResultCodeEnum.Success;result.Msg = "RPC消费者已启动";}catch (OperationInterruptedException ex){result.Msg = "消息队列服务不可用";}catch (Exception ex){result.Msg = "消费者初始化失败";}return result;}/// <summary>/// 死信队列重抛/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> Republish(string queueName){var result = new ResultModel<bool>() { Data = false };await _iRabbitMQProducer.RepublishDeadLetterMessagesAsync<ProductsEntity>(queueName);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 停止消费者/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> StopAllConsumer(string consumerTag){var result = new ResultModel<bool>() { Data = false };if (consumerTag.Equals("0")){await _iRabbitMQConsumer.StopAllConsuming();}else{await _iRabbitMQConsumer.StopConsuming(consumerTag);}result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}}
}

        案例如下

 

        7.2.接口

using Frame2_DataModel.Entity.Products;
using Frame6_LibraryUtility;namespace Frame1_Service.IService.Product
{public interface IRabbitMQTestSvr{/// <summary>/// 生产者-点对点(Point-to-Point)/// </summary>/// <param name="model"></param>/// <returns></returns>Task<ResultModel<bool>> ProducerTest(ProductsEntity model);/// <summary>/// 消费者-点对点(Point-to-Point)/// </summary>/// <returns></returns>Task<ResultModel<bool>> ConsumerTest();/// <summary>/// 生产者-发布订阅(Pub/Sub)/// </summary>/// <param name="model"></param>/// <returns></returns>Task<ResultModel<bool>> ProducerPubSub(ProductsEntity model);/// <summary>/// 消费者-发布订阅(Pub/Sub)/// </summary>/// <returns></returns>Task<ResultModel<bool>> ConsumerPubSub();/// <summary>/// 生产者-路由模式(Routing)/// </summary>/// <param name="model"></param>/// <returns></returns>Task<ResultModel<bool>> ProducerRouting(ProductsEntity model);/// <summary>/// 消费者-路由模式(Routing)/// </summary>/// <returns></returns>Task<ResultModel<bool>> ConsumerRouting();/// <summary>/// 生产者-主题模式(Topic)/// </summary>/// <param name="model"></param>/// <returns></returns>Task<ResultModel<bool>> ProducerTopic(ProductsEntity model);/// <summary>/// 消费者-主题模式(Topic)/// </summary>/// <returns></returns>Task<ResultModel<bool>> ConsumerTopic();/// <summary>/// 生产者-请求响应模式(RPC)/// </summary>/// <param name="model"></param>/// <returns></returns>Task<ResultModel<CalculateResponse>> ProducerRPC(ProductsEntity model);/// <summary>/// 消费者-请求响应模式(RPC)/// </summary>/// <returns></returns>Task<ResultModel<bool>> ConsumerRPC();/// <summary>/// 死信队列重抛/// </summary>/// <returns></returns>Task<ResultModel<bool>> Republish(string queueName);/// <summary>/// 停止消费者/// </summary>/// <returns></returns>Task<ResultModel<bool>> StopAllConsumer(string consumerTag);}
}

        案例如下

        7.3.控制器

using Frame1_Service.IService.Product;
using Frame1_Service.Service.Product;
using Frame2_DataModel.Entity.Products;
using Frame4_LibraryCore.BaseConfig;
using Frame6_LibraryUtility;
using Microsoft.AspNetCore.Mvc;namespace DemoAPI.Controllers
{/// <summary>/// 消息队列控制器 -RabbitMQ/// </summary>//[Authorize]// 保护整个控制器[Route("api/[controller]/[action]")]//标记路由地址规格[ApiController] // 标记该类为 API 控制器,启用一些默认的行为,如模型绑定、输入验证等[ApiExplorerSettings(GroupName = nameof(ApiVersionInfo.V1))]//设置控制器的API版本public class RabbitMQTestController : BaseController{private IRabbitMQTestSvr _iRabbitMQTestSvr;/// <summary>/// 构造/// </summary>/// <param name="iRabbitMQTestSvr"></param>public RabbitMQTestController(IRabbitMQTestSvr iRabbitMQTestSvr) {_iRabbitMQTestSvr = iRabbitMQTestSvr;}/// <summary>/// 生产者-点对点(Point-to-Point)/// </summary>/// <param name="model"></param>/// <returns></returns>[HttpPost]public async Task<ResultModel<bool>> ProducerTest(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerTest(model);/// <summary>/// 消费者-点对点(Point-to-Point)/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> ConsumerTest() => await _iRabbitMQTestSvr.ConsumerTest();/// <summary>/// 生产者-发布订阅(Pub/Sub)/// </summary>/// <param name="model"></param>/// <returns></returns>[HttpPost]public async Task<ResultModel<bool>> ProducerPubSub(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerPubSub(model);/// <summary>/// 消费者-发布订阅(Pub/Sub)/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> ConsumerPubSub() => await _iRabbitMQTestSvr.ConsumerPubSub();/// <summary>/// 生产者-路由模式(Routing)/// </summary>/// <param name="model"></param>/// <returns></returns>[HttpPost]public async Task<ResultModel<bool>> ProducerRouting(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerRouting(model);/// <summary>/// 消费者-路由模式(Routing)/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> ConsumerRouting() => await _iRabbitMQTestSvr.ConsumerRouting();/// <summary>/// 生产者-主题模式(Topic)/// </summary>/// <param name="model"></param>/// <returns></returns>[HttpPost]public async Task<ResultModel<bool>> ProducerTopic(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerTopic(model);/// <summary>/// 消费者-主题模式(Topic)/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> ConsumerTopic() => await _iRabbitMQTestSvr.ConsumerTopic();/// <summary>/// 生产者-请求响应模式(RPC)/// </summary>/// <param name="model"></param>/// <returns></returns>[HttpPost]public async Task<ResultModel<CalculateResponse>> ProducerRPC(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerRPC(model);/// <summary>/// 消费者-请求响应模式(RPC)/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> ConsumerRPC() => await _iRabbitMQTestSvr.ConsumerRPC();/// <summary>/// 死信队列重抛/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> Republish(string queueName) => await _iRabbitMQTestSvr.Republish(queueName);/// <summary>/// 停止消费者/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> StopAllConsumer(string consumerTag) => await _iRabbitMQTestSvr.StopAllConsumer(consumerTag);}
}

        案例如下

http://www.dtcms.com/wzjs/808029.html

相关文章:

  • 怎么用手机做网站平台南京鼓楼做网站的公司
  • 网站建设静态代码猪八戒包装设计
  • 廊坊公司快速建站爱采购卖家版app下载
  • 网站开发与维护总结wordpress静态页面
  • 一站式商家服务平台石碣镇网站仿做
  • 网站域名注销流程哈尔滨手机网页制作
  • 四川同风源建设工程有限公司网站适合网站开发的python
  • 淘宝做网站很便宜福州云建站模版
  • 如何在学校内网建立网站寮步网站建设价钱
  • 商业网站的后缀三合一模板网站
  • 如何针对你的网站做搜索优化陕西机械加工网
  • 为什么网站需要静态化生成html网站产品内容在数据库
  • 吴中区做网站的公司做电影网站需要注意什么
  • 什么网站做的好看又便宜账户竞价托管费用
  • 做网站能赚钱吗软件app开发公司哪家好
  • 哪里有个人卖房网站仿制网站建设
  • ui素材网站移动应用开发实训报告
  • 网站建设与管理课程用Wordpress建的网站有
  • 各网站的网络联盟如何自己创建一个网页
  • 梅林网站建设公司广告免费发布信息
  • 装修设计装饰电脑系统优化软件十大排名
  • 网站建设主要包括两个方面威海哪里可以建设企业网站
  • 重庆网站推广优化软件业务黄冈网络推广服务平台
  • 模板商城建站网站有限公司免费
  • 网站改版建设公司桂林网站制作培训班
  • 石家庄住房建设局网站wordpress 上传附件
  • 广州网站建设技术外包网站建设网站建设的
  • tor网站建设自己制作一个网站的软件
  • 网站站内推广计划书手机版房屋3d效果图设计软件
  • 简历模板网站免费建站开发软件