RxJava。加入来自两个Observable/Flowable流的所有项目

RxJava。加入来自两个Observable/Flowable流的所有项目

问题描述:

我正在加载一些项目以在单个网页上显示。与此同时,我正在加载物品的价格。简化它看起来像这样:RxJava。加入来自两个Observable/Flowable流的所有项目

Observable<Integer> ids = itemIdsToShow(); // COLD Observable 
Observable<Item> items = ids.flatMap(id -> loadingItem(id)); 
Observable<Price> prices = ids.flatMap(id -> loadingPrice(id)); 

更新:“物品”和“价格”都没有排序。

现在我想将它们结合在一起。

Observable<Long> wait = Observable.interval(1000, TimeUnit.SECONDS); 
Observable<Pair<Item, Price>> pairs = items.join(prices, (ii)->wait, (pp)->wait, Pair::of); 

Observable<Item2> items2 = pairs.filter(p->p.a.id == p.b.id).map(p->new Item2(p.a, p.b)); 

它有效,但Observable“wait”看起来很奇怪。或者,我可以使用任何从未完成的Observable。其实我需要一个可观察的,完成“项目”和“价格”的完成。

下一个建议不起作用。

Observable<Object> wait = items.mergeWith(prices).takeLast(1); 

法“加入”订阅这个新创建的再观察的,从而启动了新一代的ID序列的(一切从头开始)。

我很确定,有一个干净而好看的方式来完成全连接而不创建自定义实现。

Zip operator完全是这样。为了您的例子中,你想最终是这样的:

Observable.zip(Item, Price, Pair<Item, Price>)(items, prices, 
BiFunction { item, price -> 
    Pair(item, price) 
}) 

你可以看到对此进行了详细解释here一个例子。

编辑:根据更新的问题,他们没有订购,因为flatMap不保留秩序。取而代之的是结帐concatMap哪(direct link to the marble diagram)。

+0

只有在两个可观察对象以相同顺序生成数据时,它才有效。它不是这种情况: - ( – 30thh

+0

结帐concatMap而不是flatMap –

+0

取回部分在问题中非常简化,原来它包含一个包含多路复用和解复用的长链观测值,如果我将所有“flatMap”调用替换为“ concatMap“,它会使代码变得不可读,并且会失去并发执行的好处。||||||这个问题不是关于读取,而是关于连接。 – 30thh