Flux的组合结果来自Mono
我开始使用Project反应堆,并且我很少挣扎的一个地方是我如何将来自Mono的物体与Flux结合在一起。这是我的使用情况:Flux的组合结果来自Mono
public interface GroupRepository {
Mono<GroupModel> getGroup(Long groupId);
}
public interface UserRepository {
Flux<User> getUsers(Set<Long> userIds);
}
Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.
现在我想实现的是:
如何从UserFlux结合响应和相关联的用户与组,像group.addUsers(userfromFlux)。
有人可以帮助如何结合来自userFlux和groupMono的结果。我想我使用类似Zip的东西,但是它能够从源代码进行一对一映射。在我的情况下,我需要做1到N映射。在这里,我有一个组,但需要添加到该组的多个用户。它是返回Mono<List<Users>
然后用ZIP运营商与单声道,并提供一个组合子这里public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, final BiFunction<? super T1, ? super T2, ? extends O> combinator)
提到一个好主意?
我认为Flux.combineLatest
静态方法可以帮助你:因为你的Mono
只发出1个元素,该元素将始终与来自Flux
的每个传入值组合。
Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]),
groupMono, userFlux);
为其他人添加一个答案,我用,Flux.zip(groupMono, userMono(holding list of users), this::biFunctionToPopulateGroupWithUsers)
。我使用了这种方法,而不是像@Simon所建议的那样,因为持有用户的底层组是一个HashSet
,并且以被动方式添加用户不会是线程安全的。但是如果你有一个线程安全的数据结构,我会使用@Simon的建议。
但最后我用'Flux.zip(groupMono,userMono,双功能)',因为在我的情况下,模型组内底层的数据结构是HashSet的(持有其是组成员的用户),它不是线程安全的以反应性的方式添加用户流量组中的用户。因此,我正在使用BiFunction来以单独的方式填充组中的用户。感谢您的帮助,非常感谢您的帮助! – Coder