精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

線程池ThreadPoolExecutor源碼深度解析

開發 前端
線程池是Java并發編程的核心組件,通過復用線程減少資源開銷,提升系統吞吐量。其核心設計包括線程復用機制 、任務隊列和拒絕策略 ,通過ThreadPoolExecutor的參數(核心線程數、最大線程數、隊列容量等)實現靈活的資源控制。線程池的生命周期由RUNNING、SHUTDOWN等狀態管理,確保任務有序執行或終止。

一、引言

二、為什么使用線程池?

三、JDK線程池的架構設計

    1. JUC并發包下的Executor框架的uml類圖

    2. ThreadPoolExecutor的參數解析

    3. 運行機制詳解

    4. 線程池的生命周期

四、JDK內置線程池的問題

五、線程池中的問題與最佳實踐

    1. invokeAll 超時機制無效?

    2. submit()的異常消失了?

    3. 異常處理實踐

    4. 拒絕策略實踐

    5. 池隔離實踐

六、總結

一、引 言

為什么進行源碼角度的深度解析?

大家在項目中到處都在使用線程池做一些性能接口層次的優化,原先串行的多個遠程調用,因為rt過高,通過線程池批量異步優化,從而降低rt。還有像RocketMQ中broker啟動時,同時通過ScheduledThreadPoolExecutor線程池執行其他組件的定時任務,每隔一段時間處理相關的任務。線程池廣泛的應用在外面各種實際開發場景中,我們很多同學可能在項目里只是簡單的copy了一些前人的代碼參數并不知道其中的含義,從而導致生產級別的bug。所以本篇文章,旨在幫助還不熟悉或者想要熟悉線程池的同學,分享我自己在學習線程池源碼上的一些內容來更簡單、快速的掌握線程池。

二、為什么使用線程池?

并發編程中,對于常見的操作系統,線程都是執行任務的基本單元,如果每次執行任務時都創建新的線程,任務執行完畢又進行銷毀,會出現以下的問題:

  • 資源開銷:比如在Linux系統中,頻繁的創建和銷毀線程,一個是頻繁的進行一個系統調用,另外是一些內存和CPU資源調度的占用。雖然有一些寫時復制的策略防止lwp的創建時的內存占用,但是實際寫入還是會申請系統內存的,何況一些頁表等本身就有內存占用。
  • 性能瓶頸:線程的創建需要系統調用,如果只是簡單的計算任務,可能耗時還沒創建的rt高,這里反而降低了系統的吞吐量。
  • 缺乏資源管理:無限制的創建線程會導致內存溢出,java.lang.OutOfMemoryError: unable to create native thread,這里主要因為Java的線程其實Linux中是lwp線程,需要通過JNI進行系統調用創建,每個線程默認需要1MB的??臻g,很容易導致無休止的創建線程導致內存溢出,另外就是頻繁的系統調用,導致的上下文切換,占用了過多的CPU,反而起到了相反的作用。
  • 功能受限:手動管理線程難以實現更高級的功能,如定時任務、周期任務、任務管理、并發任務數的控制等。

通過上面的問題,我們其實可以清晰的感知到這些問題都是歸攏到資源沒有得到合理的分配和控制導致的,線程池出現的核心宗旨其實就是對資源的合理分配和控制。除了線程池,其實更多的也接觸過數據庫連接池、netty的對象池等池化技術,這些池化思想其實都是為了更好的降低資源的消耗以及更好的進行資源管理。

三、JDK線程池的架構設計

JUC并發包下的Executor框架的uml類圖

圖片圖片

  • Executor:任務執行的頂層接口,主要是分離任務提交與執行邏輯,支持同步/異步執行,遵循Java內存模型的 happen-before規則。
  • ExecutorService:繼承Executor接口,提供了更完善的生命周期管理能力,通過Future對象提供任務取消、狀態查詢、結果獲取能力實現了任務監控。
  • AbstractExecutorService:常見的設計模式為了簡化線程池的開發,通常通過父類進行一些基礎的默認實現讓子類繼承。
  • ScheduledExecutorService:ExecutorService的擴展接口,支持延遲執行和周期性任務調度。
  • ThreadPoolExecutor:是ExecutorService接口最核心和最常用的實現類,它提供了高度可配置的線程池,允許我們精細控制線程池的各種行為。
  • ScheduledThreadPoolExecutor:是ScheduledExecutorService接口的實現類,它繼承自ThreadPoolExecutor,專門用于處理定時和周期任務。
  • Executors:一個靜態工廠模式的工具類,提供了一系列靜態方法來創建各種常見配置的線程池,newFixedThreadPool(), newCachedThreadPool(),等,簡化了創建線程池的使用但是會帶來一些問題,很多開發規范里都不建議大家直接使用。JDK內置的線程池如果我們不熟悉里面的參數很有可能導致出乎自己意料的結果,池大小設置、阻塞隊列選擇等等都是有考究的,這一點后續會進行一些詳細說明。生產環境中建議謹慎使用或直接使用ThreadPoolExecutor構造函數自定義。

ThreadPoolExecutor的參數解析

圖片圖片

  • corePoolSize 核心線程數:

線程池中還未退出的alive的核心線程數量。

雖然線程處于空閑狀態(其實是阻塞在阻塞隊列中),除非顯示設置了allowCoreThreadTimeOut=true,否則這些線程不會從自己的run方法中退出被回收。

添加新任務時,如果當前工作線程小于coreSize,此時即使存在空閑的core線程,線程池也會通過addWorker方法創建一個新的線程。

  • maximumPoolSize 最大線程數:

線程池可以創建的最大線程數。

如果是有界隊列,當隊列滿時,仍然有任務進來,此時線程池會創建小于最大線程數的線程來完成任務,空閑。

如果是無界隊列,那么永遠不會出現第二點的情況,除了內存異常,否則會一直保持核心線程數,多余的任務會一直往隊列中加入。

  • keepAliveTime 線程空閑存活時間

線程數超過corePoolSize后創建的線程我們理解為非核心線程,對于這類線程,他的回收機制在于我們設置的keepAliveTime,線程會限期阻塞在隊列中獲取任務,如果超時未獲取就會進行清理并退出。

另外如果設置allowCoreThreadTimeOut=true,所謂的核心線程在空閑時間達到keepAliveTime時也會被回收。

  • unit 時間單位

keepAliveTime參數的時間單位,TimeUnit中時分秒等。

  • workQueue 任務隊列

阻塞隊列,核心線程數滿時,新加入的任務,會先添加到阻塞隊列中等待線程獲取任務并執行。

常用的BlockingQueue實現有:

1)ArrayBlockingQueue:數組實現的先進先出原則的有界阻塞隊列,構造方法必須指定容量。

2)LinkedBlockingQueue:鏈表實現的阻塞隊列,構造傳入容量則有界,未傳則是無界隊列,此時設置的最大線程數其實就不會有作用了。

3)SynchronousQueue:一個不存儲元素的阻塞隊列。每個put操作必須等待一個take操作,反之亦然。它相當于一個傳遞通道,非常適合傳遞性需求,吞吐量高,但要求maximumPoolSize足夠大。

