首页
登录 | 注册

RxJava1.x中的subscribeOn,observeOn到底做了些什么

注:文中的OnSubscribe1,OnSubscribe2,Observable1等等命名是通过出现的时序来命名的,越大说明越晚出现

我们先来举个例子吧:

Observable.create(object : Observable.OnSubscribe<String> {
            override fun call(it: Subscriber<in String>?) {
                Timber.i("OnSubscribe call:  ${Thread.currentThread().name}")
                it?.onNext("onNextString")
            }
        })
        //Schedulers.newThread()其实是NewThreadScheduler,具体就不分析了,看源码很容易找到
                .subscribeOn(Schedulers.newThread())
                //LooperScheduler
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(object : Action1<Any?> {
                    override fun call(t: Any?) {
                        Timber.i(" onNext  ${Thread.currentThread().name}   ${t?.toString()?: "null"}")
                    }
                })
                    

结果如下:

OnSubscribe call:  RxNewThreadScheduler-2
onNext  main  onNextString

总结:

  • 对于subscribeOn,,每个subscribeOn都会引用上个onSubscribe,它只是改变了第一个subscribeOn上面的OnSubscribe的线程,subscribeOn方法会新建一个Observable2,同时OnSubscribe2赋值为OperatorSubscribeOn(Observable,Scheduler),OperatorSubscribeOn里面的Scheduler切换线程,让Observable执行subscribe
  • 对于observeOn,新建一个Observable3对象,同时OnSubscribe3赋值为OnSubscribeLift,OnSubscribeLift处理subscribe时候,先让operator封装subscribe成ObserveOnSubscriber,再让上一个OnSubscribe去处理ObserveOnSubscriber,关键一点是observeOn切换线程是在ObserveOnSubscriber的onNext,onError等方法切换的

下面我们来详细的看看整个流程,从上面的例子来看显然OnSubscribe 的call方法执行在子线程中,而subscriber接受到消息是主线程执行的。现在我们来一步步分析其原理,create方法前面就已经分析过了,我们直接看subscribeOn:

Observable.java:

    public final Observable<T> subscribeOn(Scheduler scheduler) {
    //我们写的demo是用create方法,create是直接new Observable的,那显然不是ScalarSynchronousObservable
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        //这里又采用了create方法,说明这里又新建了一个Observable对象
        //暂时我们把create新建立的对象为ob2,前面的Observable为ob1,ob2的OnSubscribe2为OperatorSubscribeOn
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }
OperatorSubscribeOn.java:

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
    //NewThreadScheduler
        this.scheduler = scheduler;
        //这个source为ob1
        this.source = source;
    }

来看看这部分的流程图:
RxJava1.x中的subscribeOn,observeOn到底做了些什么

接着我们看看observeOn方法:

Observable.java:

    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, RxRingBuffer.SIZE);
    }
    
    public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
        return observeOn(scheduler, false, bufferSize);
    }
    
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //同理,this不是ScalarSynchronousObservable
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
    }

    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        //这里又有个create方法,把它作为ob3,其属性onSubscribe3为OnSubscribeLift对象
        //参数onSubscribe是ob2的onSubscribe2
        return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }
 
 
 OnSubscribeLift.java:
 
    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        // ob2的onSubscribe,也就是OperatorSubscribeOn类对象
        this.parent = parent;
        //OperatorObserveOn
        this.operator = operator;
    } 

RxJava1.x中的subscribeOn,observeOn到底做了些什么

最后我们来看看Observable.subscribe(Subscriber)这个方法,前面我们就知道这个方法其实是让当前的Observable的onSubscribe执行call方法,并且把Subscriber当做参数,当执行到subscribe,当前的Observable已经是ob3了,其onSubscribe也已经是onSubscribe3(OnSubscribeLift)对象,我们来看看OnSubscribeLift的call方法:

