当前位置: 首页 > news >正文

安卓进阶——RxJava

 ✅作者简介:大家好,我是 Meteors., 向往着更加简洁高效的代码写法与编程方式,持续分享Java技术内容。
🍎个人主页:Meteors.的博客
💞当前专栏:知识分享
✨特色专栏:知识分享
🥭本文内容:安卓进阶——RxJava
📚 ** ps **  :阅读文章如果有问题或者疑惑,欢迎在评论区提问或指出。


目录

一、介绍

二、RxJava 基础概念

(一)核心角色

1.Observable(被观察者)

2. Observer(观察者)

3.Subscription(订阅关系)

4. Subject(主题)

(二)订阅流程

1. 创建被观察者

2. 创建观察者

3. 进行订阅

三、核心操作符详解

(一)创建型操作符

(二)转换型操作符

(三)过滤型操作符

(四)组合型操作符

四、 线程调度

五、实际应用场景

(一)网络请求链式调用

(二)事件防抖与限流

(三)生命周期管理

(四)响应式 UI 交互

(五)数据缓存与同步

六、错误处理

(一)基础错误捕获

(二)重试机制

(三)全局错误处理

七、高级特性

(一)背压(Backpressure)与 Flowable

(二)Subject 的应用

八、性能优化与注意事项

(一)避免内存泄漏​

(二)合理选择操作符​

(三)调试技巧​

九、总结


一、介绍

RxJava是Netflix公司基于Java虚拟机开发的响应式编程框架,隶属于ReactiveX开源项目,旨在简化异步编程与事件驱动程序的开发流程。该框架通过扩展观察者模式,支持以可观察对象组合处理数据流与事件序列,并提供声明式操作符实现逻辑编排。

其核心设计通过封装线程管理、同步机制及非阻塞I/O等底层细节,降低多线程并发编程复杂度,使开发者能够专注于业务逻辑实现。框架采用函数式风格提供丰富的流处理操作符,可灵活构建异步任务链并处理背压等场景。


二、RxJava 基础概念

(一)核心角色

1.Observable(被观察者)

  • 作用:数据的发射源,负责产生并发送事件(数据)流。

  • 特点:可以发出多个数据项(onNext),也可以在发生错误时终止(onError),或者正常结束时调用(onComplete)。

  • 常见创建方式

    • Observable.create():手动控制数据发射,最灵活但复杂。

    • Observable.just(T...):快速创建发射固定数量数据的 Observable。

    • Observable.fromIterable()Observable.fromArray():从集合或数组创建。

    • Observable.interval()Observable.timer():用于定时任务。

  • 示例

    Observable<String> observable = Observable.just("A", "B", "C");

2. Observer(观察者)

  • 作用:接收 Observable 发出的数据,并对其进行处理(如展示、存储等)。

  • 包含的回调方法

    • onSubscribe(Disposable d):订阅建立时调用,可用来保存 Disposable 以便后续取消订阅。

    • onNext(T t):每次接收到一个数据项时调用。

    • onError(Throwable e):发生错误时调用,之后不会再收到数据。

    • onComplete():数据流正常结束时调用,之后也不会再有数据。

  • 示例

    Observer<String> observer = new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {// 可在此保存 d,用于取消订阅}@Overridepublic void onNext(String s) {System.out.println("接收到数据: " + s);}@Overridepublic void onError(Throwable e) {System.err.println("发生错误: " + e.getMessage());}@Overridepublic void onComplete() {System.out.println("数据流结束");}
    };

3.Subscription(订阅关系)

  • 作用:表示 Observable 和 Observer 之间的订阅关系,是两者之间的桥梁。

  • 在 RxJava 1.x 中叫做 Subscription,而在 RxJava 2.x 及以后,这个角色主要由 Disposable​ 承担。

  • 主要功能:

    • 用于取消订阅(dispose()),防止内存泄漏或继续接收不需要的事件。

    • 管理生命周期,控制何时停止接收数据。

  • 示例

    Disposable disposable = observable.subscribe(observer);
    // 取消订阅
    disposable.dispose();

4. Subject(主题)

  • 作用既是 Observable(可发射数据)又是 Observer(可接收数据),起到桥梁或代理的作用。

  • 常见实现类:

    • PublishSubject:只发送订阅后接收到的数据。

    • BehaviorSubject:发送订阅前最后一个数据及之后的数据。

    • ReplaySubject:发送所有历史数据给新订阅者。

    • AsyncSubject:只在 onComplete 时发送最后一个数据。

  • 用途举例:多播(一个数据源多个观察者)、事件总线等场景。

  • 示例

PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(data -> System.out.println("观察者1: " + data));
subject.onNext("Hello");
subject.subscribe(data -> System.out.println("观察者2: " + data));
subject.onNext("World");

(二)订阅流程

1. 创建被观察者

使用工厂方法创建一个能发射数据的对象,比如:

Observable<String> observable = Observable.just("数据1", "数据2");

也可以使用更灵活的 create()方法:

