当前位置: 首页 > news >正文

ABP VNext + NATS JetStream:高性能事件流处理

🌟 ABP VNext + NATS JetStream:高性能事件流处理 🚀


📚 目录

  • 🌟 ABP VNext + NATS JetStream:高性能事件流处理 🚀
    • 1. 引言 ✨
    • 2. 环境与依赖 🛠️
    • 3. 系统架构 🏗️
    • 4. 配置与依赖注入 🏷️
      • 4.1 `appsettings.json`
      • 4.2 模块注册
    • 5. 发布消息 📤
    • 6. 消费消息 📥
      • 6.1 Push-Consumer(Queue Group) 🤝
      • 6.2 Pull-Consumer(可控拉取) 🔄
    • 7. 死信队列消费示例 💀
    • 8. 集成测试示例(Testcontainers) 🧪
    • 9. 性能测试与对比 📊
    • 10. 实践与注意事项 💡


1. 引言 ✨

ABP VNext 8.x + .NET 8 中集成 NATS.Client v1 JetStream,构建一条“发布 → 推送/拉取 → 死信”全流程的低延迟高可靠可回溯事件流系统。


2. 环境与依赖 🛠️

  • .NET 8
  • ABP VNext 8.x
  • NATS.Server ≥ 2.2(推荐 ≥ 2.9)
dotnet add package NATS.Client            # 核心 NATS 客户端 v1
dotnet add package NATS.Client.JetStream  # JetStream 扩展 v1
dotnet add package Volo.Abp.EventBus      # 可选,ABP 事件总线
dotnet add package AspNetCore.HealthChecks.Nats # NATS 健康检查

⚙️ 启动本地 NATS Server(JetStream 模式):

nats-server --jetstream --store_dir ./data

确保 ./data 目录具有读写权限,否则流无法持久化。


3. 系统架构 🏗️

NATS
Producer
Publish
Consumers
DeadLetterService Handler
Stream: ORDERS
BillingService Handler
AnalyticsService Handler
Stream: ORDERS_DLQ
JetStream
OrderAppService
  • Producer:同步发布,获取 PublishAck

  • Stream:按主题存储消息,支持回溯与限流 ⏳

  • Consumers

    • Push(Queue Group)模式,自动负载均衡 🔄
    • Pull(Durable)模式,可控拉取 🔧
    • Dead-letter 流,处理重试失败消息 💀

4. 配置与依赖注入 🏷️

4.1 appsettings.json

{"Nats": {"Url": "nats://localhost:4222","ConnectionName": "MyAppNats"}
}

4.2 模块注册