4)PriorityBlockingQueue:二叉堆實現的優先級阻塞隊列,構造時可自行調整排序行為(小頂堆或大頂堆)。

5)DelayQueue:支持延時的無界阻塞隊列,主要用于周期性的任務,我們可以直接通過它來實現一些簡單的延遲任務需求,復雜的周期性任務建議使用ScheduledThreadPoolExecutor。

  • threadFactory 線程工廠

用于創建新線程的工廠。通過自定義ThreadFactory,我們可以為線程池中的線程設置更有意義的名稱、設置守護線程狀態、設置線程優先級、指定UncaughtExceptionHandler等。

Executors.defaultThreadFactory()是默認實現。

  • handler 拒絕策略

ThreadPoolExecutor.AbortPolicy:默認的拒絕策略,簡單粗暴,當execute中添加woker失敗時,直接在當前線程拋出異常。

JDK內置了四種拒絕策略:

1)ThreadPoolExecutor.AbortPolicy:默認的拒絕策略,簡單粗暴,當execute中添加woker失敗時,直接在當前線程拋出異常。

2)ThreadPoolExecutor.CallerRunsPolicy:提交任務的線程,直接執行任務。變相的背壓機制,可以降低任務往線程中加入。

3)ThreadPoolExecutor.DiscardPolicy:直接丟棄被拒絕的任務,不做任何通知,需容忍數據丟失。

4)ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列中最舊的任務,然后重試提交當前任務,需容忍數據丟失。

5)實現RejectedExecutionHandler接口自定義拒絕策略,在實際生產應用中推薦使用,可以做一些打印觀察日志的操作,告警、兜底的相關處理等。

運行機制詳解

新任務通過execute()方法提交給ThreadPoolExecutor時,其處理流程如下:

判斷核心線程數:如果當前運行的線程數小于corePoolSize,則創建新線程(即使有空閑的核心線程)來執行任務。

嘗試入隊:如果當前運行的線程數大于或等于corePoolSize,則嘗試將任務添加到workQueue中。

  • 如果workQueue.offer()成功(隊列未滿),任務入隊等待執行。

嘗試創建非核心線程:如果workQueue.offer()失?。犃幸褲M):

  • 判斷當前運行的線程數是否小于maximumPoolSize;
  • 如果是,則創建新的非核心線程來執行任務。

執行拒絕策略:

如果當前運行的線程數也達到了maximumPoolSize(即核心線程和非核心線程都已用盡,且隊列也滿了),則執行RejectedExecutionHandler所定義的拒絕策略。

參考網絡中的經典執行圖:

圖片圖片

這個圖能很好的表明運行原理,但是忽略了很多細節,比如所謂的緩沖執行是在什么條件下去走的呢?直接執行又是什么邏輯下執行呢?最后的任務拒絕又是怎么回事?帶著這些疑問點,我們直接來進行一個源碼級別的分析:

execute核心流程的源碼分析

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //線程池狀態 高3位表示線程狀態 低29位代表線程數量
    int c = ctl.get();
    //判斷當前線程池線程數量是否小于核心線程數
    if (workerCountOf(c) < corePoolSize) {
        //作為核心線程數進行線程的創建,并且創建成功線程會將command的任務執行--》對應圖上的直接執行
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //創建核心線程失敗或者當前線程數量超過核心線程數
    //當前線程池是否還在運行狀態,嘗試將任務添加到阻塞隊列 --》對應圖上的緩沖執行
    //BlockingQueue隊列的頂級抽象定義了offer不是進行阻塞添加而是立即返回,添加失敗直接返回false,區別于put
    if (isRunning(c) && workQueue.offer(command)) {
        //重新獲取線程池標志位
        int recheck = ctl.get();
        //如果線程此時不在運行狀態中,那么將任務刪除
        if (! isRunning(recheck) && remove(command))
            //刪除任務成功,走拒絕策略拒絕掉當前任務
            reject(command);
        else if (workerCountOf(recheck) == 0)
            //如果線程池中的工作線程都沒有的時候,這里需要創建一個線程去執行添加到隊列中的任務
            //防止因為并發的原因工作線程都被終止掉了,此時任務在阻塞隊列里等著,缺沒有工作線程
            addWorker(null, false);
    }
    //到這里那就是添加隊列失敗,或者線程池狀態異常,但是這里仍然嘗試進行創建一個worker
    //如果創建失敗,也是走拒絕策略拒絕當前任務
    else if (!addWorker(command, false))
        reject(command);
}

接下來我們仔細看看addWorker這個方法具體是在做什么:

//核心邏輯其實就是在無限循環創建一個worker,創建失敗直接返回,創建成功,則將worker執行
// 因為worker有thread的成員變量,最終添加worker成功,會啟動線程的start方法
//start方法最終會回調到外層的runWorker方法,改方法會不停的從阻塞隊列里以阻塞的take方式
//獲取任務,除非達到能被終止的條件,此時當前線程會終止
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //不停的重試添加worker的計數,只有添加成功的才會進行后續的worker啟動
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            //重試期間,如果其他線程導致線程池狀態不一致了。重新回到第一個循環進行check判斷
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //這里加鎖一個是workers.add時需要加鎖,另外是防止其他線程已經在嘗試修改線程池狀態了
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //將worker的引用添加到workers的hashSet中
                    workers.add(w);
                    int s = workers.size();
                    //更新線程池此時最大的線程數
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //如果添加成功,就啟動worker中的線程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //這里添加失敗的話,需要把線程池的count數進行--,并且要把worker引用從hashSer中移除
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

線程池的生命周期

在介紹運行機制原理的源碼分析時,其實是有提到線程池狀態這個概念的。介紹這個狀態其實也是讓大家更方便的去管理線程池,比如我們關閉線程池時,怎么去優雅的關閉,使用不同的方法可能會有不同的效果,我們需要根據自己的業務場景去酌情分析、權衡使用。

//線程池的狀態和計數采用一個Integer變量設置的
//這里之所以用一個變量來儲存狀態和數量,其實很有講究的,因為我們在上面的運行原理上可以看到
//源碼中有大量的進行狀態以及數量的判斷,如果分開采用變量的記錄的話,在維護二者一致性方面
//可能就需要加鎖的維護成本了,而且計算中都是位移運算也是非常高效的
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//線程池的大小由ctl低29位表示,現成狀態由ctl高3位表示
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;


// 線程池的狀態通過簡單的位移就能計算出來,狀態只能從低到高流轉,不能逆向
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;


// 這里是獲取線程狀態以及獲取線程數量的簡單高效的位移方法
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

接下來結合源碼詳細介紹下線程池的5種狀態以及分別有什么不同的表現行為?

先說下結論:


RUNNING     這個就是線程池運行中狀態,我們可以添加任務也可以處理阻塞隊列任務


SHUTDOWN   不能添加新的任務,但是會將阻塞隊列中任務執行完畢


STOP       不能添加新的任務,執行中的線程也會被打斷,也不會處理阻塞隊列的任務


TIDYING    所有線程都被終止,并且workCount=0時會被置為的狀態


TERMINATED   調用完鉤子方法terminated()被置為的狀態

shutdown狀態源碼分析:

