Android开源库——RxJava和RxAndroid
RxJava和RxAndroid是什么?
RxJava是基于JVM的响应式扩展,用于编写异步代码
RxAndroid是关于Android的RxJava绑定
RxJava和RxAndroid使用
依赖
implementation 'io.reactivex.rxjava3:rxjava:3.1.0'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.2'
 
使用过程
如下模拟在子线程中进行耗时操作,并将结果返回到主线程中处理
- Flowable:将要进行的操作
 - subscribeOn():操作要运行的线程
 - observeOn() :处理结果要运行的线程
 - subscribe():处理结果
 
public class MainActivity extends AppCompatActivity {
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Flowable.fromCallable(() -> {
                    Thread.sleep(3000);
                    return "Done";
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(System.out::println, Throwable::printStackTrace);
    }
}
 
内存泄漏问题
若Activity退出后,线程仍未执行完,会导致内存泄漏,需要在onDestroy()将任务取消
public class MainActivity extends AppCompatActivity {
    CompositeDisposable mCompositeDisposable  = new CompositeDisposable();
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Disposable disposable = Flowable.fromCallable(() -> {
                    Thread.sleep(3000);
                    return "Done";
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(System.out::println, Throwable::printStackTrace);
        mCompositeDisposable.add(disposable);
    }
    @Override
    protected void onDestroy() {
        super.onDestroy();
        mCompositeDisposable.dispose();
    }
}
 
RxJava源码解析
Publisher
Publisher用于发布数据,Subscriber通过subscribe()订阅数据
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
 
Subscriber
Subscriber接收Publisher发布的数据
- onSubscribe():subscribe()回调函数,回调前会创建Subscription用于控制数据发布和停止
 - onNext():当Subscription调用request()时会调用onNext()发布数据
 - onError():处理接收到的错误
 - onComplete():处理完成的情况
 
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
 
Subscription
Subscription表示Publisher和Subscriber的对应关系
- request():向Publisher请求数据
 - cancel():让Publisher停止发布数据
 
public interface Subscription {
    public void request(long n);
    public void cancel();
}
 
Scheduler
createWorker()用于创建Worker ,具体的调度工作由Worker的schedule()完成
public abstract class Scheduler {
	public abstract Worker createWorker();
    public abstract static class Worker implements Disposable {
        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }
        
        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
   }
}
 
source传递过程
fromCallable()创建FlowableFromCallable,传递callable
public abstract class Flowable<@NonNull T> implements Publisher<T> {
    public static <@NonNull T> Flowable<T> fromCallable(@NonNull Callable<? extends T> callable) {
        return RxJavaPlugins.onAssembly(new FlowableFromCallable<>(callable));
    }
}
 
subscribeOn()创建FlowableSubscribeOn,传递this(即FlowableFromCallable)作为source
public abstract class Flowable<@NonNull T> implements Publisher<T> {
    public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return subscribeOn(scheduler, !(this instanceof FlowableCreate));
    }
    
    public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<>(this, scheduler, requestOn));
    }
}
 
observeOn()创建FlowableObserveOn,传递this(即FlowableSubscribeOn)作为source
public abstract class Flowable<@NonNull T> implements Publisher<T> {
    public final Flowable<T> observeOn(@NonNull Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    public final Flowable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new FlowableObserveOn<>(this, scheduler, delayError, bufferSize));
    }
}
 
即依次将自身当作Flowable,作为参数source传递给下一个Flowable
subscribe()流程
subscribe()最终调用具体Flowable的subscribeActual()
public abstract class Flowable<@NonNull T> implements Publisher<T> {
	......
    public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError) {
        return subscribe(onNext, onError, Functions.EMPTY_ACTION);
    }
    
    public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
            @NonNull Action onComplete) {
		.....
        LambdaSubscriber<T> ls = new LambdaSubscriber<>(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
        subscribe(ls);
        return ls;
    }
    
    public final void subscribe(@NonNull FlowableSubscriber<? super T> subscriber) {
        try {
            Subscriber<? super T> flowableSubscriber = RxJavaPlugins.onSubscribe(this, subscriber);
            ......
            subscribeActual(flowableSubscriber);
        }......
    }
    protected abstract void subscribeActual(@NonNull Subscriber<? super T> subscriber);
}
 
调用过程和传递过程是相反的,先调用FlowableObserveOn的subscribeActual()
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
	......
    @Override
    public void subscribeActual(Subscriber<? super T> s) {
        Worker worker = scheduler.createWorker();
        if (s instanceof ConditionalSubscriber) {
            .....
        } else {
            source.subscribe(new ObserveOnSubscriber<>(s, worker, delayError, prefetch));
        }
    }
}
 
