精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

萬字圖解 Java 并發框架:Fork/Join、CountDownLatch、Semaphore、CyclicBarrier

開發 前端
在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時。并 且該算法會消耗了更多的系統資源,比如創建多個線程和多個雙端隊列。

Fork/Join 框架

Chaya:碼哥,什么是 Fork/Join 框架?

Fork/Join 是 Java 7 引入的并行計算框架,核心思想是 **"分而治之"**。它通過以下特性解決復雜計算問題:

  • 自動任務拆分:將大任務遞歸拆分為子任務
  • 工作竊取算法(Work-Stealing):最大化線程利用率
  • 輕量級線程管理:基于 ForkJoinPool 的優化線程池

Fork 就是把一個大任務切分為若干子任務并行的執行,Join 就是合并這些子任務的執行結果,最后得到這個大任務的結 果。

比如計算 1+2+…+10000,可以分割成 10 個子任務,每個子任務分別對 1000 個數進行求和, 最終匯總這 10 個子任務的結果。

Fork/Join 的運行流程圖如下:

圖片圖片

工作竊取算法


Chaya:“碼哥。有任務要拆分,那必然會出現分配不均勻的情況?要如何實現負載均衡呢?”

這個問題問得好,Chaya 小姐姐。

我們設計一個工作竊取算法(Work-Stealing)來解決這個問題。每個工作線程維護一個雙端隊列(Deque):

  • 頭部:執行自己拆分出的任務(LIFO)
  • 尾部:竊取其他線程的任務(FIFO)

圖片圖片

工作竊取算法的優點:充分利用線程進行并行計算,減少了線程間的競爭。

工作竊取算法的缺點:在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時。并 且該算法會消耗了更多的系統資源,比如創建多個線程和多個雙端隊列。

任務拆分流程

圖片圖片

使用場景

場景 1:大規模數據處理(并行排序)

需求:對 10 億條數據排序,要求內存可控且充分利用多核性能。

代碼實現

public class ParallelMergeSort extends RecursiveAction {
    privatefinalint[] array;
    privatefinalint start;
    privatefinalint end;
    privatestaticfinalint THRESHOLD = 1_000_000; // 拆分閾值

    @Override
    protected void compute() {
        if (end - start <= THRESHOLD) {
            Arrays.sort(array, start, end);  // 小任務直接排序
            return;
        }

        int mid = (start + end) >>> 1;
        invokeAll(
            new ParallelMergeSort(array, start, mid),
            new ParallelMergeSort(array, mid, end)
        );

        merge(array, start, mid, end);  // 合并結果
    }

    // 生產級優化:復用臨時數組減少內存分配
    private void merge(int[] array, int start, int mid, int end) {
        int[] temp = ThreadLocalRandom.current().ints().toArray();
        // ... 合并邏輯 ...
    }
}

// 使用方式
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
int[] data = loadHugeData();
pool.invoke(new ParallelMergeSort(data, 0, data.length));

性能優化點

  1. 合理設置 THRESHOLD(通過壓測確定最佳值)
  2. 避免在遞歸中頻繁創建臨時數組
  3. 使用 ThreadLocalRandom 保證線程安全

場景 2:金融計算(蒙特卡洛模擬)

需求:快速計算期權定價,要求高精度且低延遲。

代碼實現

public class MonteCarloTask extends RecursiveTask<Double> {
    privatefinalint iterations;
    privatestaticfinalint THRESHOLD = 10_000;

    @Override
    protected Double compute() {
        if (iterations <= THRESHOLD) {
            return calculateSync(); // 同步計算
        }

        MonteCarloTask left = new MonteCarloTask(iterations / 2);
        MonteCarloTask right = new MonteCarloTask(iterations / 2);
        left.fork();

        double rightResult = right.compute();
        double leftResult = left.join(); // 注意順序:先計算再join

        return (leftResult + rightResult) / 2;
    }

    private double calculateSync() {
        double sum = 0;
        for (int i = 0; i < iterations; i++) {
            sum += randomSimulation();
        }
        return sum / iterations;
    }
}

// 生產級調用(指定超時)
ForkJoinPool pool = new ForkJoinPool(4);
MonteCarloTask task = new MonteCarloTask(1_000_000);
pool.submit(task);

try {
    double result = task.get(5, TimeUnit.SECONDS); // 嚴格超時控制
} catch (TimeoutException e) {
    task.cancel(true);
    // 降級策略...
}

ForkJoinPool 生產級配置

自定義線程工廠

public class NamedForkJoinThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
    privatefinal String namePrefix;
    privatefinal AtomicInteger counter = new AtomicInteger(1);

    @Override
    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        ForkJoinWorkerThread thread = new ForkJoinWorkerThread(pool) {};
        thread.setName(namePrefix + "-" + counter.getAndIncrement());
        thread.setPriority(Thread.NORM_PRIORITY);
        thread.setDaemon(false); // 生產環境必須為非守護線程
        return thread;
    }
}

與其他并發框架對比

圖片圖片

Fork/Join 適用場景

  1. 遞歸可分治的問題(排序、遍歷、數學計算)
  2. 嚴格低延遲要求的計算任務
  3. 需要自動負載均衡的大規模數據處理

CountDownLatch

CountDownLatch 是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程執行完后再執行。

例如,應用程序的主線程希望在負責啟動框架服務的線程已經啟動所有框架服務之后執行。

