RxJava实用总结

前一段时间终于下定决心将RxJava使用到项目中,现总结一下自己使用RxJava的场景和心得。

线程和Schedulers

首先要明确一点:虽然RxJava是异步的。但是,除非你自己手动切换线程,否则在整个链式调用中,RxJava会一直运行在当前线程中。

RxJava的调度器(Schedulers)为实际生产提供了5中解决方案,如果需要大量IO操作,可以选择Schedulers.io()调度器,如果需要进行密集计算,可以选择Schedulers.computation(),如果只是想开一个线程做些事,可以选择Schedulers.newThread(),借助于RxAndroid,我们还可以使用AndroidSchedulers.mainThread()这个调度器将线程切换回主线程,选择了调度器后,使用一些方法可以将线程切换至调度器对应的线程中。

subscribeOn()observeOn()

我们熟知的subscribeOn(Schedulers.io())observeOn(AndroidSchedulers.mainThread())就是用来将线程切换至指定调度器的线程中的,刚开始看到这两个操作符的时候很容易被混淆,因为它们名称相近,功能相似。毛主席曾教导我们:实践是检验真理的唯一标准!因此,我们可以写代码做实验来看看区别。

首先,我们可以运行如下的代码(注意是Java程序):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {
String[] str = {"A", "B", "C"};
Observable.from(str)
.observeOn(Schedulers.io())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
System.out.println(Thread.currentThread().getName() + " map : " + s);
return s + " Hi";
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(Thread.currentThread().getName() + " call : " + s);
}
});
}

多运行几次你会发现,有时候输出的数据不完整,甚至没有数据!将observeOn(Schedulers.io())替换为subscribeOn(Schedulers.io())后效果更明显 !这是因为,当调用observeOn()或者subscribeOn()后,代码运行在子线程,这之后,子线程还没来得及调用map()subscribe(),主线程就执行完了,因此,你很有可能是看不到运行结果的。

这种情况在真是生产环境中是不太可能出现的,因为几乎所有项目主线程的生命周期不可能这么短。想要在上面的例子中看到显示结果也很简单,调用sleep()方法,让主线程“睡”几秒就行。然后,我们可以看到输出结果如下:

1
2
3
4
5
6
RxIoScheduler-2  map : A
RxIoScheduler-2 call : A Hi
RxIoScheduler-2 map : B
RxIoScheduler-2 call : B Hi
RxIoScheduler-2 map : C
RxIoScheduler-2 call : C Hi

然后我们将.observeOn(Schedulers.io())放置在.map()之后,即:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
String[] str = {"A", "B", "C"};
Observable.from(str)
.map(new Func1<String, String>() {
@Override
public String call(String s) {
System.out.println(Thread.currentThread().getName() + " map : " + s);
return s + " Hi";
}
})
.observeOn(Schedulers.io())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(Thread.currentThread().getName() + " call : " + s);
}
});
sleep(3000);

输出的结果如下:

1
2
3
4
5
6
main  map : A
main map : B
main map : C
RxIoScheduler-2 call : A Hi
RxIoScheduler-2 call : B Hi
RxIoScheduler-2 call : C Hi

多加几个操作符,改变.observeOn(Schedulers.io())的位置,你会得出这样的结论:当事件传递到.observeOn()后,Observable所处的线程才会发生改变。

现在把.observeOn(Schedulers.io())替换为.subscribeOn(Schedulers.io()),无论把它放在哪,你会发现运行的结果都是一样的:

1
2
3
4
5
6
RxIoScheduler-2  map : A
RxIoScheduler-2 call : A Hi
RxIoScheduler-2 map : B
RxIoScheduler-2 call : B Hi
RxIoScheduler-2 map : C
RxIoScheduler-2 call : C Hi

我们会发现,subscribeOn()从事件发出的开端就造成影响。当你调用subscribeOn()时,被观察者,也就是数据源,就已切换至subscribeOn()所调用的线程中,那么整个流程就处于该线程中。

有了上面两个结论,那么当observeOn()subscribeOn()同时存在的时候,就比较好理解了。当observeOn()subscribeOn()同时存在时,数据源和操作流程会先处于subscribeOn()所在的线程,直到调用了observeOn()后,线程会切换至observeOn()所在的线程。

如果同时存在多个observeOn()和多个subscribeOn()会怎么样?多个observeOn()好理解,反正什么时候调用到了observeOn(),什么时候就切换线程。而多个subscribeOn()则不同,它们不会被覆盖,只有第一个subscribeOn()线程有效。

Worker

