跳至主要內容
  • Hostloc 空間訪問刷分
  • 售賣場
  • 廣告位
  • 賣站?

4563博客

全新的繁體中文 WordPress 網站
  • 首頁
  • 为什么 CompletableFuture 的 thenApplyAsync 没有新起一个线程?
未分類
27 8 月 2020

为什么 CompletableFuture 的 thenApplyAsync 没有新起一个线程?

为什么 CompletableFuture 的 thenApplyAsync 没有新起一个线程?

資深大佬 : amiwrong123 18

    public static void test() {         CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {             String supplyAsyncResult = " "+Thread.currentThread().getName()+" Hello world! ";             try {                 Thread.sleep(1000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println(supplyAsyncResult);             return supplyAsyncResult;         }).thenApplyAsync(r -> {  //添加后续任务             String thenApplyResult = Thread.currentThread().getName()+r + " thenApply! ";             System.out.println(thenApplyResult);             return thenApplyResult;         });          try {             System.out.println(completableFuture.get() + " finish!");         } catch (InterruptedException | ExecutionException e) {             e.printStackTrace();         }     } 

打印:

 ForkJoinPool.commonPool-worker-9 Hello world!  ForkJoinPool.commonPool-worker-9 ForkJoinPool.commonPool-worker-9 Hello world!  thenApply!  ForkJoinPool.commonPool-worker-9 ForkJoinPool.commonPool-worker-9 Hello world!  thenApply!  finish! 
        public void run() {             CompletableFuture<T> d; Supplier<T> f;             if ((d = dep) != null && (f = fn) != null) {                 dep = null; fn = null;  //只是为了防止内存泄漏,方便 GC                 if (d.result == null) {                     try {                         d.completeValue(f.get());  //执行 task                     } catch (Throwable ex) {       //执行 task 期间抛出了异常                         d.completeThrowable(ex);                     }                 }                 d.postComplete();             }         } 

从源码上来看,supplyAsync 新起了一个线程,等到线程执行完 task,开始执行 d.postComplete(),即开始执行后续 task,然后 postComplete 会执行后续 task 的 completion 对象的 tryFire 方法。

    static final class UniApply<T,V> extends UniCompletion<T,V> {         Function<? super T,? extends V> fn;         UniApply(Executor executor, CompletableFuture<V> dep,                  CompletableFuture<T> src,                  Function<? super T,? extends V> fn) {             super(executor, dep, src); this.fn = fn;         }         final CompletableFuture<V> tryFire(int mode) {             CompletableFuture<V> d; CompletableFuture<T> a;             if ((d = dep) == null ||                 !d.uniApply(a = src, fn, mode > 0 ? null : this))//这里会发现前一个 stage 执行完毕,但提供了线程池                 return null;             dep = null; src = null; fn = null;             return d.postFire(a, mode);         }     } 
    final <S> boolean uniApply(CompletableFuture<S> a,                                Function<? super S,? extends T> f,                                UniApply<S,T> c) {         Object r; Throwable x;         if (a == null || (r = a.result) == null || f == null)             return false;         tryComplete: if (result == null) {             if (r instanceof AltResult) {                 if ((x = ((AltResult)r).ex) != null) {                     completeThrowable(x, r);                     break tryComplete;                 }                 r = null;             }             try {                 if (c != null && !c.claim())//会执行到这里,然后发现 claim 返回 false                     return false;                 @SuppressWarnings("unchecked") S s = (S) r;                 completeValue(f.apply(s));             } catch (Throwable ex) {                 completeThrowable(ex);             }         }         return true;     } 
        final boolean claim() {             Executor e = executor;             if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {                 if (e == null)                     return true;                 executor = null; // disable                 e.execute(this); //会执行到这里,然后把 this completion 对象提交给线程池执行,当前线程即将返回             }             return false;         } 

我的问题在于,当 worker-9 线程执行完第一个 task 之后,它把第二个 task 提交给了 executor (e.execute(this)),然后线程就返回了(从 claim 函数一层一层返回,直到返回 postComplete )。那为什么第二个 task 从打印结果来看,还是同一个 worker-9 线程来执行的?

还是说,只是因为我的例子比较简单,所以 executor 没有分配一个新的线程出来,其他情况下,thenApplyAsync 里面在执行e.execute(this)时,还是有可能新起一个线程的吗?

大佬有話說 (8)

  • 資深大佬 : passerbytiny

    supplyAsync 和 thenApplyAsync 虽然都是异步调用,但它们两个之间是串行的,为什么就不能在一个线程(执行器)中被执行。

  • 主 資深大佬 : amiwrong123

    @passerbytiny
    没说不可以,它们之间肯定是串行的,但不一定是同一个线程吧。从源码上可见,supplyAsync 的线程并不是直接执行下一个 task 的,因为它 e.execute(this)之后就马上返回了。

  • 資深大佬 : zyoo

    async 的语义是不一定同一个线程,所以这个只能说是巧合了,你可以多试几把?

  • 主 資深大佬 : amiwrong123

    @zyoo
    多试几次也一样。我怀疑这跟 ForkJoinPool.commonPool()的线程调度有关系,但我现在还没来得及看它的原理呢。。

  • 資深大佬 : passerbytiny

    @amiwrong123 异步任务都是将任务提交给执行器去执行的,而不是从线程池取出一个线程用来执行任务。选择哪个线程是由执行器自行决定的,任务的提交者很难也不该对线程选择产生影响。

    从高层次上看,thenApplyAsync 是在 applyAsync 完成之后执行的,所以最优选择就是两者使用同一个线程(线程唤醒也是有成本的)。从源码看,你要主要看的应该是 Executor 的源码。

  • 主 資深大佬 : amiwrong123

    @passerbytiny
    好吧,大概理解了。主要之前我以为我这个例子,applyAsync 和 thenApplyAsync 的执行线程肯定是同一个线程,但从源码上看发现 前一个线程只是提交任务给 Executor 而已。

    所以,applyAsync 和 thenApplyAsync 的执行线程不一定是同一个呗。只是这个例子里,线程池是这样调度的。

  • 資深大佬 : tangzekk

    木宝厉害哦

  • 資深大佬 : RedBeanIce

    @amiwrong123 #5
    @passerbytiny #6

    JDK8
    求问一下,我也是看到这里将任务提交到 Executor
    e.execute(new AsyncRun(d, f));
    那么下一步应该看 forkjoinpool ?因为默认是他。

文章導覽

上一篇文章
下一篇文章

AD

其他操作

  • 登入
  • 訂閱網站內容的資訊提供
  • 訂閱留言的資訊提供
  • WordPress.org 台灣繁體中文

51la

4563博客

全新的繁體中文 WordPress 網站
返回頂端
本站採用 WordPress 建置 | 佈景主題採用 GretaThemes 所設計的 Memory
4563博客
  • Hostloc 空間訪問刷分
  • 售賣場
  • 廣告位
  • 賣站?
在這裡新增小工具