//線程池關閉
 public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //循環cas設置線程池狀態,直到成功或狀態已經state>=SHUTDOWN
        advanceRunState(SHUTDOWN);
        //這個是真正得出結論的地方
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}


private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}


//打斷空閑的線程,如何判斷線程是否空閑還是運行?
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //worker的線程沒有被打斷過,并且能獲取到worker的aqs獨占鎖
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    //打斷當前線程,如果線程在阻塞隊列中阻塞,此時會被中斷
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

STOP狀態分析

//循環cas修改線程池狀態為stop。打斷所有線程,取出阻塞隊列的所有任務
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //檢查線程的權限
        checkShutdownAccess();
        //將狀態case為stop
        advanceRunState(STOP);
        //打斷所有worker不管是不是正在執行任務
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}


//這里獲取鎖之后。打斷了所有的線程
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

TIDYING、TERMINATED 狀態分析

//這個方法在每個線程退出時都會進行調用,如果是運行中、或者狀態大于等于TIDYING或者shutdown但是隊列不為空都
//直接返回,如果不滿足以上條件,并且線程數不為0的話,打斷一個空閑線程
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }


        //此時到這里,狀態要么為STOP。要么是shutdown并且隊列為空了
        // 獲取一個鎖,嘗試cas修改狀態為TIDYING
        //調用terminated()的鉤子方法,
        //修改線程池為終態TERMINATED,并且喚醒阻塞在termination隊列上的線程
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

四、JDK內置線程池的問題

java.util.concurrent.Executors工廠類提供了一些靜態方法,方便我們快速創建幾種預設配置的線程池:

  • Executors.newFixedThreadPool(int nThreads):

創建一個固定大小的線程池。corePoolSize和maximumPoolSize都等于nThreads。

keepAliveTime為0L(因為線程數不會超過corePoolSize,所以此參數無效,除非allowCoreThreadTimeOut為true)。

使用無界的LinkedBlockingQueue作為工作隊列。

問題:由于使用無界隊列,當任務提交速度遠大于處理速度時,隊列會持續增長,可能導致內存溢出(OOM)。此時maximumPoolSize參數實際上是無效的,線程數永遠不會超過nThreads。

  • Executors.newSingleThreadExecutor():

創建一個只有一個工作線程的線程池。corePoolSize和maximumPoolSize都為1。

同樣使用無界的LinkedBlockingQueue。

保證所有任務按照提交順序(FIFO)執行。

問題:與newFixedThreadPool類似,無界隊列可能導致OOM。

  • Executors.newCachedThreadPool():

創建一個可緩存的線程池。

corePoolSize為0。

maximumPoolSize為Integer.MAX_VALUE (幾乎是無界的)。

keepAliveTime為60秒。

使用SynchronousQueue作為工作隊列。這種隊列不存儲元素,任務提交后必須有空閑線程立即接收,否則會創建新線程(如果未達到maximumPoolSize)。

問題:如果任務提交速度過快,會創建大量線程(理論上可達Integer.MAX_VALUE個),可能耗盡系統資源,導致OOM以及頻繁的上下文切換。

  • Executors.newSingleThreadScheduledExecutor()、Executors.newScheduledThreadPool(int corePoolSize):

創建用于調度任務的線程池。

內部使用ScheduledThreadPoolExecutor實現,其任務隊列是DelayedWorkQueue (一種特殊的PriorityQueue)。

newSingleThreadScheduledExecutor的corePoolSize為1,maximumPoolSize為Integer.MAX_VALUE(但由于隊列是DelayedWorkQueue,通常不會無限增長線程,除非有大量同時到期的任務且處理不過來)。

newScheduledThreadPool可以指定corePoolSize。

問題:雖然DelayedWorkQueue本身是無界的,但ScheduledThreadPoolExecutor在任務執行邏輯上與普通ThreadPoolExecutor有所不同。主要風險仍然是如果corePoolSize設置不當,且大量任務同時到期并執行緩慢,可能導致任務積壓。

某一線互聯網Java開發手冊某一線互聯網Java開發手冊


五、線程池中的問題與最佳實踐

invokeAll 超時機制無效?

ExecutorService.invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)方法會提交一組Callable任務,并等待所有任務完成,或者直到超時。如果超時發生,它會嘗試取消(中斷)所有尚未完成的任務,然后返回一個List<Future>。

失效場景分析:

  • 任務不響應中斷(最常見):任務內部捕獲 InterruptedException 后靜默處理,或執行不檢查中斷狀態的阻塞操作(如循環計算):
Callable<String> task = () -> {
    while (true) {
        //缺少此檢查將導致超時失效
        if (Thread.interrupted()) break; 
        // 耗時計算...
    }
    return "done";
};
  • 使用非響應中斷的API:任務調用了不響應 interrupt() 的第三方庫或JNI代碼(如某些IO操作)
Callable<Integer> task = () -> {
    Files.copy(in, path); // 某些NIO操作不響應中斷
    return 1;
};
  • 任務依賴外部資源阻塞:任務因外部資源(如數據庫連接、網絡請求)阻塞且未設置超時。
Callable<Result> task = () -> {
    //未設查詢超時時間
    return jdbcTemplate.query("SELECT * FROM large_table"); 
};
  • 線程池配置缺陷:核心線程數過大或隊列無界,導致 invokeAll 超時前任務無法全部啟動,任務堆積在隊列,invokeAll 超時后仍有大量任務未執行。
new ThreadPoolExecutor(
    100, 100, // 核心線程數過大
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<>() // 無界隊列
);

invokeAll超時失效demo:

import java.util.*;
import java.util.concurrent.*;


public class InvokeAllTimeoutDemo {
    
    // 模擬耗時任務(可配置是否響應中斷)
    static class Task implements Callable<String> {
        private final int id;
        private final long durationMs;
        private final boolean respectInterrupt;
        
        Task(int id, long durationMs, boolean respectInterrupt) {
            this.id = id;
            this.durationMs = durationMs;
            this.respectInterrupt = respectInterrupt;
        }
        
        @Override
        public String call() throws Exception {
            System.out.printf("Task %d started%n", id);
            long start = System.currentTimeMillis();


            // 模擬工作(檢查中斷狀態)
            while (System.currentTimeMillis() - start < durationMs) {
                if (respectInterrupt && Thread.interrupted()) {
                    throw new InterruptedException("Task " + id + " interrupted");
                }
                // 不響應中斷的任務會繼續執行
            }


            System.out.printf("Task %d completed%n", id);
            return "Result-" + id;
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        
        List<Callable<String>> tasks = Arrays.asList(
            new Task(1, 2000, true),   // 2秒,響應中斷
            new Task(2, 10000, false)  // 10秒,不響應中斷
        );
        
        System.out.println("Invoking with 3s timeout...");
        try {
            //設置3秒超時
            List<Future<String>> futures = executor.invokeAll(tasks, 3, TimeUnit.SECONDS);


            for (Future<String> f : futures) {
                // 明確處理取消狀態
                if (f.isCancelled()) {
                    System.out.println("Task was cancelled");  
                } else {
                    try {
                        System.out.println("Result: " + f.get(100, TimeUnit.MILLISECONDS));
                    } catch (TimeoutException | ExecutionException e) {
                        System.out.println("Task failed: " + e.getCause());
                    }
                }
            }
        } finally {
            executor.shutdownNow();
            System.out.println("Executor shutdown");
        }
    }
}

當我們使用invokeAll(tasks, timeout) 提交多個任務時,如果出現某個任務對中斷不響應或者響應不及時,那我們即使設置了超時時間,不響應中斷的任務2仍在后臺運行(即使調用了 shutdownNow())

submit()的異常消失了?

使用ExecutorService.submit()提交任務時,任務執行過程中如果拋出未捕獲的異常(無論是受檢異常還是運行時異常),這個異常會被Future的包裝類如FutureTask重寫的run()方法捕獲并封裝在返回的Future包裝對象的成員變量中。

