rxjava开关可观察,如果第二个可观察的开始发射项目
我有一些我并行执行的observable集合,如localObservable
和networkObservable
。如果networkObservable
开始发射物品(从此时起,我只需要这些物品),然后丢弃由localObservable
发出的物品(也许localObservable
尚未开始)。rxjava开关可观察,如果第二个可观察的开始发射项目
Observable<Integer> localObservable =
Observable.defer(() -> Observable.range(1, 10)).subscribeOn(Schedulers.io());
Observable<Integer> networkObservable =
Observable.defer(() -> Observable.range(11, 20)).subscribeOn(Schedulers.io());
你可以做这样的事情:
Observable<Long> networkObservable =
Observable.interval(1000, 500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.share();
Observable<Long> localObservable =
Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.takeUntil(networkObservable);
Observable.merge(networkObservable, localObservable)
.subscribe(System.out::println);
这将输出:
0 // localObservable
1 // localObservable
0 // networkObservable from here on
1
2
...
takeUntil
会让localObservable
停止和取消时,从networkObservable
第一发射发生,所以合并Observable
将从localObservable
发出只要networkObservable
未启动ed,当它发生时,它将停止从localObservable
发射并切换为仅从networkObservable
发射。
有由运营商这样的一个简单的解决办法:AMB
只要看看的System.out的输出。
文档:http://reactivex.io/documentation/operators/amb.html
基本上你订阅观察到两者在同一时间和任何可观察到的第一发射获得通过。其他观察对象将取消订阅。
@Test
public void ambTest() throws Exception {
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> network = Observable.timer(1000, TimeUnit.MILLISECONDS, testScheduler)
.concatMap(aLong -> Observable.just(1, 2, 3))
.doOnSubscribe(disposable -> System.out.println("connect network"))
.doOnDispose(() -> System.out.println("dispose network"));
Observable<Integer> local = Observable.timer(500, TimeUnit.MILLISECONDS, testScheduler)
.concatMap(aLong -> Observable.just(4, 5, 6))
.doOnSubscribe(disposable -> System.out.println("connect local"))
.doOnDispose(() -> System.out.println("dispose local"));
Observable<Integer> integerObservable = Observable.ambArray(network, local);
TestObserver<Integer> test = integerObservable.test();
testScheduler.advanceTimeBy(600, TimeUnit.MILLISECONDS);
test.assertValues(4, 5, 6);
testScheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
test.assertValues(4, 5, 6);
}
这取决于需求,如果你想要从当网络可观测数据尚未发射时,局域可观测量,则局域网将只选择具有第一个发射的本地可观测数据,并且永远不会切换到网络可观测数据 – yosriz
它的工作。感谢你的回答。 – xymelon
如果'networkObservable'发出错误,我想继续使用'localObservable',我该怎么做? – xymelon
你可以使用mergeDelayError(在这种情况下,你会在localObservable发射一些东西之后得到onError),或者只是用onErrorREsumeNext或类似的方法捕获networkObservable的所有错误 – yosriz