Schedulers只是一个调度器,它不具有任何线程切换的能力,而完成这一项任务的其实是Scheduler的静态内部类Worker。刚才我们介绍的subscribeOn()subscribeOn()操作符,内部也是调用了Worker来执行真正的线程转换。因此,我们也可以直接使用Worker来做线程切换的操作。同时,由于Worker实现了Subscription接口,所以,当任务结束时,需要调用unsubscribe()方法。

比如,当我们需要处理文件IO时,可能会需要创建一个单独的线程:

1
2
3
4
5
6
7
8
9
Scheduler.Worker ioWorker = Schedulers.io().createWorker();
ioWorker.schedule(new Action0() {
@Override
public void call() {
handleFiles();
}
});
// some time later...
ioWorker.unsubscribe();

又或者,当我们需要切换回主线程使用Glide下载图片时,借助RxAndroid,我们可以这么写:

1
2
3
4
5
6
7
8
9
Scheduler.Worker imageWorker = AndroidSchedulers.mainThread().createWorker();
imageWorker.schedule(new Action0() {
@Override
public void call() {
Glide.with(context).load(url).into(imageView);
}
});
// some time later...
imageWorker.unsubscribe();

除了上面演示的常规线程操作外,Worker还提供了延迟调度和轮询操作的功能。

比如, 你希望2秒后自动执行文件操作,你可以这么写:

1
2
3
4
5
6
7
8
9
Scheduler.Worker ioWorker = Schedulers.io().createWorker();
ioWorker.schedule(new Action0() {
@Override
public void call() {
handleFiles();
}
}, 2, TimeUnit.SECONDS);
// some time later...
ioWorker.unsubscribe();

网络请求的时候会有轮询的需求,可以使用someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable.create(new Observable.OnSubscribe<String>() {  
@Override
public void call(final Subscriber<? super String> observer) {

Schedulers.newThread().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
observer.onNext(doNetworkCallAndGetStringResult());
}
}, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {

}
})

配合Retrofit进行网络下载

这方面的文章很多了,推荐阅读RxJava与Retrofit结合的最佳实践,这里就不做说明了。

多数据源的处理

处理本地数据和网络数据,一般有两种情况:顺序加载和同时加载:

  • 顺序加载:先从本地文件中获取,如果本地文件存在,则直接显示本地文件,不用从服务器下载;如果本地文件没有,再从服务器下载。
  • 同时加载:同时获取本地文件和从服务器下载最新文件,优先显示本地文件,当服务器数据下载完毕,则更新界面的数据并同时将最新的数据进行存储操作。如果服务器数据先于本地数据返回,则只显示服务器上的数据。

这两种情况都需要两个Observable对象作为数据源,一个是本地文件的Observable对象,另一个是网络下载的Observable对象。

对于第一种情况,我们考虑一个案例,比如新闻类APP需要一个功能:用户点击新闻标题跳转到新闻详情页时,先检查本地是否存有新闻内容,如果有,显示本地存储的新闻内容,如果没有,则需要从网络加载内容。

那么本地数据源可以用下面的代码获得。

1
2
3
4
5
6
7
8
9
10
11
12
13
public Observable<NewsDetails> getNewsDetailFromDb(final String newsId) {
final NewsDetails newsDetails = diskRepository.getNewsDetail(newsId);
return Observable.create(new Observable.OnSubscribe<NewsDetails>() {
@Override
public void call(Subscriber<? super NewsDetails> subscriber) {
if (newsDetails != null) {
subscriber.onNext(newsDetails);
} else {
subscriber.onCompleted();
}
}
});
}

而网络下载新闻数据后,一般还需要保存到本地数据库中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public Observable<NewsDetails> getNewsDetailsFromNet(final String newsId) {
return netWorkRepository.getNewsDetails(newsId)
.doOnNext(new Action1<NewsDetails>() {
@Override
public void call(NewsDetails newsDetails) {
diskRepository.saveNewsDetail(newsDetails);
}
}).onErrorReturn(new Func1<Throwable, NewsDetails>() {
@Override
public NewsDetails call(Throwable throwable) {
return null;
}
});
}

有了这两个数据源,可以使用concat()first()操作符来做处理。

1
2
3
4
5
6
Observable<NewsDetails> getNewsDetail = Observable.concat(
domainService.getNewsDetailFromDb(newsId),
domainService.getNewsDetailsFromNet(newsId))
.first()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

这样,就实现了我们第一种情况的功能。

对于第二种情况,可以使用mergeDelayError()操作符将两个操作合并在一起进行(为什么不用merge()操作符?因为我希望能统一处理异常,而不是在发生异常时(比如网络连接超时)中断整个操作)。由于不能确定谁先完成,这个时候可以借助timestamp(),在网络请求的时候加一个时间戳来进行判断。