  • 不顯示調用Future.get(),該異常我們就無法感知,好像沒有發生過一樣。線程池的工作線程本身通常會有一個默認的未捕獲異常處理器,可能會打印堆棧到控制臺,但你的主業務邏輯不會知道。
  • 顯示調用Future.get(),拋出聲明式的ExecutionException,其cause屬性才是原始的任務異常。
  • 如果調用Future.get(long timeout, TimeUnit unit)超時,向外拋出聲明式的TimeoutException。此時任務可能仍在后臺執行,可能錯過了內部的異常。

submit()異常消失demo:

public class ThreadPoolExceptionDemo {
    
    public static void main(String[] args) {
        // 創建單線程線程池(便于觀察異常)
        ExecutorService executor = Executors.newSingleThreadExecutor();
        
        // 場景1:Callable拋出異常(通過Future.get()捕獲)
        Future<String> future1 = executor.submit(() -> {
            System.out.println("[Callable] 開始執行");
            Thread.sleep(100);
            throw new RuntimeException("Callable故意拋出的異常");
        });
        
        try {
            System.out.println("Callable結果: " + future1.get());
        } catch (ExecutionException e) {
            System.err.println("捕獲到Callable異常: " + e.getCause().getMessage());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        // 場景2:Runnable拋出異常(同樣通過Future.get()捕獲)
        Future<?> future2 = executor.submit(() -> {
            System.out.println("[Runnable] 開始執行");
            throw new IllegalArgumentException("Runnable故意拋出的異常");
        });
        
        try {
            future2.get(); // Runnable成功時返回null
        } catch (ExecutionException e) {
            System.err.println("捕獲到Runnable異常: " + e.getCause().getMessage());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        // 場景3:未處理的任務異常(需設置異常處理器)
        executor.submit(() -> {
            System.out.println("[未捕獲的任務] 開始執行");
            throw new IllegalStateException("這個異常會被默認處理器處理");
        });
        
        executor.shutdown();
    }
}

異常處理實踐

圖片圖片

  • Callable/Runnable catch處理異常:

不要捕獲Throwable或Exception然后靜默處理(只打日志)。如果確實需要捕獲,請考慮是否應該重新拋出(包裝成業務允許的受檢異?;蜻\行時異常)。

禁止靜默處理 InterruptedException:

1)在JDK的JUC底層源碼中,我們可以看到很多聲明了InterruptedException的方法,基本上都是對這類方法catch異常,要么繼續往外拋出,或者處理完相關資源后,重置中斷狀態,絕對不要靜默處理。

2)如果方法沒有聲明InterruptedException如Runnable.run(),在catch InterruptedException后最好調用Thread.currentThread().interrupt()來恢復中斷標記。

正確處理中斷:callable在耗時的loop任務處理中,如果出現了中斷異常,因為Java代碼中中斷只是一種協作方式,其并沒真的終止線程,所以一般都是需要我們進行一個中斷標志的傳遞,如線程池中的shutdownNow()就依賴次機制處理。

  • submit()執行的任務,謹慎處理Future:

使用帶過期時間的future.get(long timeOut)獲取結果,并要對該方法進行try cache防止其他異常拋出。

多個任務并行處理時,如果有下個請求依賴上個請求,務必使用get()讓主線程等待這一結果執行完成后,流轉到下一個異步任務。

  • 實現線程Thread的UncaughtExceptionHandler屬性,在自定義的TheadFactory中通過set方法賦值:execute()方法執行時,對于沒有捕獲的異常使用線程組的兜底統一處理機制。
//自定義當前線程組創建線程的統一異常處理,類似于controller的統一異常處理機制
ThreadFactory myThreadFactory = new ThreadFactory() {
    private final AtomicInteger atomicInteger = new AtomicInteger(0);
    
    private final String threadNamePrefix = "myThreadFactory-";
    
    @Override
    public Thread newThread(Runnable r) {
        Thread t =  new Thread(r,threadNamePrefix + atomicInteger.getAndIncrement());
        t.setUncaughtExceptionHandler((thread, throwable) -> {
        //異常的統一處理,日志打印、兜底處理、監控、資源釋放等
        System.err.println("線程[" + thread.getName() + "]異常: " + throwable);});
        return t;
    }
};


//構造方法時使用自定義的線程工廠
ExecutorService executor = new ThreadPoolExecutor(
    corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    threadFactory,
    handler
);
  • 使用自定義線程池時建議重寫鉤子方法afterExecute(Runnable r, Throwable t):這個hook方法是用來解決當前任務線程發生的異常,默認是空實現,我們可以重寫他,比如進行兜底的線程繼續執行,打印日志記錄,以及同步失敗使用兜底異步處理等等方式。還要注意釋放應用中的資源,比如文件鎖的占用等,最好手動釋放掉,避免底層操作系統線程對這類資源釋放失敗導致長期占用,最后只能重啟Java進程的尷尬地步。
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
       //需要特別注意任務是否為submit提交,如果是execute提交的任務,那這里很直接的知道任務是否發生異常以及后續去怎么處理
       if(r instanceof Future){
           if(((Future<?>) r).isDone() || ((Future<?>) r).isCancelled()){
               //繼續使用主線程完成任務,一般不建議,最好使用兜底方式:例如異步發消息,由后續的消費組統一處理異常的任務
           }
       }else if( t != null){
            //execute異常處理
       }
    }
}


//FutureTask 把run方法進行了重寫,并且catch住了異常,所以說afterExecute的t 如果是submit提交的方式
//那么t基本上就是null
public void run() {
    //....
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
     //...
}

afterExecute可以借鑒的示例:

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.slf4j.*;


public class RobustThreadPool extends ThreadPoolExecutor {
    private static final Logger logger = LoggerFactory.getLogger(RobustThreadPool.class);
    private final AtomicLong failureCounter = new AtomicLong();
    private final RetryPolicy retryPolicy; // 重試策略
    private final ThreadLocal<Long> startTime = new ThreadLocal<>();
    
