Orleans流系统完整时序图
时序图概述
这个时序图展示了Orleans流系统中从生产者发送消息到消费者接收消息的完整流程,包括订阅管理、消息持久化、PubSub系统和消息投递等关键环节。
时序图
关键组件说明
1. 生产者端组件
- Producer Grain: 业务逻辑生产者
- StreamImpl: 流接口实现,提供OnNextAsync等API
- PersistentStreamProducer: 持久化流生产者,处理消息发送
2. 消息队列组件
- QueueAdapter: 队列适配器,抽象不同消息队列实现
- Message Queue: 实际的消息队列(如Azure Service Bus、RabbitMQ等)
3. 消费者端组件
- PersistentStreamPullingAgent: 消息拉取代理,负责从队列拉取消息
- StreamConsumerExtension: 消费者扩展,管理本地订阅和消息投递
- Consumer Grain: 业务逻辑消费者
4. PubSub系统
- PubSubRendezvousGrain: 发布-订阅汇聚点Grain,管理订阅关系
核心流程说明
订阅流程 (阶段1)
- 消费者调用
SubscribeAsync()
订阅流 - 生成唯一的
subscriptionId
- 在PubSub系统中注册消费者
- 拉取代理创建消费者数据并建立消息游标
生产者注册 (阶段2)
- 生产者发送消息时自动注册到PubSub
- 消息通过队列适配器持久化到消息队列
消息投递 (阶段3)
- 拉取代理从队列拉取消息并缓存
- 为每个活跃消费者投递消息
- 调用消费者Grain的
OnNextAsync
方法
背压控制 (阶段4)
- 通过Task完成机制实现自然背压
- 消费者处理慢时,生产者会被阻塞等待
- 确保系统不会因消息积压而崩溃
取消订阅 (阶段5)
- 消费者调用
UnsubscribeAsync()
取消订阅 - 清理PubSub系统中的订阅状态
- 停止向该消费者投递消息
设计优势
- 完全解耦: 生产者和消费者互不知晓对方存在
- 可靠性: 消息持久化确保不丢失
- 可扩展性: 支持动态添加/移除消费者
- 背压控制: 自然的消息流控制机制
- 分布式友好: 基于虚拟Actor模型,位置透明