上面的source就是上一层传递下来的FlowableSubscribeOn,即调用到FlowableSubscribeOn的subscribeActual()
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
	......
    @Override
    public void subscribeActual(final Subscriber<? super T> s) {
        Scheduler.Worker w = scheduler.createWorker();
        final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<>(s, w, source, nonScheduledRequests);
        .....
        w.schedule(sos);
    }
    
    static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
    implements FlowableSubscriber<T>, Subscription, Runnable {
		......
        @Override
        public void run() {
            lazySet(Thread.currentThread());
            Publisher<T> src = source;
            source = null;
            src.subscribe(this);
        }
 
schedule()最终会调用run()方法,lazySet()切换线程,上面的source就是上一层传递下来的FlowableFromCallable,即将到FlowableFromCallable的subscribeActual()放到指定线程中运行
public final class FlowableFromCallable<T> extends Flowable<T> implements Supplier<T> {
    final Callable<? extends T> callable;
    ......
    @Override
    public void subscribeActual(Subscriber<? super T> s) {
        DeferredScalarSubscription<T> deferred = new DeferredScalarSubscription<>(s);
        s.onSubscribe(deferred);
        T t;
        try {
            t = Objects.requireNonNull(callable.call(), "The callable returned a null value");
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            if (deferred.isCancelled()) {
                RxJavaPlugins.onError(ex);
            } else {
                s.onError(ex);
            }
            return;
        }
        deferred.complete(t);
    }
    ......
}
 
上面若出错回调onError(),否则调用downstream的onNext()传递结果
public class DeferredScalarSubscription<@NonNull T> extends BasicIntQueueSubscription<T> {
    public final void complete(T v) {
        int state = get();
        for (;;) {
            ......
            if (state == HAS_REQUEST_NO_VALUE) {
                lazySet(HAS_REQUEST_HAS_VALUE);
                Subscriber<? super T> a = downstream;
                a.onNext(v);
                if (get() != CANCELLED) {
                    a.onComplete();
                }
                return;
            }
            value = v;
            ......
        }
    }
}
 
onNext()过程
调用FlowableSubscribeOn.SubscribeOnSubscriber的onNext(),调用downstream的onNext()传递结果
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
    static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
    implements FlowableSubscriber<T>, Subscription, Runnable {
    
        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }
    }
}
 
调用FlowableObserveOn.BaseObserveOnSubscriber的onNext()、trySchedule(),schedule()最终会调用run()方法,根据sourceMode判断是同步还是异步
- FlowableObserveOn.ObserveOnSubscriber的runSync()和runAsync()都调用downstream的onNext()传递结果
 
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
    .....
    abstract static class BaseObserveOnSubscriber<T>
    extends BasicIntQueueSubscription<T>
    implements FlowableSubscriber<T>, Runnable {
        @Override
        public final void onNext(T t) {
            ......
            trySchedule();
        }'
        
        final void trySchedule() {
            ......
            worker.schedule(this);
        }
        
        @Override
        public final void run() {
            if (outputFused) {
                runBackfused();
            } else if (sourceMode == SYNC) {
                runSync();
            } else {
                runAsync();
            }
        }
    }
    
    static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
    implements FlowableSubscriber<T> {
        void runSync() {
			......
            final Subscriber<? super T> a = downstream;
			......
            for (;;) {
				......
                while (e != r) {
                    ......
                    a.onNext(v);
					......
                }......
            }
        }
    	.....
        @Override
        void runAsync() {
            final Subscriber<? super T> a = downstream;
            for (;;) {
            	......
                while (e != r) {
                    .....
                    a.onNext(v);
                    .....
                }
                .....
            }
        }
    }
}
 
调用LambdaSubscriber的onNext(),通过传入的Consumer消费掉最终的结果,即通过System.out::println打印出来
public final class LambdaSubscriber<T> extends AtomicReference<Subscription>
        implements FlowableSubscriber<T>, Subscription, Disposable, LambdaConsumerIntrospection {
	.....
    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            try {
                onNext.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                get().cancel();
                onError(e);
            }
        }
    }
}
 
onSubscribe()和request()流程
FlowableFromCallable回调下一层的onSubscribe(),其将Subscription存到upstream
public final class FlowableFromCallable<T> extends Flowable<T> implements Supplier<T> {
    final Callable<? extends T> callable;
    @Override
    public void subscribeActual(Subscriber<? super T> s) {
        DeferredScalarSubscription<T> deferred = new DeferredScalarSubscription<>(s);
        s.onSubscribe(deferred);
		......
    }
}
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
    static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
    implements FlowableSubscriber<T>, Subscription, Runnable {
        @Override
        public void onSubscribe(Subscription s) {
            if (SubscriptionHelper.setOnce(this.upstream, s)) {
                ......
            }
        }
     }
}
 