假如有這樣一個需求:處理 10 萬條數據,分片并行處理,全部完成后觸發匯總操作。

public class BatchProcessor {
    privatestaticfinalint BATCH_SIZE = 1000;
    privatefinal ExecutorService executor = Executors.newFixedThreadPool(8);

    public void process(List<Data> allData) {
        int total = allData.size();
        CountDownLatch latch = new CountDownLatch(total / BATCH_SIZE);

        for (int i = 0; i < total; i += BATCH_SIZE) {
            List<Data> batch = allData.subList(i, Math.min(i+BATCH_SIZE, total));
            executor.submit(() -> {
                try {
                    processBatch(batch);
                } finally {
                    latch.countDown(); // 確保計數減少
                }
            });
        }

        try {
            if (!latch.await(5, TimeUnit.MINUTES)) {
                thrownew TimeoutException("Batch processing timeout");
            }
            generateSummaryReport();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            shutdownNow();
        }
    }

    private void processBatch(List<Data> batch) { /* ... */ }
}

圖片

CountDownLatch 的構造函數接收一個 int 類型的參數作為計數器,如果你想等待 N 個點完 成,這里就傳入 N。

當我們調用 CountDownLatch 的 countDown 方法時,N 就會減 1,CountDownLatch 的 await 方法 會阻塞當前線程,直到 N 變成零。

用在多個線程時,只需要把這個 CountDownLatch 的引用傳遞到線程里即可。

如果有某個線程處理得比較慢,我們不可能讓主線程一直等待,所以可以使 用另外一個帶指定時間的 await 方法——await(long time,TimeUnit unit),這個方法等待特定時 間后,就會不再阻塞當前線程。

實現原理

CountDownLatch 的核心實現原理是基于 AQSAQS 全稱 AbstractQueuedSynchronizer,是 java.util.concurrent 中提供的一種高效且可擴展的同步機制;

它是一種提供了原子式管理同步狀態、阻塞和喚醒線程功能以及隊列模型的簡單框架。

除了 CountDownLatch 工具類,JDK 當中的 SemaphoreReentrantLock 等工具類都是基于 AQS 來實現的。下面我們用 CountDownLatch 來分析一下 AQS 的實現。

圖片圖片

CountDownLatch 的源碼實現,發現其實它的代碼實現非常簡單,算上注釋也才 300+ 行代碼,如果去掉注釋的話代碼不到 100 行,大部分方法實現都是調用的 Sync 這個靜態內部類的實現,而 Sync 就是繼承自 AbstractQueuedSynchronizer。

CountDownLatch 的 UML 類圖如下:

圖片圖片

核心代碼如下。

private staticfinalclass Sync extends AbstractQueuedSynchronizer {
    Sync(int count) { setState(count); } // 初始化計數器

    // 嘗試獲取共享鎖:當 state=0 時返回 1(成功),否則返回 -1(失敗)
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    // 嘗試釋放共享鎖:CAS 遞減 state,直到變為 0
    protected boolean tryReleaseShared(int releases) {
        for (;;) { // 自旋保證原子性
            int c = getState();
            if (c == 0) returnfalse; // 已經釋放完畢
            int nextc = c - 1;
            if (compareAndSetState(c, nextc)) // CAS 更新
                return nextc == 0; // 返回是否觸發喚醒
        }
    }
}

Sync 重寫了 AQS 中的 tryAcquireShared 和 tryReleaseShared 兩個方法。

當調用 CountDownLatch 的 awit() 方法時,會調用內部類 Sync 的 acquireSharedInterruptibly() 方法,在這個方法中會調用 tryAcquireShared 方法,這個方法就是 Sync 重寫的 AQS 中的方法;

調用 countDown() 方法原理基本類似。

await() 方法實現

在調用 await() 方法時,會直接調用 AQS 類的 acquireSharedInterruptibly 方法,在 acquireSharedInterruptibly 方法內部會繼續調用 Sync 實現類中的 tryAcquireShared 方法,在 tryAcquireShared 方法中判斷 state 變量值是否為 0

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1); // 進入 AQS 核心邏輯
}

// AQS 中的實現
public final void acquireSharedInterruptibly(int arg) {
    if (Thread.interrupted()) throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) // 檢查 state 是否為 0
        doAcquireSharedInterruptibly(arg); // 進入阻塞隊列
}

doAcquireSharedInterruptibly 關鍵步驟。

圖片圖片

關鍵點

  1. 節點入隊:通過 addWaiter 方法將線程封裝為 SHARED 模式節點加入隊列尾部
  2. 自旋檢查:循環判斷前驅節點是否是頭節點(公平性保證)
  3. 阻塞控制:調用 LockSupport.park() 掛起線程,響應中斷。

countDown() 方法

當執行 CountDownLatch 的 countDown() 方法,將計數器減一,也就是將 state 值減一,當減到 0 的時候,等待隊列中的線程被釋放。是調用 AQS 的 releaseShared() 方法來實現的。

public void countDown() {
    sync.releaseShared(1); // 觸發釋放操作
}

// AQS 中的實現
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // CAS 遞減 state
        doReleaseShared(); // 喚醒后續節點
        return true;
    }
    return false;
}

CyclicBarrier

CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。

它要做的事情是,讓一 組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會 開門,所有被屏障攔截的線程才會繼續運行。

現實生活中我們經常會遇到這樣的情景,在進行某個活動前需要等待人全部都齊了才開始。