    public RobustThreadPool(int corePoolSize, int maxPoolSize, 
                          BlockingQueue<Runnable> workQueue,
                          RetryPolicy retryPolicy) {
        super(corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, workQueue);
        this.retryPolicy = retryPolicy;
    }
    
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        logger.debug("開始執行任務: {}", r);
    }
    
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // 1. 異常分類處理
         if(r instanceof Future){
           if(((Future<?>) r).isDone()){
             //錯誤記錄以及異常處理
            failureCounter.incrementAndGet();
            handleFailure(r, t, costTime);
           }
       }else if( t != null){
            //execute異常處理
            failureCounter.incrementAndGet();
            handleFailure(r, t, costTime);
       }
        // 2. 資源清理
        cleanThreadLocals();
    }
    
    private void handleFailure(Runnable r, Throwable t) {
        // 1. 異常類型識別
        if (t instanceof OutOfMemoryError) {
            logger.error("JVM內存不足,終止任務: {}", t.getMessage());
            System.exit(1); // 嚴重錯誤直接終止
        } 
        // 2. 可重試異常處理
        else if (isRetryable(t)) {
            int retryCount = retryPolicy.getCurrentRetryCount(r);
            if (retryCount < retryPolicy.getMaxRetries()) {
                logger.warn("任務第{}次失敗,準備重試...", 
                    retryCount + 1, t);
                retryPolicy.retry(r, this);
            } else {
                logger.error("任務超過最大重試次數({}),轉入死信隊列", 
                    retryPolicy.getMaxRetries(), t);
                DeadLetterQueue.add(r, t);
            }
        } 
        // 3. 不可重試異常
        else {
            logger.error("不可恢復任務失敗", t);
            Metrics.recordFailure(t.getClass()); // 上報監控
        }
    }
    
    private boolean isRetryable(Throwable t) {
        return t instanceof IOException || 
               t instanceof TimeoutException ||
               (t.getCause() != null && isRetryable(t.getCause()));
    }
    
    private void cleanThreadLocals() {
        // 清理可能的內存泄漏
        try {
            ThreadLocal<?>[] holders = { /* 其他ThreadLocal */};
            for (ThreadLocal<?> holder : holders) {
                holder.remove();
            }
        } catch (Exception e) {
            logger.warn("清理ThreadLocal失敗", e);
        }
    }
    
    // 重試策略嵌套類
    public static class RetryPolicy {
        private final int maxRetries;
        private final long retryDelayMs;
        private final Map<Runnable, AtomicInteger> retryMap = new ConcurrentHashMap<>();
        
        public RetryPolicy(int maxRetries, long retryDelayMs) {
            this.maxRetries = maxRetries;
            this.retryDelayMs = retryDelayMs;
        }
        
        public void retry(Runnable task, Executor executor) {
            retryMap.computeIfAbsent(task, k -> new AtomicInteger()).incrementAndGet();
            if (retryDelayMs > 0) {
                executor.execute(() -> {
                    try {
                        Thread.sleep(retryDelayMs);
                    } catch (InterruptedException ignored) {}
                    executor.execute(task);
                });
            } else {
                executor.execute(task);
            }
        }
        
        public int getCurrentRetryCount(Runnable task) {
            return retryMap.getOrDefault(task, new AtomicInteger()).get();
        }
        
        public int getMaxRetries() {
            return maxRetries;
        }
    }
}

異常處理小結:要特別注意使用future.get()方法時,我們一定要注意設置超時時間,防止主線程無限期的阻塞避免邊緣的業務查詢影響了主業務造成得不償失的效果,另外我們需要注意一個點就是submit()方法的提交任務時,afterExecute(Runnable r, Throwable t)中的t恒為null,如果是execute方法提交的任務,那么就是直接獲取的任務執行的異常,對于submit提交的任務異常其被封裝到了Futrure 包裝對象中,一般需要我們再次判斷任務時執行完畢還是異常或被取消了,如果發生了異常,Future.get()會拋出封裝的ExecutionException異常,當然還可能是取消異常以及中斷異常。invokeAll和invokeAny我們需要對返回的Future結果檢查可能拋出的異常,對于callable 前面一再強調了要對InterruptedException不要靜默處理,因為線程的中斷標記只是一個協作方式,他并沒有停止當前線程的運行,我們需要根據自身的場景對發生的中斷進行快速響應以及傳遞中斷標志。

拒絕策略實踐

先帶大家回顧一下策略是如何觸發執行的流程:

//添加任務,當不滿足條件時會執行拒絕方法reject
public void execute(Runnable command) {
   //...
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}


//這里就是拒絕的入口。handler是有構造方法傳入
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}


public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    //....
    //指定拒絕策略
    this.handler = handler;
}

AbortPolicy:默認的拒絕策略,簡單粗暴,當execute中添加woker失敗時,直接在當前線程拋出異常。

public static class AbortPolicy implements RejectedExecutionHandler {


     //直接拋出RejectedExecutionException
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

優點:快速失敗,立即暴露系統過載問題、避免任務靜默丟失,便于監控系統捕獲

缺點:需要調用方顯式處理異常,增加代碼復雜度,可能中斷主業務流程

適用場景:適用于那些對任務丟失非常敏感,配合熔斷機制使用的快速失敗場景

CallerRunsPolicy:提交任務的線程,直接執行任務

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    //直接在提交任務的線程中執行任務
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

優點:任務都會被執行,不會丟任務,并且由于主線程執行任務,天然的流量控制,避免了大量的任務進入線程池。

缺點:調用線程可能被阻塞,導致上游服務雪崩。不適合高并發場景(可能拖垮整個調用鏈)。

適用場景:適用于處理能力不高,并且資源過載能夠平滑過渡,同時不丟失任務的場景。如:低并發、高可靠性的后臺任務(如日志歸檔)、允許同步執行的批處理系統。

DiscardPolicy:直接丟棄被拒絕的任務,不做任何通知。

public static class DiscardPolicy implements RejectedExecutionHandler {
   //空實現
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

優點:實現簡單,無額外性能開銷。避免異常傳播影響主流程

缺點:數據靜默丟失,可能會掩蓋系統容量問題

適用場景:邊緣業務的監控上報數據,統計類的uv、pv統計任務

DiscardOldestPolicy:丟棄隊列中最舊的任務,然后重試提交當前任務。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    //丟棄隊列中最舊的任務,重試當前任務
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

優點:優先保證新任務執行,避免隊列堆積導致內存溢出。

缺點:可能丟失關鍵舊任務、任務執行順序無法保證。

適用場景:適用于可容忍部分數據丟失,并且實時性要求高于歷史數據的場景,比如:行情推送。

通過上線的介紹,我們可以看到JDK內置策略基本上只使用于簡單處理的場景,在生產實踐中一般推薦我們自定義拒絕策略,進行相關的業務處理。

1. 自定義RejectedExecutionHandler:

/**
 * 帶監控統計的拒絕策略處理器
 */
public class MetricsRejectedExecutionHandler implements RejectedExecutionHandler {
    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MetricsRejectedExecutionHandler.class);


    // 統計被拒絕的任務數量
    private final AtomicLong rejectedCount = new AtomicLong(0);


    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 1. 采集線程池關鍵指標
        int poolSize = executor.getPoolSize();
        int activeThreads = executor.getActiveCount();
        int corePoolSize = executor.getCorePoolSize();
        int maxPoolSize = executor.getMaximumPoolSize();
        int queueSize = executor.getQueue().size();
        long completedTasks = executor.getCompletedTaskCount();


        // 2. 遞增拒絕計數器
        long totalRejected = rejectedCount.incrementAndGet();


