90%的人(包括我)都以為會用ThreadPoolExecutor了,看了這十張圖再說吧!
在阿里巴巴手冊中有一條建議:
【強(qiáng)制】線程池不允許使用 Executors 去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風(fēng)險。
如果經(jīng)常基于Executors提供的工廠方法創(chuàng)建線程池,很容易忽略線程池內(nèi)部的實現(xiàn)。特別是拒絕策略,因使用Executors創(chuàng)建線程池時不會傳入這個參數(shù),直接采用默認(rèn)值,所以常常被忽略。
下面我們就來了解一下線程池相關(guān)的實現(xiàn)原理、API以及實例。
線程池的作用
在實踐應(yīng)用中創(chuàng)建線程池主要是為了:
- 減少資源開銷:減少每次創(chuàng)建、銷毀線程的開銷;
- 提高響應(yīng)速度:請求到來時,線程已創(chuàng)建好,可直接執(zhí)行,提高響應(yīng)速度;
- 提高線程的可管理性:線程是稀缺資源,需根據(jù)情況加以限制,確保系統(tǒng)穩(wěn)定運行;
ThreadPoolExecutor
ThreadPoolExecutor可以實現(xiàn)線程池的創(chuàng)建。ThreadPoolExecutor相關(guān)類圖如下:
類圖
從類圖可以看出,ThreadPoolExecutor最終實現(xiàn)了Executor接口,是線程池創(chuàng)建的真正實現(xiàn)者。
Executor兩級調(diào)度模型
Executor模型
在HotSpot虛擬機(jī)中,Java中的線程將會被一一映射為操作系統(tǒng)的線程。在Java虛擬機(jī)層面,用戶將多個任務(wù)提交給Executor框架,Executor負(fù)責(zé)分配線程執(zhí)行它們;在操作系統(tǒng)層面,操作系統(tǒng)再將這些線程分配給處理器執(zhí)行。
ThreadPoolExecutor的三個角色
任務(wù)
ThreadPoolExecutor接受兩種類型的任務(wù):Callable和Runnable。
- Callable:該類任務(wù)有返回結(jié)果,可以拋出異常。通過submit方法提交,返回Future對象。通過get獲取執(zhí)行結(jié)果。
- Runnable:該類任務(wù)只執(zhí)行,無法獲取返回結(jié)果,在執(zhí)行過程中無法拋異常。通過execute或submit方法提交。
任務(wù)執(zhí)行器
Executor框架最核心的接口是Executor,它表示任務(wù)的執(zhí)行器。
通過上面類圖可以看出,Executor的子接口為ExecutorService。再往底層有兩大實現(xiàn)類:ThreadPoolExecutor和ScheduledThreadPoolExecutor(集成自ThreadPoolExecutor)。
執(zhí)行結(jié)果
Future接口表示異步的執(zhí)行結(jié)果,它的實現(xiàn)類為FutureTask。
三個角色之間的處理邏輯圖如下:
FutureTask邏輯
線程池處理流程
線程池處理流程
一個線程從被提交(submit)到執(zhí)行共經(jīng)歷以下流程:
- 線程池判斷核心線程池里是的線程是否都在執(zhí)行任務(wù),如果不是,則創(chuàng)建一個新的工作線程來執(zhí)行任務(wù)。如果核心線程池里的線程都在執(zhí)行任務(wù),則進(jìn)入下一個流程;
- 線程池判斷工作隊列是否已滿。如果工作隊列沒有滿,則將新提交的任務(wù)儲存在這個工作隊列里。如果工作隊列滿了,則進(jìn)入下一個流程;
- 線程池判斷其內(nèi)部線程是否都處于工作狀態(tài)。如果沒有,則創(chuàng)建一個新的工作線程來執(zhí)行任務(wù)。如果已滿了,則交給飽和策略來處理這個任務(wù)。
線程池在執(zhí)行execute方法時,主要有以下四種情況:
線程池執(zhí)行excute方法
- 如果當(dāng)前運行的線程少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)(需要獲得全局鎖);
- 如果運行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue;
- 如果無法將任務(wù)加入BlockingQueue(隊列已滿),則創(chuàng)建新的線程來處理任務(wù)(需要獲得全局鎖);
- 如果創(chuàng)建新線程將使當(dāng)前運行的線程超出maxiumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。
線程池采取上述的流程進(jìn)行設(shè)計是為了減少獲取全局鎖的次數(shù)。在線程池完成預(yù)熱(當(dāng)前運行的線程數(shù)大于或等于corePoolSize)之后,幾乎所有的excute方法調(diào)用都執(zhí)行步驟二。
線程的狀態(tài)流轉(zhuǎn)
順便再回顧一下線程的狀態(tài)的轉(zhuǎn)換,在JDK中Thread類中提供了一個枚舉類,例舉了線程的各個狀態(tài):
- public enum State {
- NEW,
- RUNNABLE,
- BLOCKED,
- WAITING,
- TIMED_WAITING,
- TERMINATED;
- }
一共定義了6個枚舉值,其實代表的是5種類型的線程狀態(tài):
- NEW:新建;
- RUNNABLE:運行狀態(tài);
- BLOCKED:阻塞狀態(tài);
- WAITING:等待狀態(tài),WAITING和TIMED_WAITING可以歸為一類,都屬于等待狀態(tài),只是后者可以設(shè)置等待時間,即等待多久;
- TERMINATED:終止?fàn)顟B(tài);
線程關(guān)系轉(zhuǎn)換圖:
線程狀態(tài)轉(zhuǎn)換
當(dāng)new Thread()說明這個線程處于NEW(新建狀態(tài));調(diào)用Thread.start()方法表示這個線程處于RUNNABLE(運行狀態(tài));但是RUNNABLE狀態(tài)中又包含了兩種狀態(tài):READY(就緒狀態(tài))和RUNNING(運行中)。調(diào)用start()方法,線程不一定獲得了CPU時間片,這時就處于READY,等待CPU時間片,當(dāng)獲得了CPU時間片,就處于RUNNING狀態(tài)。
在運行中調(diào)用synchronized同步的代碼塊,沒有獲取到鎖,這時會處于BLOCKED(阻塞狀態(tài)),當(dāng)重新獲取到鎖時,又會變?yōu)镽UNNING狀態(tài)。在代碼執(zhí)行的過程中可能會碰到Object.wait()等一些等待方法,線程的狀態(tài)又會轉(zhuǎn)變?yōu)閃AITING(等待狀態(tài)),等待被喚醒,當(dāng)調(diào)用了Object.notifyAll()喚醒了之后線程執(zhí)行完就會變?yōu)門ERMINATED(終止?fàn)顟B(tài))。
線程池的狀態(tài)
線程池中狀態(tài)通過2個二進(jìn)制位(bit)來表示線程池的5個狀態(tài):RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED:
- RUNNING:線程池正常工作的狀態(tài),在 RUNNING 狀態(tài)下線程池接受新的任務(wù)并處理任務(wù)隊列中的任務(wù);
- SHUTDOWN:調(diào)用shutdown()方法會進(jìn)入 SHUTDOWN 狀態(tài)。在 SHUTDOWN 狀態(tài)下,線程池不接受新的任務(wù),但是會繼續(xù)執(zhí)行任務(wù)隊列中已有的任務(wù);
- STOP:調(diào)用shutdownNow()會進(jìn)入 STOP 狀態(tài)。在 STOP 狀態(tài)下線程池既不接受新的任務(wù),也不處理已經(jīng)在隊列中的任務(wù)。對于還在執(zhí)行任務(wù)的工作線程,線程池會發(fā)起中斷請求來中斷正在執(zhí)行的任務(wù),同時會清空任務(wù)隊列中還未被執(zhí)行的任務(wù);
- TIDYING:當(dāng)線程池中的所有執(zhí)行任務(wù)的工作線程都已經(jīng)終止,并且工作線程集合為空的時候,進(jìn)入 TIDYING 狀態(tài);
- TERMINATED:當(dāng)線程池執(zhí)行完terminated()鉤子方法以后,線程池進(jìn)入終態(tài) TERMINATED;
ThreadPoolExecutor API
ThreadPoolExecutor創(chuàng)建線程池API:
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
參數(shù)解釋:
- corePoolSize :線程池常駐核心線程數(shù)。創(chuàng)建線程池時,線程池中并沒有任何線程,當(dāng)有任務(wù)來時才去創(chuàng)建線程,執(zhí)行任務(wù)。提交一個任務(wù),創(chuàng)建一個線程,直到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小,則不再創(chuàng)建。當(dāng)創(chuàng)建的線程數(shù)等于corePoolSize 時,會加入設(shè)置的阻塞隊列。
- maximumPoolSize :線程池允許創(chuàng)建的最大線程數(shù)。當(dāng)隊列滿時,會創(chuàng)建線程執(zhí)行任務(wù)直到線程池中的數(shù)量等于maximumPoolSize。
- keepAliveTime :當(dāng)線程數(shù)大于核心時,此為終止前多余的空閑線程等待新任務(wù)的最長時間。
- unit :keepAliveTime的時間單位,可選項:天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微妙(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微妙)。
- workQueue :用來儲存等待執(zhí)行任務(wù)的隊列。
- threadFactory :線程工廠,用來生產(chǎn)一組相同任務(wù)的線程。主要用于設(shè)置生成的線程名詞前綴、是否為守護(hù)線程以及優(yōu)先級等。設(shè)置有意義的名稱前綴有利于在進(jìn)行虛擬機(jī)分析時,知道線程是由哪個線程工廠創(chuàng)建的。
- handler :執(zhí)行拒絕策略對象。當(dāng)達(dá)到任務(wù)緩存上限時(即超過workQueue參數(shù)能存儲的任務(wù)數(shù)),執(zhí)行拒接策略。也就是當(dāng)任務(wù)處理不過來的時候,線程池開始執(zhí)行拒絕策略。JDK 1.5提供了四種飽和策略:
- AbortPolicy:默認(rèn),直接拋異常;
- 只用調(diào)用者所在的線程執(zhí)行任務(wù),重試添加當(dāng)前的任務(wù),它會自動重復(fù)調(diào)用execute()方法;
- DiscardOldestPolicy:丟棄任務(wù)隊列中最久的任務(wù);
- DiscardPolicy:丟棄當(dāng)前任務(wù);
適當(dāng)?shù)淖枞犃?/strong>
當(dāng)創(chuàng)建的線程數(shù)等于corePoolSize,會將任務(wù)加入阻塞隊列(BlockingQueue),維護(hù)著等待執(zhí)行的Runnable對象。
阻塞隊列通常有如下類型:
- ArrayBlockingQueue :一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列。可以限定隊列的長度,接收到任務(wù)時,如果沒有達(dá)到corePoolSize的值,則新建線程(核心線程)執(zhí)行任務(wù),如果達(dá)到了,則入隊等候,如果隊列已滿,則新建線程(非核心線程)執(zhí)行任務(wù),又如果總線程數(shù)到了maximumPoolSize,并且隊列也滿了,則發(fā)生錯誤。
- LinkedBlockingQueue :一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列。這個隊列在接收到任務(wù)時,如果當(dāng)前線程數(shù)小于核心線程數(shù),則新建線程(核心線程)處理任務(wù);如果當(dāng)前線程數(shù)等于核心線程數(shù),則進(jìn)入隊列等待。由于這個隊列沒有最大值限制,即所有超過核心線程數(shù)的任務(wù)都將被添加到隊列中,這也就導(dǎo)致了maximumPoolSize的設(shè)定失效,因為總線程數(shù)永遠(yuǎn)不會超過corePoolSize。
- PriorityBlockingQueue :一個支持優(yōu)先級排序的無界阻塞隊列。
- DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列。隊列內(nèi)元素必須實現(xiàn)Delayed接口,這就意味著傳入的任務(wù)必須先實現(xiàn)Delayed接口。這個隊列在接收到任務(wù)時,首先先入隊,只有達(dá)到了指定的延時時間,才會執(zhí)行任務(wù)。
- SynchronousQueue:一個不存儲元素的阻塞隊列。這個隊列在接收到任務(wù)時,會直接提交給線程處理,而不保留它,如果所有線程都在工作就新建一個線程來處理這個任務(wù)。所以為了保證不出現(xiàn)【線程數(shù)達(dá)到了maximumPoolSize而不能新建線程】的錯誤,使用這個類型隊列時,maximumPoolSize一般指定成Integer.MAX_VALUE,即無限大。
- LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列。
- LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。
明確的拒絕策略
當(dāng)任務(wù)處理不過來時,線程池開始執(zhí)行拒絕策略。
支持的拒絕策略:
- ThreadPoolExecutor.AbortPolicy: 丟棄任務(wù)并拋出RejectedExecutionException異常。(默認(rèn))
- ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)。(重復(fù)此過程)
- ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)。
線程池關(guān)閉
- shutdown:將線程池狀態(tài)置為SHUTDOWN,并不會立即停止。停止接收外部submit的任務(wù),內(nèi)部正在跑的任務(wù)和隊列里等待的任務(wù),會執(zhí)行完后,才真正停止。
- shutdownNow:將線程池狀態(tài)置為STOP。企圖立即停止,事實上不一定,跟shutdown()一樣,先停止接收外部提交的任務(wù),忽略隊列里等待的任務(wù),嘗試將正在跑的任務(wù)interrupt中斷(如果線程未處于sleep、wait、condition、定時鎖狀態(tài),interrupt無法中斷當(dāng)前線程),返回未執(zhí)行的任務(wù)列表。
- awaitTermination(long timeOut, TimeUnit unit)當(dāng)前線程阻塞,直到等所有已提交的任務(wù)(包括正在跑的和隊列中等待的)執(zhí)行完或者等超時時間到或者線程被中斷,拋出InterruptedException,然后返回true(shutdown請求后所有任務(wù)執(zhí)行完畢)或false(已超時)。
Executors
Executors是一個幫助類,提供了創(chuàng)建幾種預(yù)配置線程池實例的方法:newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool等。
如果查看源碼就會發(fā)現(xiàn),Executors本質(zhì)上就是實現(xiàn)了幾類默認(rèn)的ThreadPoolExecutor。而阿里巴巴開發(fā)手冊,不建議采用Executors默認(rèn)的,讓使用者直接通過ThreadPoolExecutor來創(chuàng)建。
Executors.newSingleThreadExecutor()
創(chuàng)建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當(dāng)于單線程串行執(zhí)行所有任務(wù)。如果這個唯一的線程因為異常結(jié)束,那么會有一個新的線程來替代它。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行。
- new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
該類型線程池的結(jié)構(gòu)圖:
newSingleThreadExecutor
該線程池的特點:
- 只會創(chuàng)建一條工作線程處理任務(wù);
- 采用的阻塞隊列為LinkedBlockingQueue;
Executors.newFixedThreadPool()
創(chuàng)建固定大小的線程池。每次提交一個任務(wù)就創(chuàng)建一個線程,直到線程達(dá)到線程池的最大大小。線程池的大小一旦達(dá)到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結(jié)束,那么線程池會補(bǔ)充一個新線程。
- new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
該類型線程池的結(jié)構(gòu)圖:
newFixedThreadPool
該線程池的特點:
- 固定大小;
- corePoolSize和maximunPoolSize都為用戶設(shè)定的線程數(shù)量nThreads;
- keepAliveTime為0,意味著一旦有多余的空閑線程,就會被立即停止掉;但這里keepAliveTime無效;
- 阻塞隊列采用了LinkedBlockingQueue,一個無界隊列;
- 由于阻塞隊列是一個無界隊列,因此永遠(yuǎn)不可能拒絕任務(wù);
- 由于采用了無界隊列,實際線程數(shù)量將永遠(yuǎn)維持在nThreads,因此maximumPoolSize和keepAliveTime將無效。
Executors.newCachedThreadPool()
創(chuàng)建一個可緩存的線程池。如果線程池的大小超過了處理任務(wù)所需要的線程,那么就會回收部分空閑(60秒不執(zhí)行任務(wù))的線程,當(dāng)任務(wù)數(shù)增加時,此線程池又可以智能的添加新線程來處理任務(wù)。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說JVM)能夠創(chuàng)建的最大線程大小。
- new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
該類型線程池的結(jié)構(gòu)圖:
newCachedThreadPool
- 該線程池的特點:
- 可以無限擴(kuò)大;
- 比較適合處理執(zhí)行時間比較小的任務(wù);
- corePoolSize為0,maximumPoolSize為無限大,意味著線程數(shù)量可以無限大;
- keepAliveTime為60S,意味著線程空閑時間超過60s就會被殺死;
- 采用SynchronousQueue裝等待的任務(wù),這個阻塞隊列沒有存儲空間,這意味著只要有請求到來,就必須要找到一條工作線程處理它,如果當(dāng)前沒有空閑的線程,那么就會再創(chuàng)建一條新的線程。
Executors.newScheduledThreadPool()
創(chuàng)建一個定長線程池,支持定時及周期性任務(wù)執(zhí)行。
- new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue());
該線程池類圖:
newScheduledThreadPool
該線程池的特點:
- 接收SchduledFutureTask類型的任務(wù),有兩種提交任務(wù)的方式:scheduledAtFixedRate和scheduledWithFixedDelay。SchduledFutureTask接收的參數(shù):
- time:任務(wù)開始的時間
- sequenceNumber:任務(wù)的序號
- period:任務(wù)執(zhí)行的時間間隔
- 采用DelayQueue存儲等待的任務(wù);
- DelayQueue內(nèi)部封裝了一個PriorityQueue,它會根據(jù)time的先后時間排序,若time相同則根據(jù)sequenceNumber排序;
- DelayQueue也是一個無界隊列;
- 工作線程執(zhí)行時,工作線程會從DelayQueue取已經(jīng)到期的任務(wù)去執(zhí)行;執(zhí)行結(jié)束后重新設(shè)置任務(wù)的到期時間,再次放回DelayQueue;
Executors.newWorkStealingPool()
JDK8引入,創(chuàng)建持有足夠線程的線程池支持給定的并行度,并通過使用多個隊列減少競爭。
- public static ExecutorService newWorkStealingPool() {
- return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
- ForkJoinPool.defaultForkJoinWorkerThreadFactory,
- null, true);
- }
Executors方法的弊端
1)newFixedThreadPool 和 newSingleThreadExecutor:允許的請求隊列長度為Integer.MAX_VALUE,可能會堆積大量的請求,從而導(dǎo)致 OOM。2)newCachedThreadPool 和 newScheduledThreadPool:允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE,可能會創(chuàng)建大量的線程,從而導(dǎo)致 OOM。
合理配置線程池大小
合理配置線程池,需要先分析任務(wù)特性,可以從以下角度來進(jìn)行分析:
- 任務(wù)的性質(zhì):CPU密集型任務(wù),IO密集型任務(wù)和混合型任務(wù)。
- 任務(wù)的優(yōu)先級:高,中和低。
- 任務(wù)的執(zhí)行時間:長,中和短。
- 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。
另外,還需要查看系統(tǒng)的內(nèi)核數(shù):
- Runtime.getRuntime().availableProcessors());
根據(jù)任務(wù)所需要的CPU和IO資源可以分為:
- CPU密集型任務(wù): 主要是執(zhí)行計算任務(wù),響應(yīng)時間很快,CPU一直在運行。一般公式:線程數(shù) = CPU核數(shù) + 1。只有在真正的多核CPU上才能得到加速,優(yōu)點是不存在線程切換開銷,提高了CPU的利用率并減少了線程切換的效能損耗。
- IO密集型任務(wù):主要是進(jìn)行IO操作,CPU并不是一直在執(zhí)行任務(wù),IO操作(CPU空閑狀態(tài))的時間較長,應(yīng)配置盡可能多的線程,其中的線程在IO操作時,其他線程可以繼續(xù)利用CPU,從而提高CPU的利用率。一般公式:線程數(shù) = CPU核數(shù) * 2。
使用實例
任務(wù)實現(xiàn)類:
- /**
- * 任務(wù)實現(xiàn)線程
- * @author sec
- * @version 1.0
- * @date 2021/10/30
- **/
- public class MyThread implements Runnable{
- private final Integer number;
- public MyThread(int number){
- this.number = number;
- }
- public Integer getNumber() {
- return number;
- }
- @Override
- public void run() {
- try {
- // 業(yè)務(wù)處理
- TimeUnit.SECONDS.sleep(1);
- System.out.println("Hello! ThreadPoolExecutor - " + getNumber());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
自定義阻塞提交的ThreadLocalExcutor:
- /**
- * 自定義阻塞提交的ThreadPoolExecutor
- * @author sec
- * @version 1.0
- * @date 2021/10/30
- **/
- public class CustomBlockThreadPoolExecutor {
- private ThreadPoolExecutor pool = null;
- /**
- * 線程池初始化方法
- */
- public void init() {
- // 核心線程池大小
- int poolSize = 2;
- // 最大線程池大小
- int maxPoolSize = 4;
- // 線程池中超過corePoolSize數(shù)目的空閑線程最大存活時間:30+單位TimeUnit
- long keepAliveTime = 30L;
- // ArrayBlockingQueue<Runnable> 阻塞隊列容量30
- int arrayBlockingQueueSize = 30;
- pool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime,
- TimeUnit.SECONDS, new ArrayBlockingQueue<>(arrayBlockingQueueSize), new CustomThreadFactory(),
- new CustomRejectedExecutionHandler());
- }
- /**
- * 關(guān)閉線程池方法
- */
- public void destroy() {
- if (pool != null) {
- pool.shutdownNow();
- }
- }
- public ExecutorService getCustomThreadPoolExecutor() {
- return this.pool;
- }
- /**
- * 自定義線程工廠類,
- * 生成的線程名詞前綴、是否為守護(hù)線程以及優(yōu)先級等
- */
- private static class CustomThreadFactory implements ThreadFactory {
- private final AtomicInteger count = new AtomicInteger(0);
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- String threadName = CustomBlockThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
- t.setName(threadName);
- return t;
- }
- }
- /**
- * 自定義拒絕策略對象
- */
- private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- // 核心改造點,將blockingqueue的offer改成put阻塞提交
- try {
- executor.getQueue().put(r);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 當(dāng)提交任務(wù)被拒絕時,進(jìn)入拒絕機(jī)制,實現(xiàn)拒絕方法,把任務(wù)重新用阻塞提交方法put提交,實現(xiàn)阻塞提交任務(wù)功能,防止隊列過大,OOM
- */
- public static void main(String[] args) {
- CustomBlockThreadPoolExecutor executor = new CustomBlockThreadPoolExecutor();
- // 初始化
- executor.init();
- ExecutorService pool = executor.getCustomThreadPoolExecutor();
- for (int i = 1; i < 51; i++) {
- MyThread myThread = new MyThread(i);
- System.out.println("提交第" + i + "個任務(wù)");
- pool.execute(myThread);
- }
- pool.shutdown();
- try {
- // 阻塞,超時時間到或者線程被中斷
- if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
- // 立即關(guān)閉
- executor.destroy();
- }
- } catch (InterruptedException e) {
- executor.destroy();
- }
- }
- }
小結(jié)
看似簡單的線程池創(chuàng)建,其中卻蘊(yùn)含著各類知識,融合貫通,根據(jù)具體場景采用具體的參數(shù)進(jìn)行設(shè)置才能夠達(dá)到最優(yōu)的效果。
總結(jié)一下就是:
- 用ThreadPoolExecutor自定義線程池,要看線程的用途。如果任務(wù)量不大,可以用無界隊列,如果任務(wù)量非常大,要用有界隊列,防止OOM;
- 如果任務(wù)量很大,且要求每個任務(wù)都處理成功,要對提交的任務(wù)進(jìn)行阻塞提交,重寫拒絕機(jī)制,改為阻塞提交。保證不拋棄一個任務(wù);
- 最大線程數(shù)一般設(shè)為2N+1最好,N是CPU核數(shù);
- 核心線程數(shù),要根據(jù)任務(wù)是CPU密集型,還是IO密集型。同時,如果任務(wù)是一天跑一次,設(shè)置為0合適,因為跑完就停掉了;
- 如果要獲取任務(wù)執(zhí)行結(jié)果,用CompletionService,但是注意,獲取任務(wù)的結(jié)果要重新開一個線程獲取,如果在主線程獲取,就要等任務(wù)都提交后才獲取,就會阻塞大量任務(wù)結(jié)果,隊列過大OOM,所以最好異步開個線程獲取結(jié)果。
參考文章:
[1]https://www.jianshu.com/p/94852bd1a283
[2]https://blog.csdn.net/jek123456/article/details/90601351
[3]https://blog.csdn.net/z_s_z2016/article/details/81674893
[4]https://zhuanlan.zhihu.com/p/33264000
[5]https://www.cnblogs.com/semi-sub/p/13021786.html






































