PublishSubject、ReplaySubject、BehaviorSubject、AsyncSubject的区别
python容易编辑,因此用pyrx代替rxjava3做演示会比较快捷。
pyrx安装命令: pip install rx
一、Subject
(相当于 RxJava 的 PublishSubject
)
PublishSubject
PublishSubject 将对观察者发送订阅后产生的元素,而在订阅前发出的元素将不会发送给观察者。如果你希望观察者接收到所有的元素,你可以通过使用 Observable
的 create
方法来创建 Observable
,或者使用 ReplaySubject。
如果源 Observable
因为产生了一个 error
事件而中止, PublishSubject 就不会发出任何元素,而是将这个 error
事件发送出来。
特性:
- 只发送订阅后产生的事件,不保留历史值。
- 新订阅者只能收到订阅后发射的元素。
适用场景:
实时数据流(如用户输入、网络事件)。
示例代码
from rx.subject import Subjectsubject = Subject()# 订阅1在事件发射前订阅
subject.subscribe(on_next=lambda value: print("订阅1:", value),on_error=lambda error: print("错误:", error),on_completed=lambda: print("完成")
)subject.on_next("🐶") # 订阅1收到: 🐶# 订阅2在事件发射后订阅
subject.subscribe(on_next=lambda value: print("订阅2:", value),on_error=lambda error: print("错误:", error),on_completed=lambda: print("完成")
)subject.on_next("🐱") # 订阅1收到: 🐱,订阅2收到: 🐱
二、ReplaySubject
ReplaySubject
ReplaySubject 将对观察者发送全部的元素,无论观察者是何时进行订阅的。
这里存在多个版本的 ReplaySubject,有的只会将最新的 n 个元素发送给观察者,有的只会将限制时间段内最新的元素发送给观察者。
如果把 ReplaySubject 当作观察者来使用,注意不要在多个线程调用 onNext
, onError
或 onCompleted
。这样会导致无序调用,将造成意想不到的结果。
特性:
- 缓存所有发射过的事件,新订阅者会收到全部历史事件。
- 可通过
buffer_size
参数限制缓存数量。
适用场景:
需要回放历史数据的场景(如配置变更、初始化数据)。
示例代码
from rx.subject import ReplaySubjectsubject = ReplaySubject(buffer_size=2) # 只缓存最近2个事件subject.on_next("🐶")
subject.on_next("🐱")
subject.on_next("🐭")# 订阅时会收到缓存的最后2个事件: 🐱, 🐭
subject.subscribe(on_next=lambda value: print("订阅1:", value))subject.on_next("🐹") # 订阅1收到: 🐹
三、BehaviorSubject
BehaviorSubject
当观察者对 BehaviorSubject 进行订阅时,它会将源 Observable
中最新的元素发送出来(如果不存在最新的元素,就发出默认元素)。然后将随后产生的元素发送出来。
如果源 Observable
因为产生了一个 error
事件而中止, BehaviorSubject 就不会发出任何元素,而是将这个 error
事件发送出来。
特性:
- 缓存最后一个发射的事件,新订阅者会立即收到该值。
- 创建时必须提供初始值。
适用场景:
状态管理(如用户登录状态、系统配置)。
示例代码
from rx.subject import BehaviorSubjectsubject = BehaviorSubject("初始值")subject.on_next("🐶")# 订阅时会收到最后一个值: 🐶
subject.subscribe(on_next=lambda value: print("订阅1:", value))subject.on_next("🐱") # 订阅1收到: 🐱
四、AsyncSubject
AsyncSubject
AsyncSubject 将在源 Observable
产生完成事件后,发出最后一个元素(仅仅只有最后一个元素),如果源 Observable
没有发出任何元素,只有一个完成事件。那 AsyncSubject 也只有一个完成事件。
它会对随后的观察者发出最终元素。如果源 Observable
因为产生了一个 error
事件而中止, AsyncSubject 就不会发出任何元素,而是将这个 error
事件发送出来。
特性:
- 只发射最后一个事件,且仅在
on_completed()
后发射。 - 如果未调用
on_completed()
,订阅者不会收到任何值。
适用场景:
只关心最终结果的场景(如计算完成后的结果)。
示例代码
from rx.subject import AsyncSubjectsubject = AsyncSubject()subject.subscribe(on_next=lambda value: print("订阅1:", value),on_error=lambda error: print("错误:", error),on_completed=lambda: print("完成")
)subject.on_next("🐶")
subject.on_next("🐱")
subject.on_completed() # 订阅1收到: 🐱(最后一个值)并触发完成
五、对比表格
Subject 类型 | 历史值处理 | 新订阅者行为 | 触发条件 |
---|---|---|---|
Subject | 不保留历史值 | 只接收订阅后的事件 | 无特殊条件 |
ReplaySubject | 缓存所有或部分历史值 | 接收全部缓存的历史事件 | 无特殊条件 |
BehaviorSubject | 缓存最后一个值 | 立即接收最后一个值 | 无特殊条件 |
AsyncSubject | 缓存最后一个值 | 仅在 on_completed() 后接收 | 必须调用 on_completed() |
六、注意事项
-
内存管理:
ReplaySubject
和BehaviorSubject
会持有历史值,需注意避免内存泄漏。 -
线程安全:
RxPY 的Subject
默认非线程安全,多线程环境下需自行处理同步。 -
生命周期管理:
使用dispose()
方法释放资源,避免不必要的事件处理。
rxjava3具体实例:
在引入rxjava3之前的写法:通过监听器,实现register、unregister,代码逻辑臃肿、结构复杂、过一段时间之后自己写的代码都看起来很费劲。
引入rxjava3之后,activity、fragment、service之间解除了强耦合,代码嵌套深度降低、逻辑交叉点减少,代码清爽很多。
rx是响应式编程框架的集大成者,相当于应用内部的轻量级的ASMQ(高级消息队列),前端是ui和逻辑分离的特点,需要大量的数据双向多层传递。 用rx可以从出发点直达终点,数据不需要层层传递,比如说原来的传递路径是6层,你修改一次数据类,你就需要修改6个地方的代码,用rx只需要修改前后紧挨着的2个数据管道之间的代码。