.Net Core 中RabbitMQ基本使用
文章目录
- 前言
- 一、RabbitMQ 是什么?
- 简介
- 核心作用/解决的问题
- 核心概念与工作模型
- 二、安装环境
- 1.安装 Erlang(必需依赖)
- 2.安装 RabbitMQ
- 3.启动 RabbitMQ 服务
- 4.访问管理控制台
- 三、创建生产者
- 1.新建生产者发送消息控制台
- 2.安装 NuGet 包:
- 3.生产者发送消息
- 4.关键点:
- 四、创建消费者
- 1.创建消费者接收消息控制台
- 2.安装 NuGet 包:
- 3.消费者接收消息
- 4.关键点:
- 五、主要优势
- 六、最佳实践
- 总结
前言
`
一、RabbitMQ 是什么?
简介
-
开源消息代理/消息队列系统: RabbitMQ 的核心是一个消息代理。想象它是一个高效的邮局:应用程序(生产者)把消息(信件)发送给它,它负责将消息安全、可靠地路由和递送给一个或多个目标应用程序(消费者)。
-
基于 AMQP: RabbitMQ 最初实现了 AMQP (Advanced Message Queuing Protocol) 0-9-1 协议,这是一个开放标准的应用层协议,专为可靠、异步、跨平台的消息传递而设计。这使得不同语言和平台编写的应用程序能够轻松通信。
-
平台与语言无关: 由于实现了标准协议(并支持其他协议如 MQTT, STOMP),RabbitMQ 可以被几乎所有主流编程语言(Python, Java, .NET, Ruby, JavaScript, Go, PHP 等)使用。客户端库非常丰富。
-
构建于 Erlang/OTP: RabbitMQ 是用 Erlang 语言编写的,并运行在强大的 OTP (Open Telecom Platform) 框架之上。Erlang/OTP 以其高并发、分布式、容错和热代码升级能力而闻名,这使得 RabbitMQ 天生就非常可靠、稳定且可扩展。
核心作用/解决的问题
-
RabbitMQ 主要用于解决分布式系统或复杂应用程序中的通信问题:
-
解耦: 将消息的发送者(生产者)和接收者(消费者)分离。生产者不需要知道消费者的存在或状态,只需将消息发送到 RabbitMQ。消费者可以在需要时处理消息。这提高了系统的灵活性和可维护性。
-
异步通信: 生产者发送消息后可以立即返回,无需等待消费者处理完成。消费者可以在自己方便的时候处理消息。这显著提高了系统的响应速度和吞吐量。
-
缓冲/削峰填谷: 当生产者发送消息的速度超过消费者处理的速度时,RabbitMQ 的队列可以充当缓冲区,暂存消息,避免系统因瞬时压力过载而崩溃。消费者可以按照自己的节奏处理积压的消息。
-
负载均衡: 可以启动多个消费者实例从同一个队列消费消息,RabbitMQ 会以轮询或其他方式将消息分发给这些消费者,实现工作负载的分配。
-
可靠性: RabbitMQ 提供了强大的机制(如消息持久化、发送方确认、消费者确认)来确保消息在传输和处理过程中不丢失,即使在部分系统故障的情况下。
-
灵活的路由: 通过 Exchange 和 Binding 机制,RabbitMQ 能够根据设定的规则(如路由键、主题匹配、头信息匹配等)将消息精确地投递到不同的队列。
-
核心概念与工作模型
-
理解 RabbitMQ 的关键是理解其核心组件和消息流:
-
Producer(生产者): 创建并发送消息的应用程序。
-
Message(消息): 包含要传输的数据(有效载荷)和一些元数据(如路由键、头信息等)。
-
Exchange(交换机): 生产者将消息发送到交换机。交换机相当于邮局的分拣中心,它根据消息的路由键(Routing Key) 和自身的类型以及绑定规则,决定将消息投递到哪些队列。常见的交换机类型有:
- Direct: 精确匹配路由键。
- Fanout: 广播到所有绑定的队列(忽略路由键)。
- Topic: 基于路由键模式匹配(通配符)。
- Headers: 基于消息头信息匹配。
-
Binding(绑定): 连接交换机和队列的规则。它告诉交换机“哪些消息应该被路由到哪个队列”。绑定通常包含一个路由键(对于 Direct 和 Topic 交换机)或头信息匹配规则(对于 Headers 交换机)。
-
Queue(队列): 消息存储的地方,等待消费者处理。队列是 FIFO(先进先出)的,但可以配置优先级。消息会一直存储在队列中,直到被消费者成功接收并确认。
-
Consumer(消费者): 连接到队列并接收消息进行处理的应用程序。消费者可以主动拉取消息,也可以让 RabbitMQ 在有消息时推送过来。
-
-
消息流简述:
生产者 -> (发送消息到) -> 交换机 -> (根据绑定规则路由到) -> 一个或多个队列 <- (消费者从队列) <- 消费消息
二、安装环境
1.安装 Erlang(必需依赖)
- RabbitMQ 基于 Erlang 编写,需先安装匹配版本:
- 访问 Erlang下载页
- 下载 Windows 安装包(选择与 RabbitMQ 兼容的版本)
- 运行安装程序(默认设置即可)
- 添加环境变量:找到系统环境变量→选中变量Path→编辑→添加Erlang安装bin目录(如:C:\Program Files\Erlang OTP\bin)
- 验证安装:打开命令提示符,输入 erl -version,显示版本号即成功。
2.安装 RabbitMQ
-
访问 RabbitMQ下载页
-
在 Windows Installer 部分下载 .exe 安装包
-
-
运行安装程序:安装路径保持默认(C:\Program Files\RabbitMQ Server)
-
添加环境变量::找到系统环境变量→选中变量Path→编辑→添加RabbitMQ安装sbin目录(如:C:\Program Files\RabbitMQ Server\rabbitmq_server-4.1.1\sbin)
-
保证 Cookie 文件同步:
- 复制 文件(C:\Windows\System32\config\systemprofile.erlang.cookie)中的Cookie 字符串。
- 同步Cookie 字符串到文件(C:\Users\雷家饭碗.erlang.cookie)中。
3.启动 RabbitMQ 服务
-
方法1:通过服务管理器启动
- 按 Win + R 输入 services.msc
- 找到 RabbitMQ 服务 → 右键启动
-
方法2:命令行启动
- 以管理员身份打开cmd窗口
- 输入命令启动RabbitMQ:net start RabbitMQ
- 查看RabbitMQ服务状态:sc query RabbitMQ
4.访问管理控制台
- 打开浏览器访问:RabbitMQ控制台
- 使用默认账号登录:
- 用户名: guest
- 密码: guest
三、创建生产者
1.新建生产者发送消息控制台
2.安装 NuGet 包:
- 安装命令:
Install-Package RabbitMQ.Client
3.生产者发送消息
- 代码示例:
using RabbitMQ.Client; using System.Text;var factory = new ConnectionFactory() {HostName = "localhost",UserName = "guest",Password = "guest" };using var connection = await factory.CreateConnectionAsync(); string exchangeName = "exchangeName1";// 创建初始化通道(用于声明交换机和队列) using (var initChannel = await connection.CreateChannelAsync()) {await initChannel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Direct,durable: true); }while (true) {using var channel = await connection.CreateChannelAsync();// 创建消息属性 - 7.1.2 正确方式var properties = new BasicProperties{Persistent = true, // 设置持久化ContentType = "text/plain",Headers = new Dictionary<string, object>{{ "timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }}};string message = DateTime.Now.TimeOfDay.ToString();var body = Encoding.UTF8.GetBytes(message);await channel.BasicPublishAsync(exchange: exchangeName,routingKey: "Key1",mandatory: false,basicProperties: properties, // 使用创建的消息属性body: body);Console.WriteLine($" [x] Sent {message}");await Task.Delay(1000); // 使用异步延迟 }
4.关键点:
- durable: true + Persistent: true 确保消息与服务重启后不丢失。
- 默认交换机(Key1)使用路由键直连队列。
四、创建消费者
1.创建消费者接收消息控制台
2.安装 NuGet 包:
- 安装命令:
Install-Package RabbitMQ.Client
3.消费者接收消息
- 代码示例:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using System.Text;var factory = new ConnectionFactory() {HostName = "localhost",UserName = "guest",Password = "guest",AutomaticRecoveryEnabled = true,NetworkRecoveryInterval = TimeSpan.FromSeconds(10) };Console.WriteLine("正在连接到 RabbitMQ..."); using var connection = await factory.CreateConnectionAsync(); using var channel = await connection.CreateChannelAsync();const string exchangeName = "exchangeName1"; const string queueName = "hello"; const string routingKey = "Key1";// 1. 声明交换机 await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Direct,durable: true ); QueueDeclareOk queueResult = null; try {// 2. 尝试声明队列(使用所需参数)queueResult=await channel.QueueDeclareAsync(queue: queueName,durable: true, // 保持您需要的持久化设置exclusive: false,autoDelete: false,arguments: null); } catch (OperationInterruptedException ex) when (ex.Message.Contains("inequivalent arg")) {// 3. 处理参数不匹配异常Console.WriteLine($"队列参数不匹配: {ex.Message}");// 删除现有队列并重新创建Console.WriteLine("正在删除现有队列...");await channel.QueueDeleteAsync(queueName);Console.WriteLine("重新创建队列...");queueResult = await channel.QueueDeclareAsync(queue: queueName,durable: true,exclusive: false,autoDelete: false,arguments: null); }// 4. 绑定队列到交换机 await channel.QueueBindAsync(queue: queueName,exchange: exchangeName,routingKey: routingKey );Console.WriteLine("队列信息:"); Console.WriteLine($" 名称: {queueResult.QueueName}"); Console.WriteLine($" 消息数: {queueResult.MessageCount}"); Console.WriteLine($" 消费者数: {queueResult.ConsumerCount}");// 设置服务质量(QoS)限制未确认消息数量 await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);// 创建异步消费者 var consumer = new AsyncEventingBasicConsumer(channel);// 消息接收事件处理 consumer.ReceivedAsync += async (model, ea) => {try{// 获取消息内容var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 获取消息属性var properties = ea.BasicProperties;var timestamp = properties.Timestamp.UnixTime;var timestampStr = DateTimeOffset.FromUnixTimeMilliseconds(timestamp).ToString("HH:mm:ss.fff");var headers = properties.Headers != null? string.Join(", ", properties.Headers.Select(kv => $"{kv.Key}={kv.Value}")): "无";Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 收到消息:");Console.WriteLine($" 内容: {message}");Console.WriteLine($" 原始时间戳: {timestampStr}");Console.WriteLine($" 处理延迟: {DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - timestamp} ms");Console.WriteLine($" 头部: {headers}");// 模拟业务处理Console.WriteLine("处理中...");await Task.Delay(1000); // 模拟处理耗时// 业务处理成功后手动确认await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);Console.WriteLine("√ 处理完成并确认");}catch (Exception ex){Console.WriteLine($"× 处理失败: {ex.Message}");// 处理失败:拒绝消息并重新入队await channel.BasicNackAsync(deliveryTag: ea.DeliveryTag,multiple: false,requeue: true // 重新入队以便重试);Console.WriteLine("! 消息已重新入队");} };// 启动消费者 await channel.BasicConsumeAsync(queue: queueName,autoAck: false, // 关闭自动确认consumer: consumer );Console.WriteLine(" [*] 等待消息中... 按 Ctrl+C 退出"); Console.WriteLine("--------------------------------------------------");// 保持程序运行直到收到退出信号 var exitEvent = new ManualResetEvent(false); Console.CancelKeyPress += (sender, eventArgs) => {eventArgs.Cancel = true; // 防止进程立即退出Console.WriteLine("\n正在关闭消费者...");exitEvent.Set(); // 通知主线程可以退出 };exitEvent.WaitOne(); // 阻塞直到收到退出信号
4.关键点:
- autoAck: false + BasicAck() 避免消息处理失败时丢失。
- 事件驱动模型(EventingBasicConsumer)适合实时处理。
五、主要优势
- 高可靠性: 通过持久化、确认机制保障消息不丢。
- 高可用性: 支持集群和镜像队列,避免单点故障。
- 灵活的路由: 多种交换器类型满足复杂路由需求。
- 可扩展性: 易于横向扩展(添加节点、消费者)以应对增长。
- 丰富的协议支持: 原生 AMQP 0-9-1,插件支持 MQTT, STOMP 等。
- 庞大的社区和生态系统: 开源、成熟、文档完善、客户端库丰富、插件众多。
- 管理界面: 提供易用的 Web UI 和命令行工具进行监控和管理。
六、最佳实践
场景 | 方案 | 关键配置 |
---|---|---|
消息持久化 | 队列声明 + 消息属性 | durable: true + Persistent: true |
消费者高可用 | 手动确认 + 异常重试 | autoAck: false + BasicNack |
流量控制 | 预取计数(QoS) | channel.BasicQos(prefetchCount: 1) |
延迟任务 | 死信队列 + TTL | x-message-ttl + x-dead-letter-exchange |
总结
RabbitMQ 是一个功能强大、稳定可靠、灵活且应用广泛的开源消息代理。它通过异步消息传递,有效地解决了分布式系统中的解耦、异步、削峰、负载均衡和可靠通信等关键问题。其基于标准的协议支持和丰富的特性,使其成为构建现代、可扩展、高可用应用程序的基石之一。