精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Java8異步編程之CompletableFuture源碼解讀

原創(chuàng)
開(kāi)發(fā) 后端
一說(shuō)到異步任務(wù),很多人上來(lái)咔咔新建個(gè)線程池。為了防止線程數(shù)量肆虐,一般還會(huì)考慮使用單例模式創(chuàng)建線程池,具體使用方法請(qǐng)看下文。

【51CTO.com原創(chuàng)稿件】

一、引言

一說(shuō)到異步任務(wù),很多人上來(lái)咔咔新建個(gè)線程池。為了防止線程數(shù)量肆虐,一般還會(huì)考慮使用單例模式創(chuàng)建線程池,具體使用方法大都如下面的代碼所示: 

  1. @Test  
  2. publicvoiddemo1() throwsExecutionException, InterruptedException {  
  3. ExecutorServiceexecutorService=Executors.newFixedThreadPool(5);  
  4. Future<Object>future1=executorService.submit(newCallable<Object>() {  
  5. @Override  
  6. publicObjectcall() throwsException {  
  7. returnThread.currentThread().getName();
      }    
  8.  
  9. }); 
  10.  
  11. System.out.println(future1.get()); 
  12.  
  13. executorService.execute(newRunnable() { 
  14.  
  15. @Overridepublicvoidrun() { 
  16.  
  17. System.out.println(Thread.currentThread().getName());        
  18.  
  19. }   
  20.  
  21. }); 
  22.  
  23.  

經(jīng)常使用 JavaScript 的同學(xué)相信對(duì)于異步回調(diào)的用法相當(dāng)熟悉了,畢竟 JavaScript 擁有“回調(diào)地獄”的美譽(yù)。

我們大 Java 又開(kāi)啟了新一輪模仿之旅。

java.util.concurrent 包新增了 CompletableFuture 類(lèi)可以實(shí)現(xiàn)類(lèi)似 JavaScript 的連續(xù)回調(diào)。

二、兩種基本用法

先來(lái)看下 CompletableFuture 的兩種基本⽤法,代碼如下: 

  1. @Test 
  2.  
  3. public void index1() throws ExecutionException, InterruptedException { 
  4.  
  5.   CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> Thread.currentThread().getName()); 
  6.  
  7.    CompletableFuture completableFuture2 = CompletableFuture.runAsync(() -> Thread.currentThread().getName()); 
  8.  
  9.    System.out.println(completableFuture1.get()); System.out.println(completableFuture2.get()); 
  10.  
  11.  

打印輸出: 

  1. ForkJoinPool.commonPool-worker-1 
  2. null 

初看代碼,第一反應(yīng)是代碼簡(jiǎn)潔。直接調(diào)用 CompletableFuture 類(lèi)的靜態(tài)方法,提交任務(wù)方法就完事了。但是,隨之而來(lái)的疑問(wèn)就是,異步任務(wù)執(zhí)行的背后是一套什么邏輯呢?是一對(duì)一使用newThread()還是依賴(lài)線程池去執(zhí)行的呢。

三、探索線程池原理

翻閱 CompletableFuture 類(lèi)的源碼,我們找到答案。關(guān)鍵代碼如下:

  1. private static final boolean useCommonPool = 
  2.  (ForkJoinPool.getCommonPoolParallelism() > 1); 
  3. /** 
  4. * Default executor -- ForkJoinPool.commonPool() unless it cannot 
  5. * support parallelism. 
  6. */ 
  7. private static final Executor asyncPool = useCommonPool ? 
  8.  ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 

可以看到 CompletableFuture 類(lèi)默認(rèn)使⽤的是 ForkJoinPool.commonPool() ⽅法返回的線程池。當(dāng) 然啦,前提是 ForkJoinPool 線程池的數(shù)量⼤于 1 。否則,則使⽤ CompletableFuture 類(lèi)⾃定義的 ThreadPerTaskExecutor 線程池。 ThreadPerTaskExecutor 線程池的實(shí)現(xiàn)邏輯⾮常簡(jiǎn)單,⼀⾏代碼簡(jiǎn)單實(shí)現(xiàn)了 Executor 接⼝,內(nèi)部執(zhí)⾏ 邏輯是⼀條任務(wù)對(duì)應(yīng)⼀條線程。代碼如下:

  1. /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ 
  2. static final class ThreadPerTaskExecutor implements Executor { 
  3.  public void execute(Runnable r) { new Thread(r).start(); } 

四、兩種異步接⼝

之前我們使⽤線程池執(zhí)⾏異步任務(wù)時(shí),當(dāng)不需要任務(wù)執(zhí)⾏完畢后返回結(jié)果的,我們都是實(shí)現(xiàn) Runnable 接⼝。⽽當(dāng)需要實(shí)現(xiàn)返回值時(shí),我們使⽤的則是 Callable 接⼝。 同理,使⽤ CompletableFuture 類(lèi)的靜態(tài)⽅法執(zhí)⾏異步任務(wù)時(shí),不需要返回結(jié)果的也是實(shí)現(xiàn) Runnable 接⼝。⽽當(dāng)需要實(shí)現(xiàn)返回值時(shí),我們使⽤的則是 Supplier 接⼝。其實(shí),Callable 接⼝和 Supplier 接⼝ 并沒(méi)有什么區(qū)別。 接下來(lái),我們來(lái)分析⼀下 CompletableFuture 是如何實(shí)現(xiàn)異步任務(wù)執(zhí)⾏的。

runAsync

CompletableFuture 執(zhí)⾏⽆返回值任務(wù)的是 runAsync() ⽅法。該⽅法的關(guān)鍵執(zhí)⾏代碼如下:

  1. static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { 
  2.  if (f == null) throw new NullPointerException(); 
  3.  CompletableFuture<Void> d = new CompletableFuture<Void>(); 
  4.  e.execute(new AsyncRun(d, f)); 
  5.  return d; 

可以看到,該⽅法將 Runnable 實(shí)例作為參數(shù)封裝⾄ AsyncRun 類(lèi)。實(shí)際上, AsyncRun 類(lèi)是對(duì) Runnable 接⼝的進(jìn)⼀步封裝。實(shí)際上,AsyncRun 類(lèi)也是實(shí)現(xiàn)了 Runnable 接⼝。觀察下⽅ AsyncRun 類(lèi)的源碼,可以看到 AsyncRun 類(lèi)的 run() ⽅法中調(diào)⽤了 Runnable 參數(shù)的 run() ⽅法。

  1. public void run() { 
  2.  CompletableFuture<Void> d; Runnable f; 
  3.  if ((d = dep) != null && (f = fn) != null) { 
  4.  dep = null; fn = null
  5.  if (d.result == null) { 
  6.  try { 
  7.  f.run(); 
  8.  d.completeNull(); 
  9.  } catch (Throwable ex) { 
  10.  d.completeThrowable(ex); 
  11.  } 
  12.  } 
  13.  d.postComplete(); 
  14.  } 

當(dāng)提交的任務(wù)執(zhí)⾏完畢后,即 f.run() ⽅法執(zhí)⾏完畢。調(diào)⽤ d.completeNull() ⽅法設(shè)置任務(wù)執(zhí)⾏結(jié) 果為空。代碼如下:

  1. /** The encoding of the null value. */ 
  2. static final AltResult NIL = new AltResult(null); 
  3. /** Completes with the null value, unless already completed. */ 
  4. final boolean completeNull() { 
  5.  return UNSAFE.compareAndSwapObject(this, RESULT, null
  6.  NIL); 

可以看到,對(duì)于任務(wù)返回值為 null 的執(zhí)⾏結(jié)果,被封裝為 new AltResult(null) 對(duì)象。⽽且,還是 調(diào)⽤的 CAS 本地⽅法實(shí)現(xiàn)了原⼦操作。 為什么需要對(duì) null 值進(jìn)⾏單獨(dú)封裝呢?觀察 get() ⽅法的源碼:

  1. public T get() throws InterruptedException, ExecutionException { 
  2.  Object r; 
  3.  return reportGet((r = result) == null ? waitingGet(true) : r); 

原來(lái)原因是便于使⽤ null 值區(qū)分異步任務(wù)是否執(zhí)⾏完畢。 如果你對(duì) CAS 不太了解的話,可以查閱 compareAndSwapObject ⽅法的四個(gè)參數(shù)的含義。該⽅法的參 數(shù) RESULT 是什么呢?查看代碼如下:

  1. RESULT = u.objectFieldOffset(k.getDeclaredField("result")); 

原來(lái),RESULT 是獲取 CompletableFuture 對(duì)象中 result 字段的偏移地址。這個(gè) result 字段⼜是啥 呢?就是任務(wù)執(zhí)⾏完畢后的結(jié)果值。代碼如下:

  1. // Either the result or boxed AltResult 
  2. volatile Object result;  

supplyAsync

CompletableFuture 執(zhí)⾏有返回值任務(wù)的是 supplyAsync() ⽅法。該⽅法的關(guān)鍵執(zhí)⾏代碼如下:

  1. static <U> CompletableFuture<U> asyncSupplyStage(Executor e, 
  2.  Supplier<U> f) { 
  3.  if (f == null) throw new NullPointerException(); 
  4.  CompletableFuture<U> d = new CompletableFuture<U>(); 
  5.  e.execute(new AsyncSupply<U>(d, f)); 
  6.  return d; 

與 AsyncRun 類(lèi)對(duì) Runnable 接⼝的封裝相同的是,AsyncSupply 類(lèi)也是對(duì) Runnable 接⼝的 run() ⽅ 法進(jìn)⾏了⼀層封裝。代碼如下:

  1. public void run() { 
  2.  CompletableFuture<T> d; Supplier<T> f; 
  3.  if ((d = dep) != null && (f = fn) != null) { 
  4.  dep = null; fn = null
  5.  if (d.result == null) { 
  6.  try { 
  7.  d.completeValue(f.get()); 
  8.  } catch (Throwable ex) { 
  9.  d.completeThrowable(ex); 
  10.  } 
  11.  } 
  12.  d.postComplete(); 
  13.  } 

當(dāng)異步任務(wù)執(zhí)⾏完畢后,返回結(jié)果會(huì)經(jīng) d.completeValue() ⽅法進(jìn)⾏封裝。與 d.completeNull() ⽅ 法不同的是,該⽅法具有⼀個(gè)參數(shù)。代碼如下:

  1. /** Completes with a non-exceptional result, unless already completed. */ 
  2. final boolean completeValue(T t) { 
  3.  return UNSAFE.compareAndSwapObject(this, RESULT, null
  4.  (t == null) ? NIL : t); 

⽆論是類(lèi) AsyncRun 還是類(lèi) AsyncSupply ,run() ⽅法都會(huì)在執(zhí)⾏結(jié)束之際調(diào)⽤ CompletableFuture 對(duì)象的 postComplete() ⽅法。顧名思義,該⽅法將通知后續(xù)回調(diào)函數(shù)的執(zhí)⾏。

五、探究回調(diào)函數(shù)原理

前⾯我們提到了 CompletableFuture 具有連續(xù)回調(diào)的特性。舉個(gè)例⼦:

  1. @Test 
  2. public void demo2() throws ExecutionException, InterruptedException { 
  3.  CompletableFuture<ArrayList> completableFuture = 
  4. CompletableFuture.supplyAsync(() -> { 
  5.  System.out.println(Thread.currentThread().getName()); 
  6.  return new ArrayList(); 
  7.  }) 
  8.  .whenCompleteAsync((list, throwable) -> { 
  9.  System.out.println(Thread.currentThread().getName()); 
  10.  list.add(1); 
  11.  }) 
  12.  .whenCompleteAsync((list, throwable) -> { 
  13.  System.out.println(Thread.currentThread().getName()); 
  14.  list.add(2); 
  15.  }) 
  16.  .whenCompleteAsync((list, throwable) -> { 
  17. System.out.println(Thread.currentThread().getName()); 
  18.  list.add(3); 
  19.  }); 
  20.  System.out.println(completableFuture.get()); 

打印輸出:

  1. ForkJoinPool.commonPool-worker-1 
  2. ForkJoinPool.commonPool-worker-1 
  3. ForkJoinPool.commonPool-worker-1 
  4. ForkJoinPool.commonPool-worker-1 
  5. [1, 2, 3] 

上⾯的測(cè)試⽅法中,通過(guò) supplyAsync ⽅法提交異步任務(wù),當(dāng)異步任務(wù)運(yùn)⾏結(jié)束,對(duì)結(jié)果值添加三個(gè)回 調(diào)函數(shù)進(jìn)⼀步處理。 觀察打印輸出,可以初步得出如下結(jié)論:

  1. 異步任務(wù)與回調(diào)函數(shù)均運(yùn)⾏在同⼀個(gè)線程中。
  2. 回調(diào)函數(shù)的調(diào)⽤順序與添加回調(diào)函數(shù)的順序⼀致。

那么問(wèn)題來(lái)了,CompletableFuture 內(nèi)部是如何處理連續(xù)回調(diào)函數(shù)的呢?

AsyncSupply

當(dāng)我們提交異步任務(wù)時(shí),等價(jià)于向線程池提交 AsyncSupply 對(duì)象或者 AsyncRun 對(duì)象。觀察這兩個(gè)類(lèi) 的唯⼀構(gòu)造⽅法都是相同的,代碼如下:

  1. AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) { 
  2.  this.dep = dep; this.fn = fn; 

這就將 AsyncSupply 異步任務(wù)與返回給⽤戶(hù)的 CompletableFuture 對(duì)象進(jìn)⾏綁定,⽤于在執(zhí)⾏結(jié)束后 回填結(jié)果到 CompletableFuture 對(duì)象,以及通知后續(xù)回調(diào)函數(shù)的運(yùn)⾏。

Completion

回調(diào)函數(shù)均是 Completion 類(lèi)的⼦類(lèi),抽取 Completion 類(lèi)與⼦類(lèi)的關(guān)鍵代碼:

  1. Completion next
  2. CompletableFuture<V> dep; 
  3. CompletableFuture<T> src; 
  4. Function fn; 

Completion 類(lèi)含有 next 字段,很明顯是⼀個(gè)鏈表。 Completion 的⼦類(lèi)含有兩個(gè) CompletableFuture 類(lèi)型的參數(shù),dep 是新建的、⽤于下⼀步的 CompletableFuture 對(duì)象,src 則是引⽤它的 CompletableFuture 對(duì)象。

當(dāng) Completion 執(zhí)⾏完回調(diào)⽅法后,⼀般會(huì)返回 dep 對(duì)象,⽤于迭代遍歷。

CompletableFuture

觀察源碼,CompletableFuture 主要包含下⾯兩個(gè)參數(shù):

  1. volatile Object result; //結(jié)果 
  2. volatile Completion stack; //回調(diào)⽅法棧 

Completion 類(lèi)型封裝了回調(diào)⽅法,但為什么要起名為 stack (棧)呢? 因?yàn)?CompletableFuture 借助 Completion 的鏈表結(jié)構(gòu)實(shí)現(xiàn)了棧。每當(dāng)調(diào)⽤ CompletableFuture 對(duì) 象的 whenCompleteAsync() 或其它回調(diào)⽅法時(shí),都會(huì)新建⼀個(gè) Completion 對(duì)象,并壓到棧頂。代碼 如下:

  1. final boolean tryPushStack(Completion c) { 
  2.  Completion h = stack; 
  3.  lazySetNext(c, h); 
  4.  return UNSAFE.compareAndSwapObject(this, STACK, h, c); 

postComplete

回顧上⾯兩種異步任務(wù)類(lèi)的實(shí)現(xiàn),當(dāng)異步任務(wù)執(zhí)⾏完畢之后,都會(huì)調(diào)⽤ postComplete() ⽅法通知回調(diào) ⽅法的執(zhí)⾏。代碼如下:

  1. final void postComplete() { 
  2.  CompletableFuture<?> f = this; Completion h; 
  3.  while ((h = f.stack) != null || 
  4.  (f != this && (h = (f = this).stack) != null)) { 
  5.  CompletableFuture<?> d; Completion t; 
  6.  if (f.casStack(h, t = h.next)) { 
  7.  if (t != null) { 
  8.  if (f != this) { 
  9.  pushStack(h); 
  10.  continue
  11.  } 
  12.  h.next = null; // detach 
  13.  } 
  14.  f = (d = h.tryFire(NESTED)) == null ? this : d; 
  15.  } 
  16.  } 

這段代碼是本⽂的核⼼部分,⼤致邏輯如下:

當(dāng)異步任務(wù)執(zhí)⾏結(jié)束后,CompletableFuture 會(huì)查看⾃身是否含有回調(diào)⽅法棧,如果含有,會(huì)通過(guò) casStack() ⽅法拿出棧頂元素 h ,此時(shí)的棧頂是原來(lái)?xiàng)5牡?#12038;位元素 t。如果 t 等于 null,那么直接 執(zhí)⾏回調(diào)⽅法 h,并返回下⼀個(gè) CompletableFuture 對(duì)象。然后⼀直迭代這個(gè)過(guò)程。 簡(jiǎn)化上述思路,我更想稱(chēng)其為通過(guò) Completion 對(duì)象實(shí)現(xiàn)橋接的 CompletableFuture 鏈表,流程圖如 下:

上⾯的過(guò)程是屬于正常情況下的,也就是⼀個(gè) CompletableFuture 對(duì)象只提交⼀個(gè)回調(diào)⽅法的情況。 如果我們使⽤同⼀個(gè) CompletableFuture 對(duì)象連續(xù)調(diào)⽤多次回調(diào)⽅法,那么就會(huì)形成 Completion 棧。

你以為 Completion 棧內(nèi)元素會(huì)依次調(diào)⽤,不會(huì)的。從代碼中來(lái)看,當(dāng)回調(diào)⽅法 t 不等于 null,有兩種 情況:

情況 1:如果當(dāng)前迭代到的 CompletableFuture 對(duì)象是 this (也就是 CompletableFuture 鏈表頭), 會(huì)令 h.next = null ,因?yàn)?h.next 也就是 t 通過(guò) CAS 的⽅式壓到了 this 對(duì)象的 stack 棧頂。

情況 2:如果當(dāng)前迭代到的 CompletableFuture 對(duì)象 f 不是 this (不是鏈表頭)的話,會(huì)將回調(diào)函數(shù) h 壓⼊ this (鏈表頭)的 stack 中。然后從鏈表頭再次迭代遍歷。這樣下去,對(duì)象 f 中的回調(diào)⽅法棧假設(shè) 為 3-2-1,從 f 的棧頂推出再壓⼊ this 的棧頂,順序就變?yōu)榱?1-2-3。這時(shí)候,情況就變成了第 1 種。

這樣,當(dāng)回調(diào)⽅法 t = h.next 等于 null 或者 f 等于 this 時(shí),都會(huì)對(duì)棧頂?shù)幕卣{(diào)⽅法進(jìn)⾏調(diào)⽤。

簡(jiǎn)單來(lái)說(shuō),就是將擁有多個(gè)回調(diào)⽅法的 CompletableFuture 對(duì)象的多余的回調(diào)⽅法移到到 this 對(duì)象的 棧內(nèi)。

回調(diào)⽅法執(zhí)⾏結(jié)束要么返回下⼀個(gè) CompletableFuture 對(duì)象,要么返回 null 然后⼿動(dòng)設(shè)置為 f = this, 再次從頭遍歷。

Async

回調(diào)函數(shù)的執(zhí)⾏其實(shí)分為兩種,區(qū)別在于帶不帶 Async 后綴。例如:

  1. @Test 
  2. public void demo3() throws ExecutionException, InterruptedException { 
  3.  CompletableFuture<ArrayList> completableFuture = 
  4. CompletableFuture.supplyAsync(() -> { 
  5.  System.out.println(Thread.currentThread().getName()); 
  6.  return new ArrayList(); 
  7.  }) 
  8.  .whenComplete((arrayList, throwable) -> { 
  9.  System.out.println(Thread.currentThread().getName()); 
  10.  arrayList.add(1); 
  11.  }).whenCompleteAsync((arrayList, throwable) -> { 
  12.  System.out.println(Thread.currentThread().getName()); 
  13.  arrayList.add(2); 
  14.  }); 
  15.  System.out.println(completableFuture.get()); 

打印輸出:

  1. ForkJoinPool.commonPool-worker-1 
  2. main 
  3. ForkJoinPool.commonPool-worker-1 
  4. [1, 2] 

whenComplete() 和 whenCompleteAsync() ⽅法的區(qū)別在于是否在⽴即執(zhí)⾏。源碼如下:

  1. private CompletableFuture<T> uniWhenCompleteStage( 
  2.  Executor e, BiConsumer<? super T, ? super Throwable> f) { 
  3.  if (f == null) throw new NullPointerException(); 
  4.  CompletableFuture<T> d = new CompletableFuture<T>(); 
  5.  if (e != null || !d.uniWhenComplete(this, f, null)) { 
  6.  UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); 
  7.  push(c); 
  8.  c.tryFire(SYNC); 
  9.  } 
  10.  return d; 

兩個(gè)⽅法都是調(diào)⽤的 uniWhenCompleteStage() ,區(qū)別在于參數(shù) Executor e 是否為 null。從⽽控制是 否調(diào)⽤ d.uniWhenComplete() ⽅法,該⽅法會(huì)判斷 result 是否為 null,從⽽嘗試是否⽴即執(zhí)⾏該回調(diào) ⽅法。若是 supplyAsync() ⽅法提交的異步任務(wù)耗時(shí)相對(duì)⻓⼀些,那么就不建議使⽤ whenComplete() ⽅法了。此時(shí)由 whenComplete() 和 whenCompleteAsync() ⽅法提交的異步任務(wù)都會(huì)由線程池執(zhí)⾏。

本章小結(jié)

通過(guò)本章節(jié)的源碼分析,我們明白了 Completion 之所以將自身設(shè)置為鏈表結(jié)構(gòu),是因?yàn)?CompletableFuture 需要借助 Completion 的鏈表結(jié)構(gòu)實(shí)現(xiàn)棧。也明白了同一個(gè) CompletableFuture 對(duì)象如果多次調(diào)用回調(diào)方法時(shí)執(zhí)行順序會(huì)與調(diào)用的順序不符合。換言之,一個(gè) CompletableFuture 對(duì)象只調(diào)用一個(gè)回調(diào)方法才是 CompletableFuture 設(shè)計(jì)的初衷,我們?cè)诰幊讨幸部梢岳眠@一特性來(lái)保證回調(diào)方法的調(diào)用順序。

因篇幅有限,本文并沒(méi)有分析更多的 CompletableFuture 源碼,感興趣的小伙伴可以自行查看。

六、用法集錦

異常處理

方法:

  1. public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn) 

示例:

  1. @Test 
  2. public void index2() throws ExecutionException, InterruptedException { 
  3.    CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2 / 0) 
  4.           .exceptionally((e) -> { 
  5.                System.out.println(e.getMessage()); 
  6.                return 0; 
  7.           }); 
  8.    System.out.println(completableFuture.get()); 

輸出:

  1. java.lang.ArithmeticException: / by zero 

任務(wù)完成后對(duì)結(jié)果的處理

方法:

  1. public CompletableFuture<T>   whenComplete(BiConsumer<? super T,? super Throwable> action
  2. public CompletableFuture<T>  whenCompleteAsync(BiConsumer<? super T,? super Throwable> action
  3. public CompletableFuture<T>  whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) 

示例:

  1. @Test 
  2. public void index3() throws ExecutionException, InterruptedException { 
  3.     CompletableFuture<HashMap> completableFuture = CompletableFuture.supplyAsync(() -> new HashMap()) 
  4.             .whenComplete((map, throwable) -> { 
  5.                 map.put("key1""value1"); 
  6.             }); 
  7.     System.out.println(completableFuture.get()); 

輸出:

  1. {key=value} 

任務(wù)完成后對(duì)結(jié)果的轉(zhuǎn)換

方法:

  1. public <U> CompletableFuture<U>   thenApply(Function<? super T,? extends U> fn) 
  2. public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn) 
  3. public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 

示例:

  1. @Test 
  2. public void index4() throws ExecutionException, InterruptedException { 
  3.     CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2) 
  4.             .thenApply((r) -> r + 1); 
  5.     System.out.println(completableFuture.get()); 

輸出:

任務(wù)完成后對(duì)結(jié)果的消費(fèi)

方法:

  1. public CompletableFuture<Void>    thenAccept(Consumer<? super T> action
  2. public CompletableFuture<Void>   thenAcceptAsync(Consumer<? super T> action
  3. public CompletableFuture<Void>   thenAcceptAsync(Consumer<? super T> action, Executor executor) 

示例:

  1. @Test 
  2. public void index5() throws ExecutionException, InterruptedException { 
  3.     CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 2) 
  4.             .thenAccept(System.out::println); 
  5.     System.out.println(completableFuture.get()); 

輸出:

  1. null 

任務(wù)的組合(需等待上一個(gè)任務(wù)完成)

方法:

  1. public <U> CompletableFuture<U>   thenCompose(Function<? super T,? extends CompletionStage<U>> fn) 
  2. public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn) 
  3. public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor) 

示例:

  1. @Test 
  2. public void index6() throws ExecutionException, InterruptedException { 
  3.     CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2) 
  4.             .thenCompose(integer -> CompletableFuture.supplyAsync(() -> integer + 1)); 
  5.     System.out.println(completableFuture.get()); 

輸出:

任務(wù)的組合(不需等待上一步完成)

方法:

  1. public <U,V> CompletableFuture<V>   thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 
  2. public <U,V> CompletableFuture<V>   thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 
  3. public <U,V> CompletableFuture<V>   thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)  

示例:

  1. @Test 
  2. public void index7() throws ExecutionException, InterruptedException { 
  3.     CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2) 
  4.             .thenCombine(CompletableFuture.supplyAsync(() -> 1), (x, y) -> x + y); 
  5.     System.out.println(completableFuture.get()); 

輸出:

消費(fèi)最先執(zhí)行完畢的其中一個(gè)任務(wù),不返回結(jié)果

方法:

  1. public CompletableFuture<Void>  acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action
  2. public CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action
  3. public CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) 

示例:

  1. @Test 
  2. public void index8() throws ExecutionException, InterruptedException { 
  3.     CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> { 
  4.         try { 
  5.             Thread.sleep(100); 
  6.         } catch (InterruptedException e) { 
  7.             e.printStackTrace(); 
  8.         } 
  9.         return 2; 
  10.     }) 
  11.             .acceptEither(CompletableFuture.supplyAsync(() -> 1), System.out::println); 
  12.     System.out.println(completableFuture.get()); 

輸出:

  1. null 

消費(fèi)最先執(zhí)行完畢的其中一個(gè)任務(wù),并返回結(jié)果

方法:

  1. public <U> CompletableFuture<U>     applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) 
  2. public <U> CompletableFuture<U>     applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn) 
  3. public <U> CompletableFuture<U>     applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor) 

示例:

  1. @Test 
  2. public void index9() throws ExecutionException, InterruptedException { 
  3.     CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { 
  4.         try { 
  5.             Thread.sleep(100); 
  6.         } catch (InterruptedException e) { 
  7.             e.printStackTrace(); 
  8.         } 
  9.         return 2; 
  10.     }) 
  11.             .applyToEither(CompletableFuture.supplyAsync(() -> 1), x -> x + 10); 
  12.     System.out.println(completableFuture.get()); 

輸出:

  1. 11 

等待所有任務(wù)完成

方法:

  1. public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) 

示例:

  1. @Test 
  2. public void index10() throws ExecutionException, InterruptedException { 
  3.     CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { 
  4.         try { 
  5.             Thread.sleep(2000); 
  6.         } catch (InterruptedException e) { 
  7.             e.printStackTrace(); 
  8.         } 
  9.         return 1; 
  10.     }); 
  11.     CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> 2); 
  12.     CompletableFuture<Void> completableFuture = CompletableFuture.allOf(completableFuture1, completableFuture2); 
  13.     System.out.println("waiting all task finish.."); 
  14.     System.out.println(completableFuture.get()); 
  15.     System.out.println("all task finish"); 