FlowableSubscribeOn回调下一层的onSubscribe(),其回调下一层的onSubscribe()和上一层的request()请求数据
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
	......
    @Override
    public void subscribeActual(final Subscriber<? super T> s) {
        Scheduler.Worker w = scheduler.createWorker();
        final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<>(s, w, source, nonScheduledRequests);
        s.onSubscribe(sos);
    }   
}
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
    
    static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
    implements FlowableSubscriber<T> {
        ......
        @Override
        public void onSubscribe(Subscription s) {
            if (SubscriptionHelper.validate(this.upstream, s)) {
                this.upstream = s;
				......
                queue = new SpscArrayQueue<>(prefetch);
                downstream.onSubscribe(this);
                s.request(prefetch);
            }
        }
    }
}
 
LambdaSubscriber利用FlowableInternalHelper.RequestMax的accept()调用上一层的request(),从schedule()获取数据
public final class LambdaSubscriber<T> extends AtomicReference<Subscription>
        implements FlowableSubscriber<T>, Subscription, Disposable, LambdaConsumerIntrospection {
     @Override
    public void onSubscribe(Subscription s) {
        if (SubscriptionHelper.setOnce(this, s)) {
            try {
                onSubscribe.accept(this);
            } catch (Throwable ex) {
                ......
            }
        }
    }
 }
 
public final class FlowableInternalHelper {
	    public enum RequestMax implements Consumer<Subscription> {
        INSTANCE;
        @Override
        public void accept(Subscription t) {
            t.request(Long.MAX_VALUE);
        }
    }
}
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
    abstract static class BaseObserveOnSubscriber<T>
    extends BasicIntQueueSubscription<T>
    implements FlowableSubscriber<T>, Runnable {
    
        @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(requested, n);
                trySchedule();
            }
        }
        
        final void trySchedule() {
            ......
            worker.schedule(this);
        }
	}
}
 
FlowableSubscribeOn.SubscribeOnSubscriber的request()、requestUpstream()判断当前线程,若未切换线程调用schedule()切换线程调用上一层的request()
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
    static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
    implements FlowableSubscriber<T>, Subscription, Runnable {
    
        @Override
        public void request(final long n) {
            if (SubscriptionHelper.validate(n)) {
                Subscription s = this.upstream.get();
                if (s != null) {
                    requestUpstream(n, s);
                } else {
                     ......
                    }
                }
            }
        }
        void requestUpstream(final long n, final Subscription s) {
            if (nonScheduledRequests || Thread.currentThread() == get()) {
                s.request(n);
            } else {
                worker.schedule(new Request(s, n));
            }
        }
        
        static final class Request implements Runnable {
            ......
            @Override
            public void run() {
                upstream.request(n);
            }
        }
	}
}
 
DeferredScalarSubscription接收到请求后,将值传给downstream的onNext()
public class DeferredScalarSubscription<@NonNull T> extends BasicIntQueueSubscription<T> {
    @Override
    public final void request(long n) {
        if (SubscriptionHelper.validate(n)) {
            for (;;) {
                int state = get();
                ......
                if (state == NO_REQUEST_HAS_VALUE) {
                    if (compareAndSet(NO_REQUEST_HAS_VALUE, HAS_REQUEST_HAS_VALUE)) {
                        T v = value;
                        if (v != null) {
                            value = null;
                            Subscriber<? super T> a = downstream;
                            a.onNext(v);
                            if (get() != CANCELLED) {
                                a.onComplete();
                            }
                        }
                    }
                    return;
                }
                ......
            }
        }
    }
}
 
Schedulers.io()调度过程
Schedulers.io() = Schedulers.IO = IOTask() = IoHolder.DEFAULT = IoScheduler()
public final class Schedulers {
	static final Scheduler IO;
    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
    static {
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
    }
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
    static final class IOTask implements Supplier<Scheduler> {
        @Override
        public Scheduler get() {
            return IoHolder.DEFAULT;
        }
    }
}
 
FlowableSubscribeOn的subscribeActual()通过IoScheduler创建Worker并调用schedule()
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
	......
    @Override
    public void subscribeActual(final Subscriber<? super T> s) {
        Scheduler.Worker w = scheduler.createWorker();
        final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<>(s, w, source, nonScheduledRequests);
       	.....
        w.schedule(sos);
    }
}
 
