Flux的组合结果来自Mono

问题描述:

我开始使用Project反应堆,并且我很少挣扎的一个地方是我如何将来自Mono的物体与Flux结合在一起。这是我的使用情况:Flux的组合结果来自Mono

public interface GroupRepository { 
     Mono<GroupModel> getGroup(Long groupId); 
} 

public interface UserRepository { 
     Flux<User> getUsers(Set<Long> userIds); 
} 

Mono<GroupModel> groupMono = getGroup(groupId); 
Flux<User> userFlux = getUsers(Set<Long> users); 
//run above instrtuction in parallel and associate user to group. 

现在我想实现的是:

如何从UserFlux结合响应和相关联的用户与组,像group.addUsers(userfromFlux)。

有人可以帮助如何结合来自userFlux和groupMono的结果。我想我使用类似Zip的东西,但是它能够从源代码进行一对一映射。在我的情况下,我需要做1到N映射。在这里,我有一个组,但需要添加到该组的多个用户。它是返回Mono<List<Users>然后用ZIP运营商与单声道,并提供一个组合子这里
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, final BiFunction<? super T1, ? super T2, ? extends O> combinator)提到一个好主意?

我认为Flux.combineLatest静态方法可以帮助你:因为你的Mono只发出1个元素,该元素将始终与来自Flux的每个传入值组合。

Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]), 
        groupMono, userFlux); 
+0

但最后我用'Flux.zip(groupMono,userMono,双功能)',因为在我的情况下,模型组内底层的数据结构是HashSet的(持有其是组成员的用户),它不是线程安全的以反应性的方式添加用户流量组中的用户。因此,我正在使用BiFunction来以单独的方式填充组中的用户。感谢您的帮助,非常感谢您的帮助! – Coder

为其他人添加一个答案,我用,Flux.zip(groupMono, userMono(holding list of users), this::biFunctionToPopulateGroupWithUsers)。我使用了这种方法,而不是像@Simon所建议的那样,因为持有用户的底层组是一个HashSet,并且以被动方式添加用户不会是线程安全的。但是如果你有一个线程安全的数据结构,我会使用@Simon的建议。