当前位置: 首页 > wzjs >正文

西安幼儿园网站制作网络营销公司哪家好

西安幼儿园网站制作,网络营销公司哪家好,轻量wordpress主题,哪家代运营公司比较好🎉 ABP vNext Redis Streams:构建实时事件驱动架构 🚀 利用 Redis Streams 作为轻量级、高性能的消息队列,结合 ABP 的领域事件和分布式事件总线,实现微服务间的实时异步通信。 🎯 目录 📚 &am…

🎉 ABP vNext + Redis Streams:构建实时事件驱动架构 🚀

利用 Redis Streams 作为轻量级、高性能的消息队列,结合 ABP 的领域事件和分布式事件总线,实现微服务间的实时异步通信。 🎯


目录 📚

  • 🎉 ABP vNext + Redis Streams:构建实时事件驱动架构 🚀
    • 背景与目标 🎯
    • 环境与依赖 🛠️
    • Redis Streams 概览 📑
    • 🏗️ 系统架构流程图
    • 在 ABP 中封装 IStreamProvider 📦
      • RedisStreamProvider 实现
    • 注册与配置 🔧
    • 使用 Consumer Group 实现横向 扩展 ↔️
    • 领域事件发布订阅最佳实践 📜
    • 异常重试与死信队列策略 🔄
    • 配置示例 🛠️
    • 可观察性 & 监控 📊
    • 测试策略 🧪


背景与目标 🎯

在微服务架构中,服务间的异步通信需满足高吞吐、低延迟,并支持横向扩展与容错。传统消息系统(如 Kafka)运维成本较高。

Redis Streams 作为 Redis 内置日志数据结构,具备轻量、持久、高性能的消息队列能力。本文演示如何在 ABP vNext 中:

  • 封装 IStreamProvider 连接 Redis Streams 🔗;
  • BackgroundService + Consumer Group 实现弹性横向扩展 ↔️;
  • 使用 Polly + DLQ 保障高可用 🔐;
  • 抽取配置并接入 health checks 与监控 📊。

环境与依赖 🛠️

# docker-compose.yml 示例
version: '3.8'
services:redis:image: redis:7ports: ["6379:6379"]volumes:- redis-data:/datacommand: ["redis-server","--save","","--appendonly","yes"]volumes:redis-data:
  • .NET 6+
  • ABP vNext 6.x
  • Redis 6.0+(Streams)
  • Docker & Docker Compose

Redis Streams 概览 📑

XADD           => 追加消息
XREADGROUP     => 读取 Consumer Group 消息
XACK           => 确认消费
XDEL           => 删除已确认消息
XINFO GROUPS   => 查询消费组
XCLAIM         => 转移待处理消息

Redis Streams 支持持久化、Consumer Group、消息按 ID 追踪。


🏗️ 系统架构流程图

XADD
XACK
Domain Service 发布事件
Redis Streams
Consumer Group
BackgroundService 读取
事件处理 Handler

在 ABP 中封装 IStreamProvider 📦

RedisStreamProvider 实现

// 类定义与依赖注入
public class RedisStreamProvider : IStreamProvider, ITransientDependency
{private readonly IConnectionMultiplexer _redis;private readonly JsonSerializerOptions _serializerOptions;private readonly ILogger<RedisStreamProvider> _logger;private readonly IServiceProvider _services;public RedisStreamProvider(IConnectionMultiplexer redis,JsonSerializerOptions serializerOptions,ILogger<RedisStreamProvider> logger,IServiceProvider services){_redis = redis;_serializerOptions = serializerOptions;_logger = logger;_services = services;}// PublishAsync 实现public async Task PublishAsync<T>(string streamName, T message){var db = _redis.GetDatabase();var json = JsonSerializer.Serialize(message, _serializerOptions);await db.StreamAddAsync(streamName, new NameValueEntry[] { new NameValueEntry("data", json) });}// Subscribe 由 BackgroundService 调用,不建议在此处使用 Task.Run
}

注册与配置 🔧

// Startup / Program.cs
builder.Services.AddStackExchangeRedisConnection(o => o.Configuration = "localhost:6379");
builder.Services.AddSingleton<IConnectionMultiplexer>(sp =>ConnectionMultiplexer.Connect(sp.GetRequiredService<RedisConfiguration>().ConnectionString));
builder.Services.AddSingleton(new JsonSerializerOptions
{PropertyNamingPolicy = JsonNamingPolicy.CamelCase,DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
});
builder.Services.AddTransient<IStreamProvider, RedisStreamProvider>();
// 注入配置
builder.Services.Configure<RedisStreamOptions>(configuration.GetSection("Redis:StreamSettings"));
// 注册 BackgroundService
builder.Services.AddHostedService<RedisStreamBackgroundService>();

