项目中- 多线程异步 Future 使用
一. 项目中架构要求:
外围客户端系统 - 交易中心微服务(场景微服务) - 交易集市微服务/能力中心 - ESB - 后方系统(理财/基金等)。
交易中心需要同时调用交易集市十几个组件/接口。由于通讯时间太长和接口请求太多,考虑使用多线程。
考虑使用非阻塞的多线程类 Future。Future表示一个可能还没有完成的异步任务的结果,针对这个结果可以添加Callback以便在任务执行成功或失败后作出相应的操作。
二. 项目使用是银行内部代码,不便于展示,当时的案例demo如下 ,亲测。
2.1 配置类。
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync class ThreadPoolConfig { // @Bean 指定类标识,默认为 类的首字母小写 @Bean("taskExecutor") public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(5); // 设置最大线程数 executor.setMaxPoolSize(10); // 设置队列容量 executor.setQueueCapacity(20); // 设置线程活跃时间(秒) executor.setKeepAliveSeconds(60); // 设置默认线程名称 executor.setThreadNamePrefix("mynah886-test-async-threads-"); // 设置拒绝策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); return executor; } }
2.2 模拟出入参数类
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class TaskInfo { private String taskId; private String taskName; }
2.3 模拟实现逻辑
2.3.1 控制层类
import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @Slf4j @RestController @RequestMapping("/async") public class AsyncController { private Logger logger = LoggerFactory.getLogger(AsyncController.class); @Autowired private TaskLogService taskLogService; @GetMapping(value = "/task" ) public String taskExecute(){ TaskInfo taskInfo1 = new TaskInfo("task1", "taskName001"); TaskInfo taskInfo2 = new TaskInfo("task2", "taskName001"); long startTime = System.currentTimeMillis(); Future<TaskInfo> future1 = null; Future<TaskInfo> future2 = null; try { future1 = taskLogService.insertTaskLog(taskInfo1); future2 = taskLogService.updateTaskLog(taskInfo2); // 异步线程池执行 等待执行完成 isDone() 进行判断 /*while (true) { if (future1.isDone() && future2.isDone()) { System.out.println("异步任务一、二已完成"); break; } }*/ } catch (Exception e) { log.debug("执行异步任务异常 {}" + e.getMessage()); } /* V get(long timeout, TimeUnit unit) 设置取结果超时时间 */ TaskInfo result1 = null; TaskInfo result2 = null; try { result1 = future1.get(3, TimeUnit.SECONDS); result2 = future2.get(); log.debug("任务一result1 == " + result1 + " 任务二result2 == " + result2); } catch (InterruptedException e) { log.debug("InterruptedException: {}", e.getMessage()); } catch (ExecutionException e) { log.debug("ExecutionException: {}", e.getMessage()); } catch (TimeoutException e) { log.debug("接口get取值超时: {}", e.getMessage()); } long endTime = System.currentTimeMillis(); log.debug("异步任务总耗时: " + (endTime - startTime)); return result1 + " --- " + result2; } }
2.3.2 接口类
import java.util.concurrent.Future; public interface TaskLogService { Future<TaskInfo> insertTaskLog(TaskInfo taskInfo) throws InterruptedException; Future<TaskInfo> updateTaskLog(TaskInfo taskInfo) throws InterruptedException; }
2.3.3 实现类
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; import java.util.concurrent.Future; @Service public class TaskLogServiceImpl implements TaskLogService { /** * 需要进行多线程任务的方法 * 1. 方法注解 @Async("taskExecutor") ,括号指定 线程池 名称 * 2. 方法返回值类型为 AsyncResult<T> * 注意: 方法返回值类型由 Future 进行包装,即 Future<T>, 返回对象为 AsyncResult<T> * @param taskInfo * @return Future<String> * @throws InterruptedException */ @Override @Async("taskExecutor") public Future<TaskInfo> insertTaskLog(TaskInfo taskInfo) throws InterruptedException { System.out.println("1---------currentThread: " + Thread.currentThread() ); System.out.println("任务一 Thread Sleep 2s Start, " + taskInfo.getTaskId()); Thread.sleep(2000); taskInfo.setTaskId("001-testId"); taskInfo.setTaskName("001-teatName"); System.out.println("任务一 Thread Sleep 2s End, " + taskInfo.getTaskId()); return new AsyncResult<>( taskInfo ); } @Override @Async("taskExecutor") public Future<TaskInfo> updateTaskLog(TaskInfo taskInfo) throws InterruptedException { System.out.println("2---------currentThread: " + Thread.currentThread() ); System.out.println("任务二 Thread Sleep 5s Start, " + taskInfo.getTaskId()); Thread.sleep(5000); taskInfo.setTaskId("002-testId"); taskInfo.setTaskName("002-teatName"); System.out.println("任务二 Thread Sleep 5s End, " + taskInfo.getTaskId()); return new AsyncResult<>( taskInfo ); } }
2.4 测试结果