前言

自己在学习RxJava2.0时,参考了大神的博客,然后在这里做一个笔记为了方便自己以后复习和查看,同时也给需要的小伙伴借鉴,RxJava2.0自己也仿照大神那样,做一个系列的文章,当然肯定没有大神写的那么好、那么完整、那么详细,自己只是做一个小的总结。

RxJava2.0 - 文章一
RxJava2.0 - 文章二
RxJava2.0文章三 - Map和FlatMap操作符的用法
RxJava2.0文章四 - Zip操作符的用法
RxJava2.0文章五 - Backpressure操作符
RxJava2.0文章六 - 解决上游、下游发射事件速度不平衡问题
RxJava2.0 - 文章七
RxJava2.0 - 文章八

1. RxJava基本工作原理


下边通过两根水管代替观察者与被观察者来解释它两个之间的关系,从事件流角度来说明其原理。


RxJava原理图.png

上边水管叫上游,是事件产生的水管;
下边水管叫下游,是事件接收的水管;
把上游和下游通过一定的方式连接,使得上游每产生一个事件,下游都能收到事件;

对应关系如下:
上游对应Observable;
下游对应Observer;
连接是subscribe();

2. 示例代码如下


2.1: 最基本使用
/**
     * 最基本使用
     */
    public static void demo1(){
        // 创建一个上游 Observable
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }) ;

        // 创建一个下游 Observer
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("TAG" , "subscribe") ;
            }

            @Override
            public void onNext(Integer value) {
                Log.e("TAG" , "" + value) ;
            }

            @Override
            public void onError(Throwable e) {
                Log.e("TAG" , "error") ;
            }

            @Override
            public void onComplete() {
                Log.e("TAG" , "complete") ;
            }
        } ;

        // 建立连接
        observable.subscribe(observer);
    }

运行结果如下:

04-26 09:58:34.796 27647-27647/cn.novate.rxjava2 E/TAG: subscribe
04-26 09:58:34.796 27647-27647/cn.novate.rxjava2 E/TAG: 1
04-26 09:58:34.796 27647-27647/cn.novate.rxjava2 E/TAG: 2
04-26 09:58:34.796 27647-27647/cn.novate.rxjava2 E/TAG: 3
04-26 09:58:34.796 27647-27647/cn.novate.rxjava2 E/TAG: complete

注意:只有把上游和下游连接起来,上游才会开始发送事件,即就是只有调用了subscribe()方法之后,上游才会开始发送事件。

2.2: 链式调用把上边代码连接起来
public static void demo2(){
        ObservableAll.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("TAG" , "subscribe") ;
            }

            @Override
            public void onNext(Integer value) {
                Log.e("TAG" , ""+value) ;
            }

            @Override
            public void onError(Throwable e) {
                Log.e("TAG" , "error") ;
            }

            @Override
            public void onComplete() {
                Log.e("TAG" , "complete") ;
            }
        });

        // 
    }

运行结果和demo1()方法一样;

3. 解释ObservableEmitter和Disposable


3.1: ObservableEmitter

Emitter是发射器的意思,用于发射emitter.onNext()、emitter.onError()、emitter.onCompleted()这3个类型的事件,且只能发送这3种事件,不能随意发射,必须满足以下规则:
1>:上游可以发送无限个onNext(),下游也可以接受无限个onNext();
2>:上游发送一个 onCompleted()之后,上游 onCompleted()之后的事件还可以继续发送,而下游一旦 接收到 onCompleted(),就不会继续接收事件;
3>:上游发送一个 onError()之后,上游 onError()之后的事件还可以继续发送,而下游一旦接收到 onError(),就不会继续接收事件;
4>:上游可以不发送 onCompleted()、onError();

3.2: Disposable

Disposable是一次性用品,用完即可丢弃。
对于上边管道问题,可以把Disposable理解为上游和下游的一个开关,一旦调用 Disposable.dispose()方法就会把两个管道切断,导致下游接收不到事件;

注意:调用Disposable.dispose()方法,只会导致下游接收不到事件,但是上游还是可以继续发送事件的

下边写一个示例代码,用于测试,在 调用 Disposable.dispose()方法之后,上游可以继续发送事件,但是下游接收不到事件:

