如何在RxJava中的另一个observable内部封装observable?
问题描述:
我想包装改造API调用的另一种方法,我可以另外显示/隐藏装载机,检查网络等。由于我的API返回观察到的,我最终的办法是下面:如何在RxJava中的另一个observable内部封装observable?
private <T> Observable<T> request(final Observable<T> apiCall, final ViewManager viewManager) {
return Observable.create(new Action1<Emitter<T>>() {
@Override
public void call(final Emitter<T> emitter) {
if (!NetworkUtils.isConnected(context)) {
emitter.onError(new ConnectException("network not connected"));
return;
}
viewManager.showLoader();
apiCall.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
viewManager.hideLoader();
emitter.onCompleted();
}
@Override
public void onError(Throwable e) {
viewManager.hideLoader();
emitter.onError(e);
}
@Override
public void onNext(T response) {
emitter.onNext(response);
}
});
}
}, Emitter.BackpressureMode.BUFFER);
}
这是一个处理问题的标准方式?你如何在另一个observable中包含observable?谁能指导?
答
反应式扩展的惯用方式是使用组合,这是RX的强大功能之一。
首先让我们使用运营商定义所需的行为,你要的是这样的:
apiCall
.observeOn(AndroidSchedulers.mainThread())
.startWith(Observable.defer(() -> {
if (!NetworkUtils.isConnected(context)) {
return Observable.error(new ConnectException("network not connected"));
} else {
return Observable.empty();
}
}))
.doOnSubscribe(() -> viewManager.showLoader())
.doOnCompleted(() -> viewManager.hideLoader())
.doOnError(throwable -> viewManager.hideLoader());
现在,它构成任何网络apiCall可观的,你可以使用compose()
运营商和封装这种逻辑到Transformer
为:
class CustomTransformer<T> implements Observable.Transformer<T, T> {
private final ViewManager viewManager;
private final Context context;
CustomTransformer(ViewManager viewManager, Context context) {
this.viewManager = viewManager;
this.context = context;
}
@Override
public Observable<T> call(Observable<T> apiCall) {
return apiCall
.observeOn(AndroidSchedulers.mainThread())
.startWith(Observable.defer(() -> {
if (!NetworkUtils.isConnected(context)) {
return Observable.error(new ConnectException("network not connected"));
} else {
return Observable.empty();
}
}))
.doOnSubscribe(() -> viewManager.showLoader())
.doOnCompleted(() -> viewManager.hideLoader())
.doOnError(throwable -> viewManager.hideLoader());
;
}
}
,那么你可以与任何网络Observable
构成它
someRetrofitQuery
.compose(new CustomTransformer<>(viewManager, context))
...
.subscribe();
感谢您的回答。但是如果我不使用compose(),并且只是在演示者中订阅这个观察者。我只需要一个额外的ViewManager参数。我不明白你为什么需要撰写 – agile1
你也可以用我建议的内容替换你的方法内容(并且具有相同的签名),这将应用相同的效果,正如我所说的,使用撰写是反应方法中更习惯的方式,因为它不是用包装它的方法'打破'链,你可以在这里阅读http://blog.danlew.net/2015/03/02/dont-break-the-chain/ – yosriz
或者你可以使用Kotlin将该函数作为Observable的扩展函数。 –