当前位置: 首页 > news >正文

【Android RxJava】Observal与Subject深入理解

文章目录

  • 一. 基本定义
    • 1. Observal(冷流)
    • 2. Subject(热流)
    • 3. Subject分类
      • 3.1 PublishSubject:只发送订阅后接收到的数据
      • 3.2 BehaviorSubject:发送订阅时最近的一个数据或默认值
      • 3.3 ReplaySubject:发送所有历史数据
      • 3.4 AsyncSubject:只在 onComplete 后发送最后一个数据
    • 4. 冷流和热流
      • 4.1 冷流
      • 4.2 热流
      • 4.3 冷流转换为热流
  • 二. 业务场景应用的探索
    • 1. APP与服务端建立单一持久的连接
    • 2. APP 用户登录、注册、验证码

一. 基本定义

1. Observal(冷流)

  • Observable 是 RxJava 中最基本的响应式流数据源。
  • 它是只读的,只能被观察,不能主动发射数据
  • 它遵循观察者模式(Observer Pattern),只能由数据源向观察者(Observer)单向推送数据
Observable<String> observable = Observable.just("Hello", "World");observable.subscribe(item -> System.out.println("Observer 1: " + item));
observable.subscribe(item -> System.out.println("Observer 2: " + item));

2. Subject(热流)

  • Subject 是一个特殊的对象,它既是 Observable 又是 Observer
  • 可以主动发射数据(作为 Observer 接收数据),也可以被其他观察者订阅(作为 Observable 发送数据)。可以在运行时主动调用 onNext()、onError()、onComplete()
  • 可以实现多播(Multicast),即一个数据流被多个观察者共享。
PublishSubject<String> subject = PublishSubject.create();subject.subscribe(item -> System.out.println("Observer 1: " + item));
subject.onNext("Hello");
subject.onNext("World");subject.subscribe(item -> System.out.println("Observer 2: " + item));
subject.onNext("RxJava");

3. Subject分类

3.1 PublishSubject:只发送订阅后接收到的数据

PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生),因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。如果要确保来自原始Observable的所有数据都被分发,你需要这样做:或者使用Create创建那个Observable以便手动给它引入"冷"Observable的行为(当所有观察者都已经订阅时才开始发射数据),或者改用ReplaySubject。

 private void testPublishSubject(){PublishSubject<String> publishSubject = PublishSubject.create();Subscriber subscriber = new Subscriber<String>() {@Overridepublic void onCompleted() {System.out.println("onCompleted");}@Overridepublic void onError(Throwable e) {System.out.println("onError:"+e.getMessage());}@Overridepublic void onNext(String s) {System.out.println("onNext:"+s);}};publishSubject.onNext("1");publishSubject.onNext("2");publishSubject.onNext("3");publishSubject.subscribe(subscriber);publishSubject.onNext("4");publishSubject.onCompleted();}//输出的结果是:
onNext:4
onCompleted//这就是丢失了1,2,3这三个数据

3.2 BehaviorSubject:发送订阅时最近的一个数据或默认值

当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。然而,如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,只是简单的向前传递这个错误通知。

 private void testBehaviorSubject(){BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");Subscriber subscriber = new Subscriber<String>() {@Overridepublic void onCompleted() {System.out.println("onCompleted");}@Overridepublic void onError(Throwable e) {System.out.println("onError:"+e.getMessage());}@Overridepublic void onNext(String s) {System.out.println("onNext:"+s);}};behaviorSubject.onNext("1");behaviorSubject.onNext("2");behaviorSubject.onNext("3");behaviorSubject.subscribe(subscriber);behaviorSubject.onNext("4");behaviorSubject.onNext("5");behaviorSubject.onCompleted();}// 输出的结果是:
onNext:3
onNext:4
onNext:5
onCompleted

3.3 ReplaySubject:发送所有历史数据

ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。也有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。
如果你把ReplaySubject当作一个观察者使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。

 private void testReplaySubject(){ReplaySubject<String> replaySubject = ReplaySubject.create();Subscriber subscriber = new Subscriber<String>() {@Overridepublic void onCompleted() {System.out.println("onCompleted");}@Overridepublic void onError(Throwable e) {System.out.println("onError:"+e.getMessage());}@Overridepublic void onNext(String s) {System.out.println("onNext:"+s);}};replaySubject.onNext("1");replaySubject.onNext("2");replaySubject.onNext("3");replaySubject.subscribe(subscriber);replaySubject.onNext("4");replaySubject.onCompleted();}//输出的结果是:
onNext:1
onNext:2
onNext:3
onNext:4
onCompleted
// 没有丢失数据
// 假如第一行代码改成         ReplaySubject<String> replaySubject = ReplaySubject.createWithSize(2);
// 输出结果是:(会丢失数据1)
onNext:2
onNext:3
onNext:4
onCompleted

3.4 AsyncSubject:只在 onComplete 后发送最后一个数据

一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。(如果原始Observable没有发射任何值,AsyncSubject也不发射任何值)它会把这最后一个值发射给任何后续的观察者。然而,如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知

  • 场景1:AsyncSubject当做Observer
 private void testAsyncSubject() {AsyncSubject<String> asyncSubject = AsyncSubject.create();Observable.just("1", "2", "3").subscribe(asyncSubject);asyncSubject.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {System.out.println("onCompleted");}@Overridepublic void onError(Throwable e) {System.out.println("onError:" + e.getMessage());}@Overridepublic void onNext(String s) {System.out.println("onNext:" + s);}});}//输出