Observable.create(emitter -> {emitter.onNext("A");emitter.onNext("B");emitter.onComplete();
});

2. 创建观察者

定义好对数据的处理逻辑,包括接收数据、处理错误和完成通知。

3. 进行订阅

通过调用 observable.subscribe(observer)将二者关联起来,从而触发数据流的流动。

observable.subscribe(data -> System.out.println("接收到: " + data),     // onNexterror -> System.err.println("出错: " + error),   // onError() -> System.out.println("完成")                 // onComplete
);

三、核心操作符详解

(一)创建型操作符

  • just():直接传入固定值发射(如 just(1,2,3))。

  • fromIterable()/fromArray():从集合或数组发射数据。

  • create():自定义数据发射逻辑(需手动调用 onNext()/onError()/onComplete())。

  • defer():延迟创建 Observable,直到订阅时才生成数据(避免缓存旧数据)。

  • interval():定时发射递增整数(如每隔 1s 发射 0,1,2...)。

  • timer():延迟指定时间后发射单个数据(如 2s 后发射 0)。


(二)转换型操作符

  • map():一对一数据转换(如将字符串转为用户对象)。

  • flatMap():一对多转换,结果扁平化为单个流(如用户 ID 转用户详情列表)。

  • concatMap():保证顺序的 flatMap(前一个请求完成后才处理下一个)。

  • switchMap():只保留最新请求,取消旧请求(如搜索框输入时只取最后一次查询)。

  • scan():累积计算(如累加数据流中的数值)。


(三)过滤型操作符

  • filter():按条件过滤数据(如只保留偶数)。

  • take(n):仅接收前 n 个数据(如只取前 3 次点击事件)。

  • skip(n):跳过前 n 个数据(如跳过初始加载的 2 条旧数据)。

  • distinct():去重(如避免重复的用户 ID)。

  • debounce(time, unit):防抖(如搜索框输入后延迟 300ms 再请求)。


(四)组合型操作符

  • merge():合并多个流(无序,谁先发射谁先处理)。

  • zip():按顺序配对多个流的数据(如用户信息 + 头像 URL 配对)。

  • concat():顺序合并多个流(前一个流完成后才处理下一个)。

  • amb():取最先发射数据的流(如多个请求中取最快返回的)。


四、 线程调度

(一)常用调度器

  • Schedulers.io():IO 密集型操作(如网络请求、文件读写)。

  • Schedulers.computation():CPU 密集型计算(如数据计算、图像处理)。

  • AndroidSchedulers.mainThread():安卓主线程(更新 UI)。

  • Schedulers.newThread():创建新线程(不推荐,易导致线程混乱)。


(二)调度控制

  • subscribeOn():指定 Observable 发射数据的线程(仅影响上游)。

  • observeOn():指定 Observer 处理数据的线程(影响下游所有操作)。

  • 示例:IO 线程请求,主线程更新 UI。

    api.getUser().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())

五、实际应用场景

(一)网络请求链式调用

  • 串行请求:用户登录 → 获取个人信息 → 加载偏好设置(flatMap串联)。

    api.login(username, password).flatMap(loginResponse -> api.getUserInfo(loginResponse.token)).flatMap(userInfo -> api.getPreferences(userInfo.userId)).subscribe(preferences -> {// 处理最终偏好设置}, throwable -> {// 处理错误});
  • 并行请求:同时获取用户信息和好友列表(zip或 merge组合)。

    Observable<User> userObs = api.getUser(userId);
    Observable<List<Friend>> friendObs = api.getFriends(userId);Observable.zip(userObs, friendObs, (user, friends) -> {return new UserProfile(user, friends);
    }).subscribe(profile -> {// 同时拿到用户和好友信息
    });

(二)事件防抖与限流

  • 搜索框输入:debounce(300, TimeUnit.MILLISECONDS)避免频繁请求。

    RxTextView.textChanges(searchEditText).debounce(300, TimeUnit.MILLISECONDS).switchMap(text -> api.search(text.toString())).subscribe(results -> showResults(results));
  • 按钮点击:throttleFirst(1, TimeUnit.SECONDS)限制 1 秒内只响应一次点击。

    RxView.clicks(button).throttleFirst(1, TimeUnit.SECONDS).subscribe(aVoid -> performAction());

(三)生命周期管理

  • 结合 RxLifecycle或 AutoDispose自动取消订阅(避免 Activity/Fragment 销毁后内存泄漏)。

    CompositeDisposable compositeDisposable = new CompositeDisposable();compositeDisposable.add(api.getData().subscribe(data -> updateUI(data))
    );// 在 onDestroy() 中
    compositeDisposable.clear(); // 或 dispose()

    示例:compositeDisposable.add(observable.subscribe(...))手动管理订阅。

    api.getData().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).`as`(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this))).subscribe { data -> updateUI(data) }

