当前位置: 首页 > wzjs >正文

网站对联广告图片济宁建筑人才网

网站对联广告图片,济宁建筑人才网,个人网站首页布局图,重庆网站建设推广服务ABP VNext Cosmos DB Change Feed:搭建实时数据变更流服务 🚀 📚 目录 ABP VNext Cosmos DB Change Feed:搭建实时数据变更流服务 🚀TL;DR ✨🚀 1. 环境与依赖 🏗️2. 服务注册与依赖注入 &am…

ABP VNext + Cosmos DB Change Feed:搭建实时数据变更流服务 🚀


📚 目录

  • ABP VNext + Cosmos DB Change Feed:搭建实时数据变更流服务 🚀
      • TL;DR ✨🚀
    • 1. 环境与依赖 🏗️
    • 2. 服务注册与依赖注入 🔌
    • 3. 封装 Change Feed 为 IHostedService 🔧
      • 3.1 HostedService 生命周期流程图
      • 3.2 `ChangeFeedHostedService` 实现
    • 4. 事务与幂等 🛡️
    • 5. 发布到事件总线 📡
      • MassTransit 示例
    • 6. 容错与监控 🛠️📊
    • 7. 横向扩展 🌐
    • 参考文档 📖


TL;DR ✨🚀

  • 全托管 DI:CosmosClient 由容器单例管理,HostedService 构造注入,优雅释放。
  • 作用域与事务:回调内创建新 Scope,结合 IUnitOfWorkManager 实现事务一致性🛡️。
  • Exactly-Once:通过(DocumentId, ETag)唯一索引 + 手动 Checkpoint,确保不漏不重✅。
  • 容错重试:Polly 指数退避重试与熔断,处理启动与回调中的网络抖动🔄。
  • 监控可扩展:日志、指标、Dead-Letter 容错,中控告警 + 多实例自动分片,助力弹性伸缩📊。

1. 环境与依赖 🏗️

  • .NET 平台:.NET 6 + / ABP VNext 6.x

  • Azure 资源:Cosmos DB Core API(Source 容器 + Lease 容器)

  • 主要 NuGet 包

    dotnet add package Microsoft.Azure.Cosmos
    dotnet add package Volo.Abp.EventBus.MassTransit
    dotnet add package Streamiz.Kafka.Net.Stream        # 可选
    dotnet add package Volo.Abp.EntityFrameworkCore
    dotnet add package Polly
    
  • appsettings.json 配置

    {"Cosmos": {"ConnectionString": "<your-connection-string>","Database": "MyAppDb","SourceContainer": "Docs","LeaseContainer": "Leases"},"RabbitMq": { "Host": "rabbitmq://localhost" },"Kafka":   { "BootstrapServers": "localhost:9092" }
    }
    

2. 服务注册与依赖注入 🔌

MyAppModuleConfigureServices 中:

public override void ConfigureServices(ServiceConfigurationContext context)
{var configuration = context.Services.GetConfiguration();// CosmosClient 单例托管context.Services.AddSingleton(sp =>new CosmosClient(configuration["Cosmos:ConnectionString"]));// Polly 重试策略:3 次指数退避context.Services.AddSingleton(sp => Policy.Handle<Exception>().WaitAndRetryAsync(retryCount: 3,sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),onRetry: (ex, ts, retryCount, ctx) =>{var logger = sp.GetRequiredService<ILogger<ChangeFeedHostedService>>();logger.LogWarning(ex, "⚠️ ChangeFeed 启动重试,第 {RetryCount} 次", retryCount);}));// 注册 HostedServicecontext.Services.AddHostedService<ChangeFeedHostedService>();
}

💡 Tip:将 Cosmos、RabbitMQ、Kafka 等配置抽象到 SettingDefinition,支持动态变更。


3. 封装 Change Feed 为 IHostedService 🔧

3.1 HostedService 生命周期流程图

应用启动
DI 容器构建
触发 IHostedService.StartAsync
启动 ChangeFeedProcessor
监听文档变更
HandleChangesAsync 回调
发布事件 & 写审计 & Checkpoint
准备下一批

⚠️ “触发 StartAsync”更准确地反映了 ASP.NET Core Host 的启动流程。

3.2 ChangeFeedHostedService 实现