public class MyNatsModule : AbpModule
{public override void ConfigureServices(ServiceConfigurationContext context){var configuration = context.Services.GetConfiguration();// 📝 绑定 NatsOptions,并支持运行时刷新context.Services.Configure<NatsOptions>(configuration.GetSection("Nats"));context.Services.AddOptions<NatsOptions>().BindConfiguration("Nats").ValidateDataAnnotations();// 🔌 注入 IConnection(Singleton)context.Services.AddSingleton<IConnection>(sp =>{var opts = sp.GetRequiredService<IOptionsMonitor<NatsOptions>>().CurrentValue;var cf = new ConnectionFactory();var connOpts = ConnectionFactory.GetDefaultOptions();connOpts.Url  = opts.Url;connOpts.Name = opts.ConnectionName;connOpts.ReconnectHandler += (_, __) => Console.WriteLine("🔄 NATS reconnecting...");connOpts.ClosedHandler    += (_, __) => Console.WriteLine("🔒 NATS closed.");return cf.CreateConnection(connOpts);});// 💬 注入 JetStream 发布/订阅上下文context.Services.AddSingleton<IJetStream>(sp =>sp.GetRequiredService<IConnection>().CreateJetStreamContext());// 🛠️ 注入 JetStream 管理上下文context.Services.AddSingleton<IJetStreamManagement>(sp =>sp.GetRequiredService<IConnection>().CreateJetStreamManagementContext());// 📊 注册 NATS 健康检查context.Services.AddHealthChecks().AddNats(options =>{options.ConnectionFactory = sp =>sp.GetRequiredService<IConnection>();}, name: "nats-jetstream");}public override void OnApplicationInitialization(ApplicationInitializationContext ctx){var jsm = ctx.ServiceProvider.GetRequiredService<IJetStreamManagement>();// 1️⃣ 创建 ORDERS Stream(幂等)jsm.AddStream(new StreamConfiguration{Name         = "ORDERS",Subjects     = new[] { "orders.*" },StorageType  = StorageType.File,Retention    = RetentionPolicy.Limits,MaxMsgs      = 1_000_000,MaxConsumers = 20});// 2️⃣ Billing Push Consumer (Queue Group + DLQ)var billingCfg = ConsumerConfiguration.Builder().WithDurable("billing-durable").WithFilterSubject("orders.created").WithAckPolicy(AckPolicy.Explicit).WithMaxDeliver(5).WithBackOff(new[] { TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(30) }).WithDeliverSubject("ORDERS.DLQ.billing")  // DLQ 投递主题.Build();jsm.AddOrUpdateConsumer("ORDERS", billingCfg);// 3️⃣ Analytics Pull Consumer (回溯全部)var analyticsCfg = ConsumerConfiguration.Builder().WithDurable("analytics-durable").WithFilterSubject("orders.created").WithAckPolicy(AckPolicy.Explicit).WithDeliverPolicy(DeliverPolicy.All).Build();jsm.AddOrUpdateConsumer("ORDERS", analyticsCfg);// 4️⃣ Dead-letter Streamjsm.AddStream(new StreamConfiguration{Name        = "ORDERS_DLQ",Subjects    = new[] { "ORDERS.DLQ.*" },StorageType = StorageType.File});}public override async Task OnApplicationShutdownAsync(ApplicationShutdownContext ctx){// 🔌 优雅关闭 NATS 连接var conn = ctx.ServiceProvider.GetRequiredService<IConnection>();await conn.DrainAsync();   // 等待未完成的消息处理conn.Close();conn.Dispose();}
}

5. 发布消息 📤

public class OrderCreated
{public Guid   OrderId    { get; set; }public decimal Amount    { get; set; }public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}public class OrderAppService : ApplicationService
{private readonly IJetStream _jetStream;public OrderAppService(IJetStream jetStream) => _jetStream = jetStream;public Task CreateOrderAsync(CreateOrderInput input){// 1️⃣ 业务落库(略)// 2️⃣ 同步发布并捕获异常var evt  = new OrderCreated { OrderId = Guid.NewGuid(), Amount = input.Amount };var data = JsonSerializer.SerializeToUtf8Bytes(evt);try{_jetStream.Publish("orders.created", data);}catch (JetStreamApiException ex){Logger.LogError(ex, "❌ NATS publish failed");throw;}return Task.CompletedTask;}
}

6. 消费消息 📥

6.1 Push-Consumer(Queue Group) 🤝

public class BillingService : ITransientDependency
{public BillingService(IJetStream js){js.SubscribeAsync(subject: "orders.created",queue:   "billing-queue",msgHandler: async msg =>{try{var evt = JsonSerializer.Deserialize<OrderCreated>(msg.Data)!;await HandleAsync(evt);msg.Ack();}catch (Exception ex){Logger.LogError(ex, "⚠️ Billing handler failed");// 不 Ack → 根据 BackOff/MaxDeliver 重试或送入 DLQ}});}private Task HandleAsync(OrderCreated evt){// 账单处理逻辑return Task.CompletedTask;}
}

6.2 Pull-Consumer(可控拉取) 🔄

public class AnalyticsService : BackgroundService
{private readonly IJetStream _js;public AnalyticsService(IJetStream js) => _js = js;protected override Task ExecuteAsync(CancellationToken stoppingToken){var sub = _js.PullSubscribe("orders.created", "analytics-durable");return Task.Run(() =>{while (!stoppingToken.IsCancellationRequested){var msgs = sub.Fetch(50, TimeSpan.FromSeconds(1));foreach (var m in msgs){var evt = JsonSerializer.Deserialize<OrderCreated>(m.Data)!;// 分析写库(略)m.Ack();}}}, stoppingToken);}
}

7. 死信队列消费示例 💀

public class DeadLetterService : ITransientDependency
{public DeadLetterService(IJetStream js){js.SubscribeAsync("ORDERS.DLQ.billing", msg =>{var deadEvt = JsonSerializer.Deserialize<OrderCreated>(msg.Data)!;Logger.LogWarning("🚨 DLQ received for OrderId {OrderId}", deadEvt.OrderId);// 执行人工补偿或报警msg.Ack();});}
}

8. 集成测试示例(Testcontainers) 🧪

public class NatsJetStreamTests : IAsyncLifetime
{private NatsContainer _nats;private IConnection   _conn;private IJetStreamManagement _jsm;public async Task InitializeAsync(){// 启动 NATS 容器并开启 JetStream_nats = new TestcontainersBuilder<NatsContainer>().WithImage("nats:latest").WithJetStream(true).WithPortBinding(4222, 4222).WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(4222)).Build();await _nats.StartAsync();_conn = new ConnectionFactory().CreateConnection($"nats://localhost:4222");_jsm = _conn.CreateJetStreamManagementContext();// 幂等创建测试 Stream_jsm.AddStream(new StreamConfiguration{Name     = "TEST",Subjects = new[] { "test.*" }});}public async Task DisposeAsync(){await _conn.DrainAsync();_conn.Close();_nats.Dispose();}[Fact]public void PublishAndConsume_Test(){var js = _conn.CreateJetStreamContext();js.Publish("test.foo", Encoding.UTF8.GetBytes("hello"));var sub  = js.PullSubscribe("test.foo", "durable");var msgs = sub.Fetch(1, TimeSpan.FromSeconds(1));Assert.Single(msgs);Assert.Equal("hello", Encoding.UTF8.GetString(msgs[0].Data));msgs[0].Ack();}
}

9. 性能测试与对比 📊

[SimpleJob(RuntimeMoniker.NetCoreApp80)]
public class NatsBenchmark
{private IJetStream _js;[GlobalSetup]public void Setup(){var conn = new ConnectionFactory().CreateConnection("nats://localhost:4222");_js = conn.CreateJetStreamContext();}[Benchmark(Description = "Publish 100k messages sync")]public void Publish100k(){var data = new byte[256];for (int i = 0; i < 100_000; i++){_js.Publish("orders.created", data);}}
}
测试环境平均延迟 (ms)吞吐 (msg/s)
NATS JetStream(单节点,2 核 4GB)2.148 000
RabbitMQ(同配置)6.516 000
Kafka(同配置)4.035 000

说明:以上数据为本地单节点测试,仅供参考,实际场景请根据硬件/网络配置自行 Benchmark。


10. 实践与注意事项 💡

  • 客户端库统一:统一使用 NATS.Client v1,避免 v2 API 混用
  • 错误处理Publish 捕获 JetStreamApiException,管理操作捕获 JetStreamApiExceptionIOException
  • 资源管理await conn.DrainAsync()Close()Dispose()
  • 管理 API 异步化:可使用 AddStreamAsync / CreateOrUpdateConsumerAsync 优化启动性能
  • 队列组:Push 模式下使用 Queue Group 实现水平扩缩容
  • 消息幂等:基于 OrderId 或业务唯一键去重
  • 监控与回溯:定期调用 jsm.StreamInfojsm.GetConsumerInfo 上报 Prometheus/Grafana
  • 性能数据声明:附上测试环境说明,避免误导

http://www.dtcms.com/a/311411.html

相关文章:

  • FPGA kernel 仿真器调试环境搭建
  • 分类任务当中常见指标 F1分数、recall、准确率分别是什么含义
  • 「iOS」————SideTable
  • 基于Dockerfile 部署一个 Flask 应用
  • WAIC引爆AI,智元机器人收购上纬新材,Geek+上市,157起融资撑起热度|2025年7月人工智能投融资观察 · 极新月报
  • 【传奇开心果系列】Flet框架流式输出和实时滚动页面的智能聊天机器人自定义模板
  • github在界面创建tag
  • 性能测试-性能测试中的经典面试题二
  • 超级人工智能+无人机操控系统,振兴乡村经济的加速器,(申请专利应用),严禁抄袭!
  • spring-ai-alibaba 学习(十九)——graph之条件边、并行节点、子图节点
  • linux编译基础知识-库文件标准路径
  • Docker 的网络模式
  • 3 使用 Jenkins 构建镜像:将你的应用打包成镜像
  • 【20min 急速入门】使用Demucs进行音轨分离
  • ffmpeg命令和ffplay命令详解
  • Java高性能编程实践指南
  • ARM Cortex-M异常处理高级特性详解
  • OpenCV 全解读:核心、源码结构与图像/视频渲染能力深度对比
  • [硬件电路-121]:模拟电路 - 信号处理电路 - 模拟电路中常见的难题
  • 网络编程之原始套接字
  • Anthropic:跨越生产效能拐点的AI增长飞轮
  • [硬件电路-123]:模拟电路 - 信号处理电路 - 常见的高速运放芯片、典型电路、电路实施注意事项
  • 淘宝小程序的坑
  • 阿里云部署微调chatglm3
  • 音视频学习(四十七):模数转换
  • 文心4.5开源测评:国产大模型的轻量化革命与全栈突破
  • Unity_数据持久化_C#处理XML文件
  • Ubuntu18网络连接不上也ping不通网络配置问题排查与解决方法
  • Pyspark的register方法自定义udf函数
  • Android13文件管理USB音乐无专辑图片显示的是同目录其他图片