调用IoScheduler的createWorker()会返回EventLoopWorker
public final class IoScheduler extends Scheduler {
	@NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
}
 
调用IoScheduler.EventLoopWorker的schedule()最终调用ThreadWorker的父类NewThreadWorker的scheduleActual()
public final class IoScheduler extends Scheduler {
    static final class EventLoopWorker extends Scheduler.Worker implements Runnable {
        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        	......
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
    static final class ThreadWorker extends NewThreadWorker {
		......
    }
}
 
调用scheduleActual()将Runnable封装成ScheduledRunnable,通过ScheduledThreadPoolExecutor的submit()或schedule()提交
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;
    volatile boolean disposed;
    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    
    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
		......
        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            ......
        }
        return sr;
    }
}
public final class SchedulerPoolFactory {
    public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);
        exec.setRemoveOnCancelPolicy(PURGE_ENABLED);
        return exec;
    }
}
 
线程池会调用FlowableSubscribeOn.SubscribeOnSubscriber的run()方法,SubscribeOnSubscriber继承了AtomicReference<Thread>,lazySet()切换线程调用上一层source的subscribe()
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
    static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
    implements FlowableSubscriber<T>, Subscription, Runnable {
    
        @Override
        public void run() {
            lazySet(Thread.currentThread());
            Publisher<T> src = source;
            source = null;
            src.subscribe(this);
        }
    }
}
 
AndroidSchedulers.mainThread()调度过程
AndroidSchedulers.mainThread() = AndroidSchedulers.MAIN_THREAD = MainHolder.DEFAULT = HandlerScheduler(),通过主线程Looper创建handler
public final class AndroidSchedulers {
    private static final class MainHolder {
        static final Scheduler DEFAULT = internalFrom(Looper.getMainLooper(), true);
    }
    private static final Scheduler MAIN_THREAD =
        RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
	}
	
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
    private static Scheduler internalFrom(Looper looper, boolean async) {
        ......
        return new HandlerScheduler(new Handler(looper), async);
    }
}
 
FlowableObserveOn的subscribeActual()通过IoScheduler创建Worker,在onNext()的trySchedule()调用schedule()
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
    @Override
    public void subscribeActual(Subscriber<? super T> s) {
        Worker worker = scheduler.createWorker();
        if (s instanceof ConditionalSubscriber) {
            ......
        } else {
            source.subscribe(new ObserveOnSubscriber<>(s, worker, delayError, prefetch));
        }
    }
	abstract static class BaseObserveOnSubscriber<T>
    extends BasicIntQueueSubscription<T>
    implements FlowableSubscriber<T>, Runnable {
        @Override
        public final void onNext(T t) {
            ......
            trySchedule();
        }
        
        final void trySchedule() {
            ......
            worker.schedule(this);
        }
    }
}
 
调用HandlerScheduler的createWorker()返回HandlerWorker()
final class HandlerScheduler extends Scheduler {
    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler, async);
    }
}
 
调用HandlerScheduler.HandlerWorker的schedule(),将Runnable封装成ScheduledRunnable,调用主线程handler的sendMessageDelayed()
final class HandlerScheduler extends Scheduler {
    private static final class HandlerWorker extends Worker {
    
        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
           	......
            run = RxJavaPlugins.onSchedule(run);
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; 
            if (async) {
                message.setAsynchronous(true);
            }
            handler.sendMessageDelayed(message, unit.toMillis(delay));
            ......
            return scheduled;
        }
    }
}
 
最终主线程会调用FlowableObserveOn.BaseObserveOnSubscriber的run(),根据sourceMode判断是同步还是异步
- FlowableObserveOn.ObserveOnSubscriber的runSync()和runAsync()都调用downstream的onNext()传递结果
 
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
    .....
    abstract static class BaseObserveOnSubscriber<T>
    extends BasicIntQueueSubscription<T>
    implements FlowableSubscriber<T>, Runnable {
		......
        @Override
        public final void run() {
            if (outputFused) {
                runBackfused();
            } else if (sourceMode == SYNC) {
                runSync();
            } else {
                runAsync();
            }
        }
    }
    
    static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
    implements FlowableSubscriber<T> {
        void runSync() {
			......
            final Subscriber<? super T> a = downstream;
			......
            for (;;) {
				......
                while (e != r) {
                    ......
                    a.onNext(v);
					......
                }......
            }
        }
    	.....
        @Override
        void runAsync() {
            final Subscriber<? super T> a = downstream;
            for (;;) {
            	......
                while (e != r) {
                    .....
                    a.onNext(v);
                    .....
                }
                .....
            }
        }
    }
}
                