当前位置: 首页 > wzjs >正文

dz论坛做分类网站wordpress ux主题

dz论坛做分类网站,wordpress ux主题,哪个程序做下载网站好,互联网企业概念封装了与 RabbitMQ 消息代理交互的功能,包括发送和接收消息,以及管理连接和通道。 主要组件 依赖项: 代码使用了多个命名空间,包括 Microsoft.Extensions.Configuration(用于配置管理)、RabbitMQ.Client&a…

封装了与 RabbitMQ 消息代理交互的功能,包括发送和接收消息,以及管理连接和通道。

主要组件

  1. 依赖项

    • 代码使用了多个命名空间,包括 Microsoft.Extensions.Configuration(用于配置管理)、RabbitMQ.Client(用于与 RabbitMQ 的交互)、Newtonsoft.Json(用于 JSON 序列化和反序列化)。
    • 还使用了一些自定义命名空间,可能是更大应用程序框架的一部分(例如 MSEBP.Kernel.Common)。
  2. 配置模型

    • RabbitConfigModel 用于保存连接 RabbitMQ 的配置设置,包括 IPPortUserNamePasswordVirtualHost 等属性。
  3. 连接管理

    • 类中维护了一个静态字段 _connection 用于 RabbitMQ 连接,并提供了 GetConnection 方法来建立或获取该连接。
    • 连接使用 IP、端口、用户名、密码和虚拟主机等参数进行建立。
  4. 接收消息

    • Receive<T> 方法允许从 RabbitMQ 队列异步消费消息。
    • 使用 EventingBasicConsumer 来处理传入的消息,并通过提供的委托(receiveMethod)进行处理。
    • 消息处理包括错误处理,根据处理是否成功来确认(或否定确认)消息。
  5. 发送消息

    • Send<T> 方法将对象序列化为 JSON,并将其作为消息发送到指定的 RabbitMQ 队列。
    • 此方法允许指定消息是否应为持久化,并为消息设置适当的属性(如头信息)。
  6. 通道管理

    • CreateConsumerChannel 方法创建一个新的通道以与 RabbitMQ 进行交互。
    • 通道用于发送和接收消息,必须从已建立的连接中创建。
  7. 连接工厂

    • GetConnection 方法是一个静态方法,使用 ConnectionFactory 创建新的 RabbitMQ 连接。它配置了心跳和其他参数。
  8. 资源管理

    • Dispose 方法确保在不再需要时正确关闭 RabbitMQ 连接。

