前一段时间终于下定决心将RxJava使用到项目中,现总结一下自己使用RxJava的场景和心得。
线程和Schedulers
首先要明确一点:虽然RxJava是异步的。但是,除非你自己手动切换线程,否则在整个链式调用中,RxJava会一直运行在当前线程中。
RxJava的调度器(Schedulers)为实际生产提供了5中解决方案,如果需要大量IO操作,可以选择Schedulers.io()调度器,如果需要进行密集计算,可以选择Schedulers.computation(),如果只是想开一个线程做些事,可以选择Schedulers.newThread(),借助于RxAndroid,我们还可以使用AndroidSchedulers.mainThread()这个调度器将线程切换回主线程,选择了调度器后,使用一些方法可以将线程切换至调度器对应的线程中。
subscribeOn()和observeOn()
我们熟知的subscribeOn(Schedulers.io())和observeOn(AndroidSchedulers.mainThread())就是用来将线程切换至指定调度器的线程中的,刚开始看到这两个操作符的时候很容易被混淆,因为它们名称相近,功能相似。毛主席曾教导我们:实践是检验真理的唯一标准!因此,我们可以写代码做实验来看看区别。
首先,我们可以运行如下的代码(注意是Java程序):
1 | public static void main(String[] args) { |
多运行几次你会发现,有时候输出的数据不完整,甚至没有数据!将observeOn(Schedulers.io())替换为subscribeOn(Schedulers.io())后效果更明显 !这是因为,当调用observeOn()或者subscribeOn()后,代码运行在子线程,这之后,子线程还没来得及调用map()和subscribe(),主线程就执行完了,因此,你很有可能是看不到运行结果的。
这种情况在真是生产环境中是不太可能出现的,因为几乎所有项目主线程的生命周期不可能这么短。想要在上面的例子中看到显示结果也很简单,调用sleep()方法,让主线程“睡”几秒就行。然后,我们可以看到输出结果如下:
1 | RxIoScheduler-2 map : A |
然后我们将.observeOn(Schedulers.io())放置在.map()之后,即:
1 | String[] str = {"A", "B", "C"}; |
输出的结果如下:
1 | main map : A |
多加几个操作符,改变.observeOn(Schedulers.io())的位置,你会得出这样的结论:当事件传递到.observeOn()后,Observable所处的线程才会发生改变。
现在把.observeOn(Schedulers.io())替换为.subscribeOn(Schedulers.io()),无论把它放在哪,你会发现运行的结果都是一样的:
1 | RxIoScheduler-2 map : A |
我们会发现,subscribeOn()从事件发出的开端就造成影响。当你调用subscribeOn()时,被观察者,也就是数据源,就已切换至subscribeOn()所调用的线程中,那么整个流程就处于该线程中。
有了上面两个结论,那么当observeOn()和subscribeOn()同时存在的时候,就比较好理解了。当observeOn()和subscribeOn()同时存在时,数据源和操作流程会先处于subscribeOn()所在的线程,直到调用了observeOn()后,线程会切换至observeOn()所在的线程。
如果同时存在多个observeOn()和多个subscribeOn()会怎么样?多个observeOn()好理解,反正什么时候调用到了observeOn(),什么时候就切换线程。而多个subscribeOn()则不同,它们不会被覆盖,只有第一个subscribeOn()线程有效。
Worker
Schedulers只是一个调度器,它不具有任何线程切换的能力,而完成这一项任务的其实是Scheduler的静态内部类Worker。刚才我们介绍的subscribeOn()和subscribeOn()操作符,内部也是调用了Worker来执行真正的线程转换。因此,我们也可以直接使用Worker来做线程切换的操作。同时,由于Worker实现了Subscription接口,所以,当任务结束时,需要调用unsubscribe()方法。
比如,当我们需要处理文件IO时,可能会需要创建一个单独的线程:
1 | Scheduler.Worker ioWorker = Schedulers.io().createWorker(); |
又或者,当我们需要切换回主线程使用Glide下载图片时,借助RxAndroid,我们可以这么写:
1 | Scheduler.Worker imageWorker = AndroidSchedulers.mainThread().createWorker(); |
除了上面演示的常规线程操作外,Worker还提供了延迟调度和轮询操作的功能。
比如, 你希望2秒后自动执行文件操作,你可以这么写:
1 | Scheduler.Worker ioWorker = Schedulers.io().createWorker(); |
网络请求的时候会有轮询的需求,可以使用someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
1 | Observable.create(new Observable.OnSubscribe<String>() { |
配合Retrofit进行网络下载
这方面的文章很多了,推荐阅读RxJava与Retrofit结合的最佳实践,这里就不做说明了。
多数据源的处理
处理本地数据和网络数据,一般有两种情况:顺序加载和同时加载:
- 顺序加载:先从本地文件中获取,如果本地文件存在,则直接显示本地文件,不用从服务器下载;如果本地文件没有,再从服务器下载。
- 同时加载:同时获取本地文件和从服务器下载最新文件,优先显示本地文件,当服务器数据下载完毕,则更新界面的数据并同时将最新的数据进行存储操作。如果服务器数据先于本地数据返回,则只显示服务器上的数据。
这两种情况都需要两个Observable对象作为数据源,一个是本地文件的Observable对象,另一个是网络下载的Observable对象。
对于第一种情况,我们考虑一个案例,比如新闻类APP需要一个功能:用户点击新闻标题跳转到新闻详情页时,先检查本地是否存有新闻内容,如果有,显示本地存储的新闻内容,如果没有,则需要从网络加载内容。
那么本地数据源可以用下面的代码获得。
1 | public Observable<NewsDetails> getNewsDetailFromDb(final String newsId) { |
而网络下载新闻数据后,一般还需要保存到本地数据库中:
1 | public Observable<NewsDetails> getNewsDetailsFromNet(final String newsId) { |
有了这两个数据源,可以使用concat()和first()操作符来做处理。
1 | Observable<NewsDetails> getNewsDetail = Observable.concat( |
这样,就实现了我们第一种情况的功能。
对于第二种情况,可以使用mergeDelayError()操作符将两个操作合并在一起进行(为什么不用merge()操作符?因为我希望能统一处理异常,而不是在发生异常时(比如网络连接超时)中断整个操作)。由于不能确定谁先完成,这个时候可以借助timestamp(),在网络请求的时候加一个时间戳来进行判断。
比如使用Gank.io的API请求妹子图实现第二种情况,那么整个流程就应该是:先同时从本地数据和网络请求中获取数据,然后处理异常,判断网络请求和本地加载谁先完成,对此进行过滤操作,最后将数据返回,显示到界面上。
那么我们就可以用下面的方法来同时获取本地数据和网络数据:
1 | private Observable<Timestamped<MeiziModel>> getMergedMeizi() { |
然后整个获取图片的处理流程就可以写成下面的形式:
1 | public Observable<Timestamped<MeiziModel>> getMdeizi(ITimestampedView timestampedView) { |
最后就可以获取经过判断处理后的妹子图片:
1 | Observable<Timestamped<MeiziModel>> recentMeizi = mService.getMdeizi(mITimestampedView) |
关于这种情况,我已经写了一个完整Demo上传至Github: RxJavaDemo
当然,我会在文末再次提醒你我写了这个Demo的😄。
flatmap()
有人第一次看到flatmap()是懵逼的(其实是我哈)。因为它的功能和map()差不多的,都是将一个对象转为另一个对象,那么flatmap()和map()有什么联系和区别呢,以及,什么时候需要使用flatmap()呢?
很多时候,只需要看一下源码就知道答案:
1 | public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) { |
这已经非常明显了,flatmap()的内部实现原理大概就是将数据转换后,合并成一个Observable对象(merge()返回的是一个Observable对象),然后再发射出去。
同时有一点需要注意,合并之后的Observable对象发射出去的数据,并不一定会按照数据源进来的顺序发射,我们可以看下面一个例子:
1 | Observable.range(1, 10) |
数据源是1到10的升序数列,我们对每一个数据的延迟时间做了处理,打印出来的是10到1的降序数列。如果你一定要要求转换后的Observable对象发射的数据顺序和数据源的顺序是一样的话,可以考虑使用concatMap()操作符。
由于flatmap()返回的是一个Observable对象,所以将数据源转化后合并后,通过调用empty()、just()以及Observable.error()可以发射空数据、一个数据、多个数据或者一个异常。
说到异常,在RxJava里面大部分操作符,一旦遇到异常情况,会中断整个流程并立即调用onError(),同时之前的正确的数据也会付之东流。解决这个问题的方法之一就是,延迟错误信息的发送,当所有正常数据发射完成后,再发射错误信息。flatmap()和mergeDelayError()可以做到,并且,利用flatmap()中返回的Observable.error(),我们可以自定义错误信息。比如:
1 | Observable.range(1, 10) |
另外,flatmap()还非常适合嵌套异步操作,特别是在网络请求中经常使用到。简单的例子如下:
1 | netWorkRepository.getLatestNews() |
写了这么多,现在可以回答刚开始提出的问题:多对一的转换,自定义异常或者需要异步嵌套的时候,需要使用flatmap()。
最后
这里只是总结了我使用RxJava的几大场景,具体到某些操作符的使用,比如timer()、compose()、throttleFirst()等,这里就不再详述了。大家可以看看下面两个链接的内容来了解:
RxJava wiki
ReactiveX/RxJava文档中文版
最后,我写了一个RxJavaDemo,重点是实现多渠道数据加载的第二种方式,大家有兴趣可以看看。
地址:RxJavaDemo