一套萬能的異步處理方案(典藏版)
兄弟們,今天咱們來聊個能讓你代碼從 “卡成 PPT” 變 “飛一般絲滑” 的話題 —— 異步處理。
我敢打賭,你肯定遇到過這種情況:寫了個接口,邏輯里又要查數(shù)據(jù)庫、又要調(diào)第三方接口、還要算一堆數(shù)據(jù),跑起來那叫一個慢,用戶在前端戳半天沒反應,老板在后面盯著問 “怎么回事啊,是不是服務器該換了?”,結(jié)果你排查半天發(fā)現(xiàn),不是服務器不行,是代碼里全是 “同步等待” 的坑 —— 查完 A 等 A,查完 B 等 B,CPU 閑著沒事干也不幫你多跑點活,這不純純浪費資源嘛!
所以今天這篇,我不整那些 “異步非阻塞 IO 原理”“線程模型深度剖析” 的虛頭巴腦,就給你一套能直接抄作業(yè)、覆蓋 80% 業(yè)務場景的異步處理方案,從基礎(chǔ)到實戰(zhàn),從坑點到避坑,保證你看完就能用,用了就見效。
一、先搞懂:為啥咱們非得用異步?
在講方案之前,先掰扯清楚 “異步” 到底是個啥,以及為啥它能救你的慢接口。
你可以把代碼執(zhí)行想象成去奶茶店買喝的:
- 同步就是你點完單,站在柜臺前一動不動等,不管店員做沒做你的,你都得等著,期間啥也干不了 —— 這就是咱們平時寫的doA(); doB(); doC();,A 沒干完,B 絕對不開始,哪怕 A 是在等數(shù)據(jù)庫返回(店員做奶茶),CPU(你)也只能閑著。
- 異步就是你點完單拿個取餐碼,然后找個位置刷手機(干別的活),等店員喊你(任務完成),你再過去拿 —— 對應到代碼里,就是發(fā)起一個任務后,不用等著它結(jié)束,繼續(xù)執(zhí)行別的代碼,等任務有結(jié)果了再回來處理。
那異步能解決啥問題?舉個真實場景:
比如你寫個 “訂單詳情接口”,要做三件事:
- 查訂單基本信息(DB,100ms)
- 查訂單對應的商品列表(DB,150ms)
- 查用戶的收貨地址(調(diào)用用戶服務接口,200ms)
如果用同步,總耗時就是 100+150+200=450ms,這還沒算其他邏輯,接口響應時間輕松破 500ms,用戶體驗直接拉胯。
但如果用異步,這三件事可以同時跑,總耗時差不多就是最長的那個 200ms,直接把響應時間砍半,CPU 也能充分利用起來 —— 這就是異步的魔力。
不過先別急著興奮,異步雖好,但早年 Java 里的異步方案,坑可不少。
二、那些年我們踩過的異步 “坑”
最早咱們想搞異步,第一反應就是new Thread(),對吧?比如這樣:
// 同步代碼
public void syncOrderDetail() {
Order order = orderDao.getById(1L); // 100ms
List<Goods> goodsList = goodsDao.getByOrderId(1L); // 150ms
Address address = userService.getAddress(1L); // 200ms
// 組裝數(shù)據(jù)返回
}
// 想搞異步,就new Thread
public void asyncBadExample() {
Order[] order = new Order[1]; // 用數(shù)組存,因為匿名內(nèi)部類要final
List<Goods>[] goodsList = new List[1];
Address[] address = new Address[1];
// 查訂單
new Thread(() -> order[0] = orderDao.getById(1L)).start();
// 查商品
new Thread(() -> goodsList[0] = goodsDao.getByOrderId(1L)).start();
// 查地址
new Thread(() -> address[0] = userService.getAddress(1L)).start();
// 等結(jié)果
while (order[0] == null || goodsList[0] == null || address[0] == null) {
// 空循環(huán)等,CPU直接干燒
}
// 組裝數(shù)據(jù)
}你瞅瞅這代碼,問題一大堆:
- 線程管不?。好看萎惒骄?new 一個線程,要是接口并發(fā)高,瞬間幾百上千個線程,JVM 直接 OOM 給你看 —— 就像奶茶店來了 100 個客人,每個客人都讓店員單獨開個機器做奶茶,機器根本不夠用。
- 拿結(jié)果太費勁:用數(shù)組存結(jié)果,還得空循環(huán)等,這叫 “忙等”,CPU 利用率直接拉滿,其他活都沒法干了 —— 相當于你刷手機的時候,每隔 1 秒就去問店員 “我的奶茶好了嗎”,店員煩,你也沒法專心刷手機。
- 沒異常處理:要是查商品的時候數(shù)據(jù)庫崩了,這個線程直接拋異常死了,外面還不知道,一直在空循環(huán)等,直接超時 —— 就像店員做奶茶的時候機器壞了,沒告訴你,你還傻乎乎等半天。
后來 Java 5 出了Future和ExecutorService,算是解決了一部分問題。ExecutorService就是線程池,能幫你管理線程(相當于奶茶店固定幾個機器,客人多了排隊,不瞎開機器);Future能幫你拿結(jié)果,還能判斷任務是否完成。
比如這樣改:
// 先搞個線程池
private ExecutorService executor = Executors.newFixedThreadPool(3);
public void asyncWithFuture() throws ExecutionException, InterruptedException {
// 提交任務,返回Future
Future<Order> orderFuture = executor.submit(() -> orderDao.getById(1L));
Future<List<Goods>> goodsFuture = executor.submit(() -> goodsDao.getByOrderId(1L));
Future<Address> addressFuture = executor.submit(() -> userService.getAddress(1L));
// 拿結(jié)果(這里會阻塞,直到任務完成)
Order order = orderFuture.get();
List<Goods> goodsList = goodsFuture.get();
Address address = addressFuture.get();
// 組裝數(shù)據(jù)
}比new Thread()強多了,線程有池管著,也不用空循環(huán)了,但還是有坑:
- get () 方法會阻塞:雖然三個任務是同時跑的,但orderFuture.get()會等到訂單查完才執(zhí)行下一句,要是訂單查了 100ms,商品查了 150ms,那goodsFuture.get()其實還要等 50ms—— 相當于你先問 “我的奶茶好了嗎”,等拿到奶茶,再問 “我的薯條好了嗎”,但其實薯條早好了,你白等了。
- 沒法鏈式調(diào)用:如果查完訂單,需要用訂單里的用戶 ID 查用戶信息,再用用戶信息查會員等級,用Future就得嵌套,代碼越寫越丑,跟回調(diào)地獄似的。
- 異常處理麻煩:得用 try-catch 包著get(),要是三個任務都可能拋異常,代碼里全是 try-catch,看著就頭疼。
直到 Java 8 出了CompletableFuture,才算把這些坑都填上了 —— 這玩意兒就是咱們今天這套 “萬能方案” 的核心,相當于給異步處理裝了個 “自動擋”,好用到飛起。
三、核心武器:CompletableFuture 詳解(干貨密集區(qū))
CompletableFuture本質(zhì)上是Future的增強版,它解決了Future的阻塞、沒法鏈式調(diào)用、異常難處理的問題,還支持多個異步任務的組合,簡直是為實戰(zhàn)而生。
咱們先從基礎(chǔ)用法開始,再講進階技巧,最后結(jié)合場景落地。
3.1 基礎(chǔ):怎么用 CompletableFuture 發(fā)起異步任務?
CompletableFuture提供了幾個靜態(tài)方法來發(fā)起異步任務,最常用的是這兩個:
- supplyAsync(Supplier<U> supplier):適合有返回值的任務,比如查數(shù)據(jù)庫、調(diào)接口。
- runAsync(Runnable runnable):適合沒返回值的任務,比如寫日志、發(fā)通知。
這倆方法都可以傳一個Executor(線程池),如果不傳,就用默認的ForkJoinPool.commonPool()—— 但強烈不建議用默認線程池,因為默認線程池的線程數(shù)是 CPU 核心數(shù),要是任務是 IO 密集型(比如調(diào)接口、查數(shù)據(jù)庫),線程會經(jīng)常等 IO,導致任務堆積,后面再說為啥。
先寫個基礎(chǔ)例子,還是之前的訂單詳情場景:
// 自定義線程池(后面詳細講怎么配)
private ExecutorService orderExecutor = new ThreadPoolExecutor(
4, // 核心線程數(shù)
8, // 最大線程數(shù)
60L, // 空閑線程存活時間
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), // 隊列
new ThreadFactoryBuilder().setNameFormat("order-async-%d").build(), // 線程名
new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
);
public void asyncWithCompletableFuture() {
// 1. 異步查訂單
CompletableFuture<Order> orderFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("線程" + Thread.currentThread().getName() + ":查訂單");
return orderDao.getById(1L);
}, orderExecutor);
// 2. 異步查商品(不用等訂單,直接跑)
CompletableFuture<List<Goods>> goodsFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("線程" + Thread.currentThread().getName() + ":查商品");
return goodsDao.getByOrderId(1L);
}, orderExecutor);
// 3. 異步查地址(需要訂單里的userId,所以等訂單查完再跑)
CompletableFuture<Address> addressFuture = orderFuture.thenCompose(order -> {
// thenCompose:用前一個任務的結(jié)果,發(fā)起新的異步任務
return CompletableFuture.supplyAsync(() -> {
System.out.println("線程" + Thread.currentThread().getName() + ":查地址,userId=" + order.getUserId());
return userService.getAddress(order.getUserId());
}, orderExecutor);
});
// 4. 等所有任務完成,組裝結(jié)果(不阻塞主線程,用whenComplete處理結(jié)果)
CompletableFuture.allOf(orderFuture, goodsFuture, addressFuture)
.whenComplete((v, e) -> {
// v是allOf的返回值,因為allOf沒有返回值,所以v是null
// e是異常,如果三個任務中有一個拋異常,e就是那個異常
if (e != null) {
System.out.println("異步任務出錯了:" + e.getMessage());
return;
}
// 拿結(jié)果(這里get()不會阻塞,因為allOf已經(jīng)確保任務完成了)
try {
Order order = orderFuture.get();
List<Goods> goodsList = goodsFuture.get();
Address address = addressFuture.get();
// 組裝數(shù)據(jù)
OrderDetailVO vo = new OrderDetailVO(order, goodsList, address);
System.out.println("組裝完成:" + vo);
} catch (Exception ex) {
// 這里一般不會拋異常,因為前面allOf已經(jīng)處理了
ex.printStackTrace();
}
});
// 主線程繼續(xù)干別的,不用等上面的任務完成
System.out.println("主線程:異步任務已發(fā)起,我先溜了~");
}這段代碼里有幾個關(guān)鍵點,咱們掰開揉碎了說:
- thenCompose 方法:它的作用是 “用前一個異步任務的結(jié)果,發(fā)起一個新的異步任務”,而且返回的是CompletableFuture,這樣就能鏈式調(diào)用,避免嵌套。比如查地址需要訂單里的 userId,所以得等訂單查完,用thenCompose就很優(yōu)雅,要是用Future,就得寫成:
// Future的嵌套寫法,丑哭
Future<Order> orderFuture = executor.submit(() -> orderDao.getById(1L));
Order order = orderFuture.get();
Future<Address> addressFuture = executor.submit(() -> userService.getAddress(order.getUserId()));- allOf 方法:它的作用是 “等待所有異步任務完成”,返回的是一個CompletableFuture<Void>(沒有返回值)。適合多個任務都完成后再做后續(xù)處理的場景,比如訂單詳情里的三個任務都完成了,才能組裝 VO。
還有個類似的方法叫anyOf,是 “只要有一個任務完成就觸發(fā)”,適合比如 “查商品信息,先查本地緩存,再查遠程服務,哪個快用哪個” 的場景。
- whenComplete 方法:它是 “任務完成后觸發(fā)的回調(diào)”,不管任務是成功還是失敗,都會執(zhí)行。里面的v是任務的返回值(allOf 的話 v 是 null),e是異常,如果任務成功,e 是 null;如果失敗,e 就是拋出的異常。
這里要注意:whenComplete不會阻塞主線程,所以主線程會先打印 “我先溜了~”,等異步任務都完成了,再執(zhí)行回調(diào)里的組裝邏輯 —— 這才是真正的異步非阻塞。
3.2 進階:CompletableFuture 的鏈式調(diào)用和異常處理
CompletableFuture的鏈式調(diào)用是它的核心優(yōu)勢,除了前面的thenCompose,還有幾個常用的方法,咱們用一個場景串起來:
比如 “查完訂單后,計算訂單的優(yōu)惠金額,然后根據(jù)優(yōu)惠金額和商品列表,計算最終實付金額”:
// 鏈式調(diào)用示例
CompletableFuture<BigDecimal> payAmountFuture = CompletableFuture.supplyAsync(() -> {
// 1. 查訂單
System.out.println("查訂單");
return orderDao.getById(1L);
}, orderExecutor)
.thenApply(order -> {
// 2. 計算優(yōu)惠金額(用訂單的金額和用戶等級,同步操作)
System.out.println("計算優(yōu)惠金額,訂單金額:" + order.getAmount());
BigDecimal discount = calculateDiscount(order); // 同步方法,不用開新線程
order.setDiscount(discount);
return order;
})
.thenCombine(CompletableFuture.supplyAsync(() -> {
// 3. 查商品列表(異步,和計算優(yōu)惠并行)
System.out.println("查商品列表");
return goodsDao.getByOrderId(1L);
}, orderExecutor), (order, goodsList) -> {
// 4. 結(jié)合訂單(帶優(yōu)惠)和商品列表,計算實付金額
System.out.println("計算實付金額");
BigDecimal totalGoodsAmount = goodsList.stream()
.map(Goods::getPrice)
.reduce(BigDecimal.ZERO, BigDecimal::add);
// 實付 = 商品總金額 - 優(yōu)惠
return totalGoodsAmount.subtract(order.getDiscount());
})
.exceptionally(ex -> {
// 5. 異常處理:如果前面任何一步出錯,返回默認值0
System.out.println("計算實付金額出錯:" + ex.getMessage());
return BigDecimal.ZERO;
});
// 拿結(jié)果(如果想阻塞等結(jié)果,可以用get(),也可以用join(),join()不用拋異常)
BigDecimal payAmount = payAmountFuture.join();
System.out.println("最終實付金額:" + payAmount);這里又多了幾個關(guān)鍵方法,咱們一個個說:
- thenApply:用前一個任務的結(jié)果做同步處理,返回新的結(jié)果。比如計算優(yōu)惠金額是同步方法,不需要開新線程,就用thenApply—— 它會復用前一個任務的線程,或者在主線程執(zhí)行(如果前一個任務已經(jīng)完成)。
- thenCombine:結(jié)合兩個異步任務的結(jié)果,做處理后返回新結(jié)果。比如查商品列表是異步的,和計算優(yōu)惠可以并行,等兩個都完成了,再計算實付金額 —— 這樣比 “等計算優(yōu)惠完了再查商品” 快多了。
- exceptionally:捕獲前面所有步驟的異常,返回一個默認值。比如前面任何一步出錯(查訂單失敗、查商品失敗、計算優(yōu)惠失?。?,都會走到這里,返回 0 元,避免整個流程崩潰。
除了exceptionally,還有個handle方法也能處理異常,它和exceptionally的區(qū)別是:handle不管成功還是失敗都會執(zhí)行,而exceptionally只在失敗時執(zhí)行。比如:
.handle((result, ex) -> {
if (ex != null) {
System.out.println("出錯了:" + ex.getMessage());
return BigDecimal.ZERO;
}
return result;
});3.3 關(guān)鍵:超時控制(避免異步任務 “卡死人”)
異步任務最怕啥?最怕它一直不完成,比如調(diào)第三方接口,對方服務掛了,你的任務就一直阻塞在那,線程被占著不放,最后線程池滿了,整個服務都卡了。
所以超時控制是異步處理的必選項,CompletableFuture提供了兩個方法來做超時:
- orTimeout(long timeout, TimeUnit unit):超時后拋出TimeoutException。
- completeOnTimeout(U value, long timeout, TimeUnit unit):超時后返回一個默認值,不拋異常。
比如查地址的時候,設置 200ms 超時,超時了就返回默認地址:
CompletableFuture<Address> addressFuture = CompletableFuture.supplyAsync(() -> {
// 模擬調(diào)用第三方接口超時
try {
Thread.sleep(300); // 睡300ms,超過200ms的超時時間
} catch (InterruptedException e) {
e.printStackTrace();
}
return userService.getAddress(1L);
}, orderExecutor)
// 超時控制:200ms超時,返回默認地址
.completeOnTimeout(new Address("默認地址:未知"), 200, TimeUnit.MILLISECONDS);
// 或者超時拋異常
// .orTimeout(200, TimeUnit.MILLISECONDS);這里要注意:超時后,原來的異步任務其實還在跑(比如那個睡 300ms 的線程),只是CompletableFuture會忽略它的結(jié)果,直接返回默認值或拋異常。如果原來的任務是寫數(shù)據(jù)庫、調(diào)支付接口這種 “有狀態(tài)” 的操作,得自己處理 “任務超時但實際還在執(zhí)行” 的問題,比如用分布式鎖、冪等設計。
四、異步的 “基石”:線程池配置(90% 的人都配錯了)
講完CompletableFuture,咱們得聊聊它的 “搭檔”—— 線程池。線程池是異步處理的基石,配置得不好,異步不僅沒效果,還會出大問題。
很多人用線程池,要么直接用Executors.newFixedThreadPool(10),要么用默認的ForkJoinPool,這都是坑。咱們先搞懂線程池的核心參數(shù),再講怎么根據(jù)業(yè)務場景配置。
4.1 線程池的 7 個核心參數(shù)(必懂)
線程池的核心類是ThreadPoolExecutor,它的構(gòu)造方法有 7 個參數(shù),每個參數(shù)都影響線程池的行為,咱們用 “奶茶店” 的例子一個個解釋:
new ThreadPoolExecutor(
int corePoolSize, // 核心線程數(shù)
int maximumPoolSize, // 最大線程數(shù)
long keepAliveTime, // 空閑線程存活時間
TimeUnit unit, // 存活時間單位
BlockingQueue<Runnable> workQueue, // 任務隊列
ThreadFactory threadFactory, // 線程工廠
RejectedExecutionHandler handler // 拒絕策略
);- corePoolSize(核心線程數(shù)):奶茶店的 “常駐店員”,不管有沒有訂單,這些店員都在店里(線程不會被銷毀)。比如核心線程數(shù)設 4,就是不管忙不忙,都有 4 個店員在。
- maximumPoolSize(最大線程數(shù)):奶茶店的 “最多能有多少店員”,包括常駐店員和臨時店員。比如最大線程數(shù)設 8,就是忙的時候最多再招 4 個臨時店員,總共 8 個。
- keepAliveTime(空閑線程存活時間):臨時店員沒事干的時候,能在店里待多久。比如設 60 秒,就是臨時店員閑了 60 秒還沒活干,就讓他走了(線程被銷毀)。
- unit(時間單位):keepAliveTime 的單位,比如秒、毫秒。
- workQueue(任務隊列):訂單太多,店員忙不過來的時候,訂單排隊的地方。比如設 100,就是最多能排 100 個訂單,超過了就拒絕。
- threadFactory(線程工廠):用來創(chuàng)建線程的工廠,主要是給線程起個名字,方便排查問題。比如給線程起名 “order-async-1”“order-async-2”,后面查日志的時候就知道哪個線程出的問題。
- handler(拒絕策略):訂單太多,店員忙不過來,隊列也排滿了,該怎么處理新訂單。有四種默認策略:
- CallerRunsPolicy:讓發(fā)起訂單的人自己處理(比如讓顧客自己做奶茶),也就是讓調(diào)用線程執(zhí)行任務,這樣能減緩請求速度,避免任務丟失。
- AbortPolicy:直接拋異常(RejectedExecutionException),默認策略,容易導致業(yè)務報錯。
- DiscardPolicy:直接扔掉新訂單,不拋異常,適合不重要的任務(比如日志)。
- DiscardOldestPolicy:扔掉隊列里最老的訂單,再把新訂單加進去,適合任務有時效性的場景(比如實時統(tǒng)計)。
4.2 怎么配置?看業(yè)務場景!
線程池的配置沒有 “萬能值”,但有兩個核心場景:CPU 密集型和 IO 密集型,配置思路完全不同。
場景 1:CPU 密集型任務(比如復雜計算、數(shù)據(jù)處理)
CPU 密集型任務的特點是:線程一直在用 CPU 算,很少等 IO(比如查數(shù)據(jù)庫、調(diào)接口)。比如計算訂單的優(yōu)惠金額、處理 Excel 數(shù)據(jù)。
這種場景下,線程數(shù)太多反而會導致 CPU 頻繁切換線程,效率下降。所以核心線程數(shù)和最大線程數(shù)建議設為:CPU 核心數(shù) + 1。
比如你的服務器是 4 核 CPU,就設為 5。為什么加 1?因為如果某個線程偶爾等 IO(比如讀本地文件),多出來的那個線程能利用 CPU 的空閑時間,提高利用率。
怎么獲取 CPU 核心數(shù)?用Runtime.getRuntime().availableProcessors()。
場景 2:IO 密集型任務(比如查數(shù)據(jù)庫、調(diào)第三方接口、發(fā) MQ)
IO 密集型任務的特點是:線程大部分時間都在等 IO 完成(比如等數(shù)據(jù)庫返回、等第三方接口響應),CPU 大部分時間是空閑的。比如查訂單、查商品、調(diào)用支付接口。
這種場景下,線程數(shù)可以多設一點,讓 CPU 能充分利用起來。建議核心線程數(shù)設為:CPU 核心數(shù) * 2,最大線程數(shù)可以設為 CPU 核心數(shù) * 4,或者根據(jù)實際并發(fā)調(diào)整。
比如 4 核 CPU,核心線程數(shù)設 8,最大線程數(shù)設 16。
場景 3:混合任務(既有 CPU 密集又有 IO 密集)
如果一個線程里既有 IO 操作又有 CPU 計算,建議把任務拆成兩個:IO 密集的任務和 CPU 密集的任務,分別用兩個線程池處理。比如查訂單(IO 密集)用一個線程池,計算優(yōu)惠(CPU 密集)用另一個線程池,這樣能提高效率。
4.3 實戰(zhàn)配置示例(直接抄)
咱們以 “訂單服務” 為例,訂單服務里的異步任務大多是 IO 密集型(查 DB、調(diào)接口),所以線程池配置如下:
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.*;
publicclass ThreadPoolConfig {
// CPU核心數(shù)
privatestaticfinalint CPU_CORES = Runtime.getRuntime().availableProcessors();
// 訂單相關(guān)異步任務線程池(IO密集型)
@Bean
public Executor orderAsyncExecutor() {
// 線程工廠:給線程起名字
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("order-async-%d") // 線程名:order-async-1, order-async-2...
.setDaemon(false) // 非守護線程(守護線程會隨著主線程退出而退出,這里要避免)
.build();
// 線程池參數(shù)
int corePoolSize = CPU_CORES * 2; // 核心線程數(shù):CPU*2
int maximumPoolSize = CPU_CORES * 4; // 最大線程數(shù):CPU*4
long keepAliveTime = 60L; // 空閑線程存活時間:60秒
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100); // 任務隊列:100個
RejectedExecutionHandler rejectedHandler = new ThreadPoolExecutor.CallerRunsPolicy(); // 拒絕策略:調(diào)用者執(zhí)行
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
threadFactory,
rejectedHandler
);
// 允許核心線程超時銷毀(默認核心線程不會超時,這里設為true,空閑時可以銷毀,節(jié)省資源)
executor.allowCoreThreadTimeOut(true);
return executor;
}
// 計算相關(guān)異步任務線程池(CPU密集型,比如計算優(yōu)惠、統(tǒng)計數(shù)據(jù))
@Bean
public Executor calculateAsyncExecutor() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("calculate-async-%d")
.build();
int corePoolSize = CPU_CORES + 1; // 核心線程數(shù):CPU+1
int maximumPoolSize = CPU_CORES + 1; // 最大線程數(shù)和核心線程數(shù)一致,不需要臨時線程
long keepAliveTime = 60L;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(50);
RejectedExecutionHandler rejectedHandler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
threadFactory,
rejectedHandler
);
return executor;
}
}這里有兩個關(guān)鍵點要注意:
- 給線程起名字:用ThreadFactoryBuilder給線程起有意義的名字,比如 “order-async-% d”,后面查日志的時候,看到線程名就知道是哪個線程池的任務,排查問題效率翻倍。
- 拒絕策略選 CallerRunsPolicy:對于業(yè)務任務(比如訂單處理),不建議用 AbortPolicy(拋異常)或 DiscardPolicy(扔任務),因為會導致業(yè)務失敗。CallerRunsPolicy 讓調(diào)用線程自己執(zhí)行任務,雖然會讓主線程(比如 Tomcat 的線程)變慢,但能避免任務丟失,是比較安全的選擇。
五、實戰(zhàn)場景:一套方案覆蓋 80% 業(yè)務
前面講了CompletableFuture和線程池,現(xiàn)在咱們結(jié)合實際業(yè)務場景,把這套方案落地。以 “電商下單流程” 為例,看看異步怎么用。
5.1 下單流程分析
一個典型的電商下單流程包括:
- 參數(shù)校驗(必須同步,因為要即時返回錯誤)
- 庫存扣減(必須同步,因為要保證庫存不超賣)
- 創(chuàng)建訂單(必須同步,因為要返回訂單號給用戶)
- 記錄下單日志(異步,不影響下單主流程)
- 更新用戶積分(異步,積分晚一點更新沒關(guān)系)
- 發(fā)送下單成功通知(異步,通知晚一點沒關(guān)系)
- 同步商品銷量(異步,銷量統(tǒng)計可以延遲)
其中 1-3 是核心主流程,必須同步執(zhí)行,4-7 是非核心流程,可以異步執(zhí)行,這樣能減少下單接口的響應時間。
5.2 異步方案落地代碼
咱們用 Spring Boot 來寫這個下單接口,結(jié)合CompletableFuture和自定義線程池:
@RestController
@RequestMapping("/order")
publicclassOrderController {
@Autowired
private OrderService orderService;
@Autowired
private Executor orderAsyncExecutor; // 注入前面配置的訂單異步線程池
/**
* 下單接口
*/
@PostMapping("/create")
public Result<OrderVO> createOrder(@RequestBody OrderCreateDTO dto) {
// 1. 參數(shù)校驗(同步)
validateParams(dto);
// 2. 核心流程:創(chuàng)建訂單(同步,包括庫存扣減、創(chuàng)建訂單記錄)
Order order = orderService.createOrder(dto);
// 3. 非核心流程:異步執(zhí)行
asyncProcessAfterCreate(order);
// 4. 返回結(jié)果(核心流程完成就返回,不用等異步流程)
OrderVO vo = convertToVO(order);
return Result.success(vo);
}
/**
* 參數(shù)校驗(同步)
*/
private void validateParams(OrderCreateDTO dto) {
if (dto.getUserId() == null) {
thrownew BusinessException("用戶ID不能為空");
}
if (CollectionUtils.isEmpty(dto.getGoodsList())) {
thrownew BusinessException("商品列表不能為空");
}
// 其他校驗...
}
/**
* 下單后異步處理(非核心流程)
*/
private void asyncProcessAfterCreate(Order order) {
// 3.1 異步記錄日志
CompletableFuture.runAsync(() -> {
orderService.recordOrderLog(order.getId(), "訂單創(chuàng)建成功");
}, orderAsyncExecutor)
.exceptionally(ex -> {
// 日志記錄失敗不影響主流程,只打日志
log.error("記錄訂單日志失敗,orderId={}", order.getId(), ex);
returnnull;
});
// 3.2 異步更新用戶積分(需要訂單金額,所以用supplyAsync)
CompletableFuture.supplyAsync(() -> {
// 計算積分:訂單金額1元=1積分
int points = order.getAmount().intValue();
return orderService.updateUserPoints(order.getUserId(), points);
}, orderAsyncExecutor)
.exceptionally(ex -> {
log.error("更新用戶積分失敗,userId={}, orderId={}", order.getUserId(), order.getId(), ex);
// 積分更新失敗可以重試,這里用定時任務重試,或者發(fā)MQ重試
sendRetryUpdatePointsMQ(order.getUserId(), order.getAmount().intValue());
returnnull;
});
// 3.3 異步發(fā)送通知(需要用戶手機號,所以先查用戶信息)
CompletableFuture.supplyAsync(() -> {
// 查用戶信息(IO密集型)
return userService.getUserById(order.getUserId());
}, orderAsyncExecutor)
.thenCompose(user -> {
// 用用戶手機號發(fā)送短信通知(異步)
return CompletableFuture.runAsync(() -> {
notifyService.sendSms(user.getPhone(), "您的訂單" + order.getId() + "已創(chuàng)建成功");
}, orderAsyncExecutor);
})
.exceptionally(ex -> {
log.error("發(fā)送下單通知失敗,orderId={}", order.getId(), ex);
returnnull;
});
// 3.4 異步同步商品銷量
CompletableFuture.runAsync(() -> {
for (OrderGoods goods : order.getOrderGoodsList()) {
goodsService.syncSalesCount(goods.getGoodsId(), goods.getQuantity());
}
}, orderAsyncExecutor)
.exceptionally(ex -> {
log.error("同步商品銷量失敗,orderId={}", order.getId(), ex);
returnnull;
});
}
/**
* 轉(zhuǎn)換VO
*/
private OrderVO convertToVO(Order order) {
OrderVO vo = new OrderVO();
vo.setOrderId(order.getId());
vo.setUserId(order.getUserId());
vo.setAmount(order.getAmount());
vo.setStatus(order.getStatus());
vo.setCreateTime(order.getCreateTime());
return vo;
}
/**
* 發(fā)送積分更新重試MQ
*/
private void sendRetryUpdatePointsMQ(Long userId, int points) {
RetryUpdatePointsMQ mq = new RetryUpdatePointsMQ();
mq.setUserId(userId);
mq.setPoints(points);
mq.setRetryCount(0); // 重試次數(shù)
mq.setNextRetryTime(System.currentTimeMillis() + 5 * 60 * 1000); // 5分鐘后重試
rabbitTemplate.convertAndSend("retry-update-points-exchange", "retry.update.points", mq);
}
}5.3 方案亮點和避坑點
亮點:
- 核心流程和非核心流程分離:核心流程(校驗、扣庫存、創(chuàng)建訂單)同步執(zhí)行,保證即時性和數(shù)據(jù)一致性;非核心流程(日志、積分、通知、銷量)異步執(zhí)行,減少接口響應時間。
- 異常處理不影響主流程:每個異步任務都用exceptionally處理異常,即使某個非核心流程失?。ū热缤ㄖl(fā)送失敗),也不會導致下單接口報錯,用戶能正常下單。
- 失敗重試機制:比如積分更新失敗,發(fā)送 MQ 消息,用定時任務或 MQ 的重試機制(比如 RabbitMQ 的死信隊列)進行重試,保證數(shù)據(jù)最終一致性。
避坑點:
- 異步任務不要用默認線程池:前面已經(jīng)講過,默認線程池線程數(shù)少,容易堆積任務,這里用自定義的orderAsyncExecutor,參數(shù)適配 IO 密集型任務。
- 異步任務不要依賴主線程的變量:比如下單接口里的dto對象,在異步任務里不要直接用,因為主線程可能已經(jīng)把dto回收了,導致數(shù)據(jù)不一致。應該用order對象里的字段(比如order.getUserId()),而不是dto.getUserId()。
- 異步任務要考慮冪等性:比如同步商品銷量,萬一異步任務執(zhí)行了兩次,會導致銷量統(tǒng)計錯誤。所以goodsService.syncSalesCount方法要做冪等處理,比如用 “訂單 ID + 商品 ID” 作為唯一鍵,避免重復統(tǒng)計。
六、進階:分布式場景下的異步處理
前面講的都是 “本地異步”,也就是在同一個服務里的異步任務。如果遇到 “跨服務的異步任務”,比如:
- 訂單服務創(chuàng)建訂單后,需要通知庫存服務、支付服務、物流服務。
- 用戶服務更新用戶信息后,需要同步到搜索服務、推薦服務。
這時候光靠CompletableFuture就不夠了,因為跨服務的任務沒法用本地線程池執(zhí)行,而且如果服務掛了,本地異步任務會丟失。
這時候就需要分布式異步方案,核心是用 “消息隊列(MQ)” 來實現(xiàn),比如 RabbitMQ、Kafka、RocketMQ。
6.1 分布式異步方案設計
以 “訂單創(chuàng)建后通知多服務” 為例,方案如下:
- 訂單服務:創(chuàng)建訂單成功后,發(fā)送一條 “訂單創(chuàng)建成功” 的 MQ 消息(比如發(fā)送到 RabbitMQ 的order.created隊列)。
- 其他服務:
- 庫存服務監(jiān)聽order.created隊列,收到消息后更新庫存狀態(tài)。
- 支付服務監(jiān)聽order.created隊列,收到消息后創(chuàng)建支付單。
- 物流服務監(jiān)聽order.created隊列,收到消息后創(chuàng)建物流單。
- 重試和死信隊列:如果某個服務處理消息失?。ū热缥锪鞣諘簳r不可用),MQ 會自動重試,重試幾次還失敗的話,把消息放到死信隊列,后續(xù)人工處理。
6.2 代碼示例(用 RabbitMQ)
訂單服務發(fā)送 MQ 消息:
@Service
publicclass OrderServiceImpl implements OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public Order createOrder(OrderCreateDTO dto) {
// 1. 扣庫存
stockService.deductStock(dto.getGoodsList());
// 2. 創(chuàng)建訂單
Order order = orderMapper.insert(buildOrder(dto));
// 3. 發(fā)送MQ消息(分布式異步)
sendOrderCreatedMQ(order);
return order;
}
/**
* 發(fā)送訂單創(chuàng)建成功的MQ消息
*/
private void sendOrderCreatedMQ(Order order) {
OrderCreatedMQ mq = new OrderCreatedMQ();
mq.setOrderId(order.getId());
mq.setUserId(order.getUserId());
mq.setAmount(order.getAmount());
mq.setCreateTime(new Date());
try {
// 發(fā)送消息到order.created隊列
rabbitTemplate.convertAndSend(
"order-exchange", // 交換機
"order.created", // 路由鍵
mq,
message -> {
// 設置消息過期時間:30分鐘(超過30分鐘沒處理,就放到死信隊列)
message.getMessageProperties().setExpiration("1800000");
// 設置消息唯一ID,用于冪等處理
message.getMessageProperties().setMessageId(order.getId().toString());
return message;
}
);
log.info("發(fā)送訂單創(chuàng)建MQ消息成功,orderId={}", order.getId());
} catch (Exception e) {
log.error("發(fā)送訂單創(chuàng)建MQ消息失敗,orderId={}", order.getId(), e);
// 消息發(fā)送失敗,可以記錄到本地表,用定時任務重試
mqRetryService.saveRetryRecord("order.created", mq);
}
}
}物流服務監(jiān)聽 MQ 消息:
@Service
publicclass LogisticsConsumer {
@Autowired
private LogisticsService logisticsService;
/**
* 監(jiān)聽訂單創(chuàng)建消息,創(chuàng)建物流單
*/
@RabbitListener(
queues = "order.created.logistics.queue", // 物流服務的隊列
containerFactory = "rabbitListenerContainerFactory"
)
public void handleOrderCreated(Message message, @Payload OrderCreatedMQ mq) {
String messageId = message.getMessageProperties().getMessageId();
log.info("收到訂單創(chuàng)建消息,準備創(chuàng)建物流單,messageId={}, orderId={}", messageId, mq.getOrderId());
try {
// 1. 冪等處理:先查有沒有處理過這個消息
if (logisticsService.existsByOrderId(mq.getOrderId())) {
log.info("物流單已存在,跳過處理,orderId={}", mq.getOrderId());
return;
}
// 2. 創(chuàng)建物流單
logisticsService.createLogisticsOrder(mq);
// 3. 手動確認消息(如果用的是手動確認模式)
Channel channel = message.getMessageProperties().getChannel();
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("處理訂單創(chuàng)建消息失敗,messageId={}, orderId={}", messageId, mq.getOrderId(), e);
// 處理失敗,手動拒絕消息,讓MQ重試(重試次數(shù)由MQ配置)
try {
Channel channel = message.getMessageProperties().getChannel();
// requeue=false:不重新放回原隊列,讓MQ放到死信隊列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException ex) {
log.error("拒絕消息失敗,messageId={}", messageId, ex);
}
}
}
}6.3 分布式異步的注意點
- 消息冪等性:必須保證同一個消息不會被重復處理,比如用 “訂單 ID” 作為唯一鍵,處理前先查有沒有處理過。
- 消息可靠性:確保消息不會丟失,比如:
- 生產(chǎn)者發(fā)送消息后,MQ 要返回確認(ack)。
- 消費者處理完消息后,手動確認(ack)。
- 消息發(fā)送失敗時,記錄到本地重試表,用定時任務重試。
- 消息過期時間:設置消息過期時間,避免無效消息一直占用 MQ 資源,比如超過 30 分鐘沒處理的訂單消息,就放到死信隊列。
七、總結(jié):這套 “萬能” 方案的核心是什么?
看到這里,你可能會問:“你說這是‘萬能’方案,真的能覆蓋所有場景嗎?”
其實沒有絕對的 “萬能”,但這套方案能覆蓋 80% 以上的 Java 異步場景,它的核心是 “分層設計”:
- 本地異步層:用CompletableFuture做本地異步任務的編排,解決 “多任務并行”“鏈式調(diào)用”“異常處理”“超時控制” 的問題,配合自定義線程池,保證線程資源可控。
- 分布式異步層:用 MQ 做跨服務的異步通信,解決 “服務解耦”“消息可靠傳遞”“失敗重試” 的問題,配合冪等設計和死信隊列,保證數(shù)據(jù)最終一致性。
- 基礎(chǔ)保障層:包括線程池合理配置、異步任務冪等處理、異常日志記錄、失敗重試機制,這些是異步方案能穩(wěn)定運行的基礎(chǔ)。
最后給你幾個實戰(zhàn)建議:
- 不要過度異步:只有非核心流程才用異步,核心流程(比如下單、支付)必須同步,保證數(shù)據(jù)一致性和即時性。
- 優(yōu)先用本地異步:能在一個服務里解決的,就不用分布式異步(MQ),因為 MQ 會增加系統(tǒng)復雜度。
- 監(jiān)控異步任務:給線程池加監(jiān)控,比如用 Spring Boot Actuator 監(jiān)控線程池的活躍線程數(shù)、隊列長度、拒絕次數(shù);給 MQ 加監(jiān)控,監(jiān)控消息發(fā)送成功率、消費成功率、死信隊列長度。
- 多做壓測:異步方案上線前,一定要做壓測,看看線程池會不會滿、MQ 會不會堆積,確保在高并發(fā)下能穩(wěn)定運行。
掌握了這套方案,下次再遇到 “接口太慢”“線程不夠用”“跨服務異步” 的問題,你就能游刃有余地解決,再也不用被老板催著優(yōu)化性能了~




























