并行Rx Java地图函数
问题描述:
RxJava和Reactive Programming的新手可以这么说。并行Rx Java地图函数
我试图将两个函数并行映射为单个Observable管道的一部分,但似乎不能这样工作。这是我的代码。
Observable.fromCallable(thatReturnsNumberOne())
.observeOn(newThread())
.map(doubleIt())
.observeOn(newThread())
.map(doubleIt())
.subscribe(testSubscriber);
我想2 doubleIt()调用在同一时间产生。但总是看起来,一旦第一个doubleIt()完成,只有第二个开始。即阻塞/顺序。
我错过了什么?
答
我假设thatReturnsNumberOne()
只返回一个值。返回的值按顺序传递给每个运算符。通过使用observeOn(newThread())
,当值达到链中的那个点时,您只能更改为新线程。
如果你想要做的计算并行,你必须使用多个观测:
Observable.fromCallable(thatReturnsNumberOne())
.flatMap(number -> Observable.fromCallable(doubleIt(number)).subscribeOn(newThread())
.combineLatest(Observable.fromCallable(doubleIt(number)).subscribeOn(newThread()),
doubles -> doubles[0] + doubles[1]))
.subscribe(testSubscriber);