OnSubscribeLift.java:

    public void call(Subscriber<? super R> o) {
        try {
            //RxJavaHooks.onObservableLift(operator)这个的结果也就是operator
            //operator在上面也说过是OperatorObserveOn对象
            //st其实是个ObserveOnSubscriber,通过下面的代码分析可以知道
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                // 这里有个地方,是封装后的st的onStart方法
                st.onStart();
                //这个parent是ob2的onSubscribe,也就是OperatorSubscribeOn类对象
                //这点就很关键了,重新封装好的Subscriber又传给了上层ob2的onSubscribe
                parent.call(st);
            } catch (Throwable e) {
                ....
                st.onError(e);
            }
        } catch (Throwable e) {
            ......
            o.onError(e);
        }
    }
    

来看看OperatorObserveOn的call方法,它对初始的Subscriber进行了封装:

OperatorObserveOn.java:

    /**
     * @param scheduler the scheduler to use
     * @param delayError delay errors until all normal events are emitted in the other thread?
     * @param bufferSize for the buffer feeding the Scheduler workers, defaults to {@code RxRingBuffer.MAX} if <= 0
     */
    public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //这个是AndroidSchedulers.mainThread()---LooperScheduler
        this.scheduler = scheduler;
        //false
        this.delayError = delayError;
        this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        ....
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            //封装成ObserveOnSubscriber
            return parent;
        ....
    }

接着我们看ob2的onSubscribe2,也就是OperatorSubscribeOn类对象的call方法,ObserveOnSubscriber作为参数传进去的

OperatorSubscribeOn.java(onSubscribe2):

    //subscriber应该是subscriber2,即ObserveOnSubscriber
    public void call(final Subscriber<? super T> subscriber) {
    //在OperatorSubscribeOn构造函数我们可以知道scheduler就是一开始Schedulers.newThread-》NewThreadScheduler,这个inner是NewThreadWorker
        final Worker inner = scheduler.createWorker();
        //subscriber就是ObserveOnSubscriber类对象
        subscriber.add(inner);
        //
        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();

                Subscriber<T> s = new Subscriber<T>(subscriber) {
                  .....
                //这个source在构造函数中初始化的,是ob1的引用,
                //里面具体的实现其实就是ob1的onSubscribe1去调用call方法,s作为参数
                source.unsafeSubscribe(s);
            }
        });
    }

我们来看NewThreadWorker.schedule方法:

NewThreadWorker.java:

    @Override
    public Subscription schedule(final Action0 action) {
        return schedule(action, 0, null);
    }

    @Override
    public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
        if (isUnsubscribed) {
            return Subscriptions.unsubscribed();
        }
        return scheduleActual(action, delayTime, unit);
    }

     public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        //返回的就是action
        Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
        //ScheduledAction继承Runnable
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            //executor是ScheduledThreadPoolExecutor,在NewThreadWorker构造函数中就初始化了
            //这里其实也就执行了ScheduledAction的run方法
            //也就是action.call,这里就切换了线程,OnSubscribe1的call方法是执行在这个线程池中的
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }

到这里整个流程好像就结束了,总觉得有点怪怪的,好像observeOn切换线程没起作用,仔细想想,传入到OnSubscribe2的是ObserveOnSubscriber里面具体方法还没看,像onNext,onCompleted…都是执行它的,那我们来看看ObserveOnSubscriber.onNext方法把:

ObserveOnSubscriber.java:

        public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            }
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }
        
        protected void schedule() {
            if (counter.getAndIncrement() == 0) {
            //recursiveScheduler在构造函数中初始化的,就是LooperScheduler.createWorker
                recursiveScheduler.schedule(this);
            }
        }
        
LooperScheduler&HandlerWorker.java:

        public Subscription schedule(final Action0 action) {
            return schedule(action, 0, TimeUnit.MILLISECONDS);
        }
        
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            if (unsubscribed) {
                return Subscriptions.unsubscribed();
            }

            action = hook.onSchedule(action);
            //ScheduledAction是Runnable
            ScheduledAction scheduledAction = new ScheduledAction(action, handler);

            Message message = Message.obtain(handler, scheduledAction);
            message.obj = this; // Used as token for unsubscription operation.
            //这个消息会被ScheduledAction的run方法接受到
            handler.sendMessageDelayed(message, unit.toMillis(delayTime));

            if (unsubscribed) {
                handler.removeCallbacks(scheduledAction);
                return Subscriptions.unsubscribed();
            }

            return scheduledAction;
        }
 ScheduledAction.java:
  public void run() {
    //这个action就是ObserveOnSubscriber,ObserveOnSubscriber继承了Action0
       action.call();
      }
      