例如吃飯時要等全家人都上座了才動筷子,旅游時要等全部人都到齊了才出發,比賽時要等運動員都上場后才開始。

CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一個柵欄,因為它的柵欄(Barrier)可以重復使用(Cyclic)。

圖片圖片

CyclicBarrier 是 Java 并發包中的可重用同步屏障,其特性包括:

  • 多階段協同:支持多次 await() 的同步點
  • 柵欄動作(Barrier Action):當所有線程抵達屏障時觸發
  • 自動重置:每次所有線程通過屏障后自動復位
  • 中斷處理:可響應線程中斷并傳播異常

如何使用

構造方法

public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)

解析:

  • parties:傳入一個計數器值,用來配置可以阻塞多少個線程的。
  • 第二個構造方法有一個 Runnable 參數,這個對象可以在計數器值減到 0 后,發起一次調用。

例如:下面代碼就會在計數器減到 0 后,打印出"回環屏障退出"。

CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("回環屏障退出"));

場景 :多階段分布式計算匯總

需求:實時計算商品庫存分布(本地倉 + 區域倉 + 全國倉),需三階段統計數據匯總。

public class InventoryComputeService {
// 構建 3 線程等待 CyclicBarrier
    privatefinalint PARTIES = 3;
    privatefinal CyclicBarrier barrier = new CyclicBarrier(PARTIES, this::mergeData);
    // 保存最后的結果
    privatevolatile Map<String, Integer> result = new ConcurrentHashMap<>();

    public void compute() {
       // 異步執行 3 個任務,執行完成調用 barrier.await();,當所有任務完成后會執行 mergeData
        List<CompletableFuture<Void>> tasks = new ArrayList<>();
        tasks.add(computeLocalStock());
        tasks.add(computeRegionalStock());
        tasks.add(computeNationalStock());

        // 本次3 個任務任何一個計算出現異常的話,重置 barrier
        CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]))
                        .exceptionally(ex -> {
                            barrier.reset();  // 異常處理
                            returnnull;
                        });
    }

    private CompletableFuture<Void> computeLocalStock() {
      // 異步線程
        return CompletableFuture.runAsync(() -> {
            try {
                // 模擬計算耗時
                result.put("local", calculate(Region.LOCAL));
                // 執行完成,調用 await
                barrier.await(5, TimeUnit.SECONDS); // 超時控制
            } catch (Exception e) {
                handleException(e);
            }
        });
    }

    // computeRegionalStock/computeNationalStock 同理...

    private void mergeData() {
        lock.lock();
        try {
            System.out.println("各區域最終庫存合并結果: " + result);
        } finally {
            lock.unlock();
            result.clear(); // 清空狀態為下次計算準備
        }
    }
}

代碼核心解釋

構造方法創建等待三個線程執行完成的 CyclicBarrier,CyclicBarrier 與 CountDownLatch 最大的區別是 CountDownLatch 一次性的,CyclicBarrier 是可循環利用的。

private final CyclicBarrier barrier = new CyclicBarrier(PARTIES, this::mergeData);

當三個線程都執行完成,會調用 mergeData 方法統計結果。

圖片圖片

實現原理

核心數據結構

public class CyclicBarrier {
    privatefinal ReentrantLock lock = new ReentrantLock();
    privatefinal Condition trip = lock.newCondition();
    privatefinalint parties;    // 需要同步的線程數
    privatefinal Runnable barrierCommand; // 柵欄動作
    private Generation generation = new Generation(); // 當前代

    privatestaticclass Generation {
        boolean broken = false;   // 柵欄是否破裂
    }

    // 掛起線程數計數器(每次循環遞減)
    privateint count;
}
CyclicBarrier 狀態流轉

圖片圖片

await 實現

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        thrownew Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
     // 回環屏障使用完畢或重置后,都會生成一個新的generation,這個對象可以用來讓線程退出回環屏障
final Generation g = generation;

        // 每個進入的線程,都使計數器減1,當計數器歸零后進入下面的if判斷
        int index = --count;
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
             // 如果實例化時傳入了Runnable對象,則在這里調用它的run()方法
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 里面做了喚醒所有等待線程的操作,線程是在下面的自旋中掛起的
                nextGeneration();
                return0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        for (;;) {
         // 此處省略的線程被interrupt的try catch
         // 根據是否傳入等待時間來判斷調用哪一個方法
         if (!timed) {
          // condition的await()方法,這里會暫時釋放鎖
             trip.await();
         } elseif (nanos > 0L) {
       nanos = trip.awaitNanos(nanos);
   }

   // 計數器歸零后,線程退出自旋
   if (g != generation) {
    return index;
   }
        }
    } finally {
        lock.unlock();
    }
}

上面代碼中的trip就是一個 Condition 對象,是CyclicBarrier的一個成員變量。總結一下 doWait()方法,其實做的事情還是比較簡單的。

線程進入 doWait(), 先搶占到鎖 lock 鎖對象,并執行計數器遞減 1 的操作。 遞減后的計數器值不為 0,則將自己掛起在 Condition 隊列中。

遞減后的計數器值為 0,則調用 signalAll()喚醒所有在條件隊列中的線程,并創建新的 generation 對象,讓線程可以退出回環屏障。

核心方法流程圖如下。

圖片圖片

Semaphore

Semaphore,它是一個信號量,主要作用是用來控制并發中同一個時刻執行的線程數量,可以用來做限流器,或者流程控制器。