public class ChangeFeedHostedService : IHostedService, IDisposable
{private readonly CosmosClient _cosmosClient;private readonly IConfiguration _config;private readonly ILogger<ChangeFeedHostedService> _logger;private readonly IAsyncPolicy _retryPolicy;private readonly IServiceProvider _serviceProvider;private ChangeFeedProcessor _processor;public ChangeFeedHostedService(CosmosClient cosmosClient,IConfiguration config,ILogger<ChangeFeedHostedService> logger,IAsyncPolicy retryPolicy,IServiceProvider serviceProvider){_cosmosClient    = cosmosClient;_config          = config;_logger          = logger;_retryPolicy     = retryPolicy;_serviceProvider = serviceProvider;}public async Task StartAsync(CancellationToken ct){await _retryPolicy.ExecuteAsync(async () =>{_logger.LogInformation("🔄 ChangeFeedHostedService 正在启动...");var dbName = _config["Cosmos:Database"];var src    = _cosmosClient.GetContainer(dbName, _config["Cosmos:SourceContainer"]);var lease  = _cosmosClient.GetContainer(dbName, _config["Cosmos:LeaseContainer"]);_processor = src.GetChangeFeedProcessorBuilder<MyDocument>("abp-processor", HandleChangesAsync).WithInstanceName(Environment.MachineName).WithLeaseContainer(lease).WithStartTime(DateTime.MinValue.ToUniversalTime()).Build();await _processor.StartAsync(ct);_logger.LogInformation("✅ ChangeFeedProcessor 已启动");});}public async Task StopAsync(CancellationToken ct){if (_processor != null){_logger.LogInformation("🛑 ChangeFeedProcessor 正在停止...");await _processor.StopAsync(ct);_logger.LogInformation("✅ ChangeFeedProcessor 已停止");}}public void Dispose() => _processor = null;private async Task HandleChangesAsync(IReadOnlyCollection<MyDocument> docs,CancellationToken ct){if (docs == null || docs.Count == 0) return;_logger.LogInformation("📥 收到 {Count} 条文档变更", docs.Count);// 创建新的 DI Scopeusing var scope = _serviceProvider.CreateScope();var uowManager = scope.ServiceProvider.GetRequiredService<IUnitOfWorkManager>();var eventBus   = scope.ServiceProvider.GetRequiredService<IDistributedEventBus>();var auditRepo  = scope.ServiceProvider.GetRequiredService<IRepository<AuditEntry, Guid>>();// 开始事务using var uow = await uowManager.BeginAsync();foreach (var doc in docs){try{// 发布领域事件await eventBus.PublishAsync(new DocumentChangedEvent(doc.Id, doc), ct);// 审计写入,唯一索引保证幂等var entry = new AuditEntry{DocumentId = doc.Id,ETag       = doc.ETag,Operation  = doc.Operation,Timestamp  = DateTime.UtcNow,Payload    = JsonConvert.SerializeObject(doc)};await auditRepo.InsertAsync(entry, autoSave: true);}catch (DbUpdateException dbEx)when (dbEx.InnerException?.Message.Contains("UNIQUE") ?? false){_logger.LogWarning("⚠️ 文档 {DocumentId}@{ETag} 唯一索引冲突,跳过", doc.Id, doc.ETag);}catch (Exception ex){_logger.LogError(ex, "🔥 写审计失败,写入 Dead-Letter 容器");await WriteToDeadLetterAsync(doc, ex, ct);// 回滚本次事务await uow.RollbackAsync();// 跳过到下一文档continue;}}// 提交事务await uow.CompleteAsync();// 手动 Checkpointawait _processor.CheckpointAsync(ct);_logger.LogInformation("🗸 Checkpoint 完成,位置已记录");}private Task WriteToDeadLetterAsync(MyDocument doc, Exception ex, CancellationToken ct){// TODO: 实现将失败批次写入 Dead-Letter 容器或队列,用于离线补偿return Task.CompletedTask;}
}

4. 事务与幂等 🛡️

HandleChangesAsync
IUnitOfWorkManager.Begin
Publish Event & Insert Audit
异常?
写入 Dead-Letter
Rollback UoW
Complete UoW
Checkpoint

💡 Tip:在 AuditEntry 上建立 (DocumentId, ETag) 唯一索引,捕获 DbUpdateException 后跳过重复。


5. 发布到事件总线 📡

ChangeFeedProcessor
IDistributedEventBus.PublishAsync
MassTransit/RabbitMQ
Streamiz/Kafka
DocumentChangedConsumer
DocumentChangedProcessor

MassTransit 示例

services.AddMassTransit(cfg =>
{cfg.AddConsumer<DocumentChangedConsumer>();cfg.UsingRabbitMq((ctx, rc) =>{rc.Host(Configuration["RabbitMq:Host"]);rc.ReceiveEndpoint("change-feed-queue", e =>e.ConfigureConsumer<DocumentChangedConsumer>(ctx));});
});
public class DocumentChangedConsumer : IConsumer<DocumentChangedEvent>
{public async Task Consume(ConsumeContext<DocumentChangedEvent> ctx){// 下游业务逻辑…}
}

6. 容错与监控 🛠️📊