        // 3. 輸出警告日志(包含完整指標)
        logger.warn("""
             任務被拒絕執行!線程池狀態:
            |- 活躍線程數/當前線程數: {}/{}
            |- 核心/最大線程數: {}/{}
            |- 隊列大小: {}
            |- 已完成任務數: {}
            |- 歷史拒絕總數: {}
            |- 被拒絕任務: {}
            """, 
            activeThreads, poolSize,
            corePoolSize, maxPoolSize,
            queueSize,
            completedTasks,
            totalRejected,
            r.getClass().getName());


        // 4. 可選:降級處理(如存入數據庫等待重試)
        // fallbackToDatabase(r);


        // 5. 拋出RejectedExecutionException(保持默認行為)
        throw new RejectedExecutionException("Task " + r.toString() + " rejected");
    }
    
    // 獲取累計拒絕次數(用于監控)
    public long getRejectedCount() {
        return rejectedCount.get();
    }
}
  • 記錄日志并告警:所有的異常處理中,最常見簡單的方式無外乎,先記錄個日志,然后有告警系統的進行相關的某書、某信以及短信等的告警信息推送,方便開發人員以及運維人員的及時發現問題并介入處理。
  • 兜底處理機制:一般常見的就是通過異步的方式提交到MQ,然后統一進行兜底處理。
  • 帶超時和重試的拒絕:可以嘗試等待一小段時間,或者重試幾次提交,如果仍然失敗,再執行最終的拒絕邏輯(如告警、持久化或拋異常)。
  • 動態調整策略:根據系統的負載或任務類型,動態的執行兜底策略機制,就如前面寫的源碼示例方式。

2. 根據自身業務場景選擇合適的拒絕策略:

  • 核心業務,不容丟失:如果任務非常重要,不能丟失,可以考慮:

CallerRunsPolicy:調用線程承擔任務執行壓力,是否可支撐;

自定義策略:嘗試持久化到MQ或DB,然后由專門的消費組補償任務處理;

AbortPolicy:如果希望系統快速失敗并由上層進行重試或熔斷。

  • 非核心業務,可容忍部分丟失:

DiscardOldestPolicy:新任務更重要時,如行情推送;

DiscardPolicy:邊緣業務場景,比如一些pv統計等,丟失了無所謂;

及時的進行監控查看,了解任務的丟失情況。

3. 結合線程池參數綜合考慮:

  • 拒絕策略的選擇也與線程池的隊列類型(有界/無界)、隊列容量、maximumPoolSize等參數密切相關。
  • 如果使用無界隊列LinkedBlockingQueue的無參構造,只有機器內存不夠時才會進行拒絕策略,不過這種極端場景已經不是影響線程池本身,內存不夠可能導致Java進程被操作系統直接kill可能。
  • 如果使用有界隊列,需要權衡隊列的大小,核心場景甚至可以動態追蹤阻塞隊列大小,以及動態調整隊列大小來保證核心業務的正常流轉。
  1. 充分測試和監控:無論選擇哪種策略,都必須在壓測環境中充分測試其行為,并在線上環境建立完善的監控體系,監控線程池的各項指標(活躍線程數、隊列長度、任務完成數、任務拒絕數等)。當拒絕發生時,應有相應的告警通知。

拒絕策略小結:

策略的選擇跟我們大多數的系統設計哲學是保持一致的,都是在應對不同的場景中,做出一定的trade off。最好的策略需要根據業務場景、系統容忍度、資源等方面的綜合考量,一個黃金的實踐原則:拒絕事件做好監控告警、根據業務SLA定義策略,是否可丟失,快速失敗等,定期的進行壓力測試,驗證策略的有效性。

池隔離實踐

核心思想:根據任務的資源類型 、優先級和業務特性 ,劃分多個獨立的線程池,避免不同性質的任務相互干擾。

1. 隔離維度:

  • 資源類型:CPU密集型 vs  I/O密集型任務
  • 執行時長:短時任務(毫秒級) vs 長時任務(分鐘級)
  • 實時性要求:高實時性 vs 可延遲(最終一致即可)
  • 業務重要性:支付交易(高優先級) vs 日志清理(低優先級)
  • 是否依賴外部資源:例如,訪問特定數據庫、調用特定第三方API的任務可以歸為一類。

2. 不同業務場景線程池獨立使用:在不同的業務場景下,為自己的特定業務,創建獨立的線程池。

  • 線程命名:通過ThreadFactory為每個線程池及其線程設置有意義的名稱,例如netty-io-compress-pool-%d,excel-export-pool-%d, 主要方便區別不同的業務場景以及問題排查。
  • 參數調優:不同的業務場景設置不同的參數。

corePoolSize, maximumPoolSize:CPU密集型的計算任務可以設置小點減少上下文的切換,I/O密集型可以較大,在io阻塞等待期間,多去處理其他任務。

阻塞隊列blockQueue:選擇合適的隊列類型,以及設置合理的隊列大小。

RejectedExecutionHandler:有內置的四種的策略以及自定義策略選擇,一般建議做好日志、監控以及兜底的處理。

3. 自定義Executor避免線程池共用

// 創建CPU密集型任務線程池(線程數=CPU核心數)
ExecutorService cpuIntensiveExecutor = new ThreadPoolExecutor(
    Runtime.getRuntime().availableProcessors(), // 核心線程數=CPU核心數
    Runtime.getRuntime().availableProcessors(), // 最大線程數=CPU核心數
    30L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(500),
    new ThreadFactoryBuilder()
        .setNameFormat("cpu-pool-%d")
        .setPriority(Thread.MAX_PRIORITY) // 提高優先級
        .build(),
    new ThreadPoolExecutor.AbortPolicy() // 直接拒絕
);


// 使用示例
CompletableFuture.supplyAsync(() -> {
    // 矩陣計算等CPU密集型任務
    double[][] result = matrixMultiply(largeMatrixA, largeMatrixB);
    return result;
}, cpuIntensiveExecutor)
.thenAccept(result -> {
    System.out.println("計算結果維度: " + result.length + "x" + result[0].length);
});

線程池隔離小結:

專池專用的本質是通過物理隔離實現:

  • 資源保障 :關鍵業務獨占線程資源
  • 故障隔離 :避免級聯雪崩
  • 性能優化 :針對任務類型最大化吞吐量

最終呈現的效果是像專業廚房的分區(切配區/炒菜區/面點區)一樣,讓每個線程池專注處理同類任務,提升整體效率和可靠性。

六、總結

線程池是Java并發編程的核心組件,通過復用線程減少資源開銷,提升系統吞吐量。其核心設計包括線程復用機制 、任務隊列和拒絕策略 ,通過ThreadPoolExecutor的參數(核心線程數、最大線程數、隊列容量等)實現靈活的資源控制。線程池的生命周期由RUNNING、SHUTDOWN等狀態管理,確保任務有序執行或終止。

內置線程池(如Executors.newCachedThreadPool)雖便捷,但存在內存溢出或無界隊列堆積的風險,需謹慎選擇。invokeAll的超時失效和submit提交任務的異常消失是常見陷阱需通過正確處理中斷和檢查Future.get()規避。

