RxJava2 源码分析(四)
目的
本文主要分析操作符的实现原理,关于操作符的使用不做讲解,想了解的可以参考Github上的例子项目以及官方文档。
从一个例子开始
这里我们以最常用的 map 为例。首先我们写一个demo:
Demo
这个demo演示了如何将一个int转成string。Kotlin 赛高!!!
虽然看起来没有什么屌用,但是如果int值是一个图片的资源id,我们要将这个id转成一个Bitmap,那么也可以用map操作法,再配上前几篇文章说的线程切换,岂不美哉!~
Observable.just(1).map {
// map 的作用主要是做一个变化,这里是将发射的 int 值变成 string
"$it-covert"
}.subscribe {
// 打印接收到的 string
System.out.println(it)
}
这个为了简单我用到了 just 操作符,其实和 create 差不多,如果你强迫症想搞清楚,可以自己戳戳源码,前面的文章如果你真的看懂了,戳进去几分钟就知道它的原理啦。
这里我们直接从 map 方法开始分析:
Observable
Observable
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
按住 ctrl 键,点击鼠标左键:
ObservableMap
类结构
它也是继承了 AbstractObservableWithUpstream,这个是我们的老相好了,就不介绍了
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {...}
下面看构造函数:
构造函数
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
// source 不多说
super(source);
// 这里就是我们做变换的函数
this.function = function;
}
接下来就是看 subscribeActual
方法了,看,只要掌握了套路,分析起源码来,还是比较轻松的。
subscribeActual 方法
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
这个方法很直接,不做作,直接将 t 与 functiaon 全部扔进 MapObserver 里面,交给 MapObserver 去处理。下面来分析分析 MapObserver 这个类。
ObservableMap.MapObserver
类结构
MapObserver 继承了 BasicFuseableObserver
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {...}
BasicFuseableObserver
书里说过,信息太多与信息太少,都会对理解力造成阻碍。
这个类的蛋疼之处就在于,里面的代码不多,很简单,但是又没简单到你可以一下子就能明白这个类的作用。所以如果我把这个类的代码贴出来,作用也不大,我说不出一个道道来。
这里我说一下我自己对这个类的理解:
Base class for a fuseable intermediate observer.
上面的一串英文是类的注释,翻译过来就是说 一个用于可融合的中间观察者的基类,反正我还是没太明白。
既然它是一个中间观察者,我们就看看它作为一个“中介”,搞了一些啥事情。
BasicFuseableObserver#onSubscribe
在这个方法里面,它作为第三者,插入到了原来的 parent 与 observer 之间
@Override
public final void onSubscribe(Disposable s) {
// 该方法用于判断 this.s 为 null,并且 s 不为 null
if (DisposableHelper.validate(this.s, s)) {
// 这行代码很重要
this.s = s;
// QueueDisposable 是一个接口,后面会讲
if (s instanceof QueueDisposable) {
this.qs = (QueueDisposable<T>)s;
}
// 这里有两个钩子
if (beforeDownstream()) {
// 调用了 actual 的 onSubscribe
// 这个方法我们在之前分析 subscribeActual 方法的时候,都是跳过的
// 下面我们会分析这个方法的作用
actual.onSubscribe(this);
afterDownstream();
}
}
}
由于在之前的文章中,我们忽略了 onSubscribe,而这个类用到了这个方法,所以现在我们拿 ObservableCreate 来分析一下 onSubscribe 这个方法的作用。
ObservableCreate
ObservableCreate#subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
...
}
其实从这两行代码就可以看出,onSubscribe 的参数就是我们的老父亲 parent。
之前我们说过,CreateEmitter 就是将 observer 包装了一下,即 CreateEmitter 持有 observer 。
现在看 onSubscribe 方法就是反过来,即 observer “参数持有” CreateEmitter 。
让我们再次回到 BasicFuseableObserver 类中的 onSubscribe 方法里面:
BasicFuseableObserver
BasicFuseableObserver#onSubscribe
// 这行代码很重要
// s 是原来的 parent,它把这个对象保存起来
this.s = s;
// actual 是我们 demo 中创建的匿名内部类(虽然真正的情况是将 consumer 包成了 observer)
// 将 this 作为 parent 传递进去
// 所以,这样他就成了一个中间观察者
actual.onSubscribe(this);
上面的代码,你可以联想一下链表的插入操作。
前面的文章里面,onSubscribe 的调用都是放在 subscribeActual 中的,这里为啥要放到 Observer 类里面呢?我猜想是因为操作符会导致数据源的不稳定(同步,异步数据),因为 BasicFuseableObserver 还实现了 QueueDisposable 这个接口,这个接口类注释比较多,可以详细看看。
那么,分析到了这里,我们就将 MapObserver 当作一个普通的 Observer 的包装类来看待吧。反正这里我们还用不到里面的 poll 方法。
ObservableMap.MapObserver
我们直接看 onNext 方法:
onNext
这里该方法由 MapObserver 的包装类调用,我们了解了套路,更不需要从头跟踪源码,就知道它由谁调用。只要知道它会被调用,在具体的代码中会被谁调用,分析源码的时候管他调用者是谁呢。
@Override
public void onNext(T t) {
if (done) {
return;
}
// 在这个 demo 中,sourceMode 一直为 NONE
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
// 这里就执行变化了,注意变换结果不能返回null
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 将变化的结果返回
actual.onNext(v);
}
嗯,到这里,demo的流程就分析完了,还是很简单的,主要是例子简单。
其实,该类还有一个 poll 方法,应该会在某些情况下调用,这里没有用到,里面的水应该还有点深。
其他的操作符,这里就不讲了。