Akka广播:得到第一个回复并丢弃其他

Akka广播:得到第一个回复并丢弃其他

问题描述:

我有一个CalculationSupervisor演员与工作人员池。
每次我需要做一些计算CalculationSupervisor广播CalculationRequest给使用路由器的工人。Akka广播:得到第一个回复并丢弃其他

我需要得到最快的计算结果并忽略其他结果。

CalculationSupervisor如下所示:

public class CalculationSupervisor extends AbstractActor { 

    private Router router = new Router(new RoundRobinRoutingLogic()); 

    public static Props props() { 
     return Props.create(CalculationSupervisor.class, CalculationSupervisor::new); 
    } 

    @Override 
    public Receive createReceive() { 
     return receiveBuilder() 
       .match(RegisterWorker.class, registration -> { 
        final String workerName = registration.name(); 
        final ActorRef worker = 
         context().actorOf(Worker.props(workerName), workerName); 
        router = router.addRoutee(worker); 
       }) 
       .match(CalculationRequest.class, (request) -> { 
        router.route(new Broadcast(request), self()); 
       }) 
       .match(CalculationResult.class, (result) -> { 
        // process only the first (the fastest) result 
       }) 
       .build(); 
    } 
} 

什么是实现丢弃到来的第一个(也是最快的)结果进来之后消息的逻辑的最佳模式?

如果您的主管正在处理多个请求,一个简单的方法是保留一个Map,其中包含一对请求ID和一个请求收到的响应数。只有在处理时,该特定请求ID的答复数为零时,监督员才检查该映射并处理答复。此外,为了防止地图无限增长,监事删除从地图的条目,结果的数量等于池的大小:

public class CalculationSupervisor extends AbstractActor { 
    ... 
    private int poolSize = 0; 
    private Map<Long, Integer> numReplies = new HashMap<>(); 

    @Override 
    public Receive createReceive() { 
     return receiveBuilder() 
      .match(RegisterWorker.class, registration -> { 
       ... 
       poolSize = poolSize + 1; 
      }) 
      .match(CalculationRequest.class, request -> { 
       numReplies.putIfAbsent(request.getId(), 0); 
       router.route(new Broadcast(request), self()); 
      }) 
      .match(CalculationResult.class, result -> { 
       Long requestId = result.getRequestId(); 
       if (numReplies.contains(requestId)) { 
        int num = numReplies.get(requestId); 
        if (num == 0) { 
         // process only the first (the fastest) result 
         ... 
         numReplies.put(requestId, 1); 
        } else { 
         if (num + 1 == poolSize) 
          numReplies.remove(requestId); 
         else 
          numReplies.put(requestId, num + 1); 
        } 
       } 
      }) 
      .build(); 
    } 
} 

有两个假设与上面的方法:

  1. 请求ID在CalculationRequestCalculationResult类中均可用(在此示例中,ID是Long;使用任何适当的参数)。
  2. 在发送请求之前,路由向主管注册。

更简单的解决方案是不使用路由器,在这种情况下,CalculationSupervisor不需要为同一请求协调多个结果。由于每个请求都丢弃了最早的结果,因此首先使用路由器是没有意义的。

+0

CalculationSupervisor中有很多请求,我想将它们全部处理。在你的方法中,如果我为任何传入的CalculationRequest获得了CalculationResult,我将停止处理其他请求的结果。您的解决方案可以使用一些存储计算请求标识符及其状态的Map进行扩展。这是我原来的想法,但我想找到更优雅的解决方案:) –