精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

并發編程之ForkJoin框架原理分析

開發 前端
Java7 又提供了的一個用于并行執行的任務的框架 Fork/Join ,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。在介紹Fork/Join 框架之前我們先了解幾個概念:CPU密集型、IO密集型,再逐步深入去認識Fork/Join 框架。

[[358064]]

 前言

前面我們介紹了線程池框架(ExecutorService)的兩個具體實現:

  • ThreadPoolExecutor 默認線程池
  • ScheduledThreadPoolExecutor定時線程池

線程池為線程生命周期的開銷和資源不足問題提供了解決方案。通過對多個任務重用線程,線程創建的開銷被分攤到多個任務上。Java7 又提供了的一個用于并行執行的任務的框架 Fork/Join ,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。在介紹Fork/Join 框架之前我們先了解幾個概念:CPU密集型、IO密集型,再逐步深入去認識Fork/Join 框架。

任務性質類型

CPU密集型(CPU bound)

CPU密集型也叫計算密集型,指的是系統的硬盤、內存性能相對于CPU要好很好多,此時,系統運作大部分的狀況是 CPU Loading 100%,CPU要讀/寫 I/O(硬盤/內存),I/O在很短的時間就可以完成,而CPU還有許多運算要處理,CPU Loading很高。

在多重程序系統中,大部分時間用來做計算、邏輯判斷等CPU動作的程序稱之 CPU bound。例如一個計算圓周率至小數點一千位以下的程序,在執行的過程當中絕大部分時間在用三角函數和開根號的計算,便是屬于CPU bound的程序。

CPU bound的程序一般而言CPU占用率相當高。這可能是因為任務本身不太需要訪問I/O設備,也可能是因為程序是多線程實現因此屏蔽了等待I/O的時間。

  • 線程數一般設置為:線程數 = CPU核數 + 1(現代CPU支持超線程)

IO密集型(I/O bound)

I/O密集型指的是系統的CPU性能相對硬盤、內存要好很多,此時,系統運作,大部分的狀況是 CPU 在等 I/O(硬盤/內存)的讀/寫操作,此時 CPU Loading 并不高。

I/O bound的程序一般在達到性能極限時,CPU占用率仍然較低。這可能是因為任務本身需要大量I/O操作,而 pipeline 做的不是很好,沒有充分利用處理器能力。

  • 線程數一般設置為:線程數 = ((線程等待時間 + 線程CPU時間) / 線程CPU時間) * CPU數目

CPU密集型 VS I/O密集型

我們可以把任務分為計算密集型和I/O密集型。

計算密集型任務的特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也可以用多任務完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數量應當等于CPU的核心數。

計算密集型任務由于主要消耗CPU資源,因此,代碼運行效率至關重要。Python這樣的腳本語言運行效率很低,完全不適合計算密集型任務。對于計算密集型任務,最好用C語言編寫。

第二種任務的類型是I/O密集型,涉及到網絡、磁盤I/O的任務都是I/O密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待I/O操作完成(因為I/O的速度遠遠低于CPU和內存的速度)。對于I/O密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是I/O密集型任務,比如Web應用。

I/O密集型任務執行期間,99%的時間都花在I/O上,花在CPU上的時間很少,因此,用運行速度極快的C語言替換用Python這樣運行速度極低的腳本語言,完全無法提升運行效率。對于I/O密集型任務,最合適的語言就是開發效率最高(代碼量最少)的語言,腳本語言是首選,C語言最差。

什么是 Fork/Join 框架?

Fork/Join 框架是 Java7 提供了的一個用于并行執行的任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。

Fork 就是把一個大任務切分為若干個子任務并行的執行,Join 就是合并這些子任務的執行結果,最后得到這個大任務的結果。比如計算 1+2+......+10000,可以分割成10個子任務,每個子任務對1000個數進行求和,最終匯總這10個子任務的結果。如下圖所示:


Fork/Join的特性:

  1. ForkJoinPool 不是為了替代 ExecutorService,而是它的補充,在某些應用場景下性能比 ExecutorService 更好。(見 Java Tip: When to use ForkJoinPool vs ExecutorService )
  2. ForkJoinPool 主要用于實現“分而治之”的算法,特別是分治之后遞歸調用的函數,例如 quick sort 等;
  3. ForkJoinPool 最適合的是計算密集型的任務,如果存在 I/O、線程間同步、sleep() 等會造成線程長時間阻塞的情況時,最好配合 MangedBlocker。

關于“分而治之”的算法,可以查看《分治、回溯的實現和特性》

工作竊取算法

工作竊取(work-stealing)算法 是指某個線程從其他隊列里竊取任務來執行。

我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭,于是把這些子任務分別放到不同的隊列里,并為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應,比如A線程負責處理A隊列里的任務。

但是有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。


工作竊取算法的優點是充分利用線程進行并行計算,并減少了線程間的競爭,其缺點是在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時。并且消耗了更多的系統資源,比如創建多個線程和多個雙端隊列。


  1. ForkJoinPool 的每個工作線程都維護著一個工作隊列(WorkQueue),這是一個雙端隊列(Deque),里面存放的對象是任務(ForkJoinTask)。
  2. 每個工作線程在運行中產生新的任務(通常是因為調用了 fork())時,會放入工作隊列的隊尾,并且工作線程在處理自己的工作隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。
  3. 每個工作線程在處理自己的工作隊列同時,會嘗試竊取一個任務(或是來自于剛剛提交到 pool 的任務,或是來自于其他工作線程的工作隊列),竊取的任務位于其他線程的工作隊列的隊首,也就是說工作線程在竊取其他工作線程的任務時,使用的是 FIFO 方式。
  4. 在遇到 join() 時,如果需要 join 的任務尚未完成,則會先處理其他任務,并等待其完成。
  5. 在既沒有自己的任務,也沒有可以竊取的任務時,進入休眠。

