当前位置: 首页 > news >正文

.Net Core 中RabbitMQ基本使用

文章目录

  • 前言
  • 一、RabbitMQ 是什么?
    • 简介
    • 核心作用/解决的问题
    • 核心概念与工作模型
  • 二、安装环境
    • 1.安装 Erlang(必需依赖)
    • 2.安装 RabbitMQ
    • 3.启动 RabbitMQ 服务
    • 4.访问管理控制台
  • 三、创建生产者
    • 1.新建生产者发送消息控制台
    • 2.安装 NuGet 包:
    • 3.生产者发送消息
    • 4.关键点:
  • 四、创建消费者
    • 1.创建消费者接收消息控制台
    • 2.安装 NuGet 包:
    • 3.消费者接收消息
    • 4.关键点:
  • 五、主要优势
  • 六、最佳实践
  • 总结


前言

`

一、RabbitMQ 是什么?

简介

  • 开源消息代理/消息队列系统: RabbitMQ 的核心是一个消息代理。想象它是一个高效的邮局:应用程序(生产者)把消息(信件)发送给它,它负责将消息安全、可靠地路由和递送给一个或多个目标应用程序(消费者)。

  • 基于 AMQP: RabbitMQ 最初实现了 AMQP (Advanced Message Queuing Protocol) 0-9-1 协议,这是一个开放标准的应用层协议,专为可靠、异步、跨平台的消息传递而设计。这使得不同语言和平台编写的应用程序能够轻松通信。

  • 平台与语言无关: 由于实现了标准协议(并支持其他协议如 MQTT, STOMP),RabbitMQ 可以被几乎所有主流编程语言(Python, Java, .NET, Ruby, JavaScript, Go, PHP 等)使用。客户端库非常丰富。

  • 构建于 Erlang/OTP: RabbitMQ 是用 Erlang 语言编写的,并运行在强大的 OTP (Open Telecom Platform) 框架之上。Erlang/OTP 以其高并发、分布式、容错和热代码升级能力而闻名,这使得 RabbitMQ 天生就非常可靠、稳定且可扩展。

核心作用/解决的问题

  • RabbitMQ 主要用于解决分布式系统或复杂应用程序中的通信问题:

    • 解耦: 将消息的发送者(生产者)和接收者(消费者)分离。生产者不需要知道消费者的存在或状态,只需将消息发送到 RabbitMQ。消费者可以在需要时处理消息。这提高了系统的灵活性和可维护性。

    • 异步通信: 生产者发送消息后可以立即返回,无需等待消费者处理完成。消费者可以在自己方便的时候处理消息。这显著提高了系统的响应速度和吞吐量。

    • 缓冲/削峰填谷: 当生产者发送消息的速度超过消费者处理的速度时,RabbitMQ 的队列可以充当缓冲区,暂存消息,避免系统因瞬时压力过载而崩溃。消费者可以按照自己的节奏处理积压的消息。

    • 负载均衡: 可以启动多个消费者实例从同一个队列消费消息,RabbitMQ 会以轮询或其他方式将消息分发给这些消费者,实现工作负载的分配。

    • 可靠性: RabbitMQ 提供了强大的机制(如消息持久化、发送方确认、消费者确认)来确保消息在传输和处理过程中不丢失,即使在部分系统故障的情况下。

    • 灵活的路由: 通过 Exchange 和 Binding 机制,RabbitMQ 能够根据设定的规则(如路由键、主题匹配、头信息匹配等)将消息精确地投递到不同的队列。

核心概念与工作模型

  • 理解 RabbitMQ 的关键是理解其核心组件和消息流:

    • Producer(生产者): 创建并发送消息的应用程序。

    • Message(消息): 包含要传输的数据(有效载荷)和一些元数据(如路由键、头信息等)。

    • Exchange(交换机): 生产者将消息发送到交换机。交换机相当于邮局的分拣中心,它根据消息的路由键(Routing Key) 和自身的类型以及绑定规则,决定将消息投递到哪些队列。常见的交换机类型有:

      • Direct: 精确匹配路由键。
      • Fanout: 广播到所有绑定的队列(忽略路由键)。
      • Topic: 基于路由键模式匹配(通配符)。
      • Headers: 基于消息头信息匹配。
    • Binding(绑定): 连接交换机和队列的规则。它告诉交换机“哪些消息应该被路由到哪个队列”。绑定通常包含一个路由键(对于 Direct 和 Topic 交换机)或头信息匹配规则(对于 Headers 交换机)。

    • Queue(队列): 消息存储的地方,等待消费者处理。队列是 FIFO(先进先出)的,但可以配置优先级。消息会一直存储在队列中,直到被消费者成功接收并确认。

    • Consumer(消费者): 连接到队列并接收消息进行处理的应用程序。消费者可以主动拉取消息,也可以让 RabbitMQ 在有消息时推送过来。

  • 消息流简述:
    生产者 -> (发送消息到) -> 交换机 -> (根据绑定规则路由到) -> 一个或多个队列 <- (消费者从队列) <- 消费消息

二、安装环境

1.安装 Erlang(必需依赖)

  • RabbitMQ 基于 Erlang 编写,需先安装匹配版本:
    • 访问 Erlang下载页
    • 下载 Windows 安装包(选择与 RabbitMQ 兼容的版本)
    • 运行安装程序(默认设置即可)
    • 添加环境变量:找到系统环境变量→选中变量Path→编辑→添加Erlang安装bin目录(如:C:\Program Files\Erlang OTP\bin)
    • 验证安装:打开命令提示符,输入 erl -version,显示版本号即成功。

2.安装 RabbitMQ

  • 访问 RabbitMQ下载页

  • 在 Windows Installer 部分下载 .exe 安装包

  • 在这里插入图片描述

  • 运行安装程序:安装路径保持默认(C:\Program Files\RabbitMQ Server)

  • 添加环境变量::找到系统环境变量→选中变量Path→编辑→添加RabbitMQ安装sbin目录(如:C:\Program Files\RabbitMQ Server\rabbitmq_server-4.1.1\sbin)

  • 保证 Cookie 文件同步

    • 复制 文件(C:\Windows\System32\config\systemprofile.erlang.cookie)中的Cookie 字符串。
    • 同步Cookie 字符串到文件(C:\Users\雷家饭碗.erlang.cookie)中。

3.启动 RabbitMQ 服务

  • 方法1:通过服务管理器启动

    • 按 Win + R 输入 services.msc
    • 找到 RabbitMQ 服务 → 右键启动
  • 方法2:命令行启动

    • 以管理员身份打开cmd窗口
    • 输入命令启动RabbitMQ:net start RabbitMQ
    • 查看RabbitMQ服务状态:sc query RabbitMQ

4.访问管理控制台

  • 打开浏览器访问:RabbitMQ控制台
  • 使用默认账号登录
    • 用户名: guest
    • 密码: guest

三、创建生产者

1.新建生产者发送消息控制台

2.安装 NuGet 包:

  1. 安装命令:
    Install-Package RabbitMQ.Client
    

3.生产者发送消息

  1. 代码示例:
    using RabbitMQ.Client;
    using System.Text;var factory = new ConnectionFactory()
    {HostName = "localhost",UserName = "guest",Password = "guest"
    };using var connection = await factory.CreateConnectionAsync();
    string exchangeName = "exchangeName1";// 创建初始化通道(用于声明交换机和队列)
    using (var initChannel = await connection.CreateChannelAsync())
    {await initChannel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Direct,durable: true);
    }while (true)
    {using var channel = await connection.CreateChannelAsync();// 创建消息属性 - 7.1.2 正确方式var properties = new BasicProperties{Persistent = true,  // 设置持久化ContentType = "text/plain",Headers = new Dictionary<string, object>{{ "timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }}};string message = DateTime.Now.TimeOfDay.ToString();var body = Encoding.UTF8.GetBytes(message);await channel.BasicPublishAsync(exchange: exchangeName,routingKey: "Key1",mandatory: false,basicProperties: properties,  // 使用创建的消息属性body: body);Console.WriteLine($" [x] Sent {message}");await Task.Delay(1000);  // 使用异步延迟
    }
    

4.关键点:

  • durable: true + Persistent: true 确保消息与服务重启后不丢失。
  • 默认交换机(Key1)使用路由键直连队列。

四、创建消费者

1.创建消费者接收消息控制台

2.安装 NuGet 包:

  1. 安装命令:
    Install-Package RabbitMQ.Client
    

3.消费者接收消息

  1. 代码示例:
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using RabbitMQ.Client.Exceptions;
    using System.Text;var factory = new ConnectionFactory()
    {HostName = "localhost",UserName = "guest",Password = "guest",AutomaticRecoveryEnabled = true,NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
    };Console.WriteLine("正在连接到 RabbitMQ...");
    using var connection = await factory.CreateConnectionAsync();
    using var channel = await connection.CreateChannelAsync();const string exchangeName = "exchangeName1";
    const string queueName = "hello";
    const string routingKey = "Key1";// 1. 声明交换机
    await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Direct,durable: true
    );
    QueueDeclareOk queueResult = null;
    try
    {// 2. 尝试声明队列(使用所需参数)queueResult=await channel.QueueDeclareAsync(queue: queueName,durable: true, // 保持您需要的持久化设置exclusive: false,autoDelete: false,arguments: null);
    }
    catch (OperationInterruptedException ex) when (ex.Message.Contains("inequivalent arg"))
    {// 3. 处理参数不匹配异常Console.WriteLine($"队列参数不匹配: {ex.Message}");// 删除现有队列并重新创建Console.WriteLine("正在删除现有队列...");await channel.QueueDeleteAsync(queueName);Console.WriteLine("重新创建队列...");queueResult = await channel.QueueDeclareAsync(queue: queueName,durable: true,exclusive: false,autoDelete: false,arguments: null);
    }// 4. 绑定队列到交换机
    await channel.QueueBindAsync(queue: queueName,exchange: exchangeName,routingKey: routingKey
    );Console.WriteLine("队列信息:");
    Console.WriteLine($" 名称: {queueResult.QueueName}");
    Console.WriteLine($" 消息数: {queueResult.MessageCount}");
    Console.WriteLine($" 消费者数: {queueResult.ConsumerCount}");// 设置服务质量(QoS)限制未确认消息数量
    await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);// 创建异步消费者
    var consumer = new AsyncEventingBasicConsumer(channel);// 消息接收事件处理
    consumer.ReceivedAsync += async (model, ea) =>
    {try{// 获取消息内容var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 获取消息属性var properties = ea.BasicProperties;var timestamp = properties.Timestamp.UnixTime;var timestampStr = DateTimeOffset.FromUnixTimeMilliseconds(timestamp).ToString("HH:mm:ss.fff");var headers = properties.Headers != null? string.Join(", ", properties.Headers.Select(kv => $"{kv.Key}={kv.Value}")): "无";Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 收到消息:");Console.WriteLine($"  内容: {message}");Console.WriteLine($"  原始时间戳: {timestampStr}");Console.WriteLine($"  处理延迟: {DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - timestamp} ms");Console.WriteLine($"  头部: {headers}");// 模拟业务处理Console.WriteLine("处理中...");await Task.Delay(1000); // 模拟处理耗时// 业务处理成功后手动确认await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);Console.WriteLine("√ 处理完成并确认");}catch (Exception ex){Console.WriteLine($"× 处理失败: {ex.Message}");// 处理失败:拒绝消息并重新入队await channel.BasicNackAsync(deliveryTag: ea.DeliveryTag,multiple: false,requeue: true // 重新入队以便重试);Console.WriteLine("! 消息已重新入队");}
    };// 启动消费者
    await channel.BasicConsumeAsync(queue: queueName,autoAck: false, // 关闭自动确认consumer: consumer
    );Console.WriteLine(" [*] 等待消息中... 按 Ctrl+C 退出");
    Console.WriteLine("--------------------------------------------------");// 保持程序运行直到收到退出信号
    var exitEvent = new ManualResetEvent(false);
    Console.CancelKeyPress += (sender, eventArgs) => {eventArgs.Cancel = true; // 防止进程立即退出Console.WriteLine("\n正在关闭消费者...");exitEvent.Set(); // 通知主线程可以退出
    };exitEvent.WaitOne(); // 阻塞直到收到退出信号
    

4.关键点:

  • autoAck: false + BasicAck() 避免消息处理失败时丢失。
  • 事件驱动模型EventingBasicConsumer)适合实时处理。

五、主要优势

  • 高可靠性: 通过持久化、确认机制保障消息不丢。
  • 高可用性: 支持集群和镜像队列,避免单点故障。
  • 灵活的路由: 多种交换器类型满足复杂路由需求。
  • 可扩展性: 易于横向扩展(添加节点、消费者)以应对增长。
  • 丰富的协议支持: 原生 AMQP 0-9-1,插件支持 MQTT, STOMP 等。
  • 庞大的社区和生态系统: 开源、成熟、文档完善、客户端库丰富、插件众多。
  • 管理界面: 提供易用的 Web UI 和命令行工具进行监控和管理。

六、最佳实践

场景方案关键配置
消息持久化队列声明 + 消息属性durable: true + Persistent: true
消费者高可用手动确认 + 异常重试autoAck: false + BasicNack
流量控制预取计数(QoS)channel.BasicQos(prefetchCount: 1)
延迟任务死信队列 + TTLx-message-ttl + x-dead-letter-exchange

总结

RabbitMQ 是一个功能强大、稳定可靠、灵活且应用广泛的开源消息代理。它通过异步消息传递,有效地解决了分布式系统中的解耦、异步、削峰、负载均衡和可靠通信等关键问题。其基于标准的协议支持和丰富的特性,使其成为构建现代、可扩展、高可用应用程序的基石之一。

http://www.dtcms.com/a/265807.html

相关文章:

  • [自然语言处理]计算语言的熵
  • 【Python办公】Excel转CSV文件(可指定拆分行数\可批量或单个)
  • 用C#编写一个读取磁盘第一扇区的程序
  • 架空线路云台监控系统应对线路故障的智能化解决方案
  • 深度学习中的逻辑回归:从原理到Python实现
  • leetcode:1049. 最后一块石头的重量 II[01背包][动态规划]
  • 实际开发如何快速定位和解决死锁?
  • PM2.5和PM10分别是什么
  • 基于MATLAB的风力发电机无人机巡检路径优化研究
  • 最新PDF转markdown软件MonkeyOCR整合包,文档图片解析工具
  • 深度解析:Java内部类与外部类的交互机制
  • odoo-057 pgadmin 登录忘记密码
  • 【实时Linux实战系列】实时以太网与 TSN 基础
  • ARM单片机启动流程(二)(详细解析)
  • UDP服务器主要是指什么意思?
  • 提升自动驾驶导航能力:基于深度学习的场景理解技术
  • Centrifugo 深度解析:构建高性能实时应用的开源引擎
  • RocketMQ-Dashboard页面报Failed to fetch ops home page data错误
  • 车载交换机动态MAC学习和静态MAC绑定如何获取MAC地址表
  • BitsAndBytesConfig量化及注意事项
  • 明远智睿H618:开启多场景智慧生活新时代
  • 代码随想录打卡第五天
  • TinyWebserver学习(8)-定时器
  • 深度解析:venv和conda如何解决依赖冲突难题
  • 使用netstat与grep命令结合批量查找特定内容
  • Class3图像分类数据集代码
  • 数学建模_时间序列
  • CTF Web PHP弱类型与进制绕过(过滤)
  • 【云计算】企业项目 策略授权
  • 网络层:ip协议 与数据链路层