使用 Consumer Group 实现横向 扩展 ↔️

# 幂等创建消费组
redis-cli --raw XINFO GROUPS mystream | grep -q 'mygroup' || redis-cli XGROUP CREATE mystream mygroup $ MKSTREAM
  • 多实例启动时,注入不同 consumerName,自动均摊消息;
  • 闲置消息可通过 XCLAIM 转移。

领域事件发布订阅最佳实践 📜

public class OrderCreatedEvent : EventData
{ public Guid OrderId { get; set; } }public class OrderDomainService : DomainService
{private readonly IStreamProvider _stream;public OrderDomainService(IStreamProvider stream) => _stream = stream;public async Task CreateOrderAsync(OrderDto dto){// 业务逻辑await _stream.PublishAsync("order-stream", new OrderCreatedEvent { OrderId = dto.Id });}
}

异常重试与死信队列策略 🔄

// 在 BackgroundService 中
var options = // IOptions<RedisStreamOptions>
var policy = Policy.Handle<Exception>().WaitAndRetryAsync(retryCount: options.Value.RetryCount,sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),onRetry: (ex, time) => _logger.LogWarning(ex, "Retry after {Delay}", time));await policy.ExecuteAsync(async () =>
{// 处理消息
});// DLQ 处理
await db.StreamAddAsync(options.Value.DlqStream,new NameValueEntry[] { /* data, metadata ... */ });

配置示例 🛠️

// appsettings.json
{"Redis": {"ConnectionString": "localhost:6379","StreamSettings": {"GroupName": "mygroup","DlqStream": "dlq-stream","RetryCount": 3,"BatchSize": 10}}
}

可观察性 & 监控 📊

Domain Service Redis Streams BackgroundService Event Handler XADD message XREADGROUP messages Process message XACK message Domain Service Redis Streams BackgroundService Event Handler
  • 健康检查:实现 IHealthCheck,检查 Redis 连接和消费组状态;
  • 指标打点:使用 OpenTelemetryPrometheus 收集:
    • 消息处理耗时
    • 失败率
    • Pending 队列长度

测试策略 🧪

  1. 单元测试:Mock IConnectionMultiplexerIDatabase,验证序列化、StreamAdd、重试逻辑;
  2. 集成测试:使用 Testcontainers Redis,验证端到端 Publish → Consume → Ack 流程;
  3. 模拟故障:断开 Redis 或注入异常,确保 DLQ 与重试策略生效。

http://www.dtcms.com/wzjs/309685.html

相关文章:

  • 网页网站的区别是什么微信seo排名优化软件
  • 浙江省建设厅继续教育网站优化网站页面
  • 服务专业的网站建站公司常见的网络营销方式有哪些
  • 漏惹网站做百度最贵关键词排名
  • 黑河做网站的上海seo
  • 北京市建设委员会网站资质办理百度指数支持数据下载吗
  • 邢台多地划为高风险区简述seo的应用范围
  • 专业模板网站制作服务如何免费自己创建网站
  • html生成惠州seo全网营销
  • 做网站珠海公司网络推广
  • 兴义做网站的东莞网站推广哪里找
  • 免费新闻网站建设关键词优化公司前十排名
  • 大业工业设计公司官网seo的搜索排名影响因素有哪些
  • 学术会议网站怎么做可以进入任何网站的浏览器
  • wordpress发文章设置文字大小抖音矩阵排名软件seo
  • ps切图做网站腾讯体育nba
  • 制作网站建设规划书的结构为资深seo顾问
  • 网站建设有趣名称信息流优化师面试常见问题
  • 怎么让百度多收录网站国家大事新闻近三天
  • 怎么评价一个网站设计做的好坏公司网站制作模板
  • 网站 验证码错误北京seo方法
  • .net网站开发过程电商网站开发
  • 衡阳做网站的公司360优化大师官方下载最新版
  • 河北邯郸网站制作今日热搜榜排名
  • 新疆生产建设兵团六师网站关键词都有哪些
  • 开办网站原因网络营销推广及优化方案
  • 什么是网络营销产品组合策略百度seo优
  • github做网站空间seo发包技术教程
  • 政府旅游网站建设seo页面内容优化
  • 东莞公司网站建设营销型网站建设免费推广