如何使用Java 8的Fork/Join框架并行化循环

如何使用Java 8的Fork/Join框架并行化循环

问题描述:

如何使用Java 8的Fork/Join框架并行化循环。随便我没有使用多线程。我在SO中读了很多问题。现在我无法在Java 8中实现并行处理列表。任何人都可以帮助我?如何使用Java 8的Fork/Join框架并行化循环

我曾尝试过类似于this link的东西。

routes.stream().parallel().forEach(this::doSomething); 

情景就像基于路由列表名单,我需要devide任务并执行我需要一个像foreach循环的insted的我想基于阵列尺寸的并行执行。

我的问题是在处理updateSchedules服务时,它花费了太多时间。这就是我想在这里实现线程概念的原因。 scheduleService.updateSchedules(originId, destinationId,req.getJourneyDate());

for (Availabilities ar : routes) { 
    try { 
    log.info("Starting for bus" + ar); 
    Bus bus = new Bus(); 

    // Get schedule list 
    BitlaSchedules schedule = scheduleRepo 
     .findByOriginIdAndDestinationIdAndScheduleIdAndTravelIdAndRouteId(originId, 
     destinationId, ar.getScheduleId(), ar.getTravelId(), ar.getRouteId()); 

    if (schedule == null) { 
     scheduleService.updateSchedules(originId, destinationId,req.getJourneyDate()); 
     schedule = scheduleRepo 
     .findByOriginIdAndDestinationIdAndScheduleIdAndTravelIdAndRouteId(originId, 
      destinationId, ar.getScheduleId(), ar.getTravelId(), ar.getRouteId()); 
    } 
    } catch(Exception e) { 
    log.error(e.getMessage()); 
    } 
} 
+0

我想用一个执行器会更合适。你是如何得出结论的?Fork/Join是你需要的吗? – Fildor

+0

请编辑您的问题,以表明您尝试了什么(不仅仅是“某些事情来自...”)以及您遇到了什么具体问题。如果这是一个编译器错误,请复制/粘贴错误。如果是运行时错误,请执行相同的操作。如果这是意外的行为,请描述它。 – slim

+0

请参阅:http://stackoverflow.com/help/mcve – slim

正如@fdreger已经表示,它只会帮你CPU密集型任务。 因此,在做出任何假设之前,为了获得更好的表现,为了获得更好的表现,请为自己做个忙。大多数情况下,瓶颈与IO相关。

我会给你一个很简单的例子,你可以如何在Java使用并行流。

public class Test { 

    public static void main(String[] args) { 
     // some dummy data 
     List<Integer> list = new ArrayList<>(); 
     for (int i = 0; i < 20; ++i) list.add(i); 

     // to simulate some CPU intensive work 
     Random random = new SecureRandom(); 

     List<String> result = list.parallelStream().map(i -> { 

      // simulate work load 
      int millis = 0; 
      try { 
       millis = random.nextInt(1000); 
       Thread.sleep(millis); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
      } 

      // return any desired result 
      return "Done something with " + i + " in thread " + Thread.currentThread().getName() + " took " + millis + "ms"; 
     }).collect(Collectors.toList()); // collect joins - will return once all the workers are done 

     // print the result 
     result.forEach(System.out::println);  
    } 
} 

大概的基本错误是,你试图做到这一点。

不要!该fork/join框架是工程的非常具体的一条 - 它解决了一个非常特殊的领域: - 解决CPU密集型问题; - 可以在不共享资源的情况下进行拆分(即无需同步或锁定)。

你的代码似乎使用外部服务: - 如果服务使用任何类型的数据库,那么你的问题不是CPU密集型的;即使不是,那么 - 因为有一个明显的update,那么就有一个需要同步的共享的可变状态(尤其是因为我们似乎有多个作者)。

这意味着您使用并行流不会获得任何

只需使用一个标准的执行有一个线程池,并提交您的项目,如任务。

+0

@Sitansu任何最新的Java书籍都会解释执行者,标准的Java教程或Google搜索到的数百个页面也是如此。 – slim