比如使用Gank.io的API请求妹子图实现第二种情况,那么整个流程就应该是:先同时从本地数据和网络请求中获取数据,然后处理异常,判断网络请求和本地加载谁先完成,对此进行过滤操作,最后将数据返回,显示到界面上。

那么我们就可以用下面的方法来同时获取本地数据和网络数据:

1
2
3
4
5
6
7
8
9
10
11
private Observable<Timestamped<MeiziModel>> getMergedMeizi() {
return Observable.mergeDelayError(
mDiskRepository.getMeiziFromDB().subscribeOn(Schedulers.io()),
mNetRepository.getMeiziFromNet().timestamp().doOnNext(new Action1<Timestamped<MeiziModel>>() {
@Override
public void call(Timestamped<MeiziModel> result) {
mDiskRepository.saveMeizi(result);
}
}).subscribeOn(Schedulers.io())
);
}

然后整个获取图片的处理流程就可以写成下面的形式:

1
2
3
4
5
6
7
8
9
10
11
 public Observable<Timestamped<MeiziModel>> getMdeizi(ITimestampedView timestampedView) {
return getMergedMeizi()
.onErrorReturn(new Func1<Throwable, Timestamped<MeiziModel>>() {
@Override
public Timestamped<MeiziModel> call(Throwable throwable) {
// 不处理任何错误信息
return null;
}
})
.filter(getRecentMeiziFilter(timestampedView));
}

最后就可以获取经过判断处理后的妹子图片:

1
2
3
4
Observable<Timestamped<MeiziModel>> recentMeizi = mService.getMdeizi(mITimestampedView)
.observeOn(AndroidSchedulers.mainThread());

meiziSubscription = recentMeizi.subscribe(fetchMeiziOnNext, fetchMeiziOnError, fetchMeiziOnComplete);

关于这种情况,我已经写了一个完整Demo上传至Github: RxJavaDemo

当然,我会在文末再次提醒你我写了这个Demo的😄。

flatmap()

有人第一次看到flatmap()是懵逼的(其实是我哈)。因为它的功能和map()差不多的,都是将一个对象转为另一个对象,那么flatmap()map()有什么联系和区别呢,以及,什么时候需要使用flatmap()呢?

很多时候,只需要看一下源码就知道答案:

1
2
3
4
5
6
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func));
}

这已经非常明显了,flatmap()的内部实现原理大概就是将数据转换后,合并成一个Observable对象(merge()返回的是一个Observable对象),然后再发射出去。

同时有一点需要注意,合并之后的Observable对象发射出去的数据,并不一定会按照数据源进来的顺序发射,我们可以看下面一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.range(1, 10)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(integer).delay(11 - integer, TimeUnit.SECONDS);
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});

数据源是1到10的升序数列,我们对每一个数据的延迟时间做了处理,打印出来的是10到1的降序数列。如果你一定要要求转换后的Observable对象发射的数据顺序和数据源的顺序是一样的话,可以考虑使用concatMap()操作符。

由于flatmap()返回的是一个Observable对象,所以将数据源转化后合并后,通过调用empty()just()以及Observable.error()可以发射空数据、一个数据、多个数据或者一个异常。

说到异常,在RxJava里面大部分操作符,一旦遇到异常情况,会中断整个流程并立即调用onError(),同时之前的正确的数据也会付之东流。解决这个问题的方法之一就是,延迟错误信息的发送,当所有正常数据发射完成后,再发射错误信息。flatmap()mergeDelayError()可以做到,并且,利用flatmap()中返回的Observable.error(),我们可以自定义错误信息。比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.range(1, 10)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
if (integer < 5) {
return Observable.just(integer * integer);
}
return Observable.<Integer>error(new IOException("数据太大了!"));
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});

另外,flatmap()还非常适合嵌套异步操作,特别是在网络请求中经常使用到。简单的例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
netWorkRepository.getLatestNews()
.flatMap(new Func1<LatestNews, Observable<?>>() {
@Override
public Observable<String> call(LatestNews latestNews) {
return netWorkRepository.getNewsDetails(latestNews.getID());
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});

写了这么多,现在可以回答刚开始提出的问题:多对一的转换,自定义异常或者需要异步嵌套的时候,需要使用flatmap()

最后

这里只是总结了我使用RxJava的几大场景,具体到某些操作符的使用,比如timer()compose()throttleFirst()等,这里就不再详述了。大家可以看看下面两个链接的内容来了解:

RxJava wiki
ReactiveX/RxJava文档中文版

最后,我写了一个RxJavaDemo,重点是实现多渠道数据加载的第二种方式,大家有兴趣可以看看。

地址:RxJavaDemo

如果觉得文章对你有帮助,请我喝杯可乐吧