当前位置: 首页 > news >正文

Orleans流系统时序图

Orleans流系统完整时序图

时序图概述

这个时序图展示了Orleans流系统中从生产者发送消息到消费者接收消息的完整流程,包括订阅管理、消息持久化、PubSub系统和消息投递等关键环节。

时序图

生产者GrainStreamImpl<T>PersistentStreamProducer队列适配器消息队列PersistentStreamPullingAgentPubSubRendezvousGrainStreamConsumerExtension消费者Grain阶段1: 消费者订阅SubscribeAsync()生成subscriptionId(GUID)RegisterConsumer(subscriptionId, streamId, consumerGrainId, filterData)存储消费者订阅状态AddSubscriber(subscriptionId, streamId, consumerGrainId, filterData)创建StreamConsumerData并缓存DoHandshakeWithConsumer()GetSequenceToken()返回序列令牌创建消息游标CursorRunConsumerCursor()开始投递阶段2: 生产者注册OnNextAsync(message)OnNextAsync(message)RegisterProducer(streamId, producerGrainId)存储生产者状态返回现有消费者列表QueueMessageAsync(message)持久化消息到队列阶段3: 消息拉取和投递GetQueueMessagesAsync()返回消息批次AddToCache()缓存消息遍历所有消费者DeliverItem(subscriptionId, message)查找对应的观察者OnNextAsync(message, token)处理消息逻辑Task完成(背压控制)投递完成loop[为每个消费者投递消息]阶段4: 背压控制OnNextAsync(nextMessage)OnNextAsync(nextMessage)QueueMessageAsync(nextMessage)长时间处理消息Task未完成暂停拉取新消息队列满,等待Task等待完成阻塞等待消费者Task快速完成继续拉取新消息消息成功入队Task立即完成立即返回alt[消费者处理慢(背压生效)][消费者处理正常]阶段5: 取消订阅UnsubscribeAsync(handle)UnregisterConsumer(subscriptionId, streamId)移除消费者订阅状态RemoveSubscriber(subscriptionId, streamId)移除StreamConsumerDataRemoveObserver(subscriptionId)生产者GrainStreamImpl<T>PersistentStreamProducer队列适配器消息队列PersistentStreamPullingAgentPubSubRendezvousGrainStreamConsumerExtension消费者Grain

关键组件说明

1. 生产者端组件

  • Producer Grain: 业务逻辑生产者
  • StreamImpl: 流接口实现,提供OnNextAsync等API
  • PersistentStreamProducer: 持久化流生产者,处理消息发送

2. 消息队列组件

  • QueueAdapter: 队列适配器,抽象不同消息队列实现
  • Message Queue: 实际的消息队列(如Azure Service Bus、RabbitMQ等)

3. 消费者端组件

  • PersistentStreamPullingAgent: 消息拉取代理,负责从队列拉取消息
  • StreamConsumerExtension: 消费者扩展,管理本地订阅和消息投递
  • Consumer Grain: 业务逻辑消费者

4. PubSub系统

  • PubSubRendezvousGrain: 发布-订阅汇聚点Grain,管理订阅关系

核心流程说明

订阅流程 (阶段1)

  1. 消费者调用SubscribeAsync()订阅流
  2. 生成唯一的subscriptionId
  3. 在PubSub系统中注册消费者
  4. 拉取代理创建消费者数据并建立消息游标

生产者注册 (阶段2)

  1. 生产者发送消息时自动注册到PubSub
  2. 消息通过队列适配器持久化到消息队列

消息投递 (阶段3)

  1. 拉取代理从队列拉取消息并缓存
  2. 为每个活跃消费者投递消息
  3. 调用消费者Grain的OnNextAsync方法

背压控制 (阶段4)

  1. 通过Task完成机制实现自然背压
  2. 消费者处理慢时,生产者会被阻塞等待
  3. 确保系统不会因消息积压而崩溃

取消订阅 (阶段5)

  1. 消费者调用UnsubscribeAsync()取消订阅
  2. 清理PubSub系统中的订阅状态
  3. 停止向该消费者投递消息

设计优势

  1. 完全解耦: 生产者和消费者互不知晓对方存在
  2. 可靠性: 消息持久化确保不丢失
  3. 可扩展性: 支持动态添加/移除消费者
  4. 背压控制: 自然的消息流控制机制
  5. 分布式友好: 基于虚拟Actor模型,位置透明
http://www.dtcms.com/a/435173.html

相关文章:

  • 专业网站建设价格分析企业建设网站好吗
  • 活动日志系统集成指南
  • 弹幕网站是怎么做的软件开发工程师级别
  • 贵阳市建设局信息管理网站中国建设会计学网站
  • 函数简单传入参数的汇编分析
  • 怎样做已有网站的编辑维护大沥九江网站制作
  • 自己网站首页如何设置网站托管维护方案
  • 安卓基础组件024-底部导航栏
  • 【ROS2学习笔记】话题通信篇:话题通信项目实践——系统状态监测与可视化工具
  • 苏州门户网站平台推广员
  • ICT 数字测试原理 4 --电源监控电路(PMC)
  • 网站内页要不要加上关键词和描述广告型网站怎么做的
  • 深圳市建设局工程交易中心网站贵州 网站建设
  • 【C/C++】 函数形参—指针传递
  • 门限签名与多方安全计算(MPC)
  • 东莞公司高端网站建设高大上网站
  • 基于websocket的多用户网页五子棋(一)
  • PCA 主成分分析:数据世界的 “旅行清单整理师”—— 从 30 维杂乱到 2 维清晰的诗意降维
  • wordpress兼容mipseo加盟代理
  • 台州网站排名优化费用网站建设设计风格描述
  • 利用 ZoneABC 免费域名 零成本接入 Cloudflare 企业版 CDN
  • 企业网站建设费用属于什么科目dw软件网站建设教程
  • Streamlit:基础入门——零基础搭建第一个 Web 应用
  • Netty粘包和半包问题产生的原因和解决方案
  • 【小沐学GIS】基于C++绘制地形DEM(OpenGL、Terrain、TIFF、hgt)第十二期
  • 怎么搭建本地网站外贸营销工具
  • MySQL常用命令全攻略
  • 郑州市网站和公众号建设长沙公积金网站怎么做异动
  • 平面设计有什么网站wordpress 汽车模板下载
  • 珠宝首饰网站开发郑州微盟网站建设公司