(四)响应式 UI 交互

  • 监听控件状态:RxView.clicks(button)转换为可观察的点击事件流。

    RxView.clicks(button).subscribe(aVoid -> doSomething());
  • 输入框内容监听:RxTextView.textChanges(editText)监听文本变化。

    RxTextView.textChanges(editText).skipInitialValue() // 忽略初始空值.debounce(500, TimeUnit.MILLISECONDS).subscribe(charSequence -> {// 处理输入内容});

(五)数据缓存与同步

  • 内存缓存 + 网络请求:优先读取缓存,缓存不存在时请求网络(concat优先本地,switchIfEmpty切网络)。
    Observable<Data> cacheObs = Observable.just(getFromCache());
    Observable<Data> networkObs = api.fetchData().doOnNext(this::saveToCache);cacheObs.concatWith(networkObs).firstElement() // 只取第一个发射的数据.subscribe(data -> updateUI(data));

    或者更清晰的写法

    Observable.concat(getFromCacheAsync(), fetchFromNetworkAsync()).firstElement().subscribe(data -> updateUI(data));

六、错误处理

(一)基础错误捕获

  • onErrorReturn():错误时返回默认值(如网络失败返回空列表)。

  • onErrorResumeNext():错误时切换备用 Observable(如主接口失败调用备用接口)。


(二)重试机制

  • retry(n):失败后重试 n 次(如网络请求失败重试 2 次)。

  • retryWhen():自定义重试逻辑(如根据错误类型决定是否重试)。


(三)全局错误处理

  • RxJavaPlugins.setErrorHandler():设置全局错误处理器(捕获未被处理的异常)。

七、高级特性

(一)背压(Backpressure)与 Flowable

  • 解决生产者(Observable)发射速度远快于消费者(Observer)的问题。

  • 使用 Flowable替代 Observable,配合 BackpressureStrategy(如 BUFFER缓存、DROP丢弃)。


(二)Subject 的应用

  • PublishSubject:仅向后续观察者发射订阅后的数据(如实时通知事件)。

  • BehaviorSubject:向观察者发射最近一次的数据(如保存用户登录状态)。

  • ReplaySubject:缓存所有数据,新观察者接收全部历史数据(如日志回放)。


八、性能优化与注意事项

(一)避免内存泄漏

  • 及时取消订阅:通过 CompositeDisposable管理所有订阅,在 onDestroy()中调用 clear()

  • 避免长生命周期 Observable 持有短生命周期对象(如 Activity)。


(二)合理选择操作符

  • 避免嵌套过深的 flatMap(影响可读性和性能)。

  • 优先使用无副作用的纯函数操作符(如 map而非修改外部变量)。


(三)调试技巧

  • 使用 doOnNext()/doOnError()打印日志(跟踪数据流状态)。

  • 开启 RxJava 调试模式:RxJavaPlugins.setInitIoSchedulerHandler(scheduler -> Schedulers.trampoline())(避免异步导致调试困难)。


九、总结

  • RxJava 核心优势:异步操作的链式调用、逻辑扁平化、强大的操作符组合能力。

  • 关键注意点:线程调度、生命周期管理、错误处理、背压问题。

http://www.dtcms.com/a/602207.html

相关文章:

  • mobilenet v4 导出onnx onnx推理
  • 网站服务器如何维护网络广告案例
  • 奢侈品网站建设中车网站建设的优缺点
  • C/C++ Linux网络编程2 - Socket编程与简单UDP服务器客户端
  • 工业场景漏油硬件检测方法及原理
  • 工业设计就业网站在线优化工具
  • 瑞美吉泮Rimegepant说明书深度解析:用法用量,真实世界疗效
  • 做电影网站主机放哪比较好网站页面设计服务
  • 每日两题day41
  • 网站设计与开发期末考试题建筑模拟器2022下载
  • 访问日志查询功能
  • vite创建vue2项目
  • 【MATLAB例程】二维平面的TOA定位,几何精度因子GDOP和克拉美罗下界CRLB计算与输出
  • 怎么创一个网站赚钱免费入驻的外贸平台
  • 云边云科技SD-WAN解决方案 — 构建安全、高效、智能的云网基石
  • 20251112给荣品RD-RK3588开发板跑Rockchip的原厂Android13系统时适配AP6275P模块的BT蓝牙部分【使用原厂的DTS】
  • MyBatis 专题深度细化解析
  • a做爰视频免费观费网站asp网站如何迁移
  • 网站推广平台wordpress怎么加属性
  • 文创做的好的网站推荐微信公众号属于网站建设
  • 1. Cockpit 管理服务器;2. Linux 软件包管理
  • 【剑斩OFFER】算法的暴力美学——山脉数组的蜂顶索引
  • 关键词挖掘工具有哪些兰州seo优化
  • LeetCode 热题 100——哈希——最长连续序列
  • c语言反编译软件|详细解析c语言反编译工具的使用及其重要性
  • 模板网站更改青海制作网站的公司
  • 牛客:栈的压入、弹出序列
  • 深入解析UDP服务器核心开发机制
  • 阜阳做网站的公司网站开发前端跟后端的区别
  • MongoDB知识点与技巧总结