沒(méi)看過(guò)ReentrantLock源碼,別說(shuō)精通Java并發(fā)編程
引言
高手程序員與新手程序員一個(gè)簡(jiǎn)單的判斷標(biāo)準(zhǔn),就是有沒(méi)有使用過(guò)CountDownLatch,在互聯(lián)網(wǎng)公司工作超過(guò)3年的程序員基本上應(yīng)該都用過(guò)。CountDownLatch中文名稱叫做閉鎖,也叫計(jì)數(shù)鎖,不過(guò)不是用來(lái)加鎖的,而是通過(guò)計(jì)數(shù)實(shí)現(xiàn)條件等待的功能。CountDownLatch的使用場(chǎng)景有兩個(gè):
- 當(dāng)前線程等待其他線程都執(zhí)行完成之后,再執(zhí)行。
- 所有線程滿足條件后,再一起執(zhí)行。
使用示例
CountDownLatch常用的方法就兩個(gè),countDown()方法用來(lái)將計(jì)數(shù)器減一,await()方法會(huì)阻塞當(dāng)前線程,直到計(jì)數(shù)器值等于0。
場(chǎng)景1:
先看一下第一種場(chǎng)景,也是最常用的場(chǎng)景:
- 當(dāng)前線程等待其他線程都執(zhí)行完成之后,再執(zhí)行。
在工作中什么時(shí)候會(huì)遇到這種場(chǎng)景呢?比如當(dāng)前線程需要查詢3個(gè)數(shù)據(jù)庫(kù),并且把查詢結(jié)果匯總返回給前端。查詢3個(gè)數(shù)據(jù)庫(kù)的邏輯,可以分別使用3個(gè)線程加快查詢速度。但是怎么判斷3個(gè)線程都執(zhí)行結(jié)束了呢?這時(shí)候就可以使用CountDownLatch了。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author 一燈架構(gòu)
* @apiNote CountDownLatch測(cè)試類(場(chǎng)景1)
**/
public class CountDownLatchTest1 {
public static void main(String[] args) throws InterruptedException {
// 1. 創(chuàng)建一個(gè)線程池,用來(lái)執(zhí)行3個(gè)查詢?nèi)蝿?wù)
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 2. 創(chuàng)建一個(gè)計(jì)數(shù)鎖,數(shù)量是3
CountDownLatch countDownLatch = new CountDownLatch(3);
// 3. 啟動(dòng)3個(gè)查詢?nèi)蝿?wù)
for (int i = 0; i < 3; i++) {
executorService.submit(() -> {
try {
// 4. 睡眠1秒,模擬任務(wù)執(zhí)行過(guò)程
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 執(zhí)行完成");
// 5. 任務(wù)執(zhí)行完成,計(jì)數(shù)器減一
countDownLatch.countDown();
} catch (InterruptedException e) {
}
});
}
// 6. 等待所有任務(wù)執(zhí)行完成
countDownLatch.await();
System.out.println("所有任務(wù)執(zhí)行完成。");
// 7. 關(guān)閉線程池
executorService.shutdown();
}
}輸出結(jié)果:
pool-1-thread-2 執(zhí)行完成 pool-1-thread-1 執(zhí)行完成 pool-1-thread-3 執(zhí)行完成 所有任務(wù)執(zhí)行完成。
需要注意的是,這里創(chuàng)建CountDownLatch計(jì)數(shù)器的時(shí)候,指定的數(shù)量是3,因?yàn)橛?個(gè)任務(wù)。在3個(gè)任務(wù)沒(méi)有執(zhí)行完成之前,await()方法會(huì)一直阻塞,直到3個(gè)任務(wù)都執(zhí)行完成。
場(chǎng)景2
再看一下第二種場(chǎng)景,有些情況用的也比較多:
- 所有線程滿足條件后,再一起執(zhí)行。
什么情況下會(huì)遇到這種場(chǎng)景呢?比如系統(tǒng)中多個(gè)任務(wù)線程存在先后依賴關(guān)系,必須等待其他線程啟動(dòng)完成后,才能一起執(zhí)行。
/**
* @author 一燈架構(gòu)
* @apiNote CountDownLatch測(cè)試類(場(chǎng)景2)
**/
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
// 1. 創(chuàng)建一個(gè)線程池,用來(lái)執(zhí)行3個(gè)任務(wù)
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 2. 創(chuàng)建一個(gè)計(jì)數(shù)鎖,數(shù)量是1
CountDownLatch countDownLatch = new CountDownLatch(1);
// 3. 啟動(dòng)3個(gè)任務(wù)
for (int i = 0; i < 3; i++) {
executorService.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 啟動(dòng)完成");
// 4. 等待其他任務(wù)啟動(dòng)完成
countDownLatch.await();
// 5. 睡眠1秒,模擬任務(wù)執(zhí)行過(guò)程
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 執(zhí)行完成");
} catch (InterruptedException e) {
}
});
}
// 6. 所有任務(wù)啟動(dòng)完成,計(jì)數(shù)器減一
countDownLatch.countDown();
System.out.println("所有任務(wù)啟動(dòng)完成,開始執(zhí)行。");
// 7. 關(guān)閉線程池
executorService.shutdown();
}
}輸出結(jié)果:
pool-1-thread-1 啟動(dòng)完成 pool-1-thread-2 啟動(dòng)完成 pool-1-thread-3 啟動(dòng)完成 所有任務(wù)啟動(dòng)完成,開始執(zhí)行。 pool-1-thread-1 執(zhí)行完成 pool-1-thread-3 執(zhí)行完成 pool-1-thread-2 執(zhí)行完成
需要注意的是,與場(chǎng)景1不同,這里創(chuàng)建CountDownLatch計(jì)數(shù)器的時(shí)候,指定的數(shù)量是1,因?yàn)?個(gè)任務(wù)需要滿足同一個(gè)條件,就是都啟動(dòng)完成,也就是只需要調(diào)用一次countDown()方法。 看完了CountDownLatch的使用方式,再看一下CountDownLatch的源碼實(shí)現(xiàn)。
類屬性
public class CountDownLatch {
// 只有一個(gè)Sync同步變量
private final Sync sync;
// Sync繼承自AQS,主要邏輯都在這里面
private static final class Sync extends AbstractQueuedSynchronizer {
// 只有這一個(gè)構(gòu)造方法,需要指定計(jì)數(shù)器數(shù)值
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
}跟ReentrantLock一樣,CountDownLatch也沒(méi)有直接繼承AQS,也是采用組合的方式,使用Sync同步變量實(shí)現(xiàn)計(jì)數(shù)的功能,而Sync同步變量才是真正繼承AQS的。
countDown方法源碼
public void countDown() {
// 底層調(diào)用父類AQS中的releaseShared()方法
sync.releaseShared(1);
}countDown()方法里面調(diào)用的是父類AQS中的releaseShared()方法,而releaseShared()方法又在調(diào)用子類Sync中tryReleaseShared()方法。
/**
* 父類AQS
*/
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public final boolean releaseShared(int arg) {
// tryReleaseShared()由子類實(shí)現(xiàn)
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 定義抽象方法,由子類實(shí)現(xiàn)
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
}/**
* 子類Sync
*/
private static final class Sync extends AbstractQueuedSynchronizer {
// 實(shí)現(xiàn)父類AQS中的tryReleaseShared()方法
@Override
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0) {
return false;
}
int nextc = c-1;
if (compareAndSetState(c, nextc)) {
return nextc == 0;
}
}
}
}而Sync同步類中tryReleaseShared()方法邏輯也很簡(jiǎn)單,就是把同步狀態(tài)state值減一。
await源碼
await()方法底層也是調(diào)用父類中acquireSharedInterruptibly()方法,而父類AQS又需要調(diào)用子類Sync中的具體實(shí)現(xiàn)。
public void await() throws InterruptedException {
// 底層調(diào)用父類AQS中的releaseShared()方法
sync.acquireSharedInterruptibly(1);
}/**
* 父類AQS
*/
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
// tryAcquireShared()由子類實(shí)現(xiàn)
if (tryAcquireShared(arg) < 0) {
doAcquireSharedInterruptibly(arg);
}
}
// 定義抽象方法,由子類實(shí)現(xiàn)
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
}子類Sync只需要實(shí)現(xiàn)tryAcquireShared()方法即可,而tryAcquireShared()方法的作用就是判斷鎖是否已經(jīng)完全釋放,即同步狀態(tài)state=0。
/**
* 子類Sync
*/
private static final class Sync extends AbstractQueuedSynchronizer {
// 實(shí)現(xiàn)父類AQS中的tryAcquireShared()方法
@Override
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
}總結(jié)
看完了CountDownLatch的所有源碼,是不是覺(jué)得CountDownLatch邏輯很簡(jiǎn)單。
因?yàn)榧渔i流程的編排工作已經(jīng)在父類AQS中實(shí)現(xiàn),子類只需要實(shí)現(xiàn)具體的加鎖邏輯即可,也就是實(shí)現(xiàn)tryReleaseShared()方法和tryAcquireShared()方法。而加鎖邏輯也很簡(jiǎn)單,也就是修改同步狀態(tài)state的值即可。想要詳細(xì)了解父類AQS的流程,可以翻看前幾篇文章。
下篇文章再一塊學(xué)習(xí)一下共享鎖Semaphore的源碼實(shí)現(xiàn)。































