LatchUtils:簡化Java異步任務(wù)同步的利器
在Java應(yīng)用開發(fā)中,為了提升系統(tǒng)性能和響應(yīng)速度,我們經(jīng)常需要將一些耗時(shí)操作(如調(diào)用外部API、查詢數(shù)據(jù)庫、復(fù)雜計(jì)算等)進(jìn)行異步并行處理。當(dāng)主流程需要等待所有這些并行任務(wù)執(zhí)行完畢后再繼續(xù)時(shí),我們通常會(huì)用到 ExecutorService、 CountDownLatch 等并發(fā)工具。
然而,直接使用這些原生工具,往往意味著需要編寫一些重復(fù)的、模式化的“膠水代碼”,這不僅增加了代碼量,也讓核心業(yè)務(wù)邏輯顯得不夠清晰。
為了解決這個(gè)問題,我封裝了一個(gè)名為 LatchUtils 的輕量級(jí)工具類。它能夠以一種極其簡潔的方式來組織和管理這一類異步任務(wù)。
詳細(xì)代碼
其代碼如下,后面會(huì)有使用說明和示例以及和傳統(tǒng)實(shí)現(xiàn)代碼的對(duì)比。
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
public class LatchUtils {
private static final ThreadLocal<List<TaskInfo>> THREADLOCAL = ThreadLocal.withInitial(LinkedList::new);
public static void submitTask(Executor executor, Runnable runnable) {
THREADLOCAL.get().add(new TaskInfo(executor, runnable));
}
private static List<TaskInfo> popTask() {
List<TaskInfo> taskInfos = THREADLOCAL.get();
THREADLOCAL.remove();
return taskInfos;
}
public static boolean waitFor(long timeout, TimeUnit timeUnit) {
List<TaskInfo> taskInfos = popTask();
if (taskInfos.isEmpty()) {
return true;
}
CountDownLatch latch = new CountDownLatch(taskInfos.size());
for (TaskInfo taskInfo : taskInfos) {
Executor executor = taskInfo.executor;
Runnable runnable = taskInfo.runnable;
executor.execute(() -> {
try {
runnable.run();
} finally {
latch.countDown();
}
});
}
boolean await = false;
try {
await = latch.await(timeout, timeUnit);
} catch (Exception ignored) {
}
return await;
}
private static final class TaskInfo {
private final Executor executor;
private final Runnable runnable;
public TaskInfo(Executor executor, Runnable runnable) {
this.executor = executor;
this.runnable = runnable;
}
}
}核心思想
LatchUtils 的設(shè)計(jì)哲學(xué)是:多次提交,一次等待。
? 任務(wù)注冊(cè): 在主流程代碼中,可以先通過 LatchUtils.submitTask() 提交Runnable任務(wù)和其對(duì)應(yīng)的Executor(該線程池用來執(zhí)行這個(gè)Runnable)。
? 執(zhí)行并等待: 當(dāng)并行任務(wù)都提交完畢后,你只需調(diào)用一次 LatchUtils.waitFor()。關(guān)注工眾號(hào):碼猿技術(shù)專欄,回復(fù)關(guān)鍵詞:1111 獲取阿里內(nèi)部Java性能調(diào)優(yōu)手冊(cè)!該方法會(huì)立即觸發(fā)所有已注冊(cè)任務(wù)的執(zhí)行,并阻塞等待所有任務(wù)執(zhí)行完成或超時(shí)。
API 概覽
這個(gè)工具類對(duì)外暴露的接口極其簡單,只有兩個(gè)核心靜態(tài)方法:
submitTask()
public static void submitTask(Executor executor, Runnable runnable)功能: 提交一個(gè)異步任務(wù)。
參數(shù):
? executor:java.util.concurrent.Executor - 指定執(zhí)行此任務(wù)的線程池。
? runnable:java.lang.Runnable - 需要異步執(zhí)行的具體業(yè)務(wù)邏輯。
waitFor()
public static boolean waitFor(long timeout, TimeUnit timeUnit)功能: 觸發(fā)所有已提交任務(wù)的執(zhí)行,并同步等待它們?nèi)客瓿伞?/span>
參數(shù):
? timeout:long - 最長等待時(shí)間。
? timeUnit:java.util.concurrent.TimeUnit - 等待時(shí)間單位。
返回值:
? true: 如果所有任務(wù)在指定時(shí)間內(nèi)成功完成。
? false: 如果等待超時(shí)。
注意: 該方法在執(zhí)行后會(huì)自動(dòng)清理當(dāng)前線程提交的任務(wù)列表,因此可以重復(fù)使用。
實(shí)戰(zhàn)示例
讓我們來看一個(gè)典型的應(yīng)用場景:一個(gè)聚合服務(wù)需要同時(shí)調(diào)用用戶服務(wù)、訂單服務(wù)和商品服務(wù),拿到所有結(jié)果后再進(jìn)行下一步處理。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
// 1. 準(zhǔn)備一個(gè)線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
System.out.println("主流程開始,準(zhǔn)備分發(fā)異步任務(wù)...");
// 2. 提交多個(gè)異步任務(wù)
// 任務(wù)一:獲取用戶信息
LatchUtils.submitTask(executorService, () -> {
try {
System.out.println("開始獲取用戶信息...");
Thread.sleep(1000); // 模擬耗時(shí)
System.out.println("獲取用戶信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 任務(wù)二:獲取訂單信息
LatchUtils.submitTask(executorService, () -> {
try {
System.out.println("開始獲取訂單信息...");
Thread.sleep(1500); // 模擬耗時(shí)
System.out.println("獲取訂單信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 任務(wù)三:獲取商品信息
LatchUtils.submitTask(executorService, () -> {
try {
System.out.println("開始獲取商品信息...");
Thread.sleep(500); // 模擬耗時(shí)
System.out.println("獲取商品信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
System.out.println("所有異步任務(wù)已提交,主線程開始等待...");
// 3. 等待所有任務(wù)完成,最長等待5秒
boolean allTasksCompleted = LatchUtils.waitFor(5, TimeUnit.SECONDS);
// 4. 根據(jù)等待結(jié)果繼續(xù)主流程
if (allTasksCompleted) {
System.out.println("所有異步任務(wù)執(zhí)行成功,主流程繼續(xù)...");
} else {
System.err.println("有任務(wù)執(zhí)行超時(shí),主流程中斷!");
}
// 5. 關(guān)閉線程池
executorService.shutdown();
}
}輸出結(jié)果:
主流程開始,準(zhǔn)備分發(fā)異步任務(wù)...
所有異步任務(wù)已提交,主線程開始等待...
開始獲取商品信息...
開始獲取用戶信息...
開始獲取訂單信息...
獲取商品信息成功!
獲取用戶信息成功!
獲取訂單信息成功!
所有異步任務(wù)執(zhí)行成功,主流程繼續(xù)...從這個(gè)例子中可以看到,業(yè)務(wù)代碼變得非常清晰。我們只需要關(guān)注“提交任務(wù)”和“等待結(jié)果”這兩個(gè)動(dòng)作,而無需關(guān)心 CountDownLatch 的初始化、countDown() 的調(diào)用以及異常處理等細(xì)節(jié)。
對(duì)比:如果不使用 LatchUtils
為了更好地理解 LatchUtils 帶來的價(jià)值,讓我們看看要實(shí)現(xiàn)與上面完全相同的功能,用傳統(tǒng)的Java并發(fā)API需要如何編寫代碼。通常有兩種主流方式:使用 CountDownLatch 或使用 CompletableFuture。
方式一:直接使用 CountDownLatch
這是最經(jīng)典的方式,開發(fā)者需要手動(dòng)管理 CountDownLatch 的生命周期。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ManualCountDownLatchExample {
public static void main(String[] args) {
// 1. 準(zhǔn)備一個(gè)線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 2. 手動(dòng)初始化 CountDownLatch,數(shù)量為任務(wù)數(shù)
CountDownLatch latch = new CountDownLatch(3);
System.out.println("主流程開始,準(zhǔn)備分發(fā)異步任務(wù)...");
// 3. 提交任務(wù),并在每個(gè)任務(wù)的 finally 塊中手動(dòng)調(diào)用 latch.countDown()
// 任務(wù)一:獲取用戶信息
executorService.execute(() -> {
try {
System.out.println("開始獲取用戶信息...");
Thread.sleep(1000);
System.out.println("獲取用戶信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 手動(dòng)減一
}
});
// 任務(wù)二:獲取訂單信息
executorService.execute(() -> {
try {
System.out.println("開始獲取訂單信息...");
Thread.sleep(1500);
System.out.println("獲取訂單信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 手動(dòng)減一
}
});
// 任務(wù)三:獲取商品信息
executorService.execute(() -> {
try {
System.out.println("開始獲取商品信息...");
Thread.sleep(500);
System.out.println("獲取商品信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 手動(dòng)減一
}
});
System.out.println("所有異步任務(wù)已提交,主線程開始等待...");
// 4. 手動(dòng)調(diào)用 latch.await() 進(jìn)行等待
boolean allTasksCompleted = false;
try {
allTasksCompleted = latch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// 需要處理中斷異常
Thread.currentThread().interrupt();
System.err.println("主線程在等待時(shí)被中斷!");
}
// 5. 根據(jù)等待結(jié)果繼續(xù)主流程
if (allTasksCompleted) {
System.out.println("所有異步任務(wù)執(zhí)行成功,主流程繼續(xù)...");
} else {
System.err.println("有任務(wù)執(zhí)行超時(shí),主流程中斷!");
}
// 6. 關(guān)閉線程池
executorService.shutdown();
}
}方式二:使用 CompletableFuture
使用 CompletableFuture 實(shí)現(xiàn),其代碼如下
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CompletableFutureExample {
public static void main(String[] args) {
// 1. 準(zhǔn)備一個(gè)線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
System.out.println("主流程開始,準(zhǔn)備分發(fā)異步任務(wù)...");
// 2. 創(chuàng)建 CompletableFuture 任務(wù)
CompletableFuture<Void> userFuture = CompletableFuture.runAsync(() -> {
try {
System.out.println("開始獲取用戶信息...");
Thread.sleep(1000);
System.out.println("獲取用戶信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executorService);
CompletableFuture<Void> orderFuture = CompletableFuture.runAsync(() -> {
try {
System.out.println("開始獲取訂單信息...");
Thread.sleep(1500);
System.out.println("獲取訂單信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executorService);
CompletableFuture<Void> productFuture = CompletableFuture.runAsync(() -> {
try {
System.out.println("開始獲取商品信息...");
Thread.sleep(500);
System.out.println("獲取商品信息成功!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executorService);
System.out.println("所有異步任務(wù)已提交,主線程開始等待...");
// 3. 使用 CompletableFuture.allOf 將所有任務(wù)組合起來
CompletableFuture<Void> allFutures = CompletableFuture.allOf(userFuture, orderFuture, productFuture);
// 4. 等待組合后的 Future 完成
try {
allFutures.get(5, TimeUnit.SECONDS);
System.out.println("所有異步任務(wù)執(zhí)行成功,主流程繼續(xù)...");
} catch (Exception e) {
// 需要處理多種異常,如 InterruptedException, ExecutionException, TimeoutException
System.err.println("任務(wù)執(zhí)行超時(shí)或出錯(cuò),主流程中斷! " + e.getMessage());
}
// 5. 關(guān)閉線程池
executorService.shutdown();
}
}對(duì)比分析
特性 | LatchUtils | 手動(dòng)CountDownLatch | CompletableFuture.allOf |
代碼簡潔性 | 極高 。業(yè)務(wù)邏輯和并發(fā)控制分離,核心代碼清晰。 | 中等 。需要在每個(gè)任務(wù)中嵌入latch.countDown(),分散了關(guān)注點(diǎn)。 | 較高 。鏈?zhǔn)秸{(diào)用風(fēng)格,但需要?jiǎng)?chuàng)建多個(gè)Future對(duì)象。 |
狀態(tài)管理 | 自動(dòng) 。工具類內(nèi)部自動(dòng)管理CountDownLatch。 | 手動(dòng) 。需要自己創(chuàng)建、維護(hù)和傳遞CountDownLatch實(shí)例。 | 自動(dòng) 。由CompletableFuture框架管理任務(wù)狀態(tài)。 |
錯(cuò)誤處理 | 簡化 。waitFor內(nèi)部處理InterruptedException,僅返回布爾值。 | 復(fù)雜 。需要顯式地在finally中countDown(),并為主線程的await()處理InterruptedException。 | 復(fù)雜 。get()方法會(huì)拋出多種受檢異常,需要統(tǒng)一處理。 |
關(guān)注點(diǎn)分離 | 優(yōu)秀 。開發(fā)者只需關(guān)注“提交”和“等待”兩個(gè)動(dòng)作。 | 一般 。并發(fā)控制邏輯(countDown())侵入到了業(yè)務(wù)Runnable中。 | 良好 。任務(wù)的定義和組合是分開的,但仍需處理組合后的Future。 |
易用性 | 非常簡單 。幾乎沒有學(xué)習(xí)成本。 | 需要理解CountDownLatch 。容易忘記countDown()或錯(cuò)誤處理。 | 需要理解CompletableFuture 。API較為豐富,有一定學(xué)習(xí)曲線。 |
結(jié)論很明顯:
對(duì)于“分發(fā)一組并行任務(wù),然后等待它們?nèi)客瓿伞边@一特定但常見的模式,LatchUtils 通過適度的封裝,極大地簡化了開發(fā)者的工作。它隱藏了并發(fā)控制的復(fù)雜性,讓業(yè)務(wù)代碼回歸其本質(zhì),從而提高了代碼的可讀性和可維護(hù)性。






























