【Java】RxJava解析
一,概述
RxJava 是 ReactiveX(响应式扩展)在 Java 语言上的实现,它是一种基于异步数据流的编程模式。响应式编程的核心理念是以数据为中心,强调数据的流动与变化。通过 RxJava,开发者可以基于观察者模式实现异步编程和事件驱动操作,从而快速响应外部事件。
响应式编程可以将数据流视为一系列按时间排序的事件流,这些流可以被观测、过滤、操作或组合,RxJava 提供了一组丰富的工具和操作符来支持这种编程范式。
此框架采用了观察者模式、责任链模式、装饰器模式、策略模式,实现数据流操作。本文,笔者以几个核心例子说明此框架用途及其实现原理。
以下是笔者浅析的类图,仅供参考
二,实例
1,Observable.create
1,通过create方法传入一个ObservableOnSubscribe接口的实现,返回一个Observable<T>实例。
重写ObservableOnSubscribe#subscribe方法,此方法即触发订阅的源头。
2,当返回的Observable开始subscribe时,通过封装Observer的ObservableEmiter对象,回调onNext或onError或onComplete方法,实现数据流订阅
3,callback此流,
2,Observable.just
从1可知,create方法即数据源创建的核心,此框架封装的just,fromIterable等方法均是对create的封装,
3,filter、map等操作符
类同Stream,filter等操作费同样使用责任链模式实现,详见三。
4,interval
interval返回一个定时Observable,subscribe在一个守护线程中执行,
1,设置1s一次定时触发
2,解除主线程的挂起
3,挂起主线程
输出如下
5,线程切换
通过subScribeOn和observeOn设定Scheduler,可以指定subscribe和observer所执行线程,
三,原理
几乎Rxjava所有操作,均是实现Observable,在子类中实现特定操作,属于策略模式的典型,以下策略参考
1,Observab.create
采用ObserableCreate策略
传入ObservableOnSubscribe接口实现类,被封装至ObservableCreate中,此为装饰器模式的典型实现,重点看下ObservableCreate,
1,继承Observable,而Observable即本框架的第一门面API,其内部封装了很多工厂方法,
Observable实现ObservableSource接口,重写subscribe方法,
创建模版方法subscribeActual并在subscribe调用,因此ObservableCreate开始订阅。
回到ObservarCreate#subscribeActual
以上可知,CreateEmiter是对Observer的封装,当此被观察者已经disposed时,则不会触发onNext。
2,Observable.just
采用ObserableFromArray策略
just是对array的封装,跟进,
这里返回ObservableFromArray,是对Observable的子类,
这里就将传入的array遍历迭代,不赘述
3,filter、map等操作符
采用ObservableFilter等操作符策略
以filter和map为例,filter是Observable方法,会将自己this传入到下一个Observable,如下,
1,基础AbstractObserableWithUpstream,表示责任链
2,上一个Observable作为source传入,
3,保存过滤操作predicate
跟进subscribeActual
通过filter操作,在onNext会吊钟判断是否触发下一个Observable的onNext,downstream即下一个Observer,
同理,看下map
从以上可知,Rxjava是通过将每个item依次发送给各个Observable,最后在subscribe接收到结果,触发onNext回调,而非stream那样先在每个操作费完成操作,再将数据流向下一个操作。
4,interval
Schedulers默认传入computation
策略实现是ObserableInterval,
封装了一个IntervalObserver,作为is传入schedulePeriodicallyDirect方法,IntervalObserver实现了Runnable接口,跟进run方法
只要没有disposed,触发下游onNext,count自增一次,默认从0开始自增,
因此,定时调度逻辑落在了Scheduler.schedulePeriodicallyDirect方法,此处就不继续跟进了,感选取的读者自行了解。
5,线程切换
重点看下subscribeOn和observeOn方法
将上一个Stream的Observable作为source参数传入,其重写了subscribeActual方法,稍许不一样,
保存传入的scheduler,并且通过schedule开启一个线程,SubscribeTask中通过source在指定线程中再次触发subscribe,这个source就是ObservableSubscribeOn上游的一个Observable。
因此,以上即完成subscribe切换线程逻辑,那么对于下游的Observable呢?很简单,直接调用即可,
以上,上游的observer在指定线程中执行到subscribeOn时,直接通过onNext调用到下游downstream.onNext即可,其它callback类似。
接下来看下observeOn,即观察者回调线程设定。
ObserverableOnbserveOn封装了一层observer,并且使用上游Observable触发subscribe,封装的Observer保存了一个Scheduler.Work,即指定的线程环境,
看下onNext回调实现,
将上游传递的值t保存进队列,供切线程后从队列中获取值,
随后调用schedule方法切线程
ObserveOnObserver实现了Runnable接口,看下实现,
跟进drainNormal
1,通过队列获取到上游保存的Value
2,拿到下游downstream,
3,从队列中取值,并触发下游Observer#onNext等回调,
于是乎,读者思考下以下问题,当map在observerOn方法后执行,那么map在哪个线程环境呢?
从上述可知,已经切换至observer的线程环境了,
因此,onberveOn一般放在subscribe前面调用。