当前位置: 首页 > news >正文

OPC UA + ABP vNext 企业级实战:高可用数据采集框架指南

🚀📊 OPC UA + ABP vNext 企业级实战:高可用数据采集框架指南 🚀


📑 目录

  • 🚀📊 OPC UA + ABP vNext 企业级实战:高可用数据采集框架指南 🚀
    • 一、前言 🎯
    • 二、系统架构 🏗️
    • 三、配置与校验 🔧
      • `appsettings.json`
      • 校验示例
      • 📜 配置校验流程
    • 四、OpcUaService 增强:线程安全 + Polly 重试 🔐🔄
      • 🔒 OPC UA 会话重连流程
    • 五、数据采集作业:异常隔离 + 告警上报 🚨
      • 📥 数据采集 & 缓存流程
    • 六、模块注册补全 🎛️
      • `OpcUaHealthCheck` 示例
    • 七、证书 & Kubernetes 部署 ☸️
      • 1. 生成并信任证书(Linux)
      • 📦 证书生成与挂载流程
      • 2. Kubernetes Secret 示例
      • 3. Pod 挂载
      • 4. Liveness/Readiness Probes
      • ☸️ K8s 探针配置流程
    • 八、日志采集与可观测性 🔍
    • 九、结语 🎉


一、前言 🎯

本文基于企业级生产环境需求,全面重构 OPC UAABP vNext 集成框架,涵盖:

  • 配置集中化 & 校验
  • 安全封装 & Polly 重试 🔄
  • 原生作业调度 (BackgroundWorkerBase) ⏱️
  • 分布式缓存 & 更新 幂等 🔒
  • 健康检查 & 告警事件 🚨
  • OpenTelemetry 跟踪 🕵️
  • 证书管理 & Kubernetes 部署 ☸️

实现「即克隆、即运行、即监控」的工业数据平台!✨


二、系统架构 🏗️

🔍 读数据
OPC UA Server
OpcUaService
OpcUaCollectorWorker
ApplicationService
EF Core / PostgreSQL
Redis 缓存 IDistributedCache
UI Layer

三、配置与校验 🔧

appsettings.json

"OpcUa": {"Endpoint": "opc.tcp://localhost:4840","NodeIds": ["ns=2;s=Device1", "ns=2;s=Device2"],"CacheDurationSeconds": 120,"AutoAcceptUntrusted": false,"Certificate": {"StorePath": "/etc/opcua/certs","SubjectName": "CN=OpcAbpIntegration"}
}

校验示例

public override void OnApplicationInitialization(ApplicationInitializationContext context)
{var config = context.ServiceProvider.GetRequiredService<IConfiguration>();var section = config.GetSection("OpcUa");if (!section.Exists())throw new ConfigurationErrorsException("🔴 OpcUa 配置节缺失!");var endpoint = section["Endpoint"];if (string.IsNullOrWhiteSpace(endpoint))throw new ConfigurationErrorsException("🔴 OpcUa.Endpoint 不能为空!");var nodeIds = section.GetSection("NodeIds").Get<string[]>();if (nodeIds == null || nodeIds.Length == 0)throw new ConfigurationErrorsException("🔴 OpcUa.NodeIds 至少配置一个!");
}

📜 配置校验流程

读取 appsettings.json OpcUa 节
节是否存在?
抛出 “配置节缺失” 异常
读取 Endpoint 与 NodeIds
Endpoint 非空?
抛出 “Endpoint 不能为空” 异常
NodeIds 数组长度 > 0?
抛出 “NodeIds 至少配置一个” 异常
配置校验通过 🎉

四、OpcUaService 增强:线程安全 + Polly 重试 🔐🔄

public class OpcUaService : IOpcUaService, ISingletonDependency
{private readonly IOptions<OpcUaOptions> _options;private readonly ILogger<OpcUaService> _logger;private Session? _session;private readonly SemaphoreSlim _lock = new(1, 1);public OpcUaService(IOptions<OpcUaOptions> options, ILogger<OpcUaService> logger){_options = options;_logger = logger;}public async Task<Session> EnsureSessionAsync(){await _lock.WaitAsync();try{if (_session?.Connected == true) return _session;var config = new ApplicationConfiguration{ApplicationName         = "OpcAbpIntegration",ApplicationUri          = "urn:abp:opcua",ApplicationType         = ApplicationType.Client,SecurityConfiguration   = new SecurityConfiguration{ApplicationCertificate = new CertificateIdentifier{StoreType   = "Directory",StorePath   = _options.Value.Certificate.StorePath,SubjectName = _options.Value.Certificate.SubjectName},AutoAcceptUntrustedCertificates = _options.Value.AutoAcceptUntrusted},ClientConfiguration    = new ClientConfiguration { DefaultSessionTimeout = 60000 },TransportQuotas        = new TransportQuotas { OperationTimeout = 15000, MaxMessageSize = 4_194_304 }};await config.Validate(ApplicationType.Client);var endpointDesc = CoreClientUtils.SelectEndpoint(_options.Value.Endpoint, false);var endpoint     = new ConfiguredEndpoint(null, endpointDesc, EndpointConfiguration.Create(config));_session = await Session.Create(config, endpoint, false, "OPC UA", 60000, new UserIdentity(), null);_logger.LogInformation("✅ OPC UA 会话已连接:{Endpoint}", _options.Value.Endpoint);return _session;}finally{_lock.Release();}}public async Task<string> ReadNodeAsync(string nodeId){return await Policy.Handle<Exception>().WaitAndRetryAsync(retryCount: 3,sleepDurationProvider: attempt => TimeSpan.FromSeconds(1 << attempt),onRetry: (ex, delay) => _logger.LogWarning(ex, "重试读取节点 {NodeId}", nodeId)).ExecuteAsync(async () =>{var session = await EnsureSessionAsync();_logger.LogDebug("📡 读取节点 {NodeId}", nodeId);var node    = new ReadValueId { NodeId = new NodeId(nodeId), AttributeId = Attributes.Value };var results = new DataValueCollection();await session.Read(null, 0, TimestampsToReturn.Both, new[] { node }, out results, out _);return results.FirstOrDefault()?.Value?.ToString() ?? "";});}
}