在創建的時候會指定好它有多少個信號量,比如 Semaphre semaphore = new Semaphore(2),就只有 2 個信號量。

核心功能是控制同時訪問特定資源的線程數量,具有以下特性:

  • 許可管理:通過 acquire()/release() 操作許可數量
  • 公平性選擇:支持公平/非公平兩種模式
  • 可中斷:支持帶超時的許可獲取
  • 動態調整:運行時修改許可數量

這個信號量可以比作是車道,每一個時刻每條車道只能允許一輛汽車通過,你可以理解為高速收費站上的收費口,每個收費口任意一時刻只能允許一輛汽車通行。

畫個圖來講解一下:

圖片圖片

如何使用

接口限流(突發流量控制)。

public class ApiRateLimiter {
    // 生產級配置:許可數 = QPS閾值 * 響應時間(秒)
    privatestaticfinal Semaphore SEMAPHORE = new Semaphore(500);
    privatestaticfinal Timer METRIC_TIMER = new Timer(true);

    static {
        // 監控線程:每10秒打印許可使用率
        METRIC_TIMER.schedule(new TimerTask() {
            public void run() {
                double usage = (SEMAPHORE.availablePermits() / 500.0) * 100;
                log.info("API許可使用率: {0}%", 100 - usage);
            }
        }, 10_000, 10_000);
    }

    public Response handleRequest(Request request) {
        if (!SEMAPHORE.tryAcquire(50, TimeUnit.MILLISECONDS)) { // 非阻塞獲取
            thrownew BizException(429, "請求過于頻繁");
        }

        try {
            return doBusinessLogic(request); // 核心業務邏輯
        } finally {
            SEMAPHORE.release(); // 確保釋放許可
        }
    }
}

生產級要點

  1. 使用 tryAcquire 替代 acquire 避免線程阻塞
  2. 通過 finally 保證許可釋放
  3. 集成監控上報(Prometheus + Grafana).

實現原理

Semaphore 有兩種模式,公平模式和非公平模式,分別對應兩個內部類為 FairSyncNonfairSync,這兩個子類繼承了 Sync,都是基于之前講解過的 AQS 來實現的。

核心數據結構

畫個圖來說明一下內部的結構如下:

圖片圖片

Semaphore 的公平模式依賴于 FairSync 公平同步器來實現,非公平模式依賴于 NonfairSync 非公平同步器來實現。

其中 FairSyncNonfairSync 繼承自 Sync,而 Sync 又繼承自 AQS,這些同步器的底層都是依賴于 AQS 提供的機制來實現的。

public class Semaphore implements java.io.Serializable {
    privatefinal Sync sync;

    abstractstaticclass Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) { setState(permits); }
        final int getPermits() { return getState(); }
        // 非公平嘗試獲取許可
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 || compareAndSetState(available, remaining))
                    return remaining;
            }
        }
        // 釋放許可
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) thrownew Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    returntrue;
            }
        }
    }

    // 公平模式實現
    staticfinalclass FairSync extends Sync {
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors()) // 檢查是否有等待線程
                    return -1;
                // ...與非公平模式相同
            }
        }
    }
}

所以掌握 AQS 很重要啊家人們,AQS 是模板方法模式的經典運用。

這里的 Semaphore 實現的思路跟我們之前講過的 ReentrantLock 非常的相似,包括內部類的結構都是一樣的,也是有公平和非公平兩種模式。

只是不同的是 Semaphore 是共享鎖,支持多個線程同時操作;然而 ReentrantLock 是互斥鎖,同一個時刻只允許一個線程操作。

公平模式 acquire

公平模式,Semaphore.acquire 方法源碼直接是調用 FairSync 的 acquireSharedInterruptibly,也就是進入了 AQS 的 acquireSharedInterruptibly 的模板方法里面了。

java.util.concurrent.Semaphore#acquire()源碼如下。

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

跳入 AQS 的 acquireSharedInterruptibly 方法。

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted() ||
        // Semaphore.FairSync 子類實現 tryAcquireShared
        (tryAcquireShared(arg) < 0 &&
         acquire(null, arg, true, true, false, 0L) < 0))
        throw new InterruptedException();
}

這個方法定義了一個模板流程:

  1. 先調用子類的 tryAcquireShared 方法獲取共享鎖,也就是獲取信號量。
  2. 如果獲取信號量成功,即返回值大于等于 0,則直接返回。
  3. 如果獲取失敗,返回值小于 0,則調用 AQS 的 doAcquireSharedInterruptibly 方法,進入 AQS 的等待隊列里面,等待別人釋放資源之后它再去獲取。

這里我們畫個圖理解一下:

圖片圖片

Semaphore.FairSync 子類實現 tryAcquireShared

protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 這里作為公平模式,首先判斷一下AQS等待隊列里面
        // 有沒有人在等待獲取信號量,如果有人排隊了,自己就不去獲取了
        if (hasQueuedPredecessors())
            return -1;
        // 獲取剩余的信號量資源
        int available = getState();
        // 剩余資源減去我需要的資源,是否小于0
        // 如果小于0則說明資源不夠了
        // 如果大于等于0,說明資源是足夠我使用的
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