輸出:

  1. waiting all task finish.. 
  2. null 
  3. all task finish 

返回最先完成的任務(wù)結(jié)果

方法:

  1. public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 

示例:

  1. @Test 
  2. public void index11() throws ExecutionException, InterruptedException { 
  3.     CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { 
  4.         try { 
  5.             Thread.sleep(100); 
  6.         } catch (InterruptedException e) { 
  7.             e.printStackTrace(); 
  8.         } 
  9.         return 1; 
  10.     }); 
  11.     CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> 2); 
  12.     CompletableFuture<Object> completableFuture = CompletableFuture.anyOf(completableFuture1, completableFuture2); 
  13.     System.out.println(completableFuture.get()); 

輸出:

作者簡(jiǎn)介:

薛勤,公眾號(hào)“代碼藝術(shù)”的作者,就職于阿里巴巴,熱衷于探索計(jì)算機(jī)世界的底層原理,個(gè)人在 Github@Ystcode 上擁有多個(gè)開(kāi)源項(xiàng)目。

【51CTO原創(chuàng)稿件,合作站點(diǎn)轉(zhuǎn)載請(qǐng)注明原文作者和出處為51CTO.com】

 

責(zé)任編輯:龐桂玉 來(lái)源: 51CTO
相關(guān)推薦

