Masstransit(一)
Program.cs
//注册MassTransit
var rabbitmqInfo = AppSettings.Get<RabbitMqConfig>("RabbitMq");
builder.Services.AddConsumerListener();model
using System;namespace ZR.Common.Model
{public class SubmitOrder{public Guid OrderId { get; set; }public decimal Amount { get; set; }public string username { get; set; }public int userId { get; set; }}
}ConsumerListenerExtensions.cs
using MassTransit;
public static class ConsumerListenerExtensions
{public static IServiceCollection AddConsumerListener(this IServiceCollection service){service.AddMassTransit(cfg =>{cfg.AddConsumer<OrderConsumer>();cfg.UsingRabbitMq((ctx, rb) =>{// 配置 RabbitMQ 连接rb.Host("localhost", h =>{h.Username("pony");h.Password("123456");});rb.UseDelayedMessageScheduler();ListenPriority(rb, ctx);});});return service;}private static void ListenPriority(IRabbitMqBusFactoryConfigurator cfg, IBusRegistrationContext ctx){cfg.ReceiveEndpoint("priority-orders", e =>{e.ConfigureConsumeTopology = false;e.Consumer<OrderConsumer>(ctx);e.Bind("submitorder", s =>{s.ExchangeType = "fanout";s.RoutingKey = "PRIORITY";});});}}发布消息 CommunityUserController
// <summary>
// 测试masstransit发送消息
// </summary>
/// <returns></returns>
[HttpGet("testRabbitMq")]
[Log(Title = "测试rabbitmq", BusinessType = BusinessType.EXPORT, IsSaveResponseData = false)]
[AllowAnonymous]
public async Task<IActionResult> TestRabbitMq()
{var message = new SubmitOrder{OrderId = Guid.NewGuid(),Amount = 100.0m,username = "河汉清",userId = 4};//await _bus.Publish(message);var endpoint = await _bus.GetSendEndpoint(new Uri("rabbitmq://localhost/submitorder"));await endpoint.Send(message);return SUCCESS("Message published successfully");
}消费者 OrderConsumer
using MassTransit;
using ZR.Common.Model;public class OrderConsumer : IConsumer<SubmitOrder>
{private readonly ISysUserService _sysUser;public OrderConsumer(ISysUserService sysUser){_sysUser = sysUser;}public async Task Consume(ConsumeContext<SubmitOrder> context){// 在这里处理消息Console.WriteLine($"Order ID: {context.Message.OrderId}");var user = await _sysUser.GetFirstAsync(x => x.UserId == context.Message.userId);if(user == null){Console.WriteLine("没消费");return;}user.UserName = "这是我的第一次消费消息";await _sysUser.DeleteAsync(user);Console.WriteLine(user + "消费了");}
}