🔒 OPC UA 会话重连流程

调用 ReadNodeAsync(nodeId)
Policy 重试入口
确保获取 Session:EnsureSessionAsync
Session 已连接?
直接返回同一 Session
创建 ApplicationConfiguration
SelectEndpoint & ConfiguredEndpoint
Session.Create 建立会话
返回新会话
执行 Read 操作
返回节点值 或 抛出异常

五、数据采集作业:异常隔离 + 告警上报 🚨

public class OpcUaCollectorWorker : BackgroundWorkerBase
{private readonly IOpcUaService _opcUa;private readonly IDistributedCache<MyDeviceCacheItem> _cache;private readonly IMyDeviceRepository _repository;private readonly IDistributedEventBus _eventBus;private readonly IOptions<OpcUaOptions> _options;private readonly ILogger<OpcUaCollectorWorker> _logger;public override float DelayFactor => 1; // 可配置执行间隔public OpcUaCollectorWorker(IOpcUaService opcUa,IDistributedCache<MyDeviceCacheItem> cache,IMyDeviceRepository repository,IDistributedEventBus eventBus,IOptions<OpcUaOptions> options,ILogger<OpcUaCollectorWorker> logger){_opcUa      = opcUa;_cache      = cache;_repository = repository;_eventBus   = eventBus;_options    = options;_logger     = logger;}[UnitOfWork]public override async Task ExecuteAsync(CancellationToken stoppingToken){var failedNodes = new List<string>();var sw          = Stopwatch.StartNew();foreach (var nodeId in _options.Value.NodeIds){try{var value = await _opcUa.ReadNodeAsync(nodeId);await _repository.InsertOrUpdateAsync(new MyDeviceData(nodeId, value),existing => existing.Update(value));await _cache.SetAsync(nodeId,new MyDeviceCacheItem(value),new DistributedCacheEntryOptions {AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(_options.Value.CacheDurationSeconds)});_logger.LogInformation("📥 节点 {NodeId} 数据已更新", nodeId);}catch (Exception ex){_logger.LogError(ex, "❌ 读取节点 {NodeId} 失败", nodeId);failedNodes.Add(nodeId);}}sw.Stop();_logger.LogInformation("🔄 本次采集用时 {Elapsed} ms", sw.ElapsedMilliseconds);if (failedNodes.Count > 0){await _eventBus.PublishAsync(new NodeReadFailedEvent(failedNodes));_logger.LogWarning("⚠️ 发布读取失败告警,节点:{Nodes}", string.Join(',', failedNodes));}}
}

📥 数据采集 & 缓存流程

Worker 启动 ExecuteAsync
遍历 OpcUaOptions.NodeIds
调用 ReadNodeAsync(nodeId)
读取成功?
InsertOrUpdate 到数据库
SetAsync 到 Redis 缓存
记录失败节点到 failedNodes
继续下一个 nodeId
循环结束?
failedNodes 非空?
Publish NodeReadFailedEvent
完成,结束本次作业

六、模块注册补全 🎛️