最佳實踐包括:

  • 異常處理:通過afterExecute來對發生的異常進行兜底處理,任務細粒度的try catch或UncaughtExceptionH捕獲異常處理防止線程崩潰退出;
  • 拒絕策略:根據業務選擇拒絕策略或自定義降級邏輯,生產級應用建議盡量自定義處理;
  • 線程隔離 :按任務類型(CPU/I/O)或優先級劃分線程池,避免資源競爭。

合理使用線程池能顯著提升性能,但需結合業務場景精細調參,確保穩定性和可維護性,希望這篇文章能給大家帶來一些生產實踐上的指導,減少一些因為不熟悉線程池相關原理生產誤用導致的一些問題。

責任編輯:武曉燕 來源: 得物技術
相關推薦

2015-10-10 09:39:42

Java線程池源碼解析

2020-12-08 08:53:53

編程ThreadPoolE線程池

2021-05-26 11:30:24

Java線程池代碼

2022-12-16 08:31:37

調度線程池源碼

2020-12-10 08:24:40

線程池線程方法

2013-06-08 10:11:31

Java線程池架構

2021-09-11 07:32:15

Java線程線程池

2020-07-08 12:05:55

Java線程池策略

2013-05-28 13:57:12

MariaDB

2022-03-22 09:20:57

應用線程池技術

2011-06-22 15:50:45

QT 線程

2020-11-25 11:33:47

Java線程技術

2022-03-21 07:40:08

線程池Executors方式

2016-12-15 09:44:31

框架Caffe源碼

2013-03-05 09:16:33

MySQLInnodb

2022-12-07 08:02:43

Spring流程IOC

2025-09-26 02:00:55

JDKCPU內存

2011-08-19 17:36:42

iPhone操作隊列Java

2023-10-12 08:47:50

線程池參數含義

2018-10-31 15:54:47

Java線程池源碼
點贊
收藏

51CTO技術棧公眾號