ObserveOnSubscriber.java:


// only execute this from schedule()
//这个call方法已经改变线程了
    public void call() {
        .....
        //child就是作为参数传进来的Subscriber
        final Subscriber<? super T> localChild = this.child;
        localChild.onNext(localOn.getValue(v));
        .....
    }

通过LooperScheduler.schedule的具体实现可以看出,也就是通过Handler进行线程切换的,即ObserveOnSubscriber.onNext工作在主线程

RxJava1.x中的subscribeOn,observeOn到底做了些什么

那么现在就有个问题,如果多次调用subscribeOn,observeOn比如说连续我两次调用subscribeOn或者连续两次调用observeOn,能不能切换线程:

Observable.create(object : Observable.OnSubscribe<String> {
            override fun call(it: Subscriber<in String>?) {
                Timber.i("OnSubscribe call:  ${Thread.currentThread().name}")
                it?.onNext("onNextString")
            }
        })
                .subscribeOn(Schedulers.newThread())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(object : Action1<Any?> {
                    override fun call(t: Any?) {
                        Timber.i(" onNext  ${Thread.currentThread().name}   ${t?.toString()?: "null"}")
                    }
                })
                    

结果:

OnSubscribe call:  RxNewThreadScheduler-1
onNext  RxNewThreadScheduler-1 onNextString

这里例子表明后面subscribeOn并没有改变执行的线程,我们来回顾下subscribeOn:

Observable.java:

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        //我们知道,传入的subscribe还是会被OperatorSubscribeOn的call处理一次的
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }

OperatorSubscribeOn.java:

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        //这里是切换线程
        inner.schedule(new Action0() {
                ,.....
                source.unsafeSubscribe(s);
            }
        });
    }

什么意思呢,不管第一个subscribeOn后面执行的是在哪个线程,都是会经过第一个subscribeOn处理,然后切换线程。

接着我们来看看多个observeOn处理的情况:

Observable.create(object : Observable.OnSubscribe<String> {
            override fun call(it: Subscriber<in String>?) {
                Timber.i("OnSubscribe call:  ${Thread.currentThread().name}")
                it?.onNext("onNextString")
            }
        })
                .observeOn(Schedulers.newThread())
                .map{
                    Timber.i("map : ${Thread.currentThread().name}")
                }
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(object : Action1<Any?> {
                    override fun call(t: Any?) {
                        Timber.i(" onNext  ${Thread.currentThread().name}   ${t?.toString()?: "null"}")
                    }
                })
                    

结果:

OnSubscribe call:  main
map: RxNewThreadScheduler-1
onNext  main onNextString

我们在上面的分析中可以知道,observeOn方法它改变线程的关键点在于封装的ObserveOnSubscriber里面具体的onNext等方法。
我们来回顾下ObserveOnSubscriber的onNext的代码:

    public void call() {
        .....
        //child就是作为参数传进来的Subscriber
        final Subscriber<? super T> localChild = this.child;
        localChild.onNext(localOn.getValue(v));
        .....
    }
    

根据上面那个例子来看,这里的loacalChild应该就是onSubscriberMap这个实例对象了,通过上文的map分析知道,onNext方法是交给了Func的call方法执行,然后再交给前一个onSubscriber处理,这前一个onSubscriber又是ObserveOnSubscriber对象,又切换线程。这里的意思就是第一个observeOn改变的事map中func的线程,第二个改变的是最后一个subscribe的线程。



2020 jeepxie.net webmaster#jeepxie.net
10 q. 0.009 s.
京ICP备10005923号