RxJava 用法封装举例
RxJava 工具类封装,增加了更多实用功能,如 重试机制、缓存支持、全局错误处理、线程池配置 等,同时代码更加简洁和易于扩展。
- RxJava 工具类封装:
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class RxJavaUtils {
private static final int RETRY_COUNT = 3; // 默认重试次数
private static final int RETRY_DELAY = 2; // 默认重试延迟(秒)
// 线程切换:IO 线程执行,主线程观察
public static <T> void async(Observable<T> observable, Observer<T> observer) {
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
}
// 线程切换:IO 线程执行,主线程观察(简化版)
public static <T> void async(Observable<T> observable, Consumer<T> onNext, Consumer<Throwable> onError) {
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onNext, onError);
}
// 绑定生命周期,防止内存泄漏
public static <T> void bindLifecycle(Observable<T> observable, CompositeDisposable compositeDisposable, Observer<T> observer) {
Disposable disposable = observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new Observer<T>() {
@Override
public void onSubscribe(Disposable d) {
compositeDisposable.add(d);
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
});
}
// 错误处理:统一处理网络请求错误
public static void handleError(Throwable throwable) {
if (throwable instanceof IOException) {
// 网络错误
System.out.println("Network error: " + throwable.getMessage());
} else {
// 其他错误
System.out.println("Unknown error: " + throwable.getMessage());
}
}
// 支持重试机制
public static <T> Observable<T> withRetry(Observable<T> observable) {
return observable.retryWhen(errors -> errors
.zipWith(Observable.range(1, RETRY_COUNT + 1), (throwable, retryCount) -> {
if (retryCount > RETRY_COUNT) {
throw new RuntimeException("Retry limit exceeded", throwable);
}
return retryCount;
})
.flatMap(retryCount -> Observable.timer(RETRY_DELAY, TimeUnit.SECONDS)));
}
// 支持缓存
public static <T> Observable<T> withCache(Observable<T> observable, T cacheData) {
return observable.onErrorResumeNext(throwable -> {
if (cacheData != null) {
return Observable.just(cacheData);
} else {
return Observable.error(throwable);
}
});
}
// 自定义线程池
public static <T> Observable<T> withCustomScheduler(Observable<T> observable) {
return observable.subscribeOn(Schedulers.from(ThreadPoolUtils.getExecutor()))
.observeOn(AndroidSchedulers.mainThread());
}
}
- 线程池工具类 (ThreadPoolUtils)
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class ThreadPoolUtils {
private static final int CORE_POOL_SIZE = 4; // 核心线程数
private static final int MAX_POOL_SIZE = 8; // 最大线程数
private static final long KEEP_ALIVE_TIME = 60L; // 线程空闲时间(秒)
private static ThreadPoolExecutor executor;
static {
executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(CORE_POOL_SIZE);
executor.setMaximumPoolSize(MAX_POOL_SIZE);
executor.setKeepAliveTime(KEEP_ALIVE_TIME, TimeUnit.SECONDS);
}
// 获取线程池
public static Executor getExecutor() {
return executor;
}
}
- 使用示例
3.1 基本使用
// 模拟网络请求
Observable<String> observable = Observable.just("Hello, RxJava!");
// 使用工具类简化线程切换
RxJavaUtils.async(observable, new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// 订阅时调用
}
@Override
public void onNext(String s) {
// 处理结果
System.out.println(s);
}
@Override
public void onError(Throwable e) {
// 处理错误
RxJavaUtils.handleError(e);
}
@Override
public void onComplete() {
// 完成时调用
}
});
3.2 支持重试机制
// 模拟网络请求(可能失败)
Observable<String> observable = Observable.error(new IOException("Network error"));
// 使用重试机制
RxJavaUtils.async(RxJavaUtils.withRetry(observable),
s -> System.out.println(s), // onNext
throwable -> RxJavaUtils.handleError(throwable) // onError
);
3.3 支持缓存
// 模拟网络请求(可能失败)
Observable<String> observable = Observable.error(new IOException("Network error"));
// 使用缓存
RxJavaUtils.async(RxJavaUtils.withCache(observable, "Cached data"),
s -> System.out.println(s), // onNext
throwable -> RxJavaUtils.handleError(throwable) // onError
);
3.4 自定义线程池
// 模拟网络请求
Observable<String> observable = Observable.just("Hello, RxJava!");
// 使用自定义线程池
RxJavaUtils.async(RxJavaUtils.withCustomScheduler(observable),
s -> System.out.println(s), // onNext
throwable -> RxJavaUtils.handleError(throwable) // onError
);
- 功能扩展
4.1 支持 Retrofit 网络请求
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava3.RxJava3CallAdapterFactory;
import retrofit2.converter.gson.GsonConverterFactory;
public class RetrofitUtils {
private static Retrofit retrofit;
// 初始化 Retrofit
public static Retrofit getRetrofit(String baseUrl) {
if (retrofit == null) {
retrofit = new Retrofit.Builder()
.baseUrl(baseUrl)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava3CallAdapterFactory.create())
.build();
}
return retrofit;
}
}
4.2 结合 Retrofit 使用
// 定义 API 接口
public interface ApiService {
@GET("user/{id}")
Observable<User> getUser(@Path("id") int id);
}
// 使用 Retrofit 和 RxJava 工具类
Retrofit retrofit = RetrofitUtils.getRetrofit("https://api.example.com/");
ApiService apiService = retrofit.create(ApiService.class);
RxJavaUtils.async(RxJavaUtils.withRetry(apiService.getUser(1)),
user -> System.out.println(user.getName()), // onNext
throwable -> RxJavaUtils.handleError(throwable) // onError
);
- 总结
优化后的 RxJava 工具类具有以下特点:
线程切换:简化异步操作。
重试机制:支持网络请求失败后自动重试。
缓存支持:在网络请求失败时返回缓存数据。
自定义线程池:灵活配置线程池。
生命周期绑定:防止内存泄漏。
全局错误处理:统一处理错误逻辑。
通过封装这些功能,可以显著提升代码的可读性、可维护性和健壮性。建议将工具类封装到一个独立的模块中,方便在项目中复用。