RxJava。加入来自两个Observable/Flowable流的所有项目
问题描述:
我正在加载一些项目以在单个网页上显示。与此同时,我正在加载物品的价格。简化它看起来像这样:RxJava。加入来自两个Observable/Flowable流的所有项目
Observable<Integer> ids = itemIdsToShow(); // COLD Observable
Observable<Item> items = ids.flatMap(id -> loadingItem(id));
Observable<Price> prices = ids.flatMap(id -> loadingPrice(id));
更新:“物品”和“价格”都没有排序。
现在我想将它们结合在一起。
Observable<Long> wait = Observable.interval(1000, TimeUnit.SECONDS);
Observable<Pair<Item, Price>> pairs = items.join(prices, (ii)->wait, (pp)->wait, Pair::of);
Observable<Item2> items2 = pairs.filter(p->p.a.id == p.b.id).map(p->new Item2(p.a, p.b));
它有效,但Observable“wait”看起来很奇怪。或者,我可以使用任何从未完成的Observable。其实我需要一个可观察的,完成“项目”和“价格”的完成。
下一个建议不起作用。
Observable<Object> wait = items.mergeWith(prices).takeLast(1);
法“加入”订阅这个新创建的再观察的,从而启动了新一代的ID序列的(一切从头开始)。
我很确定,有一个干净而好看的方式来完成全连接而不创建自定义实现。
答
Zip operator完全是这样。为了您的例子中,你想最终是这样的:
Observable.zip(Item, Price, Pair<Item, Price>)(items, prices,
BiFunction { item, price ->
Pair(item, price)
})
你可以看到对此进行了详细解释here一个例子。
编辑:根据更新的问题,他们没有订购,因为flatMap不保留秩序。取而代之的是结帐concatMap哪(direct link to the marble diagram)。
只有在两个可观察对象以相同顺序生成数据时,它才有效。它不是这种情况: - ( – 30thh
结帐concatMap而不是flatMap –
取回部分在问题中非常简化,原来它包含一个包含多路复用和解复用的长链观测值,如果我将所有“flatMap”调用替换为“ concatMap“,它会使代码变得不可读,并且会失去并发执行的好处。||||||这个问题不是关于读取,而是关于连接。 – 30thh