[DependsOn(typeof(AbpEntityFrameworkCoreModule),typeof(AbpDistributedCacheModule),typeof(AbpBackgroundWorkersModule),typeof(AbpAutofacModule))]
public class OpcUaModule : AbpModule
{public override void ConfigureServices(ServiceConfigurationContext context){// 1️⃣ 配置绑定与校验(见 OnApplicationInitialization)context.Services.Configure<OpcUaOptions>(context.Services.GetConfiguration().GetSection("OpcUa"));// 2️⃣ 核心服务注册context.Services.AddSingleton<IOpcUaService, OpcUaService>();// 3️⃣ EF Core & 仓储context.Services.AddAbpDbContext<MyDbContext>(opts =>{opts.AddDefaultRepositories(includeAllEntities: true);});// 4️⃣ Background Workercontext.Services.AddBackgroundWorker<OpcUaCollectorWorker>();// 5️⃣ 健康检查context.Services.AddHealthChecks().AddCheck<OpcUaHealthCheck>("opcua").AddNpgSql("YourPostgreConnection").AddRedis("localhost");// 6️⃣ OpenTelemetry 跟踪context.Services.AddOpenTelemetryTracing(builder =>{builder.AddAspNetCoreInstrumentation().AddHttpClientInstrumentation().AddEntityFrameworkCoreInstrumentation().AddSource("OpcUaService").AddJaegerExporter();});}
}

OpcUaHealthCheck 示例

public class OpcUaHealthCheck : IHealthCheck
{private readonly IOpcUaService _opcUa;public OpcUaHealthCheck(IOpcUaService opcUa) => _opcUa = opcUa;public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken token = default){try{await _opcUa.EnsureSessionAsync();return HealthCheckResult.Healthy("OPC UA session is healthy");}catch (Exception ex){return HealthCheckResult.Unhealthy("OPC UA session failed", ex);}}
}

七、证书 & Kubernetes 部署 ☸️

1. 生成并信任证书(Linux)

openssl genrsa -out client.key 2048
openssl req -new -x509 -key client.key -out client.crt -days 365 \-subj "/CN=OpcAbpIntegration"
mkdir -p /etc/opcua/certs/trusted
cp client.crt /etc/opcua/certs/trusted/

📦 证书生成与挂载流程

Kubernetes 部署
本地生成证书
Pod spec 中挂载 volume
创建 Secret opcua-certs
容器启动时可读 /etc/opcua/certs
openssl req -new -x509 client.crt
openssl genrsa client.key
mkdir /etc/opcua/certs/trusted
cp client.crt 到 trusted 目录

2. Kubernetes Secret 示例

apiVersion: v1
kind: Secret
metadata:name: opcua-certsnamespace: your-ns
stringData:client.crt: |-----BEGIN CERTIFICATE-----...base64...-----END CERTIFICATE-----

3. Pod 挂载

volumeMounts:
- name: opcua-certsmountPath: /etc/opcua/certs
volumes:
- name: opcua-certssecret:secretName: opcua-certs

4. Liveness/Readiness Probes

readinessProbe:httpGet:path: /health/readyport: 5000initialDelaySeconds: 10periodSeconds: 30livenessProbe:httpGet:path: /health/liveport: 5000initialDelaySeconds: 30periodSeconds: 60

☸️ K8s 探针配置流程

容器启动
InitContainer 挂载证书
主容器启动 ABP 应用
应用暴露 /health/ready 与 /health/live
K8s ReadinessProbe 调用 /health/ready
K8s LivenessProbe 调用 /health/live
Ready?
开始接收流量
持续探测
Alive?
重启 Pod
继续运行

八、日志采集与可观测性 🔍

  • 推荐安装 NuGet 包:
    • OpenTelemetry.Extensions.Hosting
    • OpenTelemetry.Instrumentation.Http, AspNetCore, EntityFrameworkCore
  • 日志平台:SeqELKJaeger
  • ABP 自带日志面板可实时查看采集结果

九、结语 🎉

此版本已实现企业级「高可用、可复现、可维护」规范,覆盖从 证书配置作业调度缓存优化健康检查可观测 的全链路实践。

📦 推荐 将此框架部署于 IoT EdgeKubernetes,并结合 CI/CD自动化证书脚本,打造工业物联网的实时采集+可视化体系!


相关文章:

  • FlashInfer - SparseAttention(稀疏注意力)只计算部分有意义的注意力连接,而非全部 token 对
  • 文件(文件夹时间戳修改)最后修改时间变更
  • python打卡day25@浙大疏锦行
  • promise的说明
  • Minimum MPDU Start Spacing in A-MPDU
  • Spring Cloud:构建云原生微服务架构的最佳工具和实践
  • WhaleTunnel 信创数据库适配能力全景图:打通国产数据生态的最后一公里
  • 【Linux】shell内置命令fg,bg和jobs
  • 缺乏自动化测试,如何提高测试效率
  • 剖析提示词工程中的递归提示
  • Dockerfile实战:从零构建自定义CentOS镜像
  • UOS专业版上通过源码安装 Python 3.13 并保留系统默认版本
  • 关于并发编程AQS的学习
  • Python 之 Flask 入门学习
  • 计算机图形学之几何(Geometry)
  • Spring 事件监听机制的使用
  • Spring 中的 @Configuration @Bean注解
  • UE5 像素推流
  • 在UI 原型设计中,交互规则有哪些核心要素?
  • 数值积分知识
  • 俄方代表团抵达土耳其,俄乌直接谈判有望于当地时间上午重启
  • 92岁上海交大退休教师捐赠百万元给学校,其父也曾设奖学金
  • 2025年中国网络文明大会将于6月10日在安徽合肥举办
  • 足球少年郎7月试锋芒,明日之星冠军杯构建顶级青少年赛事
  • 吉林:消纳绿电,“氢”装上阵
  • 国羽用冠军开启奥运周期,林丹:希望洛杉矶奥运取得更好成绩