为什么 CompletableFuture 的 thenApplyAsync 没有新起一个线程?
public static void test() { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { String supplyAsyncResult = " "+Thread.currentThread().getName()+" Hello world! "; try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(supplyAsyncResult); return supplyAsyncResult; }).thenApplyAsync(r -> { //添加后续任务 String thenApplyResult = Thread.currentThread().getName()+r + " thenApply! "; System.out.println(thenApplyResult); return thenApplyResult; }); try { System.out.println(completableFuture.get() + " finish!"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
打印:
ForkJoinPool.commonPool-worker-9 Hello world! ForkJoinPool.commonPool-worker-9 ForkJoinPool.commonPool-worker-9 Hello world! thenApply! ForkJoinPool.commonPool-worker-9 ForkJoinPool.commonPool-worker-9 Hello world! thenApply! finish!
public void run() { CompletableFuture<T> d; Supplier<T> f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; //只是为了防止内存泄漏,方便 GC if (d.result == null) { try { d.completeValue(f.get()); //执行 task } catch (Throwable ex) { //执行 task 期间抛出了异常 d.completeThrowable(ex); } } d.postComplete(); } }
从源码上来看,supplyAsync 新起了一个线程,等到线程执行完 task,开始执行 d.postComplete(),即开始执行后续 task,然后 postComplete 会执行后续 task 的 completion 对象的 tryFire 方法。
static final class UniApply<T,V> extends UniCompletion<T,V> { Function<? super T,? extends V> fn; UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn) { super(executor, dep, src); this.fn = fn; } final CompletableFuture<V> tryFire(int mode) { CompletableFuture<V> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this))//这里会发现前一个 stage 执行完毕,但提供了线程池 return null; dep = null; src = null; fn = null; return d.postFire(a, mode); } }
final <S> boolean uniApply(CompletableFuture<S> a, Function<? super S,? extends T> f, UniApply<S,T> c) { Object r; Throwable x; if (a == null || (r = a.result) == null || f == null) return false; tryComplete: if (result == null) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { completeThrowable(x, r); break tryComplete; } r = null; } try { if (c != null && !c.claim())//会执行到这里,然后发现 claim 返回 false return false; @SuppressWarnings("unchecked") S s = (S) r; completeValue(f.apply(s)); } catch (Throwable ex) { completeThrowable(ex); } } return true; }
final boolean claim() { Executor e = executor; if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { if (e == null) return true; executor = null; // disable e.execute(this); //会执行到这里,然后把 this completion 对象提交给线程池执行,当前线程即将返回 } return false; }
我的问题在于,当 worker-9 线程执行完第一个 task 之后,它把第二个 task 提交给了 executor (e.execute(this)),然后线程就返回了(从 claim 函数一层一层返回,直到返回 postComplete )。那为什么第二个 task 从打印结果来看,还是同一个 worker-9 线程来执行的?
还是说,只是因为我的例子比较简单,所以 executor 没有分配一个新的线程出来,其他情况下,thenApplyAsync 里面在执行e.execute(this)时,还是有可能新起一个线程的吗?