当前位置: 首页 > news >正文

Kotlin中RxJava用法

RxJava 是一个基于观察者模式的响应式编程库,广泛用于处理异步事件流。Kotlin 与 RxJava 结合使用可以简化异步编程和事件处理。以下是一些常见的 RxJava 用法示例。
配置文件中引用:

 implementation("io.reactivex.rxjava3:rxjava:3.0.2");
    implementation("io.reactivex.rxjava3:rxandroid:3.0.2") // 如果你在 Android 项目中使用

各种用法代码如下:

fun main() {
    //1.各种创建
     Observable.create<Int> { emitter ->
            for (i in 1..10) {
                emitter.onNext(i)
            }
            emitter.onComplete()

    }.subscribe(
         { item -> println(item) },
          { error -> println(error) },
         { println("complete") }
    )//打印:1 2 3 4 5 6 7 8 9 10 complete 共11行

    //2.just创建
    val observable = Observable.just(1, 2, 3, 4, 5)
    observable.subscribe { println(it) } //打印:1 2 3 4 5 共5行
    //3.fromArray创建
    val observable2 = Observable.fromArray(1, 2, 3, 4, 5)
    observable2.subscribe { println(it) } //打印:1 2 3 4 5 共5行
    //4.range创建
    val observable3 = Observable.range(1, 5)
    observable3.subscribe { println(it) } //打印:1 2 3 4 5 共5行
    //5.interval创建
    val observable4 = Observable.interval(1, TimeUnit.SECONDS)
    observable4.subscribe { println(it) } //每隔1秒打印一次数字
    //6.timer创建
    val observable5 = Observable.timer(1, TimeUnit.SECONDS)
    observable5.subscribe { println(it) } //1秒后打印数字0
    //7.error创建
    val observable6 = Observable.error<Throwable>(RuntimeException("error"))
    observable6.subscribe ({item->println(item)},{e -> println(e)} ) //打印:java.lang.RuntimeException: error
    //8.fromIterable创建
    val list = listOf(1, 2, 3, 4, 5)
    val observable7 = Observable.fromIterable(list)
    observable7.subscribe { println(it) } //打印:1 2 3 4 5 共5行

  //  二.操作符
    //1.map 将发射的数据进行转换。
    val observable8 = Observable.just(1, 2, 3, 4, 5)
    observable8.map { it * 2 }.subscribe { println(it) } //打印:2 4 6 8 10 共5行
    //2.flatMap  将发射的数据进行转换,并且将转换后的数据合并后发射。
    val observable9 = Observable.just(1, 2, 3, 4, 5)
    observable9.flatMap { Observable.just(it * 2) }.subscribe { println(it) } //打印:2 4 6 8 10 共5行
    //3.concatMap 将发射的数据进行转换,并且将转换后的数据合并后发射,但是和flatMap不同的是,concatMap会按照发射的顺序来发射数据。
    val observable10 = Observable.just(1, 2, 3, 4, 5)
    observable10.concatMap { Observable.just(it * 2) }.subscribe { println(it) } //打印:2 4 6 8 10 共5行
    //4.concat 将多个Observable发射的数据按照顺序合并后发射。
    val observable11 = Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
    observable11.subscribe { println(it) } //打印:1 2 3 4 5 6 共6行
    //5.merge 将多个Observable发射的数据合并后发射。
    val observable12 = Observable.merge(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
    observable12.subscribe { println(it) } //打印:1 2 3 4 5 6 共6行
    //6.zip 将多个Observable发射的数据按照顺序合并后发射,并且合并后的数据数量为发射的数据数量最少的Observable。
    val observable13 = Observable.zip(Observable.just(1, 2, 3), Observable.just(4, 5, 6,7)){ a, b -> a + b }
    observable13.subscribe { println(it) } //打印:5 7 9 共3行
    //7.filter 过滤发射的数据,只有满足条件的数据才会被发射。
    val observable14 = Observable.just(1, 2, 3, 4, 5)
    observable14.filter { it % 2 == 0 }.subscribe { println(it) } //打印:2 4 共2行
    //8.take 只发射前n个数据。
    val observable15 = Observable.just(1, 2, 3, 4, 5)
    observable15.take(3).subscribe { println(it) } //打印:1 2 3 共3行
    //9.skip 跳过前n个数据。
    val observable16 = Observable.just(1, 2, 3, 4, 5)
    observable16.skip(3).subscribe { println(it) } //打印:4 5 共2行
    //10.takeWhile 只发射满足条件的数据。
    val observable17 = Observable.just(1, 2, 3, 4, 5)
    observable17.takeWhile { it < 4 }.subscribe { println(it) } //打印:1 2 3 共3行
    //11.skipWhile 跳过满足条件的数据。
    val observable18 = Observable.just(1, 2, 3, 4, 5)
    observable18.skipWhile { it < 4 }.subscribe { println(it) } //打印:4 5 共2行
    //12.takeUntil 只发射不满足条件的数据。
    val observable19 = Observable.just(1, 2, 3, 4, 5)
    observable19.takeUntil { it == 4 }.subscribe { println(it) } //打印:1 2 3 共3行
    //13.takeLast 只发射最后n个数据。
    val observable21 = Observable.just(1, 2, 3, 4, 5)
    observable21.takeLast(3).subscribe { println(it) } //打印:3 4 5 共3行
    //14.distinct 去重,只发射第一次出现的数据。
    val observable22 = Observable.just(1, 2, 3, 4, 5, 1, 2, 3)
    observable22.distinct().subscribe { println(it) } //打印:1 2 3 4 5 共5行
    // 三.线程调度
    //1.subscribeOn 指定Observable执行的线程。
    val observable23 = Observable.just(1, 2, 3, 4, 5)
    observable23.subscribeOn(Schedulers.io()).subscribe { println(it) } //打印:1 2 3 4 5 共5行
    //2.observeOn 指定Observer执行的线程。
    val observable24 = Observable.just(1, 2, 3, 4, 5)
    observable24.observeOn(Schedulers.computation()).subscribe { println(it) } //打印:1 2 3 4 5 共5行

    // 四.错误处理
    Observable.error<Throwable>(RuntimeException("Error occurred"))
        .subscribe(
            { println("OnNext: $it") },
            { println("OnError: ${it.message}") },
            { println("OnComplete") }
        ) //打印:OnError: Error occurred



}

五.处理生命周期

class MyActivity : AppCompatActivity() {

    private val compositeDisposable = CompositeDisposable()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        val disposable = Observable.just(1, 2, 3)
            .subscribe { println(it) }

        compositeDisposable.add(disposable)
    }

    override fun onDestroy() {
        super.onDestroy()
        compositeDisposable.clear() // 取消所有订阅
    }
}