2023-07-19 08:03:05

Future異步JDK

2021-02-21 14:35:29

Java 8異步編程

2024-04-18 08:20:27

Java 8編程工具

2021-06-06 16:56:49

異步編程Completable

2022-07-08 14:14:04

并發(fā)編程異步編程

2025-02-06 16:51:30

2024-08-06 09:43:54

Java 8工具編程

2024-12-26 12:59:39

2023-04-13 07:33:31

Java 8編程工具

2011-11-10 10:23:56

Jscex

2015-06-16 11:06:42

JavaCompletable

2017-12-21 15:48:11

JavaCompletable

2012-03-01 20:32:29

iOS

2016-10-21 11:04:07

JavaScript異步編程原理解析

2024-10-14 08:29:14

異步編程任務(wù)

2011-11-16 13:22:38

Jscex

2011-11-17 16:14:25

Jscex

2011-11-11 13:38:39

Jscex

2015-09-30 09:34:09

java8字母序列

2011-07-21 10:17:53

java
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

91在线在线观看| 午夜成人免费视频| 欧美日韩一区二区区别是什么| www.久久久久久久久久| 欧美韩日亚洲| 不卡在线观看av| 国产成人精品免费视频| 精品无码久久久久成人漫画| 欧美大奶一区二区| 欧美日本一道本在线视频| 国产黄色激情视频| av色图一区| 成人黄页在线观看| 国产精品永久在线| 日本三级中文字幕| 欧美成人激情| 日韩精品免费在线视频| 99精品视频国产| 在线观看欧美日韩电影| 日av在线播放| 凹凸成人精品亚洲精品密奴| 欧美大片一区二区| 色多多视频在线播放| 黄色的视频在线观看| 中文字幕av不卡| 精品一区二区三区国产| 精品黑人一区二区三区国语馆| 日日夜夜一区二区| 97免费视频在线| 青青青在线免费观看| 国产精品一在线观看| 精品对白一区国产伦| 国产又粗又长又爽又黄的视频| free欧美| 欧美性生交xxxxxdddd| 国产成人永久免费视频| 黄色网页在线免费看| 免费av网站大全久久| 日韩av免费在线看| 天天操天天摸天天干| 黄色综合网站| 欧美国产日韩一区二区在线观看 | 精品无码久久久久| 忘忧草精品久久久久久久高清| 偷拍与自拍一区| 综合一区中文字幕| 日本在线免费| 国产精品成人免费精品自在线观看| 欧美伦理一区二区| 爽爽视频在线观看| 成人免费高清视频在线观看| 147欧美人体大胆444| 9.1国产丝袜在线观看 | 一级特黄aaa| 日本三级午夜理伦三级三| 精品伊人久久久| 亚洲第一区在线观看| 男人女人拔萝卜视频| 久久久久久久久久久久电影| 91精品国产91久久久久久一区二区 | 青青草观看免费视频在线| 99久久综合99久久综合网站| 国产精品国产精品国产专区蜜臀ah | 秋霞成人午夜伦在线观看| 国产成人在线亚洲欧美| 免费黄色一级大片| 麻豆精品新av中文字幕| 国产美女久久久| 国产精品久久久久久久久毛片 | 欧美日韩亚洲综合一区二区三区激情在线| 神马久久久久久久久久| 91女人视频在线观看| 免费看国产精品一二区视频| 国产二区在线播放| 18成人在线观看| 国产精品一二三在线观看| 人人超在线公开视频| 亚洲高清久久久| 黄色片视频在线播放| 97精品国产99久久久久久免费| 欧美日韩国产小视频在线观看| 国内自拍第二页| 超碰在线成人| 亚洲视频999| 小早川怜子一区二区的演员表| 亚洲欧美综合| 亚洲成人999| 中文字幕在线免费看线人 | 成人国产在线观看| 欧美精品在线一区| 米奇777四色精品人人爽| 亚洲综合视频在线| 亚洲v欧美v另类v综合v日韩v| 亚洲精品国产suv一区| 不卡一区中文字幕| 亚洲亚洲精品三区日韩精品在线视频| 69xxx在线| 狠狠做深爱婷婷久久综合一区| 在线黄色免费观看| 成人看片爽爽爽| 中文字幕不卡在线视频极品| 久久一区二区三| 青青草原综合久久大伊人精品优势| 97在线中文字幕| 三级无遮挡在线观看| 亚洲人成亚洲人成在线观看图片 | 久草综合在线视频| 美女国产一区| 成人激情av| 福利片在线看| 亚洲大片精品永久免费| 91女神在线观看| 日韩激情片免费| 成人欧美在线观看| 午夜小视频免费| 国产精品成人一区二区三区夜夜夜| 韩国无码av片在线观看网站| 欧美被狂躁喷白浆精品| 一二三区视频在线观看| 岛国av在线网站| 欧美日韩国产一级片| 久久久午夜精品福利内容| 91亚洲国产高清| 热门国产精品亚洲第一区在线| 精品国产亚洲AV| 国产精品全国免费观看高清| 久久国产亚洲精品无码| 9999久久久久| 欧美尺度大的性做爰视频| 国产精品无码粉嫩小泬| 91麻豆免费观看| 久久精品国产sm调教网站演员| 亚洲精品一区av| 在线观看视频亚洲| 欧美一区二区三区不卡视频| www.欧美日韩国产在线| 欧美这里只有精品| 精品国产三级| xxx欧美精品| 一区二区三区精| 中文字幕国产一区二区| 久久久久久久少妇| 精品在线99| 日韩免费精品视频| 免费在线高清av| 色94色欧美sute亚洲线路一久| 日本69式三人交| 中国女人久久久| 精品国产乱码久久久久| 国产一二在线播放| 亚洲黄页网在线观看| 国产精品suv一区二区| 成人国产电影网| 成人性生活视频免费看| 阿v天堂2017| 极品国产人妖chinesets亚洲人妖 激情亚洲另类图片区小说区 | 国产亚洲一区二区在线观看| 久久久久久久久久久久久久国产| 羞羞色国产精品网站| 国产激情久久久久| 91青青在线视频| 91精品国产综合久久精品图片| 欧美第一页在线观看| 国产999精品久久久久久| 亚洲精品久久久久久久蜜桃臀| 另类ts人妖一区二区三区| 2021久久精品国产99国产精品| 青青草免费在线| 欧美日韩一级二级| 黄色一级片中国| 播五月开心婷婷综合| wwwxxx黄色片| 亚洲成人tv| 国产伦精品一区二区三区视频孕妇 | 天天久久综合网| 亚洲福利免费| 日韩免费毛片| 青草伊人久久| 国产91ⅴ在线精品免费观看| 番号集在线观看| 5566中文字幕一区二区电影 | www在线播放| 日韩午夜av一区| 天天综合天天干| 亚洲欧洲精品一区二区三区 | 国产精品115| 国产成人97精品免费看片| 亚洲精品承认| 亚洲第一精品福利| 中文字幕在线日亚洲9| 亚洲精品国产第一综合99久久 | 4438全国成人免费| eeuss影院在线观看| 日韩精品一区二区三区在线播放| 欧美h在线观看| 亚洲老司机在线| 亚洲一区二区三区蜜桃| 国产一区在线精品| 777米奇影视第四色| 你懂的视频一区二区| 蜜桃av色综合| 韩国三级大全久久网站| 欧洲亚洲免费视频| 亚洲丝袜精品| 国产香蕉97碰碰久久人人| 亚洲va欧美va| 欧美日韩国产一级片| 久久精品国产成人av| 亚洲免费观看高清完整版在线| 免费看污片网站| 成人激情视频网站| 亚洲一区二区图片| 日韩一区二区三区在线看| 18性欧美xxxⅹ性满足| av小次郎在线| 最近2019中文字幕第三页视频| 人妻精品一区一区三区蜜桃91| 欧美日韩国产在线播放网站| 亚洲不卡在线视频| 婷婷久久综合九色国产成人 | 成人精品久久av网站| 亚洲最新无码中文字幕久久| 欧美激情在线视频二区| 久久国产精品一区| 尤物99国产成人精品视频| 亚洲欧美日本在线观看| 欧美mv和日韩mv的网站| 97在线公开视频| 欧美丝袜第三区| 国产午夜无码视频在线观看| 欧美日韩在线视频观看| 日产精品久久久久| 亚洲国产综合色| 欧洲猛交xxxx乱大交3| 亚洲同性同志一二三专区| 五月激情四射婷婷| 国产欧美一区二区在线| 国产精品20p| 国产午夜精品在线观看| 精品人妻互换一区二区三区| 99精品在线免费| 中文字幕在线永久| 99国产精品久久久久久久久久| 精品少妇人妻av一区二区三区| 亚洲激情偷拍| 亚洲乱码日产精品bd在线观看| 久久久久99精品成人片我成大片| 国产99精品国产| 女教师高潮黄又色视频| 国产综合色视频| 欧美激情第四页| 国产成人欧美日韩在线电影| 亚洲精品一二三四| 成人美女视频在线看| 欧美日韩人妻精品一区在线| 丁香激情综合国产| 女性生殖扒开酷刑vk| 成人黄色av电影| 97人妻天天摸天天爽天天| 久久一区二区三区国产精品| 亚洲成人黄色av| 国产精品久久二区二区| 国产极品美女在线| 亚洲午夜免费电影| 久久夜靖品2区| 欧美日韩视频在线| 日韩精选在线观看| 欧美日韩国产精品自在自线| 国产精品一区二区三区在线免费观看 | 国产成人亚洲综合无码| 激情自拍一区| 国产熟女高潮视频| 免费一级片91| 精品无码av一区二区三区不卡| hitomi一区二区三区精品| 亚洲国产欧美视频| 国产精品午夜春色av| 亚洲xxxx3d动漫| 亚洲成a人片在线观看中文| 天天干天天操天天爱| 欧美日韩在线播放三区四区| 国产口爆吞精一区二区| 亚洲成人网久久久| jizz亚洲| 久久久亚洲影院| 国产三级自拍视频| 久久成人免费网站| 中文字幕在线观看91| 91女神在线视频| 美国黄色片视频| 婷婷久久综合九色综合伊人色| 中文字幕人妻精品一区| 欧美成人在线直播| 成人影视在线播放| 久久久欧美一区二区| 欧美暴力调教| 国产精品久久久久av福利动漫| 精品国产成人| 国产一区二区四区| 麻豆精品一区二区| 亚洲永久无码7777kkk| 国产精品久久久久久亚洲伦| 日本免费一二三区| 欧美乱妇20p| 日本五码在线| 久久久久女教师免费一区| 国产激情久久| 久久综合给合久久狠狠色| 久久久久蜜桃| 热久久精品免费视频| 成人免费av网站| 国精产品一区一区二区三区mba| 精品国产乱码久久久久久天美| 国产精品国产av| 亚洲日韩第一页| 免费一二一二在线视频| 91在线视频一区| 成人同人动漫免费观看| 精品久久久久久久久久中文字幕| 九九久久精品视频| av电影网站在线观看| 天涯成人国产亚洲精品一区av| 国产精品视频久久久久久| 中文字幕欧美国内| 日本精品不卡| 欧美精品123| 在线视频精品| 日本免费福利视频| 18网站在线观看| 午夜免费日韩视频| 国产精品超碰| 黄色一级片国产| 韩国精品一区二区| 黄色录像一级片| 欧美老肥妇做.爰bbww| 在线日本视频| 国产精品亚洲美女av网站| 欧美一区二区三区高清视频| 一本久道中文无码字幕av| 久久人人爽人人爽| 天码人妻一区二区三区在线看| 亚洲精品动漫100p| 国产在线美女| 美乳视频一区二区| 久久天堂成人| 国产黄色录像视频| 精品视频1区2区| 麻豆系列在线观看| 亚洲bt天天射| 欧美激情一级片一区二区| 亚洲 自拍 另类 欧美 丝袜| 一区二区理论电影在线观看| 国产手机av在线| 久久99久久亚洲国产| 波多野结衣在线一区二区| 97超碰在线人人| 91亚洲男人天堂| 免费视频久久久| 亚洲国产1区| 日日碰狠狠添天天爽超碰97| www.亚洲人| 日本a级c片免费看三区| 中文字幕亚洲综合| 国产精品亚洲欧美日韩一区在线| 国风产精品一区二区| 成人av在线看| 亚洲国产av一区二区三区| 在线看欧美日韩| 蜜桃精品一区二区三区| 国产资源在线免费观看| 91视频免费看| 亚洲特级黄色片| 色综合久久88| 亚洲最好看的视频| 老司机午夜性大片| 亚洲香肠在线观看| 每日更新在线观看av| 国产在线精品一区二区夜色| 成人黄色免费网址| 91精品在线免费| 国产精品原创| 亚洲高清视频在线观看| 成人一二三区视频| 无码无套少妇毛多18pxxxx| xxxxx成人.com| 清纯唯美亚洲经典中文字幕| 五月婷婷丁香色| 亚洲一区二区视频| 电影av在线| 国产精品久久久久免费| 日本v片在线高清不卡在线观看| 欧美日韩人妻精品一区二区三区| 亚洲精品www久久久| 四虎影视精品永久在线观看| 131美女爱做视频| 亚洲视频免费在线观看| 天堂av在线播放| 7777精品久久久大香线蕉小说| 久久国产精品久久w女人spa| 2019男人天堂|