Fork/Join的使用

使用場景示例

定義fork/join任務,如下示例,隨機生成2000w條數據在數組當中,然后求和_

  1. package com.niuh.forkjoin.recursivetask; 
  2.  
  3. import java.util.concurrent.RecursiveTask; 
  4.  
  5. /** 
  6.  * RecursiveTask 并行計算,同步有返回值 
  7.  * ForkJoin框架處理的任務基本都能使用遞歸處理,比如求斐波那契數列等,但遞歸算法的缺陷是: 
  8.  * 一只會只用單線程處理, 
  9.  * 二是遞歸次數過多時會導致堆棧溢出; 
  10.  * ForkJoin解決了這兩個問題,使用多線程并發處理,充分利用計算資源來提高效率,同時避免堆棧溢出發生。 
  11.  * 當然像求斐波那契數列這種小問題直接使用線性算法搞定可能更簡單,實際應用中完全沒必要使用ForkJoin框架, 
  12.  * 所以ForkJoin是核彈,是用來對付大家伙的,比如超大數組排序。 
  13.  * 最佳應用場景:多核、多內存、可以分割計算再合并的計算密集型任務 
  14.  */ 
  15. class LongSum extends RecursiveTask<Long> { 
  16.     //任務拆分的最小閥值 
  17.     static final int SEQUENTIAL_THRESHOLD = 1000; 
  18.     static final long NPS = (1000L * 1000 * 1000); 
  19.     static final boolean extraWork = true; // change to add more than just a sum 
  20.  
  21.  
  22.     int low; 
  23.     int high; 
  24.     int[] array; 
  25.  
  26.     LongSum(int[] arr, int lo, int hi) { 
  27.         array = arr; 
  28.         low = lo; 
  29.         high = hi; 
  30.     } 
  31.  
  32.     /** 
  33.      * fork()方法:將任務放入隊列并安排異步執行,一個任務應該只調用一次fork()函數,除非已經執行完畢并重新初始化。 
  34.      * tryUnfork()方法:嘗試把任務從隊列中拿出單獨處理,但不一定成功。 
  35.      * join()方法:等待計算完成并返回計算結果。 
  36.      * isCompletedAbnormally()方法:用于判斷任務計算是否發生異常。 
  37.      */ 
  38.     protected Long compute() { 
  39.  
  40.         if (high - low <= SEQUENTIAL_THRESHOLD) { 
  41.             long sum = 0; 
  42.             for (int i = low; i < high; ++i) { 
  43.                 sum += array[i]; 
  44.             } 
  45.             return sum
  46.  
  47.         } else { 
  48.             int mid = low + (high - low) / 2; 
  49.             LongSum left = new LongSum(array, low, mid); 
  50.             LongSum right = new LongSum(array, mid, high); 
  51.             left.fork(); 
  52.             right.fork(); 
  53.             long rightAns = right.join(); 
  54.             long leftAns = left.join(); 
  55.             return leftAns + rightAns; 
  56.         } 
  57.     } 

 執行fork/join任務

  1. package com.niuh.forkjoin.recursivetask; 
  2.  
  3. import com.niuh.forkjoin.utils.Utils; 
  4.  
  5. import java.util.concurrent.ForkJoinPool; 
  6. import java.util.concurrent.ForkJoinTask; 
  7.  
  8. public class LongSumMain { 
  9.     //獲取邏輯處理器數量 
  10.     static final int NCPU = Runtime.getRuntime().availableProcessors(); 
  11.     /** 
  12.      * for time conversion 
  13.      */ 
  14.     static final long NPS = (1000L * 1000 * 1000); 
  15.  
  16.     static long calcSum; 
  17.  
  18.     static final boolean reportSteals = true
  19.  
  20.     public static void main(String[] args) throws Exception { 
  21.         int[] array = Utils.buildRandomIntArray(2000000); 
  22.         System.out.println("cpu-num:" + NCPU); 
  23.         //單線程下計算數組數據總和 
  24.         long start = System.currentTimeMillis(); 
  25.         calcSum = seqSum(array); 
  26.         System.out.println("seq sum=" + calcSum); 
  27.         System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start)); 
  28.  
  29.         start = System.currentTimeMillis(); 
  30.         //采用fork/join方式將數組求和任務進行拆分執行,最后合并結果 
  31.         LongSum ls = new LongSum(array, 0, array.length); 
  32.         ForkJoinPool fjp = new ForkJoinPool(NCPU); //使用的線程數 
  33.         ForkJoinTask<Long> task = fjp.submit(ls); 
  34.  
  35.         System.out.println("forkjoin sum=" + task.get()); 
  36.         System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start)); 
  37.         if (task.isCompletedAbnormally()) { 
  38.             System.out.println(task.getException()); 
  39.         } 
  40.  
  41.         fjp.shutdown(); 
  42.  
  43.     } 
  44.  
  45.  
  46.     static long seqSum(int[] array) { 
  47.         long sum = 0; 
  48.         for (int i = 0; i < array.length; ++i) { 
  49.             sum += array[i]; 
  50.         } 
  51.         return sum
  52.     } 

 Fork/Join框架原理

Fork/Join 其實就是指由ForkJoinPool作為線程池、ForkJoinTask(通常實現其三個抽象子類)為任務、ForkJoinWorkerThread作為執行任務的具體線程實體這三者構成的任務調度機制。


ForkJoinWorkerThread

ForkJoinWorkerThread 直接繼承了Thread,但是僅僅是為了增加一些額外的功能,并沒有對線程的調度執行做任何更改。


