我想测试下CompletableFuture的功能,然后写了一个main方法:
public class test {
public static void main(String[] args) {
PoolConfig poolConfig = new PoolConfig();
test1 test1 = new test1();
ThreadPoolTaskExecutor executor = poolConfig.demosExecutor();
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(test1::testAsync,executor);
CompletableFuture<Integer> stringCompletableFuture1 = CompletableFuture.supplyAsync(test1::testAsync1,executor);
CompletableFuture<Boolean> stringCompletableFuture2 = CompletableFuture.supplyAsync(test1::testAsync2,executor);
LocalDateTime l = LocalDateTime.now();
System.out.println(l + ":阻塞");
Map<String, ? extends Serializable> map = Stream.of(stringCompletableFuture, stringCompletableFuture1, stringCompletableFuture2)
.map(CompletableFuture::join)
.collect(Collectors.toMap(m ->{
String str = m.getClass().getSimpleName();
return str.substring(0, 1).toLowerCase() + str.substring(1);
} , m1 -> m1));
LocalDateTime l1 = LocalDateTime.now();
Duration between = Duration.between(l, l1);
System.out.println(stringCompletableFuture2.isDone());
System.out.println(between.getSeconds());
System.out.println(System.currentTimeMillis() + ":阻塞结束");
}
}
然后是我的需要异步的方法:
@Slf4j
public class test1 {
public String testAsync(){
try {
log.error("============1000ms============");
Thread.sleep(1000);
log.error("============1000ms end============");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "等待了1000ms";
}
public Integer testAsync1(){
try {
log.error("============5000ms============");
Thread.sleep(5000);
log.error("============5000ms end============");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 5000;
}
public boolean testAsync2(){
try {
log.error("============10000ms============");
Thread.sleep(10000);
log.error("============10000ms end============");
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
}
然后执行结果为:
13:14:59.530 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService
13:14:59.532 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService
13:14:59.535 [demos-1] ERROR com.demos.auth.controller.test1 - ============1000ms============
13:14:59.539 [demos-2] ERROR com.demos.auth.controller.test1 - ============5000ms============
13:14:59.540 [demos-3] ERROR com.demos.auth.controller.test1 - ============10000ms============
2021-02-27T13:14:59.547019100:阻塞
13:15:00.542 [demos-1] ERROR com.demos.auth.controller.test1 - ============1000ms end============
13:15:04.550 [demos-2] ERROR com.demos.auth.controller.test1 - ============5000ms end============
13:15:09.541 [demos-3] ERROR com.demos.auth.controller.test1 - ============10000ms end============
true
9
1614402909541:阻塞结束
可以看出这里我使用的是ThreadPoolTaskExecutor线程池,然后当我去掉了executor,使用默认的线程池(既ForkJoinPool)却可以自动结束程序:
public class test {
public static void main(String[] args) {
PoolConfig poolConfig = new PoolConfig();
test1 test1 = new test1();
ThreadPoolTaskExecutor executor = poolConfig.demosExecutor();
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(test1::testAsync);
CompletableFuture<Integer> stringCompletableFuture1 = CompletableFuture.supplyAsync(test1::testAsync1);
CompletableFuture<Boolean> stringCompletableFuture2 = CompletableFuture.supplyAsync(test1::testAsync2);
LocalDateTime l = LocalDateTime.now();
System.out.println(l + ":阻塞");
Map<String, ? extends Serializable> map = Stream.of(stringCompletableFuture, stringCompletableFuture1, stringCompletableFuture2)
.map(CompletableFuture::join)
.collect(Collectors.toMap(m ->{
String str = m.getClass().getSimpleName();
return str.substring(0, 1).toLowerCase() + str.substring(1);
} , m1 -> m1));
LocalDateTime l1 = LocalDateTime.now();
Duration between = Duration.between(l, l1);
System.out.println(stringCompletableFuture2.isDone());
System.out.println(between.getSeconds());
System.out.println(System.currentTimeMillis() + ":阻塞结束");
}
}
程序执行结果:
13:40:54.785 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService
13:40:54.792 [ForkJoinPool.commonPool-worker-19] ERROR com.demos.auth.controller.test1 - ============1000ms============
13:40:54.796 [ForkJoinPool.commonPool-worker-5] ERROR com.demos.auth.controller.test1 - ============5000ms============
13:40:54.798 [ForkJoinPool.commonPool-worker-23] ERROR com.demos.auth.controller.test1 - ============10000ms============
2021-02-27T13:40:54.804219300:阻塞
13:40:55.811 [ForkJoinPool.commonPool-worker-19] ERROR com.demos.auth.controller.test1 - ============1000ms end============
13:40:59.803 [ForkJoinPool.commonPool-worker-5] ERROR com.demos.auth.controller.test1 - ============5000ms end============
13:41:04.806 [ForkJoinPool.commonPool-worker-23] ERROR com.demos.auth.controller.test1 - ============10000ms end============
true
10
1614404464806:阻塞结束
进程已结束,退出代码0
这里可以看出来程序已经结束了.
那么为什么会造成这两种不同的结果呢?
补充线程池配置:
@Configuration
public class PoolConfig {
@Bean(name = "demosExecutor")
public ThreadPoolTaskExecutor demosExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(12);
// 设置最大线程数
executor.setMaxPoolSize(50);
// 设置队列容量
executor.setQueueCapacity(100);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(10);
// 设置默认线程名称
executor.setThreadNamePrefix("demos-");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}
###个人理解:这两种线程池的使用场景不同,ThreadPoolTaskExecutor
是真正的线程池,可以高度重复使用的,如果线程数量没有超过核心数量或者允许核心线程超时,workQueue
会采用take()
的方式,而take
的方式会一直阻塞线程,知道队列中有任务为止。具体可以参考ThreadPoolTaskExecutor#getTask()
,如果你想主动关闭线程池,可以使用shutdown
或者shutdownNow
。而ForkJoinPool
就没有这些概念,只有并行度,在一开始就知道的任务的存在,完成任务就行了,虽然任务可以根据逻辑一直拆分,就像递归一样,但跳不出总框架,完成任务就完事了,自然不需要shutdown
。