当前位置: 首页 > news >正文

【Java】RxJava解析

一,概述

RxJava 是 ReactiveX(响应式扩展)在 Java 语言上的实现,它是一种基于异步数据流的编程模式。响应式编程的核心理念是以数据为中心,强调数据的流动与变化。通过 RxJava,开发者可以基于观察者模式实现异步编程和事件驱动操作,从而快速响应外部事件。

响应式编程可以将数据流视为一系列按时间排序的事件流,这些流可以被观测、过滤、操作或组合,RxJava 提供了一组丰富的工具和操作符来支持这种编程范式。

此框架采用了观察者模式、责任链模式、装饰器模式、策略模式,实现数据流操作。本文,笔者以几个核心例子说明此框架用途及其实现原理。

以下是笔者浅析的类图,仅供参考

二,实例

1,Observable.create

1,通过create方法传入一个ObservableOnSubscribe接口的实现,返回一个Observable<T>实例。

重写ObservableOnSubscribe#subscribe方法,此方法即触发订阅的源头。

2,当返回的Observable开始subscribe时,通过封装Observer的ObservableEmiter对象,回调onNext或onError或onComplete方法,实现数据流订阅

3,callback此流,

2,Observable.just

从1可知,create方法即数据源创建的核心,此框架封装的just,fromIterable等方法均是对create的封装,

3,filter、map等操作符

类同Stream,filter等操作费同样使用责任链模式实现,详见三。

4,interval

interval返回一个定时Observable,subscribe在一个守护线程中执行,

1,设置1s一次定时触发

2,解除主线程的挂起

3,挂起主线程

输出如下

5,线程切换

通过subScribeOn和observeOn设定Scheduler,可以指定subscribe和observer所执行线程,

三,原理

几乎Rxjava所有操作,均是实现Observable,在子类中实现特定操作,属于策略模式的典型,以下策略参考

1,Observab.create

采用ObserableCreate策略

传入ObservableOnSubscribe接口实现类,被封装至ObservableCreate中,此为装饰器模式的典型实现,重点看下ObservableCreate

1,继承Observable,而Observable即本框架的第一门面API,其内部封装了很多工厂方法,

Observable实现ObservableSource接口,重写subscribe方法,

创建模版方法subscribeActual并在subscribe调用,因此ObservableCreate开始订阅。

回到ObservarCreate#subscribeActual

以上可知,CreateEmiter是对Observer的封装,当此被观察者已经disposed时,则不会触发onNext。

2,Observable.just

采用ObserableFromArray策略

just是对array的封装,跟进,

这里返回ObservableFromArray,是对Observable的子类,

这里就将传入的array遍历迭代,不赘述

3,filter、map等操作符

采用ObservableFilter等操作符策略

以filter和map为例,filter是Observable方法,会将自己this传入到下一个Observable,如下,

1,基础AbstractObserableWithUpstream,表示责任链

2,上一个Observable作为source传入,

3,保存过滤操作predicate

跟进subscribeActual

通过filter操作,在onNext会吊钟判断是否触发下一个Observable的onNext,downstream即下一个Observer,

同理,看下map

从以上可知,Rxjava是通过将每个item依次发送给各个Observable,最后在subscribe接收到结果,触发onNext回调,而非stream那样先在每个操作费完成操作,再将数据流向下一个操作。

4,interval

Schedulers默认传入computation

策略实现是ObserableInterval,

封装了一个IntervalObserver,作为is传入schedulePeriodicallyDirect方法,IntervalObserver实现了Runnable接口,跟进run方法

只要没有disposed,触发下游onNext,count自增一次,默认从0开始自增,

因此,定时调度逻辑落在了Scheduler.schedulePeriodicallyDirect方法,此处就不继续跟进了,感选取的读者自行了解。

5,线程切换

重点看下subscribeOn和observeOn方法

将上一个Stream的Observable作为source参数传入,其重写了subscribeActual方法,稍许不一样,

保存传入的scheduler,并且通过schedule开启一个线程,SubscribeTask中通过source在指定线程中再次触发subscribe,这个source就是ObservableSubscribeOn上游的一个Observable。

因此,以上即完成subscribe切换线程逻辑,那么对于下游的Observable呢?很简单,直接调用即可,

以上,上游的observer在指定线程中执行到subscribeOn时,直接通过onNext调用到下游downstream.onNext即可,其它callback类似。

接下来看下observeOn,即观察者回调线程设定。

ObserverableOnbserveOn封装了一层observer,并且使用上游Observable触发subscribe,封装的Observer保存了一个Scheduler.Work,即指定的线程环境,

看下onNext回调实现,

将上游传递的值t保存进队列,供切线程后从队列中获取值,

随后调用schedule方法切线程

ObserveOnObserver实现了Runnable接口,看下实现,

跟进drainNormal

1,通过队列获取到上游保存的Value

2,拿到下游downstream,

3,从队列中取值,并触发下游Observer#onNext等回调,

于是乎,读者思考下以下问题,当map在observerOn方法后执行,那么map在哪个线程环境呢?

从上述可知,已经切换至observer的线程环境了,

因此,onberveOn一般放在subscribe前面调用。

相关文章:

  • 学习STC51单片机28(芯片为STC89C52RCRC)
  • 深入浅出玩转物联网时间同步:基于BC260Y的NTP实验与嵌入式仿真教学革命
  • [原创](现代Delphi 12指南):[macOS 64bit App开发]: TTask创建多线程, 更简单, 更快捷.
  • 5.Declare_Query_Checking.ipynb
  • 以光量子为例,详解量子获取方式
  • 【Redis】笔记|第9节|Redis Stack扩展功能
  • 《图解技术体系》How Redis Architecture Evolves?
  • k8s业务程序联调工具-KtConnect
  • 【安全攻防与漏洞】​​量子计算对HTTPS的威胁:后量子密码学进展
  • nvidia系列教程-Usb otg模式修改为host模式
  • proteus8安装教程
  • 操作docker容器
  • 如何利用Facebook优化TikTok的跨境商品推广效果
  • 在Facebook平台有效结合TikTok跨境营销的方法
  • MVCC机制:Undo Log版本链与ReadView机制
  • 微服务网关SpringCloudGateway+SaToken鉴权
  • 10. vue pinia 和react redux、jotai对比
  • OCR助力保险业建设
  • YOLO目标检测模型交互式UI设计与实现
  • Spring Boot论文翻译防丢失 From船长cap
  • 设计师去哪个网站找工作/百度公司高管排名
  • 淄博桓台网站建设公司/站长之家综合查询工具
  • 安徽网站建/泰州百度seo公司
  • 百度公司做网站可靠吗/seo扣费系统源码
  • 徐州市鼓楼区建设局网站/百度论坛首页官网
  • 网站投放广告多少钱/bt种子磁力搜索引擎