ForkJoinWorkerThread 是被ForkJoinPool管理的工作線程,在創建出來之后都被設置成為了守護線程,由它來執行ForkJoinTasks。該類主要為了維護創建線程實例時通過ForkJoinPool為其創建的任務隊列,與其他兩個線程池整個線程池只有一個任務隊列不同,ForkJoinPool管理的所有工作線程都擁有自己的工作隊列,為了實現任務竊取機制,該隊列被設計成一個雙端隊列,而ForkJoinWorkerThread的首要任務就是執行自己的這個雙端任務隊列中的任務,其次是竊取其他線程的工作隊列,以下是其代碼片段:

  1. public class ForkJoinWorkerThread extends Thread { 
  2.  // 這個線程工作的ForkJoinPool池 
  3.     final ForkJoinPool pool;     
  4.     // 這個線程擁有的工作竊取機制的工作隊列 
  5.     final ForkJoinPool.WorkQueue workQueue;  
  6.  
  7.     //創建在給定ForkJoinPool池中執行的ForkJoinWorkerThread。 
  8.     protected ForkJoinWorkerThread(ForkJoinPool pool) { 
  9.         // Use a placeholder until a useful name can be set in registerWorker 
  10.         super("aForkJoinWorkerThread"); 
  11.         this.pool = pool; 
  12.         //向ForkJoinPool執行池注冊當前工作線程,ForkJoinPool為其分配一個工作隊列 
  13.         this.workQueue = pool.registerWorker(this);  
  14.     } 
  15.  
  16.     //該工作線程的執行內容就是執行工作隊列中的任務 
  17.     public void run() { 
  18.         if (workQueue.array == null) { // only run once 
  19.             Throwable exception = null
  20.             try { 
  21.                 onStart(); 
  22.                 pool.runWorker(workQueue); //執行工作隊列中的任務 
  23.             } catch (Throwable ex) { 
  24.                 exception = ex; //記錄異常 
  25.             } finally { 
  26.                 try { 
  27.                     onTermination(exception); 
  28.                 } catch (Throwable ex) { 
  29.                     if (exception == null
  30.                         exception = ex; 
  31.                 } finally { 
  32.                     pool.deregisterWorker(this, exception); //撤銷工作 
  33.                 } 
  34.             } 
  35.         } 
  36.     } 
  37.  
  38.     ..... 

 ForkJoinTask

ForkJoinTask :與FutureTask一樣, ForkJoinTask也是Future的子類,不過它是一個抽象類。


ForkJoinTask :我們要使用 ForkJoin 框架,必須首先創建一個 ForkJoin 任務。它提供在任務中執行 fork() 和 join() 操作的機制,通常情況下我們不需要直接繼承 ForkJoinTask 類,而只需要繼承它的子類,Fork/Join框架提供類以下幾個子類:

  • RecursiveAction:用于沒有返回結果的任務。(比如寫數據到磁盤,然后就退出。一個 RecursiveAvtion 可以把直接的工作分割成更小的幾塊,這樣它們可以由獨立的線程或者 CPU 執行。我們可以通過繼承來實現一個 RecusiveAction)
  • RescursiveTask:用于有返回結果的任務。(可以將自己的工作分割為若干更小任務,并將這些子任務的執行合并到一個集體結果??梢杂袔讉€水平的分割和合并)
  • CountedCompleter :在任務完成執行后會觸發執行一個自定義的鉤子函數。

常量介紹

ForkJoinTask 有一個int類型的status字段:

  • 其高16位存儲任務執行狀態例如NORMAL、CANCELLED或EXCEPTIONAL
  • 低16位預留用于用戶自定義的標記。

任務未完成之前status大于等于0,完成之后就是NORMAL、CANCELLED或EXCEPTIONAL這幾個小于0的值,這幾個值也是按大小順序的:0(初始狀態) > NORMAL > CANCELLED > EXCEPTIONAL.

  1. public abstract class ForkJoinTask<V> implements Future<V>, Serializable { 
  2.  
  3.     /** 該任務的執行狀態 */ 
  4.     volatile int status; // accessed directly by pool and workers 
  5.     static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits 
  6.     static final int NORMAL      = 0xf0000000;  // must be negative 
  7.     static final int CANCELLED   = 0xc0000000;  // must be < NORMAL 
  8.     static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED 
  9.     static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16 
  10.     static final int SMASK       = 0x0000ffff;  // short bits for tags 
  11.  
  12.     // 異常哈希表 
  13.  
  14.     //被任務拋出的異常數組,為了報告給調用者。因為異常很少見,所以我們不直接將它們保存在task對象中,而是使用弱引用數組。注意,取消異常不會出現在數組,而是記錄在statue字段中 
  15.     //注意這些都是 static 類屬性,所有的ForkJoinTask共用的。 
  16.     private static final ExceptionNode[] exceptionTable;        //異常哈希鏈表數組 
  17.     private static final ReentrantLock exceptionTableLock; 
  18.     private static final ReferenceQueue<Object> exceptionTableRefQueue; //在ForkJoinTask被GC回收之后,相應的異常節點對象的引用隊列 
  19.  
  20.     /** 
  21.     * 固定容量的exceptionTable. 
  22.     */ 
  23.     private static final int EXCEPTION_MAP_CAPACITY = 32; 
  24.  
  25.  
  26.     //異常數組的鍵值對節點。 
  27.     //該哈希鏈表數組使用線程id進行比較,該數組具有固定的容量,因為它只維護任務異常足夠長,以便參與者訪問它們,所以在持續的時間內不應該變得非常大。但是,由于我們不知道最后一個joiner何時完成,我們必須使用弱引用并刪除它們。我們對每個操作都這樣做(因此完全鎖定)。此外,任何ForkJoinPool池中的一些線程在其池變為isQuiescent時都會調用helpExpungeStaleExceptions 
  28.     static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> { 
  29.         final Throwable ex; 
  30.         ExceptionNode next
  31.         final long thrower;  // 拋出異常的線程id 
  32.         final int hashCode;  // 在弱引用消失之前存儲hashCode 
  33.         ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) { 
  34.             super(task, exceptionTableRefQueue); //在ForkJoinTask被GC回收之后,會將該節點加入隊列exceptionTableRefQueue 
  35.             this.ex = ex; 
  36.             this.next = next
  37.             this.thrower = Thread.currentThread().getId(); 
  38.             this.hashCode = System.identityHashCode(task); 
  39.         } 
  40.     } 
  41.  
  42.     ................. 

 除了status記錄任務的執行狀態之外,其他字段主要是為了對任務執行的異常的處理,ForkJoinTask采用了哈希數組 + 鏈表的數據結構(JDK8以前的HashMap實現方法)存放所有(因為這些字段是static)的ForkJoinTask任務的執行異常。

fork 方法(安排任務異步執行)

fork() 做的工作只有一件事,既是把任務推入當前工作線程的工作隊列里(安排任務異步執行)??梢詤⒖匆韵碌脑创a:

  1. public final ForkJoinTask<V> fork() { 
  2.     Thread t; 
  3.     if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) 
  4.         ((ForkJoinWorkerThread)t).workQueue.push(this); 
  5.     else 
  6.         ForkJoinPool.common.externalPush(this); 
  7.     return this; 

 該方法其實就是將任務通過push方法加入到當前工作線程的工作隊列或者提交隊列(外部非ForkJoinWorkerThread線程通過submit、execute方法提交的任務),等待被線程池調度執行,這是一個非阻塞的立即返回方法。

  • 這里需要知道,ForkJoinPool線程池通過哈希數組+雙端隊列的方式將所有的工作線程擁有的任務隊列和從外部提交的任務分別映射到哈希數組的不同槽位上。

join 方法(等待執行結果)

join() 的工作則復雜得多,也是 join() 可以使得線程免于被阻塞的原因——不像同名的 Thread.join()。

  1. 檢查調用 join() 的線程是否是 ForkJoinThread 線程。如果不是(例如 main 線程),則阻塞當前線程,等待任務完成。如果是,則不阻塞。
  2. 查看任務的完成狀態,如果已經完成,直接返回結果。
  3. 如果任務尚未完成,但處于自己的工作隊列內,則完成它。
  4. 如果任務已經被其他的工作線程偷走,則竊取這個小偷的工作隊列內的任務(以 FIFO 方式),執行,以期幫助它早日完成 join 的任務。
  5. 如果偷走任務的小偷也已經把自己的任務全部做完,正在等待需要 join 的任務時,則找到小偷的小偷,幫助它完成它的任務。
  6. 遞歸地執行第5步。

將上述流程畫成序列圖的話就是這個樣子:

 由于文章篇幅有限,源碼分析請查看文章末尾的“了解更多”

小結

通常ForkJoinTask只適用于非循環依賴的純函數的計算或孤立對象的操作,否則,執行可能會遇到某種形式的死鎖,因為任務循環地等待彼此。但是,這個框架支持其他方法和技術(例如使用Phaser、helpQuiesce和complete),這些方法和技術可用于構造解決這種依賴任務的ForkJoinTask子類,為了支持這些用法,可以使用setForkJoinTaskTag或compareAndSetForkJoinTaskTag原子性地標記一個short類型的值,并使用getForkJoinTaskTag進行檢查。ForkJoinTask實現沒有將這些受保護的方法或標記用于任何目的,但是它們可以用于構造專門的子類,由此可以使用提供的方法來避免重新訪問已經處理過的節點/任務。

ForkJoinTask應該執行相對較少的計算,并且應該避免不確定的循環。大任務應該被分解成更小的子任務,通常通過遞歸分解。如果任務太大,那么并行性就不能提高吞吐量。如果太小,那么內存和內部任務維護開銷可能會超過處理開銷。

ForkJoinTask是可序列化的,這使它們能夠在諸如遠程執行框架之類的擴展中使用。只在執行之前或之后序列化任務才是明智的,而不是在執行期間。

ForkJoinPool

ForkJoinPool:ForkJoinTask 需要通過 ForkJoinPool 來執行,任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務。


常量介紹

ForkJoinPool 與 內部類 WorkQueue 共享的一些常量

  1. // Constants shared across ForkJoinPool and WorkQueue 
  2.  
  3. // 限定參數 
  4. static final int SMASK = 0xffff;        //  低位掩碼,也是最大索引位 
  5. static final int MAX_CAP = 0x7fff;      //  工作線程最大容量 
  6. static final int EVENMASK = 0xfffe;     //  偶數低位掩碼 
  7. static final int SQMASK = 0x007e;       //  workQueues 數組最多64個槽位 
  8.  
  9. // ctl 子域和 WorkQueue.scanState 的掩碼和標志位 
  10. static final int SCANNING = 1;          // 標記是否正在運行任務 
  11. static final int INACTIVE = 1 << 31;    // 失活狀態  負數 
  12. static final int SS_SEQ = 1 << 16;      // 版本戳,防止ABA問題 
  13.  
  14. // ForkJoinPool.config 和 WorkQueue.config 的配置信息標記 
  15. static final int MODE_MASK = 0xffff << 16;  // 模式掩碼 
  16. static final int LIFO_QUEUE = 0;    // LIFO隊列 
  17. static final int FIFO_QUEUE = 1 << 16;  // FIFO隊列 
  18. static final int SHARED_QUEUE = 1 << 31;    // 共享模式隊列,負數 ForkJoinPool 中的相關常量和實例字段: 

 ForkJoinPool 中的相關常量和實例字段

  1. // 低位和高位掩碼 
  2. private static final long SP_MASK = 0xffffffffL; 
  3. private static final long UC_MASK = ~SP_MASK; 
  4.  
  5. // 活躍線程數 
  6. private static final int AC_SHIFT = 48; 
  7. private static final long AC_UNIT = 0x0001L << AC_SHIFT; //活躍線程數增量 
  8. private static final long AC_MASK = 0xffffL << AC_SHIFT; //活躍線程數掩碼 
  9.  
  10. // 工作線程數 
  11. private static final int TC_SHIFT = 32; 
  12. private static final long TC_UNIT = 0x0001L << TC_SHIFT; //工作線程數增量 
  13. private static final long TC_MASK = 0xffffL << TC_SHIFT; //掩碼 
  14. private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15);  // 創建工作線程標志 
  15.  
  16. // 池狀態 
  17. private static final int RSLOCK = 1; 
  18. private static final int RSIGNAL = 1 << 1; 
  19. private static final int STARTED = 1 << 2; 
  20. private static final int STOP = 1 << 29; 
  21. private static final int TERMINATED = 1 << 30; 
  22. private static final int SHUTDOWN = 1 << 31; 
  23.  
  24. // 實例字段 
  25. volatile long ctl;                   // 主控制參數 
  26. volatile int runState;               // 運行狀態鎖 
  27. final int config;                    // 并行度|模式 
  28. int indexSeed;                       // 用于生成工作線程索引 
  29. volatile WorkQueue[] workQueues;     // 主對象注冊信息,workQueue 
  30. final ForkJoinWorkerThreadFactory factory;// 線程工廠 
  31. final UncaughtExceptionHandler ueh;  // 每個工作線程的異常信息 
  32. final String workerNamePrefix;       // 用于創建工作線程的名稱 
  33. volatile AtomicLong stealCounter;    // 偷取任務總數,也可作為同步監視器 
  34.  
  35. /** 靜態初始化字段 */ 
  36. //線程工廠 
  37. public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; 
  38. //啟動或殺死線程的方法調用者的權限 
  39. private static final RuntimePermission modifyThreadPermission; 
  40. // 公共靜態pool 
  41. static final ForkJoinPool common; 
  42. //并行度,對應內部common池 
  43. static final int commonParallelism; 
  44. //備用線程數,在tryCompensate中使用 
  45. private static int commonMaxSpares; 
  46. //創建workerNamePrefix(工作線程名稱前綴)時的序號 
  47. private static int poolNumberSequence; 
  48. //線程阻塞等待新的任務的超時值(以納秒為單位),默認2秒 
  49. private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec 
  50. //空閑超時時間,防止timer未命中 
  51. private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;  // 20ms 
  52. //默認備用線程數 
  53. private static final int DEFAULT_COMMON_MAX_SPARES = 256; 
  54. //阻塞前自旋的次數,用在在awaitRunStateLock和awaitWork中 
  55. private static final int SPINS  = 0; 
  56. //indexSeed的增量 
  57. private static final int SEED_INCREMENT = 0x9e3779b9; 

 ForkJoinPool 的內部狀態都是通過一個64位的 long 型 變量ctl來存儲,它由四個16位的子域組成:

  • AC: 正在運行工作線程數減去目標并行度,高16位
  • TC: 總工作線程數減去目標并行度,中高16位
  • SS: 棧頂等待線程的版本計數和狀態,中低16位
  • ID: 棧頂 WorkQueue 在池中的索引(poolIndex),低16位

ForkJoinPool.WorkQueue 中的相關屬性:

  1. //初始隊列容量,2的冪 
  2. static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 
  3. //最大隊列容量 
  4. static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M 
  5.  
  6. // 實例字段 
  7. volatile int scanState;    // Woker狀態, <0: inactive; odd:scanning 
  8. int stackPred;             // 記錄前一個棧頂的ctl 
  9. int nsteals;               // 偷取任務數 
  10. int hint;                  // 記錄偷取者索引,初始為隨機索引 
  11. int config;                // 池索引和模式 
  12. volatile int qlock;        // 1: locked, < 0: terminate; else 0 
  13. volatile int base;         // 下一個poll操作的索引(棧底/隊列頭) 
  14. int top;                   // 一個push操作的索引(棧頂/隊列尾) 
  15. ForkJoinTask<?>[] array;   // 任務數組 
  16. final ForkJoinPool pool;   // the containing pool (may be null
  17. final ForkJoinWorkerThread owner; // 當前工作隊列的工作線程,共享模式下為null 
  18. volatile Thread parker;    // 調用park阻塞期間為owner,其他情況為null 
  19. volatile ForkJoinTask<?> currentJoin;  // 記錄被join過來的任務 
  20. volatile ForkJoinTask<?> currentSteal; // 記錄從其他工作隊列偷取過來的任務 

 內部數據結構

ForkJoinPool采用了哈希數組 + 雙端隊列的方式存放任務,但這里的任務分為兩類:

  • 一類是通過execute、submit 提交的外部任務
  • 另一類是ForkJoinWorkerThread工作線程通過fork/join分解出來的工作任務

ForkJoinPool并沒有把這兩種任務混在一個任務隊列中,對于外部任務,會利用Thread內部的隨機probe值映射到哈希數組的偶數槽位中的提交隊列中,這種提交隊列是一種數組實現的雙端隊列稱之為Submission Queue,專門存放外部提交的任務。

對于ForkJoinWorkerThread工作線程,每一個工作線程都分配了一個工作隊列,這也是一個雙端隊列,稱之為Work Queue,這種隊列都會被映射到哈希數組的奇數槽位,每一個工作線程fork/join分解的任務都會被添加到自己擁有的那個工作隊列中。

在ForkJoinPool中的屬性 WorkQueue[] workQueues 就是我們所說的哈希數組,其元素就是內部類WorkQueue實現的基于數組的雙端隊列。該哈希數組的長度為2的冪,并且支持擴容。如下就是該哈希數組的示意結構圖:

如圖,提交隊列位于哈希數組workQueue的奇數索引槽位,工作線程的工作隊列位于偶數槽位。

  • 默認情況下,asyncMode為false時:因此工作線程把工作隊列當著棧一樣使用(后進先出),將分解的子任務推入工作隊列的top端,取任務的時候也從top端取(凡是雙端隊列都會有兩個分別指向隊列兩端的指針,這里就是圖上畫出的base和top);而當某些工作線程的任務為空的時候,就會從其他隊列(不限于workQueue,也會是提交隊列)竊取(steal)任務,如圖示擁有workQueue2的工作線程從workQueue1中竊取了一個任務,竊取任務的時候采用的是先進先出FIFO的策略(即從base端竊取任務),這樣不但可以避免在取任務的時候與擁有其隊列的工作線程發生沖突,從而減小競爭,還可以輔助其完成比較大的任務。
  • asyncMode為true的話,擁有該工作隊列的工作線程將按照先進先出的策略從base端取任務,這一般只用于不需要返回結果的任務,或者事件消息傳遞框架。

ForkJoinPool構造函數

其完整構造方法如下

  1. private ForkJoinPool(int parallelism, 
  2.                      ForkJoinWorkerThreadFactory factory, 
  3.                      UncaughtExceptionHandler handler, 
  4.                      int mode, 
  5.                      String workerNamePrefix) { 
  6.     this.workerNamePrefix = workerNamePrefix; 
  7.     this.factory = factory; 
  8.     this.ueh = handler; 
  9.     this.config = (parallelism & SMASK) | mode; 
  10.     long np = (long)(-parallelism); // offset ctl counts 
  11.     this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); 

 重要參數解釋

  1. parallelism:并行度( the parallelism level),默認情況下跟我們機器的cpu個數保持一致,使用 Runtime.getRuntime().availableProcessors()可以得到我們機器運行時可用的CPU個數。
  2. factory:創建新線程的工廠( the factory for creating new threads)。默認情況下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory。
  3. handler:線程異常情況下的處理器(Thread.UncaughtExceptionHandler handler),該處理器在線程執行任務時由于某些無法預料到的錯誤而導致任務線程中斷時進行一些處理,默認情況為null。
  4. asyncMode:這個參數要注意,在ForkJoinPool中,每一個工作線程都有一個獨立的任務隊列
  • asyncMode表示工作線程內的任務隊列是采用何種方式進行調度,可以是先進先出FIFO,也可以是后進先出LIFO。如果為true,則線程池中的工作線程則使用先進先出方式進行任務調度,默認情況下是false。

ForkJoinPool.submit 方法

  1. public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 
  2.     if (task == null
  3.         throw new NullPointerException(); 
  4.     //提交到工作隊列 
  5.     externalPush(task); 
  6.     return task; 

ForkJoinPool 自身擁有工作隊列,這些工作隊列的作用是用來接收由外部線程(非 ForkJoinThread 線程)提交過來的任務,而這些工作隊列被稱為 submitting queue 。 submit() 和 fork() 其實沒有本質區別,只是提交對象變成了 submitting queue 而已(還有一些同步,初始化的操作)。submitting queue 和其他 work queue 一樣,是工作線程”竊取“的對象,因此當其中的任務被一個工作線程成功竊取時,就意味著提交的任務真正開始進入執行階段。

PS:以上代碼提交在 Github :

https://github.com/Niuh-Study/niuh-juc-final.git

 

責任編輯:姜華 來源: 今日頭條
相關推薦

2020-11-30 16:01:03

Semaphore

2020-12-09 08:21:47

編程Exchanger工具

2020-12-04 19:28:53

CountDownLaPhaserCyclicBarri

2020-12-03 11:15:21

CyclicBarri

2020-12-08 08:53:53

編程ThreadPoolE線程池

2017-09-19 14:53:37

Java并發編程并發代碼設計

2022-11-09 09:01:08

并發編程線程池

2020-12-10 07:00:38

編程線程池定時任務

2012-03-09 10:44:11

Java

2020-12-11 07:32:45

編程ThreadLocalJava

2020-11-13 08:42:24

Synchronize

2022-04-13 08:23:31

Golang并發

2017-01-10 13:39:57

Python線程池進程池

2020-12-07 09:40:19

Future&Futu編程Java

2019-11-07 09:20:29

Java線程操作系統

2021-03-10 15:59:39

JavaSynchronize并發編程

2016-10-21 11:04:07

JavaScript異步編程原理解析

2020-07-06 08:03:32

Java悲觀鎖樂觀鎖

2025-03-20 06:48:55

性能優化JDK

2020-11-16 08:11:32

ReentrantLo
點贊
收藏

51CTO技術棧公眾號

www.avtt| 91牛牛免费视频| 一区二区不卡免费视频| 久久久成人av毛片免费观看| 亚洲国产高清在线| 亚洲最大成人网色| 欧美性猛交bbbbb精品| 日韩欧美在线中字| 亚洲激情视频在线播放| 国产一伦一伦一伦| 成人免费图片免费观看| 91偷拍与自偷拍精品| 国产精品视频免费在线| 国产精品第九页| 欧美激情电影| 亚洲黄色av网站| 亚洲激情在线看| 92国产精品| 伊人夜夜躁av伊人久久| 日本不卡二区高清三区| 性一交一乱一精一晶| 日韩精品福利网| 欧美成年人在线观看| 国产传媒国产传媒| 欧洲亚洲成人| 精品国产一区二区三区av性色| 青青草av网站| 欧美亚洲日本精品| 亚洲一二三四在线观看| 伊人久久99| aaa在线观看| 久久你懂得1024| 国产在线精品一区二区中文| 国产精品污视频| 七七婷婷婷婷精品国产| 5566日本婷婷色中文字幕97| 日本一二三区不卡| 亚洲欧美一区在线| 美女精品久久久| 成人精品一二三区| 成人3d动漫在线观看| 亚洲美女视频网站| 欧洲一级黄色片| 9l视频自拍蝌蚪9l视频成人| 日韩一级免费一区| theporn国产精品| 欧美高清免费| 精品视频在线免费观看| 成人亚洲视频在线观看| 日韩电影免费观看高清完整版| 天天综合天天做天天综合| 99久久国产综合精品五月天喷水| av免费在线观看网址| 亚洲女子a中天字幕| 这里只有精品66| 午夜免费视频在线国产| 国产精品人人做人人爽人人添| 日产国产精品精品a∨| 黄色av网站在线看| 国产网红主播福利一区二区| 日韩精品福利视频| 成年人在线观看视频| 欧美国产精品中文字幕| 亚洲高清精品中出| 成人免费看片'免费看| 精品福利视频导航大全| 秋霞影视一区二区三区| 国产高清久久| 日韩在线观看免费全集电视剧网站| 久久久精品成人| 婷婷六月综合| 久精品免费视频| 国产一级在线观看视频| 在线视频精品| 国产精品嫩草影院久久久| 曰批又黄又爽免费视频| 黄页视频在线91| 成人在线免费观看一区| 天天射天天色天天干| 99re在线精品| 亚洲春色在线| 天堂av在线电影| 欧美日韩国产中文字幕| 国产精品亚洲二区在线观看| 激情小说亚洲| 欧美成人猛片aaaaaaa| 大尺度做爰床戏呻吟舒畅| 欧美猛男男男激情videos| 日韩中文视频免费在线观看| 欧美成人国产精品高潮| 亚洲精品少妇| 国产精品无av码在线观看| 国产精品欧美激情在线| 欧美特大特白屁股xxxx| 欧美日韩国产在线一区| 欧美乱妇高清无乱码| 国产一级精品视频| 韩国精品久久久| 麻豆av一区二区| 久久77777| 欧美午夜久久久| 亚洲国产综合av| 免费成人结看片| 欧美成在线观看| 波多野结衣理论片| 国产成人av影院| 日本一区高清在线视频| 男女羞羞视频在线观看| 在线观看视频一区| av av在线| 91精品综合久久久久久久久久久 | 一本之道在线视频| 午夜精品影视国产一区在线麻豆| 久久精品青青大伊人av| 成人午夜视频在线播放| 国产麻豆精品一区二区| 日韩精品久久久免费观看| 国产经典三级在线| 欧美一区二区三区视频免费播放| 日本一级免费视频| 亚洲九九精品| 成人免费视频观看视频| 日韩免费网站| 欧美在线free| av在线网站观看| 在线成人亚洲| 91视频99| 色www永久免费视频首页在线| 欧美无砖砖区免费| 精品成人av一区二区三区| 亚洲午夜极品| 91免费在线观看网站| 黄色免费在线观看| 欧美人牲a欧美精品| 最近中文字幕免费视频| 国产亚洲一区在线| 懂色av一区二区夜夜嗨| 国产噜噜噜噜久久久久久久久| 五月婷婷狠狠干| 亚洲国产中文字幕| 年下总裁被打光屁股sp| 欧美日韩国产免费观看| 92看片淫黄大片欧美看国产片| 最新国产在线观看| 欧美亚洲一区二区在线观看| 老熟妇一区二区| 日韩精品亚洲一区二区三区免费| 久久精品国产美女| 日产福利视频在线观看| 亚洲精品美女在线观看| 日韩美女黄色片| 99久久婷婷国产综合精品| 免费av手机在线观看| 黄色成人美女网站| 7777精品久久久久久| 欧美精品在欧美一区二区| 国产精品无码在线播放| 国产精品久久久久婷婷二区次| 国产aaaaa毛片| 色综合五月天| 91久久久在线| 手机在线免费av| 精品99久久久久久| 97免费在线观看视频| 久久综合中文字幕| 亚洲黄色a v| 亚洲h色精品| 91入口在线观看| 欧美日韩在线观看首页| 国产亚洲视频在线| 国产日韩在线观看一区| 亚洲一级在线观看| 精品无码一区二区三区| 美洲天堂一区二卡三卡四卡视频| 亚洲第一页在线视频| 伊人www22综合色| 欧美在线欧美在线| 午夜老司机在线观看| 欧美一区二区三区人| 在线看成人av| 国产欧美一区二区精品仙草咪| 天堂视频免费看| 精品av久久久久电影| 日本欧美色综合网站免费| 久久精品黄色| 高清欧美性猛交xxxx| 国产免费a∨片在线观看不卡| 在线电影欧美成精品| 日本视频www| 国产精品乱码一区二三区小蝌蚪| 色偷偷中文字幕| 国产欧美亚洲一区| 精品久久免费观看| 日韩精品导航| 91午夜在线播放| 丝袜诱惑一区二区| 久久精品国产免费观看| 亚洲三区在线观看无套内射| 欧美欧美欧美欧美| 日韩三级av在线| 亚洲色图一区二区三区| 免费中文字幕av| 国产真实精品久久二三区| 国产视频一视频二| 亚洲视频电影在线| 日韩精品久久久免费观看| 国产成人一二| 成人av色在线观看| 欧美成人免费电影| 欧美激情视频网站| 日本在线视频网| 亚洲另类激情图| 韩国av免费在线| 欧美嫩在线观看| 日韩人妻精品中文字幕| 亚洲自拍偷拍欧美| 日本女人性生活视频| 久久综合九色综合97_久久久| 青青草原播放器| 美日韩一区二区| 玩弄japan白嫩少妇hd| 亚洲另类黄色| 国产精品久久久影院| 欧美激情理论| 五月天亚洲综合情| 久久最新网址| 欧美二区在线| 人人网欧美视频| 国产免费一区二区| 日韩免费精品| 亚洲a一级视频| 在线观看欧美| 成人观看高清在线观看免费| 国产精品4hu.www| 国产精品成人播放| gay欧美网站| 日本aⅴ大伊香蕉精品视频| 182在线播放| 久久久伊人日本| av最新在线| 午夜精品久久久久久99热| 视频在线这里都是精品| 插插插亚洲综合网| dj大片免费在线观看| 另类视频在线观看| 国产黄大片在线观看画质优化| www.欧美免费| 精品美女在线观看视频在线观看| 中文字幕视频一区二区在线有码| 成人午夜电影在线观看| 国产午夜精品全部视频播放| 国产鲁鲁视频在线观看免费| 中文字幕久精品免费视频| av电影在线播放高清免费观看| 色综合亚洲精品激情狠狠| 午夜在线视频播放| 久99久在线视频| 超碰97免费在线| 2019中文字幕免费视频| 偷拍视频一区二区三区| 国产精品高精视频免费| 九七电影院97理论片久久tvb| 国产日韩欧美在线看| 成人51免费| 国产aⅴ精品一区二区三区黄| 免费福利视频一区| 欧美三级电影在线播放| 日韩专区精品| 国产黄色激情视频| 国产精品外国| 中文字幕国内自拍| 国产河南妇女毛片精品久久久| 日本泡妞xxxx免费视频软件| www.激情成人| 精品无码在线观看| 亚洲欧美国产77777| 日韩黄色在线视频| 日本精品一区二区三区四区的功能| 中文字幕av影视| 日韩欧美激情四射| 天堂av在线免费观看| 伊人伊人伊人久久| 色图在线观看| 日本高清视频一区| 电影一区中文字幕| 国产伦精品一区二区三区视频免费| 猫咪成人在线观看| 夜夜春亚洲嫩草影视日日摸夜夜添夜| 欧美1级日本1级| 黄色片一级视频| 国产精品自拍av| aa一级黄色片| 一区二区三区欧美日韩| 亚洲视频 欧美视频| 欧美一区二区三区四区五区| 亚洲 欧美 激情 另类| 正在播放亚洲1区| 99爱在线视频| 成人黄色短视频在线观看| 亲子伦视频一区二区三区| 中文字幕一区二区三区乱码| 亚洲一区黄色| 在线观看视频在线观看| 久久久久久久久久久黄色| 乱h高h女3p含苞待放| 一本色道综合亚洲| 亚洲精品第五页| 在线日韩日本国产亚洲| 草草在线视频| 成人欧美一区二区三区在线观看| 欧美精品羞羞答答| 国产精品网站免费| 国产九色精品成人porny| 三年中国中文观看免费播放| 午夜国产精品一区| 精品国产av 无码一区二区三区| 亚洲丝袜av一区| 成人免费网站观看| 91中文字精品一区二区| 久久影院100000精品| 97视频在线免费播放| 成人动漫在线一区| 久久r这里只有精品| 欧美亚州韩日在线看免费版国语版| 手机看片1024日韩| 欧美乱大交xxxxx| 国产美女亚洲精品7777| 亚洲色图自拍| 强制捆绑调教一区二区| 久久久久久国产精品无码| 午夜精品一区在线观看| 国 产 黄 色 大 片| 欧美大片在线看| 免费观看亚洲天堂| 天堂av免费看| 国产一区福利在线| 97在线观看免费高| 69久久夜色精品国产69蝌蚪网| eeuss影院www在线观看| 国产精品久久99久久| 日本成人小视频| 鲁一鲁一鲁一鲁一av| 国产精品欧美经典| 在线观看免费高清视频| 伊人久久男人天堂| 高清在线一区| 日韩欧美视频第二区| 日本女人一区二区三区| 亚洲熟女少妇一区二区| 欧美区在线观看| 国产激情视频在线| 成人免费在线一区二区三区| 今天的高清视频免费播放成人| 91精品国产高清91久久久久久 | 视频在线99re| 免费成人av在线| 大地资源高清在线视频观看| 欧美日韩二区三区| 自由的xxxx在线视频| 国产精品一区二区三区在线观| 亚洲三级国产| 亚洲一区二区三区蜜桃| 欧美三日本三级三级在线播放| 午夜不卡视频| 波多野结衣一区二区三区在线观看| 国内自拍视频一区二区三区| 在线免费观看a级片| 欧美性生交大片免费| h视频在线播放| 亚洲综合中文字幕在线| 在线观看视频免费一区二区三区| 给我看免费高清在线观看| 欧美主播一区二区三区| 国产黄a三级三级三级av在线看 | 亚洲人一二三区| 黄色三级网站在线观看| 国产aⅴ夜夜欢一区二区三区| 欧美残忍xxxx极端| 久久性爱视频网站| 在线日韩一区二区| 51xtv成人影院| 久久精品ww人人做人人爽| 免费观看在线综合色| 国产一级片免费| 国产亚洲精品日韩| 婷婷视频一区二区三区| 精品久久久久av| 一区二区三区在线观看动漫| 偷拍自拍在线| 成人午夜在线视频一区| 一本色道精品久久一区二区三区 | 日韩欧美视频在线播放| 美女网站视频在线观看| 欧美亚洲综合久久| 成年人国产在线观看| 亚洲欧美日韩精品在线| 成人美女视频在线观看| 久久这里只有精品9| 欧美极品少妇xxxxⅹ免费视频| japanese国产精品|