Akka广播:得到第一个回复并丢弃其他
问题描述:
我有一个CalculationSupervisor
演员与工作人员池。
每次我需要做一些计算CalculationSupervisor
广播CalculationRequest
给使用路由器的工人。Akka广播:得到第一个回复并丢弃其他
我需要得到最快的计算结果并忽略其他结果。
CalculationSupervisor
如下所示:
public class CalculationSupervisor extends AbstractActor {
private Router router = new Router(new RoundRobinRoutingLogic());
public static Props props() {
return Props.create(CalculationSupervisor.class, CalculationSupervisor::new);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(RegisterWorker.class, registration -> {
final String workerName = registration.name();
final ActorRef worker =
context().actorOf(Worker.props(workerName), workerName);
router = router.addRoutee(worker);
})
.match(CalculationRequest.class, (request) -> {
router.route(new Broadcast(request), self());
})
.match(CalculationResult.class, (result) -> {
// process only the first (the fastest) result
})
.build();
}
}
什么是实现丢弃到来的第一个(也是最快的)结果进来之后消息的逻辑的最佳模式?
答
如果您的主管正在处理多个请求,一个简单的方法是保留一个Map
,其中包含一对请求ID和一个请求收到的响应数。只有在处理时,该特定请求ID的答复数为零时,监督员才检查该映射并处理答复。此外,为了防止地图无限增长,监事删除从地图的条目,结果的数量等于池的大小:
public class CalculationSupervisor extends AbstractActor {
...
private int poolSize = 0;
private Map<Long, Integer> numReplies = new HashMap<>();
@Override
public Receive createReceive() {
return receiveBuilder()
.match(RegisterWorker.class, registration -> {
...
poolSize = poolSize + 1;
})
.match(CalculationRequest.class, request -> {
numReplies.putIfAbsent(request.getId(), 0);
router.route(new Broadcast(request), self());
})
.match(CalculationResult.class, result -> {
Long requestId = result.getRequestId();
if (numReplies.contains(requestId)) {
int num = numReplies.get(requestId);
if (num == 0) {
// process only the first (the fastest) result
...
numReplies.put(requestId, 1);
} else {
if (num + 1 == poolSize)
numReplies.remove(requestId);
else
numReplies.put(requestId, num + 1);
}
}
})
.build();
}
}
有两个假设与上面的方法:
- 请求ID在
CalculationRequest
和CalculationResult
类中均可用(在此示例中,ID是Long
;使用任何适当的参数)。 - 在发送请求之前,路由向主管注册。
更简单的解决方案是不使用路由器,在这种情况下,CalculationSupervisor
不需要为同一请求协调多个结果。由于每个请求都丢弃了最早的结果,因此首先使用路由器是没有意义的。
CalculationSupervisor中有很多请求,我想将它们全部处理。在你的方法中,如果我为任何传入的CalculationRequest获得了CalculationResult,我将停止处理其他请求的结果。您的解决方案可以使用一些存储计算请求标识符及其状态的Map进行扩展。这是我原来的想法,但我想找到更优雅的解决方案:) –