安卓进阶——RxJava
✅作者简介:大家好,我是 Meteors., 向往着更加简洁高效的代码写法与编程方式,持续分享Java技术内容。
🍎个人主页:Meteors.的博客
💞当前专栏:知识分享
✨特色专栏:知识分享
🥭本文内容:安卓进阶——RxJava
📚 ** ps ** :阅读文章如果有问题或者疑惑,欢迎在评论区提问或指出。
目录
一、介绍
二、RxJava 基础概念
(一)核心角色
1.Observable(被观察者)
2. Observer(观察者)
3.Subscription(订阅关系)
4. Subject(主题)
(二)订阅流程
1. 创建被观察者
2. 创建观察者
3. 进行订阅
三、核心操作符详解
(一)创建型操作符
(二)转换型操作符
(三)过滤型操作符
(四)组合型操作符
四、 线程调度
五、实际应用场景
(一)网络请求链式调用
(二)事件防抖与限流
(三)生命周期管理
(四)响应式 UI 交互
(五)数据缓存与同步
六、错误处理
(一)基础错误捕获
(二)重试机制
(三)全局错误处理
七、高级特性
(一)背压(Backpressure)与 Flowable
(二)Subject 的应用
八、性能优化与注意事项
(一)避免内存泄漏
(二)合理选择操作符
(三)调试技巧
九、总结
一、介绍
RxJava是Netflix公司基于Java虚拟机开发的响应式编程框架,隶属于ReactiveX开源项目,旨在简化异步编程与事件驱动程序的开发流程。该框架通过扩展观察者模式,支持以可观察对象组合处理数据流与事件序列,并提供声明式操作符实现逻辑编排。
其核心设计通过封装线程管理、同步机制及非阻塞I/O等底层细节,降低多线程并发编程复杂度,使开发者能够专注于业务逻辑实现。框架采用函数式风格提供丰富的流处理操作符,可灵活构建异步任务链并处理背压等场景。
二、RxJava 基础概念
(一)核心角色
1.Observable(被观察者)
作用:数据的发射源,负责产生并发送事件(数据)流。
特点:可以发出多个数据项(onNext),也可以在发生错误时终止(onError),或者正常结束时调用(onComplete)。
常见创建方式:
Observable.create():手动控制数据发射,最灵活但复杂。
Observable.just(T...):快速创建发射固定数量数据的 Observable。
Observable.fromIterable()/Observable.fromArray():从集合或数组创建。
Observable.interval()/Observable.timer():用于定时任务。示例
Observable<String> observable = Observable.just("A", "B", "C");2. Observer(观察者)
作用:接收 Observable 发出的数据,并对其进行处理(如展示、存储等)。
包含的回调方法:
onSubscribe(Disposable d):订阅建立时调用,可用来保存 Disposable 以便后续取消订阅。
onNext(T t):每次接收到一个数据项时调用。
onError(Throwable e):发生错误时调用,之后不会再收到数据。
onComplete():数据流正常结束时调用,之后也不会再有数据。示例
Observer<String> observer = new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {// 可在此保存 d,用于取消订阅}@Overridepublic void onNext(String s) {System.out.println("接收到数据: " + s);}@Overridepublic void onError(Throwable e) {System.err.println("发生错误: " + e.getMessage());}@Overridepublic void onComplete() {System.out.println("数据流结束");} };3.Subscription(订阅关系)
作用:表示 Observable 和 Observer 之间的订阅关系,是两者之间的桥梁。
在 RxJava 1.x 中叫做 Subscription,而在 RxJava 2.x 及以后,这个角色主要由 Disposable 承担。
主要功能:
用于取消订阅(dispose()),防止内存泄漏或继续接收不需要的事件。
管理生命周期,控制何时停止接收数据。
示例
Disposable disposable = observable.subscribe(observer); // 取消订阅 disposable.dispose();4. Subject(主题)
作用:既是 Observable(可发射数据)又是 Observer(可接收数据),起到桥梁或代理的作用。
常见实现类:
PublishSubject:只发送订阅后接收到的数据。
BehaviorSubject:发送订阅前最后一个数据及之后的数据。
ReplaySubject:发送所有历史数据给新订阅者。
AsyncSubject:只在 onComplete 时发送最后一个数据。
用途举例:多播(一个数据源多个观察者)、事件总线等场景。
示例
PublishSubject<String> subject = PublishSubject.create(); subject.subscribe(data -> System.out.println("观察者1: " + data)); subject.onNext("Hello"); subject.subscribe(data -> System.out.println("观察者2: " + data)); subject.onNext("World");
(二)订阅流程
1. 创建被观察者
使用工厂方法创建一个能发射数据的对象,比如:
Observable<String> observable = Observable.just("数据1", "数据2");也可以使用更灵活的
create()方法:Observable.create(emitter -> {emitter.onNext("A");emitter.onNext("B");emitter.onComplete(); });2. 创建观察者
定义好对数据的处理逻辑,包括接收数据、处理错误和完成通知。
3. 进行订阅
通过调用
observable.subscribe(observer)将二者关联起来,从而触发数据流的流动。observable.subscribe(data -> System.out.println("接收到: " + data), // onNexterror -> System.err.println("出错: " + error), // onError() -> System.out.println("完成") // onComplete );
三、核心操作符详解
(一)创建型操作符
just():直接传入固定值发射(如just(1,2,3))。
fromIterable()/fromArray():从集合或数组发射数据。
create():自定义数据发射逻辑(需手动调用onNext()/onError()/onComplete())。
defer():延迟创建 Observable,直到订阅时才生成数据(避免缓存旧数据)。
interval():定时发射递增整数(如每隔 1s 发射 0,1,2...)。
timer():延迟指定时间后发射单个数据(如 2s 后发射 0)。
(二)转换型操作符
map():一对一数据转换(如将字符串转为用户对象)。
flatMap():一对多转换,结果扁平化为单个流(如用户 ID 转用户详情列表)。
concatMap():保证顺序的flatMap(前一个请求完成后才处理下一个)。
switchMap():只保留最新请求,取消旧请求(如搜索框输入时只取最后一次查询)。
scan():累积计算(如累加数据流中的数值)。
(三)过滤型操作符
filter():按条件过滤数据(如只保留偶数)。
take(n):仅接收前 n 个数据(如只取前 3 次点击事件)。
skip(n):跳过前 n 个数据(如跳过初始加载的 2 条旧数据)。
distinct():去重(如避免重复的用户 ID)。
debounce(time, unit):防抖(如搜索框输入后延迟 300ms 再请求)。
(四)组合型操作符
merge():合并多个流(无序,谁先发射谁先处理)。
zip():按顺序配对多个流的数据(如用户信息 + 头像 URL 配对)。
concat():顺序合并多个流(前一个流完成后才处理下一个)。
amb():取最先发射数据的流(如多个请求中取最快返回的)。
四、 线程调度
(一)常用调度器
Schedulers.io():IO 密集型操作(如网络请求、文件读写)。
Schedulers.computation():CPU 密集型计算(如数据计算、图像处理)。
AndroidSchedulers.mainThread():安卓主线程(更新 UI)。
Schedulers.newThread():创建新线程(不推荐,易导致线程混乱)。
(二)调度控制
subscribeOn():指定 Observable 发射数据的线程(仅影响上游)。
observeOn():指定 Observer 处理数据的线程(影响下游所有操作)。示例:IO 线程请求,主线程更新 UI。
api.getUser().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
五、实际应用场景
(一)网络请求链式调用
串行请求:用户登录 → 获取个人信息 → 加载偏好设置(
flatMap串联)。api.login(username, password).flatMap(loginResponse -> api.getUserInfo(loginResponse.token)).flatMap(userInfo -> api.getPreferences(userInfo.userId)).subscribe(preferences -> {// 处理最终偏好设置}, throwable -> {// 处理错误});并行请求:同时获取用户信息和好友列表(
zip或merge组合)。Observable<User> userObs = api.getUser(userId); Observable<List<Friend>> friendObs = api.getFriends(userId);Observable.zip(userObs, friendObs, (user, friends) -> {return new UserProfile(user, friends); }).subscribe(profile -> {// 同时拿到用户和好友信息 });
(二)事件防抖与限流
搜索框输入:
debounce(300, TimeUnit.MILLISECONDS)避免频繁请求。RxTextView.textChanges(searchEditText).debounce(300, TimeUnit.MILLISECONDS).switchMap(text -> api.search(text.toString())).subscribe(results -> showResults(results));按钮点击:
throttleFirst(1, TimeUnit.SECONDS)限制 1 秒内只响应一次点击。RxView.clicks(button).throttleFirst(1, TimeUnit.SECONDS).subscribe(aVoid -> performAction());
(三)生命周期管理
结合
RxLifecycle或AutoDispose自动取消订阅(避免 Activity/Fragment 销毁后内存泄漏)。CompositeDisposable compositeDisposable = new CompositeDisposable();compositeDisposable.add(api.getData().subscribe(data -> updateUI(data)) );// 在 onDestroy() 中 compositeDisposable.clear(); // 或 dispose()示例:
compositeDisposable.add(observable.subscribe(...))手动管理订阅。api.getData().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).`as`(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this))).subscribe { data -> updateUI(data) }
(四)响应式 UI 交互
监听控件状态:
RxView.clicks(button)转换为可观察的点击事件流。RxView.clicks(button).subscribe(aVoid -> doSomething());输入框内容监听:
RxTextView.textChanges(editText)监听文本变化。RxTextView.textChanges(editText).skipInitialValue() // 忽略初始空值.debounce(500, TimeUnit.MILLISECONDS).subscribe(charSequence -> {// 处理输入内容});
(五)数据缓存与同步
- 内存缓存 + 网络请求:优先读取缓存,缓存不存在时请求网络(
concat优先本地,switchIfEmpty切网络)。Observable<Data> cacheObs = Observable.just(getFromCache()); Observable<Data> networkObs = api.fetchData().doOnNext(this::saveToCache);cacheObs.concatWith(networkObs).firstElement() // 只取第一个发射的数据.subscribe(data -> updateUI(data));或者更清晰的写法
Observable.concat(getFromCacheAsync(), fetchFromNetworkAsync()).firstElement().subscribe(data -> updateUI(data));
六、错误处理
(一)基础错误捕获
onErrorReturn():错误时返回默认值(如网络失败返回空列表)。
onErrorResumeNext():错误时切换备用 Observable(如主接口失败调用备用接口)。
(二)重试机制
retry(n):失败后重试 n 次(如网络请求失败重试 2 次)。
retryWhen():自定义重试逻辑(如根据错误类型决定是否重试)。
(三)全局错误处理
RxJavaPlugins.setErrorHandler():设置全局错误处理器(捕获未被处理的异常)。
七、高级特性
(一)背压(Backpressure)与 Flowable
解决生产者(Observable)发射速度远快于消费者(Observer)的问题。
使用
Flowable替代Observable,配合BackpressureStrategy(如BUFFER缓存、DROP丢弃)。
(二)Subject 的应用
PublishSubject:仅向后续观察者发射订阅后的数据(如实时通知事件)。
BehaviorSubject:向观察者发射最近一次的数据(如保存用户登录状态)。
ReplaySubject:缓存所有数据,新观察者接收全部历史数据(如日志回放)。
八、性能优化与注意事项
(一)避免内存泄漏
及时取消订阅:通过
CompositeDisposable管理所有订阅,在onDestroy()中调用clear()。避免长生命周期 Observable 持有短生命周期对象(如 Activity)。
(二)合理选择操作符
避免嵌套过深的
flatMap(影响可读性和性能)。优先使用无副作用的纯函数操作符(如
map而非修改外部变量)。
(三)调试技巧
使用
doOnNext()/doOnError()打印日志(跟踪数据流状态)。开启 RxJava 调试模式:
RxJavaPlugins.setInitIoSchedulerHandler(scheduler -> Schedulers.trampoline())(避免异步导致调试困难)。
九、总结
RxJava 核心优势:异步操作的链式调用、逻辑扁平化、强大的操作符组合能力。
关键注意点:线程调度、生命周期管理、错误处理、背压问题。