onNext:3
onCompleted
  • 场景2:如果Observable因为错误发了终止
 private void testAsyncSubject() {AsyncSubject<String> asyncSubject = AsyncSubject.create();Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("1");subscriber.onError(new Exception("asyncSubjectError"));subscriber.onNext("2");subscriber.onNext("3");subscriber.onCompleted();}}).subscribe(asyncSubject);asyncSubject.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {System.out.println("onCompleted");}@Overridepublic void onError(Throwable e) {System.out.println("onError:" + e.getMessage());}@Overridepublic void onNext(String s) {System.out.println("onNext:" + s);}});}// 输出onError:asyncSubjectError
  • 场景3:AsyncSubject当做Observable
 private void testAsyncSubject(){AsyncSubject<String> asyncSubject =AsyncSubject.create();asyncSubject.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {System.out.println("onCompleted");}@Overridepublic void onError(Throwable e) {System.out.println("onError:"+e.getMessage());}@Overridepublic void onNext(String s) {System.out.println("onNext:"+s);}});asyncSubject.onNext("1");asyncSubject.onNext("2");asyncSubject.onNext("3");asyncSubject.onCompleted();}// 输出的结果是:
onNext:3
onCompleted

4. 冷流和热流

一句话理解:Hot Observable 意思是如果他开始传输数据,你不主动喊停(dispose()/cancel()),那么他就不会停,一直发射数据,即使他已经没有Subscriber了。而Cold Observable则是subscribe时才会发射数据。

4.1 冷流

  • 冷流会在每个观察者订阅时重新开始发射数据
  • 数据流是独立的,每个观察者都会收到完整的数据序列
  • 适用于一次性任务,如网络请求、读取文件等。
Observable<Long> coldObservable = Observable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS).map(i -> i);coldObservable.subscribe(i -> System.out.println("Observer 1: " + i));
Thread.sleep(2000);
coldObservable.subscribe(i -> System.out.println("Observer 2: " + i));Thread.sleep(5000);输出:
Observer 1: 0
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 2: 0
Observer 2: 1
Observer 2: 2
Observer 2: 3
Observer 2: 4

4.2 热流

  • 热流在整个生命周期中只发射一次数据流。
  • 所有订阅者共享同一个数据流。
  • 如果观察者订阅得晚,就不会收到之前的数据(除非使用 ReplaySubject 等缓存机制)。
ConnectableObservable<Long> hotObservable = Observable.interval(1, TimeUnit.SECONDS).take(5).publish();hotObservable.connect();hotObservable.subscribe(i -> System.out.println("Observer 1: " + i));
Thread.sleep(2000);
hotObservable.subscribe(i -> System.out.println("Observer 2: " + i));Thread.sleep(4000);
// 输出:
Observer 1: 0
Observer 1: 1
Observer 1: 2
Observer 2: 2
Observer 1: 3
Observer 2: 3
Observer 1: 4
Observer 2: 4
(Observer2延迟2s因此01输出没打印出来)

在这里插入图片描述

4.3 冷流转换为热流

**publish():**适用于需要手动控制热流生命周期的场景。
需要手动调用 connect() 才会开始发射数据。

Observable<Long> cold = Observable.interval(1, TimeUnit.SECONDS).take(5);
ConnectableObservable<Long> hot = cold.publish();hot.subscribe(i -> System.out.println("Observer 1: " + i));
hot.subscribe(i -> System.out.println("Observer 2: " + i));hot.connect(); // 手动触发数据流Thread.sleep(3000);

二. 业务场景应用的探索

1. APP与服务端建立单一持久的连接

建议采用Subject,更准确地说,使用 PublishSubject 或 BehaviorSubject 是因为它们的“热流”特性,能够很好地支持异步通信、事件广播、状态共享等需求
使用 Subject 可以让多个观察者共享同一个连接通道,避免重复建立连接。

2. APP 用户登录、注册、验证码

OBSERVABLE 是一次性的“请求-响应”模型;每次订阅都应重新发起网络请求(冷流特性

http://www.dtcms.com/a/485963.html

相关文章:

  • 基于Rokid CXR-S SDK的智能AR翻译助手技术拆解与实现指南
  • 【uniapp】微信小程序修改按钮样式
  • Lombok使用指南(中)
  • Threejs入门学习笔记
  • 机器学习模型评估指标AUC详解:从理论到实践
  • 凡科建站小程序网站设计的一般流程
  • Linux C/C++ 学习日记(24)UDP协议的介绍:广播、多播的实现
  • OpenHarmony内核基础:LiteOS-M内核与POSIX/CMSIS接口
  • C语言实现Modbus TCP/IP协议客户端-服务器
  • ORACLE 19C ADG环境 如何快速删除1.8TB的分区表?有哪些注意事项?
  • 重庆黔江做防溺水的网站少儿编程十大培训机构
  • 浅谈中兴电子商务网站建设html考试界面设计
  • 工业三防平板背后的条码与RFID采集技术
  • pytorch框架GPU适配npu
  • 【散列函数】哈希函数简介
  • 学英语音标作用,能听出声音拼音组成,记忆效率提高
  • 学习日记day
  • Python爬虫数据可视化:深度分析贝壳成交价格趋势与分布
  • C++中的父继子承(2)多继承菱形继承问题,多继承指针偏移,继承组合分析+高质量习题扫尾继承多态
  • 做公司网站别人能看到吗6网站源码传到服务器上后怎么做
  • php多语言网站开发网站界面设计图片
  • 树形结构渲染 + 选择(Vue3 + ElementPlus)
  • Redis技术应用
  • hot100练习-8
  • 手机网站设置在哪里找房产信息平台
  • 算法入门:专题二---滑动窗口(长度最小的子数组)更新中
  • 2025年存储市场报告深度解读
  • HTTP 413 状态码详解与前端处理,请求体过大
  • 大数据背景下时序数据库选型指南:国产开源技术的突破与实践
  • asp网站优化云南网站制作需求