前言

上篇文章讲解了 RxJava2.0的最基本使用,在本节中主要看下RxJava的线程控制。

1. 概述


RxJava原理图.png

2. 下边演示上游和下游在同一个线程


正常情况,上游和下游在同一个线程,也就是说上游在那个线程发送事件,下游就在哪个线程接收事件,下边通过示例代码演示:
    /**
     * 在主线程中创建一个上游 Observable 发送事件,则上游就在主线程中发送事件
     * 在主线程中创建一个下游 Observer 接收事件,则下游就在主线程中接收事件
     */
    public static void demo1(){
        // 创建一个上游:Observable
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.e("TAG" , "Observable thread is : " + Thread.currentThread().getName()) ;
                Log.e("TAG" , "emit 1") ;

                emitter.onNext(1);
            }
        }) ;

        // 创建一个下游:Observer
        Consumer<Integer> consumer = new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("TAG" , "Observer thread is : " + Thread.currentThread().getName()) ;
                Log.e("TAG" , "next : " + integer) ;
            }
        } ;

        // 建立连接
        observable.subscribe(consumer);
    }

运行结果如下:

cn.novate.rxjava2 E/TAG: Observable thread is : main
cn.novate.rxjava2 E/TAG: emit 1
cn.novate.rxjava2 E/TAG: Observer thread is : main
cn.novate.rxjava2 E/TAG: next : 1

以上验证了,上游、下游在同一个线程工作;

但是这样肯定是不能满足我们的需求,我们更多的是需要在子线程中做耗时操作,然后切回到主线程中进行UI更新,如下图所示


thread.png

上图黄色表示子线程,深蓝色表示主线程
要达到在子线程中做耗时操作,然后切回主线程进行UI更新,只需要让上游在子线程中发送事件,然后把下游切回到主线程中接收事件就ok,示例代码如下:

/**
     * 让上游在子线程中发送事件,然后把下游切回到主线程接收事件
     */
    public static void demo2(){
        // 创建一个上游:Observable
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.e("TAG" , "Observable thread is " + Thread.currentThread().getName()) ;
                Log.e("TAG" , "emit 1") ;
                emitter.onNext(1);
            }
        }) ;

        // 创建一个下游:Observer
        Consumer<Integer> consumer = new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("TAG" , "Observer thread is " + Thread.currentThread().getName()) ;
                Log.e("TAG" , "next :" + integer) ;
            }
        } ;

        // 建立连接
        observable.subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(consumer);
    }

运行结果如下

cn.novate.rxjava2 E/TAG: Observable thread is RxNewThreadScheduler-1
cn.novate.rxjava2 E/TAG: emit 1
cn.novate.rxjava2 E/TAG: Observer thread is main
cn.novate.rxjava2 E/TAG: next :1

subscribeOn()指的是上游发送事件的线程,observeOn()指的是下游接收事件的线程;

注意:

1>:多次调用subscribeOn()方法,只有第一次有效,其余的被忽略;
2>:多次调用observeOn()方法,都是可以的,也就是说每调用一次observeOn(),下游线程就会切换一次

 observable.subscribeOn(Schedulers.newThread()) 
.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 
.observeOn(Schedulers.io()) 
.subscribe(consumer);

在RxJava中,已经内置很多线程供我们选择,比如:
1>:Schedulers.newThread():常规的线程;
2>:Schedulers.io():io操作的线程,用于联网请求、读写文件等操作;
3>:Schedulers.computation():CPU计算密集型操作,用于大量计算操作;
4>:AndroidSchedulers.mainThread():主线程;
在RxJava内部使用线程池维护这些线程,效率比较高;

3. 实践


在我们开发过程中,一般都是把耗时操作在子线程中做,比如读写文件、读写数据库、联网请求等操作,下边做一个登录示例来演示,如何把线程切换到子线程中进行耗时操作,然后又是如何把线程切换到主线程中进行更新UI:

    /**
     * 具体示例:
     *     通过演示登录成功与失败的功能来演示:
     *          如何把线程切换到子线程中让其执行耗时操作,然后再次切换到主线程中更新UI
     */
    public static void demo3(final Context context){
        Api api = RetrofitProvider.get().create(Api.class) ;
        api.login(new LoginRequest())
                .subscribeOn(Schedulers.io())     // 切换到io线程(子线程)中进行联网请求
                .observeOn(AndroidSchedulers.mainThread())  // 在耗时操作进行完之后切换到主线程中处理请求结果,来更新UI
                .subscribe(new Observer<LoginResponse>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(LoginResponse value) {

                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("TAG" , "登录失败") ;
                    }

                    @Override
                    public void onComplete() {
                        Log.e("TAG" , "登录成功") ;
                    }
                });
    }

注意:上篇文章我们讲到了Disposable,说如果调用 Disposable.dispose()方法,会切断水管,让下游接收不到事件。既然接收不到事件,那么就不能更新UI,因此可以在这个Activity中保存 Disposable,在Activity退出时,调用Disposable.dispose()方法来切断即可;

如果有多个 Disposable,RxJava内置了一个 CompositeDisposable容器,每次得到一个 Disposable,就调用 CompositeDisposable.add()方法把它添加到容器中,在Activity退出时候,直接调用 CompositeDisposable.clear()方法,直接切断所有水管即可;

以上就是教程二的全部内容。

本站以现代、古代情诗为主,情诗网创办于2013年,以原创爱情诗歌、经典情诗、现代情诗、古代情诗、英文情诗、情诗绝句为主并收集古诗、古诗词、诗歌大全、诗词名句的文学门户。方便您下次继续阅读;可以放在浏览器的收藏夹中(快捷键Ctrl+D);或者看到喜欢或者有趣的诗词可以通过分享按钮给你的好友分享;情诗网是目前最全情诗大全网站之一。并欢迎广大诗歌爱好者阅览投稿!喜欢本站的话请大家把本站告诉给你朋友哦!地址是 www.qingshiwang.com !