消息中间件RabbitMQ03:结合WebAPI实现点对点(P2P)推送和发布-订阅推送的Demo
一、模式的区别
- 点对点模式(P2P):开启多个消费者程序(绑定到同一个队列名称)连接RabbitMQ,这些消费者 不会消费到同样的消息 ,每个消息只会被其中一个消费者消费
- 发布/订阅(Publish/Subscribe)模式:开启多个消费者程序(绑定到同一个交换机名称)连接RabbitMQ,这些消费者会消费到同样的消息,类似于消息广播
- 路由模式(Route):和点对点差不多,点对点依靠队列名称入队,路由模式依靠交换机+路由键+队列名称入队(简单了解即可)
- 简单比喻,P2P就是你打电话给某一人告诉它八卦,Pub/Sub就是你播新闻八卦(人人皆知)。
【P2P】
【Pub/Sub】
模式 | 交换机类型 | 路由键 |
点对点 | 默认"" | 默认"" |
发布订阅 | Fanout,需要指定 | 默认"" |
路由模式 | Direct,需要指定 | 需要指定 routingKey |
二、点对点推送
1.文件夹结构
2.连接管理类
我的消息队列是本地的,默认端口5672,账密都是consumer
public static class RabbitMQConnectionManager
{private static readonly Lazy<Task<IConnection>> _connection = new Lazy<Task<IConnection>>(() =>{var factory = new ConnectionFactory(){HostName = "localhost",UserName = "consumer",Password = "consumer",};return factory.CreateConnectionAsync();});// 获取连接public static Task<IConnection> Connection => _connection.Value;// 获取通道public static async Task<IChannel> CreateChannelAsync(){var connection = await Connection;return await connection.CreateChannelAsync();}
}
3.生产者
public class RabbitMQProducerService : IRabbitMQProducerService
{private readonly string queueName = "hello";public async Task<bool> SendMessage(TimeEntity timeEntity){try{// 01 建立队列连接using var channel = await RabbitMQConnectionManager.CreateChannelAsync();// 02 声明队列(不存在则创建,确保消费前队列已就绪)await channel.QueueDeclareAsync(queue: queueName,durable: false,exclusive: false,autoDelete: false,arguments: null);// 03 发送内容到消息队列var message = JsonSerializer.Serialize(timeEntity);var body = Encoding.UTF8.GetBytes(message);await channel.BasicPublishAsync(exchange: "",routingKey: queueName,body: body);Console.WriteLine($"生产者发送了: {message}");return true;}catch (Exception ex){return false;}}
}
4.消费者
ublic class RabbitMQConsumerService : IRabbitMQConsumerService{private readonly string queueName = "hello";public async Task StartConsuming(){try{// 01 建立队列连接using var channel = await RabbitMQConnectionManager.CreateChannelAsync();// 02 消费的逻辑方法var consumer = new AsyncEventingBasicConsumer(channel);ConsumeMethod(channel, consumer);// 03 启动消费(禁用自动确认,需在消费逻辑ConsumeMethod中手动确认消息)await channel.BasicConsumeAsync(queue: queueName,autoAck: false,//禁止使用消息自动确认consumer: consumer);Console.WriteLine("点击退出");Console.ReadLine();}catch (Exception ex){Console.WriteLine( ex.Message );}}/// <summary>/// 消费方法/// </summary>private static void ConsumeMethod(IChannel channel, AsyncEventingBasicConsumer consumer){consumer.ReceivedAsync += async (model, ea) =>{// 01 获取入参var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 反序列化消息var timeEntity = JsonSerializer.Deserialize<TimeEntity>(message);// 02 根据入参执行一定逻辑int delayMilliseconds =timeEntity.Hour * 3600000 + // 小时 → 毫秒(1小时=3600×1000毫秒)timeEntity.Minute * 60000 + // 分钟 → 毫秒(1分钟=60×1000毫秒)timeEntity.Second * 1000; // 秒 → 毫秒var guid = Guid.NewGuid();Console.WriteLine($"【{guid}】开始时间:现在是{DateTime.Now},将在 {timeEntity.Hour}时{timeEntity.Minute}分{timeEntity.Second}秒后打印消息...");await Task.Delay(delayMilliseconds); // 关键:异步延迟,不阻塞线程Console.WriteLine($"【{guid}】结束时间: {DateTime.Now}");Console.WriteLine($"===============================");// 03 手动确认消息await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);};}}
5.服务的注册
public class Startup
{public Startup(IConfiguration configuration){Configuration = configuration;}public IConfiguration Configuration { get; }public void ConfigureServices(IServiceCollection services){//省略其他代码// 注册 RabbitMQ 服务services.AddSingleton<IRabbitMQConsumerService, RabbitMQConsumerService>();services.AddSingleton<IRabbitMQProducerService, RabbitMQProducerService>();}public void Configure(IApplicationBuilder app, IWebHostEnvironment env){//省略其他代码// 启动消费者var consumerService = app.ApplicationServices.GetRequiredService<IRabbitMQConsumerService>();Task.Run(() => consumerService.StartConsuming());app.UseRouting();app.UseAuthorization();app.UseEndpoints(endpoints =>{endpoints.MapControllers();});}
}
6.接口调用生产者
public class TimeEntity
{public int Hour { get; set; }public int Minute { get; set; }public int Second { get; set; } = 10;
}
/// <summary>
/// 调用生产者发送消息
/// </summary>
[HttpPost]
[Route(nameof(RabbitMQSend))]
public async Task<string> RabbitMQSend(TimeEntity input)
{try{var r = await _rabbitMQProducerService.SendMessage(input);//依赖注入IRabbitMQProducerService调用即可return r ? "发送成功" : "发送失败";}catch (Exception ex){return "发生错误";}
}
7.调试及结论
调试及结论:
1.开启一个WebAPI连接消息队列名为hello
2.开启一个控制台程序连接消息队列名为hello
3.四条信息入队,分别是ABCD,结果:WebAPI消费了A、C,控制台程序消费了B、D
4.结论:点对点模式(P2P) 场景下,消息会在消费者之间公平分配,每个消费者只处理自己获得的消息
三、发布订阅模式
1.文件夹结构
同上
2.连接管理类
同上
3.生产者
public class RabbitMQProducerService : IRabbitMQProducerService
{private readonly string exchangeName = "myexchange";public async Task<bool> SendMessage(TimeEntity timeEntity){try{// 01 建立队列连接using var channel = await RabbitMQConnectionManager.CreateChannelAsync();// 02 声明交换机(fanout类型,适用于发布-订阅模式)await channel.ExchangeDeclareAsync(exchange:exchangeName,type:ExchangeType.Fanout);// 03 发送内容到交换机(不指定routingKey,fanout交换机会将消息广播到所有绑定的队列)var message = JsonSerializer.Serialize(timeEntity);var body = Encoding.UTF8.GetBytes(message);await channel.BasicPublishAsync(exchange: exchangeName,routingKey: "",body: body);Console.WriteLine($"生产者发送了: {message}");return true;}catch (Exception ex){return false;}}
}
4.消费者
public class RabbitMQConsumerService : IRabbitMQConsumerService
{private readonly string exchangeName = "myexchange";public async Task StartConsuming(){try{// 01 建立队列连接using var channel = await RabbitMQConnectionManager.CreateChannelAsync();// 02 声明交换机(必须与生产者相同)await channel.ExchangeDeclareAsync(exchange: exchangeName, type: ExchangeType.Fanout);// 03 声明一个独立的临时队列,用于接收消息(断开链接后自动删除)var queueName = await channel.QueueDeclareAsync();// 04 绑定队列到交换机await channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: "");// 05 消费的逻辑方法var consumer = new AsyncEventingBasicConsumer(channel);ConsumeMethod(channel, consumer);// 06 启动消费(禁用自动确认,需在消费逻辑ConsumeMethod中手动确认消息)await channel.BasicConsumeAsync(queue: queueName,autoAck: false,//禁止使用消息自动确认consumer: consumer);Console.WriteLine("点击退出");Console.ReadLine();}catch (Exception ex){Console.WriteLine( ex.Message );}}/// <summary>/// 消费方法/// </summary>private static void ConsumeMethod(IChannel channel, AsyncEventingBasicConsumer consumer){consumer.ReceivedAsync += async (model, ea) =>{// 01 获取入参var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 反序列化消息var timeEntity = JsonSerializer.Deserialize<TimeEntity>(message);// 02 根据入参执行一定逻辑int delayMilliseconds =timeEntity.Hour * 3600000 + // 小时 → 毫秒(1小时=3600×1000毫秒)timeEntity.Minute * 60000 + // 分钟 → 毫秒(1分钟=60×1000毫秒)timeEntity.Second * 1000; // 秒 → 毫秒var guid = Guid.NewGuid();Console.WriteLine($"【{guid}】开始时间:现在是{DateTime.Now},将在 {timeEntity.Hour}时{timeEntity.Minute}分{timeEntity.Second}秒后打印消息...");await Task.Delay(delayMilliseconds); // 关键:异步延迟,不阻塞线程Console.WriteLine($"【{guid}】结束时间: {DateTime.Now}");Console.WriteLine($"===============================");// 03 手动确认消息await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);};}
}
5.服务的注册
同上
6.接口调用生产者
同上
7.调试及结论
调试及结论:
1.开启一个WebAPI连接交换机名为exchangeName
2.开启一个控制台程序连接交换机名为exchangeName
3.两条信息入队,分别是AB,结果:WebAPI消费了A、B,控制台程序消费了A、B
4.结论:发布-订阅(Pub/Sub) 场景下,交换机会将所有消息广播到所有绑定的队列
四、路由模式
1.生产者
public class RabbitMQProducerService : IRabbitMQProducerService
{private readonly string exchangeName = "mydirect";private readonly string routingKey = "myroot";public async Task<bool> SendMessage(TimeEntity timeEntity){try{// 01 建立队列连接using var channel = await RabbitMQConnectionManager.CreateChannelAsync();// 02 声明交换机(Direct类型,适用于路由器模式)await channel.ExchangeDeclareAsync(exchange:exchangeName,type:ExchangeType.Direct);// 03 发送内容到交换机(不指定routingKey,fanout交换机会将消息广播到所有绑定的队列)var message = JsonSerializer.Serialize(timeEntity);var body = Encoding.UTF8.GetBytes(message);await channel.BasicPublishAsync(exchange: exchangeName,routingKey: routingKey,body: body);Console.WriteLine($"生产者发送了: {message}");return true;}catch (Exception ex){return false;}}
}
2.消费者
public class RabbitMQConsumerService : IRabbitMQConsumerService
{private readonly string exchangeName = "mydirect";private readonly string routingKey = "myroot";private readonly string qName = "myqname";public async Task StartConsuming(){try{// 01 建立队列连接using var channel = await RabbitMQConnectionManager.CreateChannelAsync();// 02 声明交换机(Direct类型,适用于路由模式,必须与生产者相同)await channel.ExchangeDeclareAsync(exchange: exchangeName, type: ExchangeType.Direct);// 03 声明一个独立的队列,用于接收消息var queueName = await channel.QueueDeclareAsync(qName,false,false);// 04 绑定队列到交换机await channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: routingKey);// 05 消费的逻辑方法var consumer = new AsyncEventingBasicConsumer(channel);ConsumeMethod(channel, consumer);// 06 启动消费(禁用自动确认,需在消费逻辑ConsumeMethod中手动确认消息)await channel.BasicConsumeAsync(queue: queueName,autoAck: false,//禁止使用消息自动确认consumer: consumer);Console.WriteLine("点击退出");Console.ReadLine();}catch (Exception ex){Console.WriteLine( ex.Message );}}/// <summary>/// 消费方法/// </summary>private static void ConsumeMethod(IChannel channel, AsyncEventingBasicConsumer consumer){consumer.ReceivedAsync += async (model, ea) =>{// 01 获取入参var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 反序列化消息var timeEntity = JsonSerializer.Deserialize<TimeEntity>(message);// 02 根据入参执行一定逻辑int delayMilliseconds =timeEntity.Hour * 3600000 + // 小时 → 毫秒(1小时=3600×1000毫秒)timeEntity.Minute * 60000 + // 分钟 → 毫秒(1分钟=60×1000毫秒)timeEntity.Second * 1000; // 秒 → 毫秒var guid = Guid.NewGuid();Console.WriteLine($"【{guid}】开始时间:现在是{DateTime.Now},将在 {timeEntity.Hour}时{timeEntity.Minute}分{timeEntity.Second}秒后打印消息...");await Task.Delay(delayMilliseconds); // 关键:异步延迟,不阻塞线程Console.WriteLine($"【{guid}】结束时间: {DateTime.Now}");Console.WriteLine($"===============================");// 03 手动确认消息await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);};}
}