上面的源碼就是獲取信號量的核心流程了:

  1. 首先判斷一下 AQS 等待隊列里面是否有人在排隊,如果是,則自己不嘗試獲取資源了,乖乖的去排隊
  2. 如果沒有人在排隊,獲取一下當前剩余的信號量 available,然后減去自己需要的信號量 acquires,得到減去后的結果 remaining
  3. 如果 remaining 小于 0,直接返回 remaining,說明資源不夠,獲取失敗了,這個時候就會進入 AQS 等待隊列等待。
  4. 如果 remaining 大于等于 0,則執行 CAS 操作 compareAndSetState 競爭資源,如果成功了,說明自己獲取信號量成功,如果失敗了同樣進入 AQS 等待隊列。

我們畫一下公平模式 FairSync 的 tryAcquireShared 流程圖,以及整個公平模式的 acquire 方法的流程圖:

圖片圖片

公平模式 release

看完獲取,我們緊接著來看下釋放,這里 Semaphore 的 release 方法直接調用 Sync 的 releaseShared 方法:

public void release() {
      sync.releaseShared(1);
  }

繼續來分析 releaseShared 方法,進入到 AQS 的 releaseShard 釋放資源的模板方法:

public final boolean releaseShared(int arg) {
    // 1. 調用子類的tryReleaseShared釋放資源
    if (tryReleaseShared(arg)) {
        // 釋放資源成功,調用doReleaseShared喚醒等待隊列中等待資源的線程
        doReleaseShared();
        return true;
    }
    return false;
}

這里的模板流程有:

  1. 調用子類的 tryReleaseShared 去釋放資源,即釋放信號量
  2. 如果釋放成功了,則調用 doReleaseShared 喚醒 AQS 中等待資源的線程,將資源傳播下去,如果釋放失敗,即返回小于等于 0,則直接返回。
  3. 所以,這里除了 AQS 的核心模板流程之外,具體釋放邏輯就是 Sync 的 tryReleaseShared 方法的源碼了,我們繼續來查看:
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        // 這里就是將釋放的信號量資源加回去而已
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // 嘗試CAS設置資源,成功直接返回,失敗則進入下一循環重試
        if (compareAndSetState(current, next))
            return true;
    }
}

釋放資源的流程圖如下:

圖片圖片

Exchanger

Exchanger(交換者)是一個用于線程間協作的工具類。Exchanger 用于進行線程間的數據交 換。

它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數據。這兩個線程通過 exchange 方法交換數據,如果第一個線程先執行 exchange()方法,它會一直等待第二個線程也 執行 exchange 方法,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產 出來的數據傳遞給對方。

圖片圖片

使用場景

這個玩意的使用場景很少很少……大家對她有個了解即可,大可不必深入。

因為存在很多局限性。

  1. 僅限兩個線程
  • 超過兩個線程使用同一 Exchanger 會導致未定義行為。
  • 替代方案:使用 CyclicBarrier 或 Phaser 實現多線程同步。
  1. 阻塞風險
  • 若一方線程未到達同步點,另一線程會永久阻塞。
  • 解決方案:使用帶超時的 exchange(V x, long timeout, TimeUnit unit)。
  1. 性能瓶頸
  • 頻繁交換大數據對象會導致內存和 CPU 開銷。
  • 優化建議:交換輕量級對象(如引用或標識符),而非完整數據。
  1. 不適用于分布式系統
  • Exchanger 僅限單 JVM 內的線程通信。
  • 替代方案:消息隊列(如 Kafka)或 RPC 框架(如 gRPC)。

Exchanger在多種并發編程場景中都非常有用。例如,在遺傳算法中,可以使用Exchanger來實現個體之間的信息交換;在管道設計中,可以使用Exchanger來傳遞數據塊或任務;在游戲中,可以使用Exchanger來實現玩家之間的物品交易等。

如下代碼,用 Exchanger 實現兩個線程將交換彼此持有的字符串數據:

import java.util.concurrent.Exchanger;

