Java中的受控ThreadPoolExecutor弹簧

Java中的受控ThreadPoolExecutor弹簧

问题描述:

我需要在我的Spring应用程序中创建一个全局ThreadPoolTask​​Executor,它将负责在我的应用程序中运行多线程任务。Java中的受控ThreadPoolExecutor弹簧

但是,对于每个请求,我想限制从该全局ThreadPool使用的线程数。我应该如何确保根据请求执行此限制?

例如,

我创建一个全局线程池,最大池大小为50个线程。但我想限制每个请求的线程数量来说5个线程。但是这5个线程只能从配置文件中定义的全局线程池中可用的50个线程中分配。

创建任务执行程序的配置类。

@Configuration 
public class ThreadPoolConfiguration { 

    @Value("${threadpool.corepoolsize}") 
    int corePoolSize; 

    @Value("${threadpool.maxpoolsize}") 
    int maxPoolSize; 

    @Bean 
    public ThreadPoolTaskExecutor taskExecutor() { 
     ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor(); 
     pool.setCorePoolSize(corePoolSize); 
     pool.setMaxPoolSize(maxPoolSize); 
     pool.setWaitForTasksToCompleteOnShutdown(true); 
     return pool; 
    } 
} 

Controller类

@RestController 
public class WebController { 

    @Autowired 
    ThreadPoolTaskExecutor threadPool; 

    @RequestMapping("/process") 
    public String process(){ 

     String msg = ""; 
     List<Future<String>> futureList = new ArrayList<>(); 
     for(int threadNumber = 0; threadNumber < 5; threadNumber ++){ 
      CallableWorker callableTask = new CallableWorker(String.valueOf(threadNumber)); 
      Future<String> result = threadPool.submit(callableTask); 
      futureList.add(result); 
     } 

     for(Future<String> future: futureList){ 
      try { 
       msg += future.get() + "#####"; 
      } catch (Exception e){} 
     } 

     return msg; 
    } 
} 

免责声明:这只是示例代码,我从一个blog post了。

我该如何实现这样的设计?我没有看到任何可以创建的子线程池。我也不想为每个请求实例化一个线程池,因为这会是灾难性的。

有什么建议吗?

+0

当一个请求有6个任务但仅限于5个线程时,你想要什么样的行为?在排队完成第6个任务之前,请等到前5个任务之一完成? –

+0

是的,完全一样。但是,如果另一个请求带有5个任务,请将其与全局线程池分开放置5个线程。 而且,这两个请求被提交后,从全局池10个线程被占用和1个任务仍处于等待(从第一个请求) – Amriteya

解决此问题的一种方法可能是创建可调用方法来处理元素列表而不是单个元素。例如,如果要删除请求中的x项目,则可以创建x/5元素列表,并将此列表传递给可调用函数。这样,通过代码,您可以确保每个请求最多只能使用5个线程。你必须小心处理异常情况。 (例如,您可以返回elementID的Map来产生枚举,其中的结果可能是成功的,可重试的excepiton或不可重试的异常。)

此方法可能因您尝试实现的目标而异。

+0

这是两种不同的解释,我从你的回答理解: 所以,如果我得到20个任务在第一个请求中,我将它分成4个项目的列表。 [1-5,6-10,11-15,16-20]并提交1-5作为一个工作(线程)。其他15人会发生什么? 或者我将4组任务提交到4个线程?首先,如果我有超过5 * 5个任务(比如30),我该如何处理?其次,在我的用例中,我希望每个线程都有一个任务。所以当我向一个线程提交一个组时,我必须再次进行并行化,这会导致不必要的开销,并且会违反我的第一个条件,即一个请求一次只能有5个线程。 – Amriteya

+1

如果您有20个请求,您将创建5个每个4个元素的列表,并在该方法中连续处理这4个元素。如果您有50个请求,您将创建5个每个10个元素的列表并连续处理10个元素。你在这里做的是确保你将任务分解成可以分解成5个可以并行运行的小任务。 –

+0

该方法听起来不错。我看到的唯一问题是阻止一个线程编排列表迭代。这将是一项沉重的任务,因为它会提交4个工作等待他们完成,然后再提交4个工作等等。 如果我可以使用ThreadPoolTask​​Executor,我可以提交队列中的所有作业并检索该线程。我正在寻找解决方案,我可以在队列中提交工作。 – Amriteya

我会这样做的方式是创建一个能够节制任务的类。每个请求都会创建一个油门并将其所有任务提交给油门;油门会将前5个(说)任务提交给Executor并将其他人放在列表中。由于提交的任务已完成,可以提交其他任务。

要确定何时完成任务,节流阀可以定期轮询提交的任务,检查期货的isDone()方法以查看它们是否完成,或者它可以阻止一个Future的get()方法,直到它为止完成并检查另一个未决的期货,或者如果你想变得复杂一点,请求线程可以等待()节流阀,任务可以设置为在完成时通知()节流阀,这样请求线程会然后醒来并检查完成的任务。