国产一区二区免费在线观看| 国产精品第6页| 成人福利小视频| 一本久久青青| 国产午夜久久| 欧美日韩情趣电影| 久久视频在线观看中文字幕| 男的操女的网站| 欧美一级二级视频| eeuss影院一区二区三区| 中文字幕精品一区久久久久| 欧美成人高潮一二区在线看| 国产成人精品av在线观| 日韩理论电影院| 欧美性69xxxx肥| 懂色av一区二区三区在线播放| 国产又粗又猛又爽又黄的视频小说| а√天堂8资源在线| 久久久久97| 国产精品二三区| 国产精品高清在线| 玖玖爱在线观看| 人人草在线视频| 成人性视频免费网站| 美女视频黄免费的亚洲男人天堂| 九色porny91| 国产三级在线免费观看| 国产精品久久久久久模特| 在线视频日韩精品| 荫蒂被男人添免费视频| 青春草在线免费视频| 国产尤物一区二区在线| 欧美成人免费播放| 亚洲AV无码国产成人久久| 制服丝袜专区在线| 91亚洲精品乱码久久久久久蜜桃| 97免费视频在线| 国产精品久久久久久在线观看| 女人黄色免费在线观看| 国产欧美日韩在线视频| 国产精品免费久久久久久| 在线免费观看视频| 成人看片网站| 国产拍欧美日韩视频二区 | 91超碰在线播放| 成人一区二区三区视频在线观看| 成人免费淫片视频软件| 欧美日韩午夜视频| 深夜福利一区二区三区| 亚洲国产精品久久人人爱蜜臀| 国产欧美日韩一区| 国内精品久久久久久久久久久| 免费观看成人av| 久久久国产精品视频| 欧美熟妇精品一区二区| 草草在线视频| 五月天婷婷综合| 日本一区二区三区免费观看| 国产精品久久久久久久免费 | 亚洲人成在线电影| 色乱码一区二区三区在线| 免费黄网站在线播放| 国产精品一区专区| 91精品国产电影| 中文字幕在线观看二区| 欧洲大片精品免费永久看nba| 欧美日韩亚洲高清一区二区| 热久久最新网址| 婷婷在线免费观看| 91精品婷婷色在线观看| 亚洲精品在线观看视频| 91蝌蚪视频在线观看| 成人香蕉视频| 亚洲欧美另类久久久精品2019| 国产精品一区视频网站| 一区二区乱子伦在线播放| 日韩激情一二三区| 欧美黄色三级网站| 免费看91的网站| 日韩成人精品一区| 久久人人爽亚洲精品天堂| 日本少妇xxx| 欧美日韩激情电影| 欧美日韩久久一区| 天堂av手机在线| 欧美动物xxx| 亚洲国产欧美另类丝袜| 99热自拍偷拍| 韩国成人免费视频| 国产精品久久久久一区二区三区 | 欧美精品久久久久a| 成人黄色免费网址| 国产精品x8x8一区二区| 欧美日韩国产在线观看| 一级日本黄色片| 免费一级欧美在线观看视频| 第一福利永久视频精品| 佐佐木明希av| 欧美精品hd| 亚洲一区免费在线观看| 免费观看黄色的网站| 国产精品一二三区视频| 亚洲视频一二三区| 图片区小说区区亚洲五月| 午夜影院免费视频| 国产女同互慰高潮91漫画| 懂色av一区二区三区四区五区| 国产乱码在线| 欧美性感一区二区三区| 中文字幕乱码人妻综合二区三区| xxx在线免费观看| 色呦呦日韩精品| 国产偷人视频免费| 国产精区一区二区| 制服丝袜在线91| 中文字幕一区久久| 亚洲视频资源| 制服丝袜中文字幕亚洲| 久久无码人妻精品一区二区三区| 激情小说一区| 按摩亚洲人久久| 蜜桃视频最新网址| 亚洲免费综合| 日本精品视频在线播放| 老熟妇仑乱一区二区av| 久久视频一区| 国产精品久在线观看| 黄色一级大片在线免费看国产一| 国产不卡视频在线播放| av免费精品一区二区三区| 国产高清第一页| 国产欧美精品一区二区三区四区| 免费av手机在线观看| 国产精品一区三区在线观看| 中日韩美女免费视频网站在线观看| 日韩av在线播| 在线亚洲自拍| 日本久久久a级免费| 可以免费观看的毛片| 亚洲你懂的在线视频| 亚洲精品www.| 亚洲啊v在线免费视频| 日韩在线视频网站| 国产亚洲精品久久久久久打不开| 99精品热视频只有精品10| 五月天中文字幕一区二区| 久久九九全国免费精品观看| 青青国产在线观看| 99pao成人国产永久免费视频| 91精品免费看| 在线免费看黄网站| 亚洲欧美日本韩国| 日韩av卡一卡二| 五月婷婷久久综合| 日本wwwwwww| 精品中文视频| 亚洲精品ady| 久久视频精品在线观看| 国产精品久久777777毛茸茸| 国产亚洲自拍偷拍| 51漫画成人app入口| 亚洲成人网在线观看| 久久久视频6r| 丝袜美腿成人在线| 品久久久久久久久久96高清| 日韩电影av| 一区二区在线视频播放| 青青草在线观看视频| 欧美一级播放| 免费看成人av| 亚洲wwwww| 色天使久久综合网天天| 制服下的诱惑暮生| 自拍偷拍欧美专区| 欧美一级在线播放| 在线观看国产成人| 波波电影院一区二区三区| 污视频在线免费观看一区二区三区 | 国产成人精品123区免费视频| 亚洲丝袜一区在线| 一区二区的视频| 成人精品国产一区二区4080| 日韩中文字幕在线视频观看| 欧美日韩播放| 久久男人的天堂| 91精东传媒理伦片在线观看| 亚洲三级在线观看| 你懂的在线观看网站| 麻豆91精品| 国产精品免费一区二区三区在线观看| 高清电影在线免费观看| 日韩精品中文字幕视频在线| 国产成人久久久久| 欧美孕妇孕交| 久久天堂av综合合色蜜桃网| 99久久久无码国产精品性色戒| 激情五月综合婷婷| 午夜剧场成人观在线视频免费观看 | 久久久久久久欧美精品| 这里只有精品66| 成人性生活av| 久久久99久久精品女同性| 丰满人妻熟女aⅴ一区| 色婷婷一区二区| 九九视频免费观看| 久久色.com| 亚洲综合伊人久久| 亚洲欧美日韩国产综合精品二区 | 91亚洲精华国产精华| eeuss影院在线观看| 在线免费观看一区| 亚洲国产第一区| 亚洲激情综合| 国产乱人伦精品一区二区| 日韩免费福利视频| 欧美俄罗斯乱妇| 成年人在线观看| 欧美精品一区二区久久久| 在线观看中文字幕网站| 污片在线观看一区二区| 成人在线观看小视频| 久久欧美一区二区| 亚洲性图第一页| 久久国产欧美日韩精品| 亚洲一区二区免费视频软件合集| 一根才成人网| 欧美福利在线观看| 国产成人天天5g影院在线观看| 亚洲国产精品美女| 99产精品成人啪免费网站| 亚洲天堂成人网| 国产成人一区二区在线观看| 91亚洲国产成人精品一区二三 | 欧美日韩国产综合草草| 国产成人在线视频观看| 2020日本不卡一区二区视频| 亚洲妇女无套内射精| 日本不卡视频在线观看| 影音先锋欧美在线| 国产精品欧美在线观看| 国产精品偷伦免费视频观看的| 欧洲不卡视频| 国产一区二区免费| 91国内精品视频| 精品视频资源站| 岛国av中文字幕| 欧美日韩在线视频观看| 国内免费精品视频| 久久精品一级爱片| 在线视频一二区| 极品少妇xxxx精品少妇| 国内精品视频一区二区三区| 国产精品免费不| 欧美一区二区三区在线播放| 亚洲精品aaaaa| 欧美日韩成人一区二区三区| 亚洲午夜剧场| 91日本在线视频| 午夜日韩影院| 国产精华一区二区三区| jizz性欧美2| 国产精品日韩在线一区| 巨胸喷奶水www久久久免费动漫| 国产精品草莓在线免费观看| 亚洲a∨精品一区二区三区导航| 国产精品久久久久久久久久99 | 欧美亚洲一区二区在线| 波多野结衣爱爱| 一区二区三区国产豹纹内裤在线| 无码国产69精品久久久久同性| 久久亚洲二区三区| 国产av自拍一区| 中文在线免费一区三区高中清不卡| 无码人妻一区二区三区一| 国产suv一区二区三区88区| 国产女主播在线播放| 91丨九色丨蝌蚪丨老版| 五月婷婷欧美激情| 99久久精品国产一区二区三区 | 国产精品拍天天在线| 久久精品一区二区三区四区五区 | 国产精品一区二区欧美黑人喷潮水| 草草视频在线一区二区| 久久精品国产综合精品| 日韩国产欧美| 免费的一级黄色片| 99久久99久久精品国产片桃花| 久久久99爱| 青青草成人影院| 国产伦精品一区二区三| 亚洲激情播播| 日韩中文在线字幕| 欧美一区=区| 欧美xxxxxbbbbb| wwww国产精品欧美| 91嫩草|国产丨精品入口| 精品久久久久久电影| 一级全黄少妇性色生活片| 亚洲精品一区二区在线观看| 成人在线免费电影| 久久久噜噜噜久久中文字免| 播放一区二区| 国产精品对白一区二区三区| 精品免费一区二区| 欧美亚州在线观看| 欧美+日本+国产+在线a∨观看| 一区二区不卡在线观看| 亚洲精品极品| www.色.com| 欧美激情一区在线| 福利一区二区三区四区| 欧美精品一二三四| 97精品人妻一区二区三区在线| 亚洲福利在线看| 黄色免费网站在线| 久久亚洲一区二区三区四区五区高| 黄视频免费在线看| **亚洲第一综合导航网站| 国产精品3区| 欧美一区二区影视| 日韩视频不卡| 1314成人网| 日本一区二区三区四区在线视频 | 欧美亚洲国产一卡| 日韩中文字幕免费观看| 日韩欧美在线综合网| 性猛交xxxx乱大交孕妇印度| 在线观看日韩视频| 在线男人天堂| 精品国产免费久久久久久尖叫 | 日本亚洲色大成网站www久久| 欧美日韩免费不卡视频一区二区三区 | 日韩影片中文字幕| 精品视频在线观看| 亚洲大黄网站| 美女av免费在线观看| 三级不卡在线观看| 亚洲国产精品成人综合久久久| 伊人夜夜躁av伊人久久| 久久精品视频久久| 都市激情亚洲色图| 好吊视频一二三区| 欧美巨乳美女视频| 国产剧情av在线播放| 91在线看网站| 欧美久久视频| 国产精品免费观看久久| youjizz久久| 日本一区二区不卡在线| 精品久久久久久久人人人人传媒| 天堂在线一二区| 中文字幕亚洲在线| 97欧美成人| 国产成人精品福利一区二区三区| 午夜日韩av| 久久久久久久久久久久国产精品| 亚洲国产精品影院| 天堂成人在线观看| 欧美一区二区三区免费视| 美女久久久久| 亚洲36d大奶网| 1区2区3区国产精品| 日韩精品在线免费视频| 精品亚洲精品福利线在观看| 亚洲午夜天堂| 亚洲欧美久久234| 国产精品久久国产愉拍| 国产又爽又黄无码无遮挡在线观看| 色综合色综合色综合| a中文在线播放| 亚洲精品免费在线视频| 精品久久91| 中文字幕av不卡在线| 亚洲精品欧美综合四区| 色婷婷综合视频| 国产成人精品视| 欧美电影在线观看免费| 久久视频免费在线| 不卡电影一区二区三区| 青青视频在线免费观看| 精品不卡在线视频| 欧美a级在线观看| 先锋影音亚洲资源| 国产99精品视频| 波多野结衣一本一道| 久久99精品久久久久久青青91| 日韩美女毛片| 秋霞无码一区二区| 欧美激情中文字幕| www.色呦呦| 日本精品va在线观看| 精品嫩草影院| 亚洲黄色av网址| 久久一夜天堂av一区二区三区| 中文无码av一区二区三区| 亚洲国产精品小视频| 国产亚洲精彩久久| 99re6这里有精品热视频| 久久夜色精品一区| 一级全黄裸体免费视频|