当前位置: 首页 > news >正文

RxJava 用法封装举例

RxJava 工具类封装,增加了更多实用功能,如 重试机制、缓存支持、全局错误处理、线程池配置 等,同时代码更加简洁和易于扩展。

  1. RxJava 工具类封装:
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.schedulers.Schedulers;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class RxJavaUtils {

    private static final int RETRY_COUNT = 3; // 默认重试次数
    private static final int RETRY_DELAY = 2; // 默认重试延迟(秒)

    // 线程切换:IO 线程执行,主线程观察
    public static <T> void async(Observable<T> observable, Observer<T> observer) {
        observable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(observer);
    }

    // 线程切换:IO 线程执行,主线程观察(简化版)
    public static <T> void async(Observable<T> observable, Consumer<T> onNext, Consumer<Throwable> onError) {
        observable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(onNext, onError);
    }

    // 绑定生命周期,防止内存泄漏
    public static <T> void bindLifecycle(Observable<T> observable, CompositeDisposable compositeDisposable, Observer<T> observer) {
        Disposable disposable = observable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new Observer<T>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        compositeDisposable.add(d);
                    }

                    @Override
                    public void onNext(T t) {
                        observer.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        observer.onError(e);
                    }

                    @Override
                    public void onComplete() {
                        observer.onComplete();
                    }
                });
    }

    // 错误处理:统一处理网络请求错误
    public static void handleError(Throwable throwable) {
        if (throwable instanceof IOException) {
            // 网络错误
            System.out.println("Network error: " + throwable.getMessage());
        } else {
            // 其他错误
            System.out.println("Unknown error: " + throwable.getMessage());
        }
    }

    // 支持重试机制
    public static <T> Observable<T> withRetry(Observable<T> observable) {
        return observable.retryWhen(errors -> errors
                .zipWith(Observable.range(1, RETRY_COUNT + 1), (throwable, retryCount) -> {
                    if (retryCount > RETRY_COUNT) {
                        throw new RuntimeException("Retry limit exceeded", throwable);
                    }
                    return retryCount;
                })
                .flatMap(retryCount -> Observable.timer(RETRY_DELAY, TimeUnit.SECONDS)));
    }

    // 支持缓存
    public static <T> Observable<T> withCache(Observable<T> observable, T cacheData) {
        return observable.onErrorResumeNext(throwable -> {
            if (cacheData != null) {
                return Observable.just(cacheData);
            } else {
                return Observable.error(throwable);
            }
        });
    }

    // 自定义线程池
    public static <T> Observable<T> withCustomScheduler(Observable<T> observable) {
        return observable.subscribeOn(Schedulers.from(ThreadPoolUtils.getExecutor()))
                .observeOn(AndroidSchedulers.mainThread());
    }
}
  1. 线程池工具类 (ThreadPoolUtils)
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class ThreadPoolUtils {

    private static final int CORE_POOL_SIZE = 4; // 核心线程数
    private static final int MAX_POOL_SIZE = 8; // 最大线程数
    private static final long KEEP_ALIVE_TIME = 60L; // 线程空闲时间(秒)

    private static ThreadPoolExecutor executor;

    static {
        executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(CORE_POOL_SIZE);
        executor.setMaximumPoolSize(MAX_POOL_SIZE);
        executor.setKeepAliveTime(KEEP_ALIVE_TIME, TimeUnit.SECONDS);
    }

    // 获取线程池
    public static Executor getExecutor() {
        return executor;
    }
}
  1. 使用示例
    3.1 基本使用
// 模拟网络请求
Observable<String> observable = Observable.just("Hello, RxJava!");

// 使用工具类简化线程切换
RxJavaUtils.async(observable, new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        // 订阅时调用
    }

    @Override
    public void onNext(String s) {
        // 处理结果
        System.out.println(s);
    }

    @Override
    public void onError(Throwable e) {
        // 处理错误
        RxJavaUtils.handleError(e);
    }

    @Override
    public void onComplete() {
        // 完成时调用
    }
});

3.2 支持重试机制

// 模拟网络请求(可能失败)
Observable<String> observable = Observable.error(new IOException("Network error"));

