使用多个observables创建可观察?
我有一堆观察对象,我在一个方法中运行/订阅(大约9个确切的),它本身返回一个可观察值。为了这个问题的目的,我把它缩小到2个观察值。这是我创建的方法,它返回一个包含其他可观察对象的Observable。使用多个observables创建可观察?
public static Observable<MyCustomObject> runAll(Service networkService) {
return Observable.create(subscriber -> {
networkService.getOne().subscribe(response -> {
Request request = response.raw().request();
MyCustomObject case = new MyCustomObject(request);
subscriber.onNext(case);
}, exception -> {
throw new OnErrorNotImplementedException(exception);
});
networkService.getTwo().subscribe(response -> {
Request request = response.raw().request();
MyCustomObject case = new MyCustomObject(request);
subscriber.onNext(case);
}, exception -> {
throw new OnErrorNotImplementedException(exception);
});
subscriber.onComplete();
});
}
我然后使用可观察到的是所返回......
runAll(networkService)
.subscribeOn(Schedulers.io())
.subscribe(case -> {
//do stuff
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
//handle error
}
});
我不知道如果我正确地创建一个可观察的。我基本上用Observable替换了我之前在这里的一个监听器/接口,但是我知道使用Observable.create()是不容易的。我应该以另一种方式去做吗?做我在做什么有什么不对吗?
编辑:getOne()和getTwo()是网络调用,所以他们返回Observables。
编辑2:目前有这样的
public static Observable<MyCustomObject> quickRun(Service networkService) {
return Observable.concat(
networkService.getOne().map(response -> {
Request request = response.raw().request();
MyCustomObject case = new MyCustomObject(request);
return case;
}),
networkService.getTwo().map(response -> {
Request request = response.raw().request();
MyCustomObject case = new MyCustomObject(request);
return case;
})
);
}
是的,你应该做它用不同的方式。
每当你将Observable视为像常规的异步回调那样,这是一种你没有做出反应的气味。
正如你所说Observable.create()
是不平凡的方式,创造Observable的一些陷阱,也就是说,你可能使用旧版本的RxJava 1,与更新的版本(1.3+我认为),并与RxJava2,创建基于发射器并且更安全。你可以阅读here关于Create和发射器方法的缺陷。 (作为RxJava2的附注,有另一种方式extending Observable
)。
所有这些方法都是在异步回调世界与反应世界之间架起一道桥梁,并用Observable
包装任何类型的异步操作。至于你的情况,因为你手头已经有Observables,所以你需要将它们组合成单一的流 - Observable。 Rx有很多运营商为此目的,根据你的例子Observable.merge
似乎是合适的运营商,你所有Observable
将异步运行(注意,你将需要适用于他们每个人的IO调度器),并且每个Observable
将发射它在合并流上的结果,当所有的Observable将完成时 - onCompleted将在合并Observable
上调用,这在您的示例中是错误的,因为它在刚启动所有任务之后才被调用。
Observable.merge(
networkService.getOne()
.subscribeOn(Schedulers.io()),
networkService.getTwo()
.subscribeOn(Schedulers.io())
)
.map(response -> {
Request request = response.raw().request();
return new MyCustomObject(request);
})
.subscribe(customObject -> {
//do stuff
}, throwable -> {
//handle error
}
);
是否所有9个'Observable's返回相同的类型?他们互相依赖,还是可以独立运行? –
可能有一个observable返回一个不同的类型,但大多数情况下我可以使它工作(我可以将两个结果都包装在另一个类型或其他类型中),并且每个都可以独立运行。 – EGHDK