/**
     *  让上游依次发送1、2、3、complete、4,在下游接收到第二个事件后,调用 dispose()切断水管
     */
    public static void demo3(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                Log.e("TAG" , "emit 1") ;
                emitter.onNext(1);
                Log.e("TAG" , "emit 2") ;
                emitter.onNext(2);
                Log.e("TAG" , "emit 3") ;
                emitter.onNext(3);

                emitter.onComplete();

                Log.e("TAG" , "emit 4") ;
                emitter.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {

            private Disposable mDisposable ;
            private int i ;
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("TAG" , "subscribe") ;
                mDisposable = d ;
            }

            @Override
            public void onNext(Integer value) {
                Log.e("TAG" , "next" + value) ;

                i++ ;
                if (i == 2){
                    Log.e("TAG" , "dispose") ;
                    mDisposable.dispose();
                    Log.e("TAG" , "isDisposed:" + mDisposable.isDisposed()) ;
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.e("TAG" , "error") ;
            }

            @Override
            public void onComplete() {
                Log.e("TAG" , "complete") ;
            }
        });

    }

运行结果如下:

04-26 10:39:25.721 13802-13802/cn.novate.rxjava2 E/TAG: subscribe
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: emit 1
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: next1
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: emit 2
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: next2
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: dispose
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: isDisposed:true
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: emit 3
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: complete
04-26 10:39:25.723 13802-13802/cn.novate.rxjava2 E/TAG: emit 4
右上边运行结果可知:

1>:最先调用 onSubscribe()方法;
2>:在 下游的 onNext() 方法接收到 2事件时候,然后调用 Disposable.dispose()方法切断水管,可以看到上游仍然发送了 3、complete、4,这几个事件;
3>:并且上游并没有因为 调用 onComplete()方法而停止发送事件;

4. subscribe()有多个重载方法


public final Disposable subscribe() {}
public final void subscribe(Observer<? super T> observer) {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}

1>:第一个是不带任何参数的,表示下游不关心任何事件,上游尽管发送数据;
2>:第二个是带有 Observer参数的上边已经使用过了;
3>:第三个是带有一个 Consumer参数的,表示下游只关心onNext()事件,其他事件不关心,所以说如果只需要onNext()事件,下边用示例代码说明:

/**
     * 带有一个参数的 Consumer参数的,表示只关心 onNext()事件,其余事件不关心
     */
    public static void demo4(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.e("TAG" , "emit 1") ;
                emitter.onNext(1);
                Log.e("TAG" , "emit 2") ;
                emitter.onNext(2);
                Log.e("TAG" , "emit 3") ;
                emitter.onNext(3);

                Log.e("TAG" , "complete") ;
                emitter.onComplete();

                Log.e("TAG" , "emit 4") ;
                emitter.onNext(4);
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("TAG" , "next " + integer) ;
            }
        }) ;
    }

运行结果如下:

04-26 11:06:46.858 4915-4915/cn.novate.rxjava2 E/TAG: emit 1
04-26 11:06:46.859 4915-4915/cn.novate.rxjava2 E/TAG: next 1
04-26 11:06:46.859 4915-4915/cn.novate.rxjava2 E/TAG: emit 2
04-26 11:06:46.859 4915-4915/cn.novate.rxjava2 E/TAG: next 2
04-26 11:06:46.859 4915-4915/cn.novate.rxjava2 E/TAG: emit 3
04-26 11:06:46.859 4915-4915/cn.novate.rxjava2 E/TAG: next 3
04-26 11:06:46.859 4915-4915/cn.novate.rxjava2 E/TAG: complete
04-26 11:06:46.859 4915-4915/cn.novate.rxjava2 E/TAG: emit 4

下边带有2个参数、3个参数、4个参数原理是一样的,就不解释了,自己可以写示例代码来验证。

以上就是教程一的全部内容。

本站以现代、古代情诗为主,情诗网创办于2013年,以原创爱情诗歌、经典情诗、现代情诗、古代情诗、英文情诗、情诗绝句为主并收集古诗、古诗词、诗歌大全、诗词名句的文学门户。方便您下次继续阅读;可以放在浏览器的收藏夹中(快捷键Ctrl+D);或者看到喜欢或者有趣的诗词可以通过分享按钮给你的好友分享;情诗网是目前最全情诗大全网站之一。并欢迎广大诗歌爱好者阅览投稿!喜欢本站的话请大家把本站告诉给你朋友哦!地址是 www.qingshiwang.com !