// 使用重试机制
RxJavaUtils.async(RxJavaUtils.withRetry(observable),
        s -> System.out.println(s), // onNext
        throwable -> RxJavaUtils.handleError(throwable) // onError
);

3.3 支持缓存

// 模拟网络请求(可能失败)
Observable<String> observable = Observable.error(new IOException("Network error"));

// 使用缓存
RxJavaUtils.async(RxJavaUtils.withCache(observable, "Cached data"),
        s -> System.out.println(s), // onNext
        throwable -> RxJavaUtils.handleError(throwable) // onError
);

3.4 自定义线程池

// 模拟网络请求
Observable<String> observable = Observable.just("Hello, RxJava!");

// 使用自定义线程池
RxJavaUtils.async(RxJavaUtils.withCustomScheduler(observable),
        s -> System.out.println(s), // onNext
        throwable -> RxJavaUtils.handleError(throwable) // onError
);
  1. 功能扩展
    4.1 支持 Retrofit 网络请求
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava3.RxJava3CallAdapterFactory;
import retrofit2.converter.gson.GsonConverterFactory;

public class RetrofitUtils {

    private static Retrofit retrofit;

    // 初始化 Retrofit
    public static Retrofit getRetrofit(String baseUrl) {
        if (retrofit == null) {
            retrofit = new Retrofit.Builder()
                    .baseUrl(baseUrl)
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJava3CallAdapterFactory.create())
                    .build();
        }
        return retrofit;
    }
}

4.2 结合 Retrofit 使用

// 定义 API 接口
public interface ApiService {
    @GET("user/{id}")
    Observable<User> getUser(@Path("id") int id);
}

// 使用 Retrofit 和 RxJava 工具类
Retrofit retrofit = RetrofitUtils.getRetrofit("https://api.example.com/");
ApiService apiService = retrofit.create(ApiService.class);

RxJavaUtils.async(RxJavaUtils.withRetry(apiService.getUser(1)),
        user -> System.out.println(user.getName()), // onNext
        throwable -> RxJavaUtils.handleError(throwable) // onError
);
  1. 总结
    优化后的 RxJava 工具类具有以下特点:

线程切换:简化异步操作。

重试机制:支持网络请求失败后自动重试。

缓存支持:在网络请求失败时返回缓存数据。

自定义线程池:灵活配置线程池。

生命周期绑定:防止内存泄漏。

全局错误处理:统一处理错误逻辑。

通过封装这些功能,可以显著提升代码的可读性、可维护性和健壮性。建议将工具类封装到一个独立的模块中,方便在项目中复用。

相关文章:

  • 初中文凭怎么成人大专-一种简单省心的方式
  • Gauss数据库omm用户无法连接处理
  • 写作思维魔方
  • 下载PyCharm 2024.3.4 (Community Edition)来开发测试python
  • 多线程或多进程或多协程部署flask服务
  • 网络安全等级保护2.0 vs GDPR vs NIST 2.0:全方位对比解析
  • linux0.11源码分析第四弹——操作系统的框架代码
  • 类和对象—多态—案例2—制作饮品
  • 笔记:如何使用XAML Styler以及在不同的开发环境中使用一致
  • 第7章 wireshark(网络安全防御实战--蓝军武器库)
  • 阿里云 DataWorks面试题集锦及参考答案
  • Visual Studio 2022新建c语言项目的详细步骤
  • 文献学习: 单细胞+肿瘤转移研究的发文思路解析:如何构建核心基因特征,揭示关键调控网络?
  • VB6 调用 JS 函数时数据传输json格式或a=1b=s2字符串
  • 十倍烈火刀刀爆?伪随机分布(PRD)算法详解与C++实现
  • 洛谷P1091
  • 记录排查服务器CPU负载过高
  • 【自学笔记】OpenStack基础知识点总览-持续更新
  • nvidia驱动升级-ubuntu 1804
  • 系统架构设计师—数据库基础篇—关系代数运算
  • wordpress如何修改字体大小/郑州seo服务技术
  • 网站开发项目背景/微商软文范例大全100
  • 域名服务网站建设科技公司/怎么在百度发布免费广告
  • 有哪些中文域名网站/竞价排名营销
  • 网站拥有者查询/关键词优化哪个好
  • 都兰县建设局交通局网站/互联网营销师是哪个部门发证