网站设计科技有限公司百度关键词搜索量排名
RxJava 工具类封装,增加了更多实用功能,如 重试机制、缓存支持、全局错误处理、线程池配置 等,同时代码更加简洁和易于扩展。
- RxJava 工具类封装:
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.schedulers.Schedulers;import java.io.IOException;
import java.util.concurrent.TimeUnit;public class RxJavaUtils {private static final int RETRY_COUNT = 3; // 默认重试次数private static final int RETRY_DELAY = 2; // 默认重试延迟(秒)// 线程切换:IO 线程执行,主线程观察public static <T> void async(Observable<T> observable, Observer<T> observer) {observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(observer);}// 线程切换:IO 线程执行,主线程观察(简化版)public static <T> void async(Observable<T> observable, Consumer<T> onNext, Consumer<Throwable> onError) {observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError);}// 绑定生命周期,防止内存泄漏public static <T> void bindLifecycle(Observable<T> observable, CompositeDisposable compositeDisposable, Observer<T> observer) {Disposable disposable = observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new Observer<T>() {@Overridepublic void onSubscribe(Disposable d) {compositeDisposable.add(d);}@Overridepublic void onNext(T t) {observer.onNext(t);}@Overridepublic void onError(Throwable e) {observer.onError(e);}@Overridepublic void onComplete() {observer.onComplete();}});}// 错误处理:统一处理网络请求错误public static void handleError(Throwable throwable) {if (throwable instanceof IOException) {// 网络错误System.out.println("Network error: " + throwable.getMessage());} else {// 其他错误System.out.println("Unknown error: " + throwable.getMessage());}}// 支持重试机制public static <T> Observable<T> withRetry(Observable<T> observable) {return observable.retryWhen(errors -> errors.zipWith(Observable.range(1, RETRY_COUNT + 1), (throwable, retryCount) -> {if (retryCount > RETRY_COUNT) {throw new RuntimeException("Retry limit exceeded", throwable);}return retryCount;}).flatMap(retryCount -> Observable.timer(RETRY_DELAY, TimeUnit.SECONDS)));}// 支持缓存public static <T> Observable<T> withCache(Observable<T> observable, T cacheData) {return observable.onErrorResumeNext(throwable -> {if (cacheData != null) {return Observable.just(cacheData);} else {return Observable.error(throwable);}});}// 自定义线程池public static <T> Observable<T> withCustomScheduler(Observable<T> observable) {return observable.subscribeOn(Schedulers.from(ThreadPoolUtils.getExecutor())).observeOn(AndroidSchedulers.mainThread());}
}
- 线程池工具类 (ThreadPoolUtils)
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;public class ThreadPoolUtils {private static final int CORE_POOL_SIZE = 4; // 核心线程数private static final int MAX_POOL_SIZE = 8; // 最大线程数private static final long KEEP_ALIVE_TIME = 60L; // 线程空闲时间(秒)private static ThreadPoolExecutor executor;static {executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(CORE_POOL_SIZE);executor.setMaximumPoolSize(MAX_POOL_SIZE);executor.setKeepAliveTime(KEEP_ALIVE_TIME, TimeUnit.SECONDS);}// 获取线程池public static Executor getExecutor() {return executor;}
}
- 使用示例
3.1 基本使用
// 模拟网络请求
Observable<String> observable = Observable.just("Hello, RxJava!");// 使用工具类简化线程切换
RxJavaUtils.async(observable, new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {// 订阅时调用}@Overridepublic void onNext(String s) {// 处理结果System.out.println(s);}@Overridepublic void onError(Throwable e) {// 处理错误RxJavaUtils.handleError(e);}@Overridepublic void onComplete() {// 完成时调用}
});
3.2 支持重试机制
// 模拟网络请求(可能失败)
Observable<String> observable = Observable.error(new IOException("Network error"));// 使用重试机制
RxJavaUtils.async(RxJavaUtils.withRetry(observable),s -> System.out.println(s), // onNextthrowable -> RxJavaUtils.handleError(throwable) // onError
);
3.3 支持缓存
// 模拟网络请求(可能失败)
Observable<String> observable = Observable.error(new IOException("Network error"));// 使用缓存
RxJavaUtils.async(RxJavaUtils.withCache(observable, "Cached data"),s -> System.out.println(s), // onNextthrowable -> RxJavaUtils.handleError(throwable) // onError
);
3.4 自定义线程池
// 模拟网络请求
Observable<String> observable = Observable.just("Hello, RxJava!");// 使用自定义线程池
RxJavaUtils.async(RxJavaUtils.withCustomScheduler(observable),s -> System.out.println(s), // onNextthrowable -> RxJavaUtils.handleError(throwable) // onError
);
- 功能扩展
4.1 支持 Retrofit 网络请求
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava3.RxJava3CallAdapterFactory;
import retrofit2.converter.gson.GsonConverterFactory;public class RetrofitUtils {private static Retrofit retrofit;// 初始化 Retrofitpublic static Retrofit getRetrofit(String baseUrl) {if (retrofit == null) {retrofit = new Retrofit.Builder().baseUrl(baseUrl).addConverterFactory(GsonConverterFactory.create()).addCallAdapterFactory(RxJava3CallAdapterFactory.create()).build();}return retrofit;}
}
4.2 结合 Retrofit 使用
// 定义 API 接口
public interface ApiService {@GET("user/{id}")Observable<User> getUser(@Path("id") int id);
}// 使用 Retrofit 和 RxJava 工具类
Retrofit retrofit = RetrofitUtils.getRetrofit("https://api.example.com/");
ApiService apiService = retrofit.create(ApiService.class);RxJavaUtils.async(RxJavaUtils.withRetry(apiService.getUser(1)),user -> System.out.println(user.getName()), // onNextthrowable -> RxJavaUtils.handleError(throwable) // onError
);
- 总结
优化后的 RxJava 工具类具有以下特点:
线程切换:简化异步操作。
重试机制:支持网络请求失败后自动重试。
缓存支持:在网络请求失败时返回缓存数据。
自定义线程池:灵活配置线程池。
生命周期绑定:防止内存泄漏。
全局错误处理:统一处理错误逻辑。
通过封装这些功能,可以显著提升代码的可读性、可维护性和健壮性。建议将工具类封装到一个独立的模块中,方便在项目中复用。