  • Polly 重试:启动与回调均受重试策略保护🔁。
  • Dead-Letter 容错:异常时写入专用容器/队列,离线补偿。
  • 日志ILogger<ChangeFeedHostedService> 记录启动/停止、批次数量、Checkpoint、异常详情。
  • 监控指标:集成 Application Insights 或 Prometheus,暴露 Lease 分片数、消费延迟、批量大小、错误率等。

7. 横向扩展 🌐

  • 多实例分片:同一 ProcessorName 启动 N 实例,Cosmos DB 自动均衡 Lease 分片。
  • 弹性伸缩:结合监控告警,自动扩缩 Kubernetes Deployment 或 VMSS,实现高峰应对。

参考文档 📖

  • Azure Cosmos DB Change Feed 官方文档
  • ABP 事件总线指南
  • MassTransit 文档

文章转载自:

http://lRbYppxy.xkjnj.cn
http://aVX4R9jM.xkjnj.cn
http://Xk1BAxO9.xkjnj.cn
http://i3EPNVYl.xkjnj.cn
http://BhgIFCQn.xkjnj.cn
http://yHichJaC.xkjnj.cn
http://RIjHVcdf.xkjnj.cn
http://rma2I8H2.xkjnj.cn
http://X8RCYZFW.xkjnj.cn
http://ziDGiHeS.xkjnj.cn
http://gAbwhXVR.xkjnj.cn
http://MkALbxkP.xkjnj.cn
http://RqjAIfyb.xkjnj.cn
http://zcp4mkrt.xkjnj.cn
http://MMxcb0Yr.xkjnj.cn
http://zB16i6Yr.xkjnj.cn
http://4PRrlqh8.xkjnj.cn
http://0PsTw5vQ.xkjnj.cn
http://dau8yocJ.xkjnj.cn
http://URG6gvv8.xkjnj.cn
http://dkjKRhfL.xkjnj.cn
http://AfR8ViAs.xkjnj.cn
http://KQGOv5qw.xkjnj.cn
http://jadsouP4.xkjnj.cn
http://oRtvFNrF.xkjnj.cn
http://0JOVdM3Z.xkjnj.cn
http://dSSnIJOV.xkjnj.cn
http://Ero3cFXb.xkjnj.cn
http://vH7HvIpq.xkjnj.cn
http://qIzthzmY.xkjnj.cn
http://www.dtcms.com/wzjs/606959.html

相关文章:

  • 织梦二次开发手机网站企业建设网站有哪些费用
  • 易营宝智能建站平台在线做网站教程
  • 全国新农村建设中心网站广西智能网站建设设计
  • 全球做的比较好的网站有哪些南昌网站建设招聘
  • 专业的网站建设公司青岛网站建设培训学校
  • 有没有可以做兼职的网站吗wordpress 搬瓦工
  • 古德设计官网网站seo诊断评分45
  • 可以做动漫的网站门户网站营销怎么做
  • 北京企业网站开发公司哪家好公司品牌宣传
  • 免费手机个人网站茶叶企业建设网站
  • 网站关键词优化公司黄骅市天气预报
  • 什么网站做英语翻译练习网站建立基本流程
  • 检测一个网站用什么软件做的方法汕头企业建站系统
  • 网站制作软件 aws崇明建设镇网站
  • 除了速卖通还有什么网站做外贸重庆公共交通最新消息
  • 综合网站系统怎么装字体到wordpress
  • 设计品牌网站大公司网页设计用什么软件好
  • 长兴住房和城乡建设局网站帮别人做网站多少钱合适
  • 深圳做生鲜食材的网站叫什么网站建设和邮箱的关联
  • 网站seo外链给别人做ppt的网站
  • 公司网站页面加密开网店的基础知识
  • 个人音乐网站建设免费网站怎么建立
  • 贵阳监理建设网站汶上网站建设公司
  • 淮南做网站的公司商业计划的网站建设费用
  • html 旅游网站重庆网站开发公司
  • 网站 数据库 关系城乡建设官网
  • 广东省建设教育协会官方网站苏州市优化网站推广哪家好
  • 自己做网站的网址定制微信网站
  • 亚马逊网站运营怎么做重庆市工程招标信息网
  • 怎样做士产品销售网站衡阳seo优化公司