相关文章:

  • SQL 中为什么参数多了not in 比 in 慢多了,怎么优化
  • JavaScript系列05-现代JavaScript新特性
  • .NET10 - 预览版1新功能体验(一)
  • Generalized Sparse Additive Model with Unknown Link Function
  • vue全局注册组件
  • Y3学习打卡
  • 【3-3】springcloud
  • 【每日学点HarmonyOS Next知识】网络请求回调toast问题、Popup问题、禁止弹窗返回、navigation折叠屏不显示返回键、响应式布局
  • Deepseek:物理神经网络PINN入门教程
  • element-push el-date-picker日期时间选择器,禁用可选中的时间 精确到分钟
  • OpenCV计算摄影学(11)色调映射算法类cv::TonemapDrago
  • 【量化策略】网格交易策略
  • 本地安装git
  • Sass基础
  • Django框架下html文件无法格式化的解决方案
  • 初识Qt · Qt的基本认识和基本项目代码解释
  • Firefox缩小标签页高度以及自定义调整
  • PDF文本转曲线轮廓 ​PDF转图片、提取文本和图片
  • 高性能采集服务上线回顾
  • Leetcode 209 长度最小的子数组
  • 新城市志|上海再攻坚,营商环境没有最好只有更好
  • 游客称在网红雪山勒多曼因峰需救援被开价2.8万,康定文旅:封闭整改
  • 央行:中国政府债务扩张仍有可持续性
  • 绿城房地产集团:近半年累计花费20.6亿元购买旗下债券
  • 金融监管总局:近五年民企贷款投放年平均增速比各项贷款平均增速高出1.1个百分点
  • 抗战回望21︱《“良民”日记》:一个“良民”在沦陷区的见闻与感受