错误处理和重试机制

  • 消息消费逻辑中包含错误处理机制,以决定如何处理失败的消息处理:
    • 确认(BasicAck):在成功处理消息时调用。
    • 否定确认(BasicNack):允许根据情况(例如瞬态错误与永久错误)重试消息或拒绝消息
      using Microsoft.Extensions.Configuration;
      using Newtonsoft.Json;
      using RabbitMQ.Client;
      using RabbitMQ.Client.Events;
      using System;
      using System.Collections.Generic;
      using System.Text;
      using System.Threading;
      using System.Threading.Tasks;namespace MSEBP.External.RabbitMQMsg
      {/// <summary>/// 接受消息的服务 /// </summary>public class RabbitMessageService : IDisposable{private readonly RabbitConfigModel _rabbitConfig;private static IConnection _connection;private readonly IConfiguration _configuration;private readonly ILogger _logger;private readonly object _connectionLock = new object();private const int ReconnectDelaySeconds = 5;public RabbitMessageService(IConfiguration configuration){_configuration = configuration;_logger = MSEApplication.Resolve<ILogger>("CorrelationId");_rabbitConfig = LoadConfig();ValidateConfig(_rabbitConfig);}private RabbitConfigModel LoadConfig(){return new RabbitConfigModel{IP = _configuration["RabbitMQ:IP"],Port = _configuration["RabbitMQ:Port"].AsInt(5672),UserName = _configuration["RabbitMQ:UserName"],Password = _configuration["RabbitMQ:Password"],VirtualHost = _configuration["RabbitMQ:VirtualHost"],DurableQueue = _configuration["RabbitMQ:DurableQueue"].AsBool(),QueueName = _configuration["RabbitMQ:QueueName"],Exchange = _configuration["RabbitMQ:Exchange"],ExchangeType = _configuration["RabbitMQ:ExchangeType"],DurableMessage = _configuration["RabbitMQ:DurableMessage"].AsBool(),RoutingKey = _configuration["RabbitMQ:RoutingKey"],};}private void ValidateConfig(RabbitConfigModel config){if (string.IsNullOrWhiteSpace(config.IP) || config.Port <= 0 || string.IsNullOrWhiteSpace(config.UserName) || string.IsNullOrWhiteSpace(config.Password)){throw new InvalidOperationException("RabbitMQ configuration is invalid.");}}public IConnection GetConnection(string clientProvidedName){if (_connection == null || !_connection.IsOpen){lock (_connectionLock){if (_connection == null || !_connection.IsOpen){_connection = CreateConnection(clientProvidedName);}}}return _connection;}private IConnection CreateConnection(string clientProvidedName){var factory = new ConnectionFactory{ClientProvidedName = clientProvidedName,Endpoint = new AmqpTcpEndpoint(new Uri($"amqp://{_rabbitConfig.IP}/")),Port = _rabbitConfig.Port,UserName = _rabbitConfig.UserName,Password = _rabbitConfig.Password,VirtualHost = _rabbitConfig.VirtualHost,RequestedHeartbeat = TimeSpan.FromSeconds(60)};while (true){try{var connection = factory.CreateConnection();connection.ConnectionShutdown += OnConnectionShutdown;_logger.Info("RabbitMQ connection established.");return connection;}catch (Exception ex){_logger.Error("Failed to create RabbitMQ connection, retrying...", ex);Thread.Sleep(TimeSpan.FromSeconds(ReconnectDelaySeconds)); // 等待5秒后重试}}}private void OnConnectionShutdown(object sender, ShutdownEventArgs e){_logger.Warn("RabbitMQ connection was shut down. Attempting to reconnect...");Reconnect();}private void Reconnect(){lock (_connectionLock){if (_connection != null && !_connection.IsOpen){_logger.Info("Disposing of RabbitMQ connection.");_connection.Dispose();_connection = null;}// 重新建立连接_connection = CreateConnection("Reconnect");}}public async Task Receive<T>(Func<T, Task> receiveMethod, CancellationToken cancellationToken){using (var channel = CreateConsumerChannel("Receive")){try{SetupQueue(channel);var consumer = new EventingBasicConsumer(channel);consumer.Received += async (model, ea) => await ProcessMessage(channel, ea, receiveMethod, cancellationToken);channel.BasicConsume(_rabbitConfig.QueueName, false, consumer);// Wait for cancellationawait Task.Delay(Timeout.Infinite, cancellationToken);}catch (OperationCanceledException){_logger.Info("Message consumption was canceled.");}catch (Exception ex){_logger.Error("Failed to set up message consumer", ex);throw;}}}private void SetupQueue(IModel channel){if (!string.IsNullOrWhiteSpace(_rabbitConfig.Exchange)){channel.ExchangeDeclare(_rabbitConfig.Exchange, _rabbitConfig.ExchangeType.ToString(), _rabbitConfig.DurableQueue);channel.QueueDeclare(_rabbitConfig.QueueName, _rabbitConfig.DurableQueue, false, false, null);channel.QueueBind(_rabbitConfig.QueueName, _rabbitConfig.Exchange, _rabbitConfig.RoutingKey);}else{channel.QueueDeclare(_rabbitConfig.QueueName, _rabbitConfig.DurableQueue, false, false, null);}channel.BasicQos(0, 1, false);}private async Task ProcessMessage<T>(IModel channel, BasicDeliverEventArgs ea, Func<T, Task> receiveMethod, CancellationToken cancellationToken){var deliveryTag = ea.DeliveryTag;var message = Encoding.UTF8.GetString(ea.Body.ToArray());try{_logger.Info($"Received message: {message}");var data = JsonConvert.DeserializeObject<T>(message);await receiveMethod(data);channel.BasicAck(deliveryTag, false); // 确认处理成功}catch (Exception ex){_logger.Error($"Error processing message: {message}", ex);channel.BasicNack(deliveryTag, false, true); // 允许重新处理}}public bool Send<T>(T info){var message = JsonConvert.SerializeObject(info);if (string.IsNullOrWhiteSpace(message)) return false;try{using (var channel = CreateConsumerChannel("Send")){SetupSendChannel(channel);var properties = CreateMessageProperties(channel);var bytes = Encoding.UTF8.GetBytes(message);if (string.IsNullOrWhiteSpace(_rabbitConfig.Exchange)){channel.BasicPublish("", _rabbitConfig.QueueName, properties, bytes);}else{channel.BasicPublish(_rabbitConfig.Exchange, _rabbitConfig.RoutingKey, properties, bytes);}}return true;}catch (Exception ex){_logger.Error("Failed to send message to RabbitMQ", ex);return false;}}private void SetupSendChannel(IModel channel){if (!string.IsNullOrWhiteSpace(_rabbitConfig.Exchange)){channel.ExchangeDeclare(_rabbitConfig.Exchange, _rabbitConfig.ExchangeType.ToString(), _rabbitConfig.DurableQueue, false, null);}else{channel.QueueDeclare(_rabbitConfig.QueueName, _rabbitConfig.DurableQueue, false, false, null);}}private IBasicProperties CreateMessageProperties(IModel channel){var properties = channel.CreateBasicProperties();properties.DeliveryMode = Convert.ToByte(_rabbitConfig.DurableMessage ? 2 : 1);properties.Headers = new Dictionary<string, object>{{ "Content-Type", "application/json" },{ "__TypeId__", _rabbitConfig.QueueName }};return properties;}public IModel CreateConsumerChannel(string clientProvidedName){return GetConnection(clientProvidedName).CreateModel();}public void Dispose(){if (_connection != null && _connection.IsOpen){_connection.Close();_connection.Dispose();}}}
      }
      


