问答

ThreadPoolTaskExecutor和ForkJoinPool的区别?

作者:admin 2021-04-21 我要评论

我想测试下CompletableFuture的功能,然后写了一个main方法: public class test { public static void main(String[] args) { PoolConfig poolConfig = new PoolC...

在说正事之前,我要推荐一个福利:你还在原价购买阿里云、腾讯云、华为云服务器吗?那太亏啦!来这里,新购、升级、续费都打折,能够为您省60%的钱呢!2核4G企业级云服务器低至69元/年,点击进去看看吧>>>)

我想测试下CompletableFuture的功能,然后写了一个main方法:

public class test {
 public static void main(String[] args) {
 PoolConfig poolConfig = new PoolConfig();
 test1 test1 = new test1();
 ThreadPoolTaskExecutor executor = poolConfig.demosExecutor();
 CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(test1::testAsync,executor);
 CompletableFuture<Integer> stringCompletableFuture1 = CompletableFuture.supplyAsync(test1::testAsync1,executor);
 CompletableFuture<Boolean> stringCompletableFuture2 = CompletableFuture.supplyAsync(test1::testAsync2,executor);
 LocalDateTime l = LocalDateTime.now();
 System.out.println(l + ":阻塞");
 Map<String, ? extends Serializable> map = Stream.of(stringCompletableFuture, stringCompletableFuture1, stringCompletableFuture2)
 .map(CompletableFuture::join)
 .collect(Collectors.toMap(m ->{
 String str = m.getClass().getSimpleName();
 return str.substring(0, 1).toLowerCase() + str.substring(1);
 } , m1 -> m1));
 LocalDateTime l1 = LocalDateTime.now();
 Duration between = Duration.between(l, l1);
 System.out.println(stringCompletableFuture2.isDone());
 System.out.println(between.getSeconds());
 System.out.println(System.currentTimeMillis() + ":阻塞结束");
 }
}

然后是我的需要异步的方法:

@Slf4j
public class test1 {
 public String testAsync(){
 try {
 log.error("============1000ms============");
 Thread.sleep(1000);
 log.error("============1000ms end============");
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 return "等待了1000ms";
 }
 public Integer testAsync1(){
 try {
 log.error("============5000ms============");
 Thread.sleep(5000);
 log.error("============5000ms end============");
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 return 5000;
 }
 public boolean testAsync2(){
 try {
 log.error("============10000ms============");
 Thread.sleep(10000);
 log.error("============10000ms end============");
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 return true;
 }
}

然后执行结果为:

13:14:59.530 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService
13:14:59.532 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService
13:14:59.535 [demos-1] ERROR com.demos.auth.controller.test1 - ============1000ms============
13:14:59.539 [demos-2] ERROR com.demos.auth.controller.test1 - ============5000ms============
13:14:59.540 [demos-3] ERROR com.demos.auth.controller.test1 - ============10000ms============
2021-02-27T13:14:59.547019100:阻塞
13:15:00.542 [demos-1] ERROR com.demos.auth.controller.test1 - ============1000ms end============
13:15:04.550 [demos-2] ERROR com.demos.auth.controller.test1 - ============5000ms end============
13:15:09.541 [demos-3] ERROR com.demos.auth.controller.test1 - ============10000ms end============
true
9
1614402909541:阻塞结束

可以看出这里我使用的是ThreadPoolTaskExecutor线程池,然后当我去掉了executor,使用默认的线程池(既ForkJoinPool)却可以自动结束程序:

public class test {
 public static void main(String[] args) {
 PoolConfig poolConfig = new PoolConfig();
 test1 test1 = new test1();
 ThreadPoolTaskExecutor executor = poolConfig.demosExecutor();
 CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(test1::testAsync);
 CompletableFuture<Integer> stringCompletableFuture1 = CompletableFuture.supplyAsync(test1::testAsync1);
 CompletableFuture<Boolean> stringCompletableFuture2 = CompletableFuture.supplyAsync(test1::testAsync2);
 LocalDateTime l = LocalDateTime.now();
 System.out.println(l + ":阻塞");
 Map<String, ? extends Serializable> map = Stream.of(stringCompletableFuture, stringCompletableFuture1, stringCompletableFuture2)
 .map(CompletableFuture::join)
 .collect(Collectors.toMap(m ->{
 String str = m.getClass().getSimpleName();
 return str.substring(0, 1).toLowerCase() + str.substring(1);
 } , m1 -> m1));
 LocalDateTime l1 = LocalDateTime.now();
 Duration between = Duration.between(l, l1);
 System.out.println(stringCompletableFuture2.isDone());
 System.out.println(between.getSeconds());
 System.out.println(System.currentTimeMillis() + ":阻塞结束");
 }
}

程序执行结果:

13:40:54.785 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService
13:40:54.792 [ForkJoinPool.commonPool-worker-19] ERROR com.demos.auth.controller.test1 - ============1000ms============
13:40:54.796 [ForkJoinPool.commonPool-worker-5] ERROR com.demos.auth.controller.test1 - ============5000ms============
13:40:54.798 [ForkJoinPool.commonPool-worker-23] ERROR com.demos.auth.controller.test1 - ============10000ms============
2021-02-27T13:40:54.804219300:阻塞
13:40:55.811 [ForkJoinPool.commonPool-worker-19] ERROR com.demos.auth.controller.test1 - ============1000ms end============
13:40:59.803 [ForkJoinPool.commonPool-worker-5] ERROR com.demos.auth.controller.test1 - ============5000ms end============
13:41:04.806 [ForkJoinPool.commonPool-worker-23] ERROR com.demos.auth.controller.test1 - ============10000ms end============
true
10
1614404464806:阻塞结束

进程已结束,退出代码0

这里可以看出来程序已经结束了.
那么为什么会造成这两种不同的结果呢?


补充线程池配置:

@Configuration
public class PoolConfig {
 @Bean(name = "demosExecutor")
 public ThreadPoolTaskExecutor demosExecutor() {
 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 // 设置核心线程数
 executor.setCorePoolSize(12);
 // 设置最大线程数
 executor.setMaxPoolSize(50);
 // 设置队列容量
 executor.setQueueCapacity(100);
 // 设置线程活跃时间(秒)
 executor.setKeepAliveSeconds(10);
 // 设置默认线程名称
 executor.setThreadNamePrefix("demos-");
 // 设置拒绝策略
 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
 // 等待所有任务结束后再关闭线程池
 executor.setWaitForTasksToCompleteOnShutdown(true);
 executor.setAwaitTerminationSeconds(60);
 executor.initialize();
 return executor;
 }
}
###

个人理解:这两种线程池的使用场景不同,ThreadPoolTaskExecutor 是真正的线程池,可以高度重复使用的,如果线程数量没有超过核心数量或者允许核心线程超时,workQueue会采用take()的方式,而take的方式会一直阻塞线程,知道队列中有任务为止。具体可以参考ThreadPoolTaskExecutor#getTask(),如果你想主动关闭线程池,可以使用shutdown 或者shutdownNow。而ForkJoinPool就没有这些概念,只有并行度,在一开始就知道的任务的存在,完成任务就行了,虽然任务可以根据逻辑一直拆分,就像递归一样,但跳不出总框架,完成任务就完事了,自然不需要shutdown

版权声明:本文转载自网络,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本站转载出于传播更多优秀技术知识之目的,如有侵权请联系QQ/微信:153890879删除

相关文章
  • ThreadPoolTaskExecutor和ForkJoinPool

    ThreadPoolTaskExecutor和ForkJoinPool

  • beautify = require("js-beautify").

    beautify = require("js-beautify").

  • vscdoe有没有什么插件可以吧内链样式提

    vscdoe有没有什么插件可以吧内链样式提

  • react native global全局属性获取不到

    react native global全局属性获取不到

腾讯云代理商
海外云服务器