publicclass ExchangerExample {

    public static void main(String[] args) {
        // 創建一個Exchanger對象
        Exchanger<String> exchanger = new Exchanger<>();

        // 創建一個線程,它將使用"Hello"與另一個線程交換數據
        Thread producer = new Thread(() -> {
            try {
                String producedData = "Hello";
                String consumerData = exchanger.exchange(producedData);
                System.out.println("生產者線程交換后得到的數據: " + consumerData);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "生產者線程");

        // 創建一個線程,它將使用"World"與另一個線程交換數據
        Thread consumer = new Thread(() -> {
            try {
                String consumerData = "World";
                String producedData = exchanger.exchange(consumerData);
                System.out.println("消費者線程交換后得到的數據: " + producedData);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "消費者線程");

        // 啟動線程
        producer.start();
        consumer.start();
    }
}

代碼中我們創建了一個 Exchanger 對象,并且定義了兩個線程:一個生產者線程和一個消費者線程。

生產者線程持有一個字符串 "Hello",而消費者線程持有一個字符串 "World"。兩個線程都通過調用 exchanger.exchange() 方法來等待交換數據。

當兩個線程都到達交換點時(即都調用了 exchange() 方法),Exchanger 會確保它們安全地交換數據。交換完成后,每個線程都會得到對方原本持有的數據,并打印出來。

實現原理

分別從數據結構和 exchange 方法來實現流程來學習實現原理。

核心數據結構

Participant 線程本地存儲

public class Exchanger<V> {
    // 每個線程持有一個Node
    private final Participant participant;

    static final class Participant extends ThreadLocal<Node> {
        public Node initialValue() { return new Node(); }
    }
}

圖片圖片

關鍵作用

  • 每個線程通過 Participant 持有獨立的 Node 對象
  • 避免多線程競爭同一存儲位置
  • 底層使用 ThreadLocal 實現線程隔離

Node 交換節點設計

@sun.misc.Contended // 防止偽共享
static final class Node {
    int index;              // Arena下標
    int bound;              // 最近記錄的前導邊界
    int collides;           // CAS失敗計數
    int hash;               // 偽隨機自旋
    Object item;            // 攜帶的數據
    volatile Object match;  // 交換的數據
    volatile Thread parked; // 掛起的線程
}

內存布局優化

  • 使用 @Contended 注解填充緩存行(64 字節)
  • 確保不同線程訪問的字段不在同一緩存行
  • 示例內存布局:
| 64字節緩存行 | Node.item | ...填充... |
| 64字節緩存行 | Node.match | ...填充... |

每個線程的 Node 有一個 match 屬性用于存儲待交換的數據。

exchange 方法執行流程

主流程源碼(精簡版)

public V exchange(V x) throws InterruptedException {
    Object v;
    Node[] a;
    Node q = participant.get();

    // Arena模式(
    if ((a = arena) != null ||
        (q = slotExchange(q, x, false, 0L)) == null)
        return (V)v;
    // ...省略超時處理
}

private final Object slotExchange(Node q, Object x, boolean timed, long nanos) {
    // 核心交換邏輯(
    for (;;) {
        if (slot != null) { // 存在等待節點
            Node node = (Node)slot;
            if (U.compareAndSwapObject(this, SLOT, node, null)) {
                Object v = node.item;
                node.match = x; // 數據交換(
                Thread t = node.parked;
                if (t != null)
                    U.unpark(t); // 喚醒對方線程
                return v;
            }
        } elseif (U.compareAndSwapObject(this, SLOT, null, q)) {
            // 掛起當前線程
            return timed ? awaitNanos(q, nanos) : await(q);
        }
    }
}

圖片

關鍵步驟解釋

  1. CAS 設置槽位U.compareAndSwapObject(this, SLOT, null, q)
  2. 數據交換:直接修改對方節點的 match 字段
  3. 喚醒機制:通過 Unsafe.unpark() 解除線程阻塞
責任編輯:武曉燕 來源: 碼哥跳動
相關推薦

2014-03-14 10:34:28

JavaJava并發

2017-08-07 20:50:27

JavaForkJoin

2017-08-04 11:41:53

Javathreadpool框架

2024-04-02 09:40:39

多線程Java原子性

2020-09-21 10:50:24

Java多線程代碼

2019-09-16 09:23:34

高并發編程CountDownLaCyclicBarri

2021-02-26 05:17:38

計算機網絡二進制

2023-10-31 12:58:00

TypeScriptJavaScript

2021-03-16 08:21:29

Spark系統并行

2023-02-28 07:40:09

編譯器Java線程安全

2021-06-07 15:49:51

AI 數據人工智能

2021-11-11 09:27:02

技術RedisMySQL

2021-10-18 11:58:56

負載均衡虛擬機

2024-07-19 08:34:18

2024-08-13 15:07:20

2023-01-06 08:15:58

StreamAPI接口

2022-09-06 08:02:40

死鎖順序鎖輪詢鎖

2023-03-30 08:28:57

explain關鍵字MySQL

2024-11-13 15:09:57

Java線程開發

2024-04-29 09:06:46

線程初始化源碼
點贊
收藏

51CTO技術棧公眾號

av网站在线免费看推荐| 国产精品xxxxxx| 国产精品香蕉| 色综合咪咪久久| 宅男噜噜99国产精品观看免费| 国产福利第一视频| 亚洲在线免费| 欧美另类高清videos| 日韩av在线看免费观看| 国产精品一区二区三区www| 精品欧美一区二区三区| 最近看过的日韩成人| 五月婷中文字幕| 韩国理伦片一区二区三区在线播放| 韩国视频理论视频久久| av资源在线免费观看| 日韩美女精品| 日韩欧美在线不卡| 中文字幕天天干| 精品三级久久| 一区二区三区欧美在线观看| 视频一区视频二区视频三区高| 动漫av一区二区三区| 久热成人在线视频| 国产成人精品电影| 日本三级免费看| 在线中文一区| 俺去了亚洲欧美日韩| 一二三不卡视频| 波多野结衣欧美| 欧美绝品在线观看成人午夜影视| www.亚洲天堂网| 51漫画成人app入口| 亚洲美女免费在线| 亚洲欧洲精品一区| 国产高清美女一级毛片久久| www成人在线观看| 国产一区二区三区无遮挡| 99在线精品视频免费观看20| 蜜臀99久久精品久久久久久软件| 国产成一区二区| 天天干天天干天天操| 欧美xxx在线观看| y97精品国产97久久久久久| www亚洲色图| 精品国产乱码| 亚洲色图国产精品| 久久久久久国产精品无码| 亚洲都市激情| 亚洲片在线资源| 欧美 日韩 成人| 欧洲毛片在线视频免费观看| 亚洲午夜久久久影院| 国产精品毛片一区二区| 精品国产网站| 最近2019中文字幕在线高清| 国产精品久久久久久成人| 日韩精品影视| 久久久国产精品x99av| 欧美成人黄色网| 国产一区日韩欧美| 久久深夜福利免费观看| 农村妇女精品一区二区| 欧美全黄视频| 97成人在线视频| 怡红院av久久久久久久| 美女在线视频一区| 成人中文字幕在线观看| 亚洲精品综合网| 99久久国产综合色|国产精品| 蜜桃视频成人| av片在线免费观看| 亚洲欧美激情小说另类| www.好吊操| 丝袜诱惑一区二区| 欧美日韩精品一区二区三区蜜桃 | 首页欧美精品中文字幕| 国产精品91久久久| 国产精品玖玖玖| 成人国产精品免费观看动漫| 久久人人爽爽人人爽人人片av| 国产高清美女一级毛片久久| 亚洲欧美日韩电影| 激情综合在线观看| 四虎成人精品一区二区免费网站| 日韩欧美美女一区二区三区| 最新中文字幕视频| 911精品美国片911久久久| 欧美激情视频播放| 国产又粗又猛又黄视频| 国产成人欧美日韩在线电影| 日本一区高清在线视频| 怡红院在线播放| 一本到一区二区三区| 91蝌蚪视频在线| 午夜欧洲一区| 久久91亚洲精品中文字幕奶水| 国产精品va无码一区二区三区| 久久99精品久久久久| 国产一区二区不卡视频| 日本在线观看| 欧美性69xxxx肥| 精产国品一区二区三区| 欧美限制电影| 69久久夜色精品国产7777| 国产又粗又猛又黄又爽| 久久综合色之久久综合| 精品久久久无码人妻字幂| 日韩一级二级| 精品在线观看国产| 亚洲一区二区91| 黄页视频在线91| 日韩wuma| 在线能看的av网址| 亚洲国产精品一区二区久| 日本二区三区视频| 石原莉奈一区二区三区在线观看| 国产高清精品一区| 成人免费网址| 欧美日韩亚洲综合在线 欧美亚洲特黄一级 | 精品久久一区二区三区| www中文在线| 天堂av在线一区| 久久99精品久久久久久青青日本| 国产盗摄在线观看| 欧美日韩国产天堂| 欧美丰满老妇熟乱xxxxyyy| 99精品视频免费观看| 国产经典一区二区三区| 岛国成人毛片| 欧美夫妻性生活| 激情高潮到大叫狂喷水| 美日韩一区二区| 亚洲福利av| 123成人网| 中文字幕精品一区二区精品| 日本熟妇一区二区三区| 91网址在线看| 国产精品97在线| 日韩在线影视| 国产福利视频一区二区| 国产专区在线播放| 在线精品视频一区二区三四 | 亚洲色图一区二区三区| 午夜一区二区视频| 国产精品国内免费一区二区三区| 国产精品爽爽爽爽爽爽在线观看| 国产三级电影在线| 欧美午夜精品理论片a级按摩| www..com.cn蕾丝视频在线观看免费版| 久久aⅴ国产紧身牛仔裤| 久久成人资源| 欧美极度另类| 日韩亚洲欧美成人| 国产男女无套免费网站| 一区二区三区精品在线| 精品无码人妻少妇久久久久久| 亚洲激情影院| 欧美日韩亚洲综合一区二区三区激情在线| 三级在线看中文字幕完整版| 亚洲精品永久免费| 国产精品51麻豆cm传媒| 亚洲视频每日更新| 久久免费精品国产| 亚洲一区自拍| 亚洲欧洲免费无码| 午夜视频一区二区在线观看| 久久久亚洲欧洲日产国码aⅴ| 色噜噜在线播放| 色婷婷精品久久二区二区蜜臀av | 色就是色亚洲色图| 在线免费观看一区| 中文字幕人妻一区二| 丰满亚洲少妇av| 老头吃奶性行交视频| 五月久久久综合一区二区小说| 亚洲综合精品一区二区| 日韩伦理福利| 精品国内产的精品视频在线观看| 精品人妻伦一区二区三区久久| 黄色精品在线看| 日本伦理一区二区三区| 成人高清av在线| 亚洲精品www.| 99pao成人国产永久免费视频| 日产精品久久久一区二区| 久久久精品区| 日本免费一区二区三区视频观看| 黄网站免费在线播放| 亚洲精品国产suv| 亚洲无码精品国产| 精品久久久国产| 大地资源高清在线视频观看| jlzzjlzz国产精品久久| 亚洲一区日韩精品| 国产欧美欧美| 精品一区二区三区毛片| 九九亚洲精品| 操人视频欧美| 97精品资源在线观看| 97在线视频免费| 久久精品视频免费看| 亚洲欧美日韩国产中文| 亚洲成人一二三区| 欧美色电影在线| 国产精品久久久久久99| 又紧又大又爽精品一区二区| 超薄肉色丝袜一二三| 成人av网址在线观看| 伊人网在线综合| 久久不射2019中文字幕| 日本中文字幕亚洲| 中文av一区| 亚洲在线欧美| 精品香蕉视频| 玛丽玛丽电影原版免费观看1977 | 日本中文字幕网| 亚洲欧美日韩国产中文在线| 女人黄色一级片| 久久久精品国产免大香伊| 插我舔内射18免费视频| 国产精品一区久久久久| 中文字幕第一页在线视频| 日韩av一级片| 久久久久国产精品熟女影院| 国产深夜精品| 播放灌醉水嫩大学生国内精品| 国产综合自拍| 国产精品久久国产| 午夜亚洲福利| 菠萝蜜视频在线观看入口| 久久久久免费av| 亚洲在线观看一区| 91欧美在线| 一级二级三级欧美| 欧美电影一二区| 亚洲精品在线免费看| 欧美色女视频| 亚洲精品中文字幕乱码三区不卡 | www成人在线观看| 无码人妻aⅴ一区二区三区| 91丝袜高跟美女视频| 美女又爽又黄免费| 99精品久久只有精品| 亚洲一区二区三区四区五区六区| 成人av网站免费| 这里只有精品在线观看视频| 成人av第一页| 艳妇乳肉亭妇荡乳av| 91在线视频在线| 粉嫩av蜜桃av蜜臀av| 久久久精品日韩欧美| 精品一区二区三区蜜桃在线| 国产精品麻豆一区二区| 天天色影综合网| 亚洲精品国产高清久久伦理二区| 强行糟蹋人妻hd中文| 亚洲大片在线观看| 青青青国产在线| 在线观看国产精品网站| 亚洲一区二区三区网站| 日韩午夜在线观看| 成人午夜精品福利免费| 亚洲精品视频在线观看视频| 国产精品久久久久久久龚玥菲| 一区二区三区日韩在线| 含羞草www国产在线视频| 欧美激情视频网址| 一区二区三区四区日本视频| 国产精品欧美激情在线播放| 国产精品一区二区三区av| 激情伦成人综合小说| jiujiure精品视频播放| 久久精品在线免费视频| 一区二区三区精品视频在线观看| 黄色三级视频片| 国产激情精品久久久第一区二区 | 亚洲免费av一区| 成人综合婷婷国产精品久久蜜臀| 成人手机在线免费视频| 国产精品成人一区二区艾草 | 欧美一区二区三区观看| 亚洲乱码精品一二三四区日韩在线| 国产无遮挡又黄又爽又色| 在线观看视频一区| av手机免费看| 亚洲性线免费观看视频成熟| 18+激情视频在线| 日本久久精品视频| 亚洲一区二区电影| 日韩高清国产精品| 狠狠干综合网| xx欧美撒尿嘘撒尿xx| 成人av中文字幕| 成人免费视频入口| 欧美视频在线观看免费网址| 97人妻精品一区二区三区| 国产偷亚洲偷欧美偷精品| 成人黄色网址| 国产精品国产福利国产秒拍| 国产成人精品福利| 一区二区三区免费看| 国产免费成人| 在线观看免费视频国产| 中文字幕一区二区三区不卡在线| 91美女免费看| 精品少妇一区二区三区免费观看| 亚洲精品承认| 国产99在线|中文| 国产精品chinese在线观看| 99精品视频网站| 日韩国产精品久久久久久亚洲| 无码人妻精品一区二区三区99不卡| 中文字幕视频一区| 影音先锋黄色网址| 亚洲日本欧美中文幕| 狠狠操一区二区三区| 97操在线视频| 91精品秘密在线观看| 欧美成人福利在线观看| 久久久亚洲精品一区二区三区 | 国产伦精品一区| 欧美黄色一区| 亚洲制服在线观看| 国产精品久久久久久户外露出| 无码视频在线观看| 日韩精品久久久久| 成人观看网址| 国产一区二区自拍| 日韩午夜精品| www.男人天堂| 黄色精品一区二区| 深夜影院在线观看| 45www国产精品网站| 美女av一区| 黄色一级在线视频| 99re热这里只有精品视频| 色婷婷在线观看视频| 亚洲国模精品私拍| 国产拍在线视频| 久精品国产欧美| 久久精品伊人| 免费看污片网站| 欧美亚洲国产怡红院影院| 成年人视频免费在线观看| 国产精品亚洲片夜色在线| 久久人体视频| 男插女视频网站| 亚洲国产成人av| 免费资源在线观看| 国产精品va在线播放| 欧美视频网址| 国产成人美女视频| 一区二区三区在线免费观看| 亚洲精品视频网| 97免费在线视频| 久久99国产精品视频| 制服丝袜综合网| 亚洲精品视频自拍| 日本wwwxxxx| 国产高清视频一区三区| 婷婷综合亚洲| 亚洲av熟女高潮一区二区| 欧美日韩一区二区精品| 成人性爱视频在线观看| 91久久久精品| 精品91在线| 中国女人特级毛片| 91精品一区二区三区在线观看| 欧美性猛片xxxxx免费中国| 狠狠色噜噜狠狠色综合久| 丝袜诱惑制服诱惑色一区在线观看 | 亚洲理论电影片| 成人免费在线观看视频网站| 亚洲欧洲综合另类在线| 日本精品999| 国产精品成人免费电影| 亚洲h色精品| 亚洲综合自拍网| 欧美日韩1区2区| 国产伦子伦对白在线播放观看| 先锋影音网一区| 成人综合在线视频| 国产精品露脸视频| 久久久久免费精品国产| 成人6969www免费视频| 国产综合内射日韩久| 日本道精品一区二区三区| a视频在线播放| 日韩av在线电影观看| 成人一区二区三区视频在线观看| 99成人精品视频| 高清欧美性猛交xxxx黑人猛交| 不卡在线一区二区| xxxwww国产| 91精品国产美女浴室洗澡无遮挡| 免费福利视频一区二区三区| 日韩成人午夜影院| 国产偷v国产偷v亚洲高清| 丰满熟女一区二区三区|