文章转载自:

http://VC2MegKv.jxLtk.cn
http://aWdGIXS8.jxLtk.cn
http://6faYJbBW.jxLtk.cn
http://3PbYQAW8.jxLtk.cn
http://6bbIFx19.jxLtk.cn
http://ukDJTGl6.jxLtk.cn
http://27C7HERG.jxLtk.cn
http://V1v9IMz0.jxLtk.cn
http://V1BTQkQq.jxLtk.cn
http://vO3UswXp.jxLtk.cn
http://nLxvaYJu.jxLtk.cn
http://MR13tdIi.jxLtk.cn
http://9AN1phQX.jxLtk.cn
http://YtZxwECK.jxLtk.cn
http://dALau5ro.jxLtk.cn
http://HBqtPy25.jxLtk.cn
http://0xKFAyZd.jxLtk.cn
http://w87KYiqo.jxLtk.cn
http://Uwu7UWRT.jxLtk.cn
http://oQpSO9nS.jxLtk.cn
http://X56eqcNZ.jxLtk.cn
http://l6C0lElU.jxLtk.cn
http://06GGoKzL.jxLtk.cn
http://3bIYrfJS.jxLtk.cn
http://ppU4Gf1b.jxLtk.cn
http://A757ZjLV.jxLtk.cn
http://Pd4vHgwY.jxLtk.cn
http://soOncCtG.jxLtk.cn
http://PJG9HIjd.jxLtk.cn
http://7eUKW6MM.jxLtk.cn
http://www.dtcms.com/wzjs/721250.html

相关文章:

  • 淘宝客建立网站推广怎么做seo对网店的作用有哪些
  • 网站推广分为哪几个部分重庆烤鱼制作
  • 代发新闻稿的网站wordpress.
  • linux下网站建设如何招聘软件网站开发人员
  • 优速网站建设免费高清图片素材网站有哪些
  • 内销常用网站.net可以做网站做游戏 博客园
  • 基层建设期刊网站wordpress 点击加载
  • 猎聘网招聘官方网站国际婚恋网站做翻译合法吗
  • 网站怎么做免费做网站的过程
  • 5昌平区网站建设怎么找app开发公司
  • 网站创建流程企业建设网站注意点
  • 陕西华伟建设有限公司网站广东手机微信网站制作
  • 自己做网站买宁波做网站的
  • vs2013做的网站品牌网站建设 1蝌蚪小
  • 网站模板怎样使用长泰微新闻
  • 安徽教育机构网站建设途牛网站建设的特点
  • 为什么做网站越早越好网站 改版
  • 商业网站建站wordpress cad插件大全
  • 北京高端建站公司企业展示厅设计效果图
  • 网站底部版权信息格式携程网站建设
  • 山西集团网站建设实验室网站建设
  • 做财经直播网站做PS的赚钱的网站
  • 企业网站数据库设计表青州市住房和城乡建设局网站
  • 自己创网站购物网站设计欣赏
  • 武进网站建设怎么样深圳最好的网站制作哪家公司好
  • 定制网站本地企业那非西
  • 开一家做网站的公司百度搜索工具
  • 赤峰建设银行网站如何用php制作网页
  • 哪个网站做视频钱多360站长工具
  • 网站建设的工期拖延如何解决网站套餐方案