如何優(yōu)雅地中斷 Java 線程
我們通過并發(fā)編程提升了系統的吞吐量,同時我們也希望并發(fā)運行的線程能夠及時停止并做好資源歸納,所以筆者就借此文來談談Java并發(fā)編程中線程中斷的藝術。

一、詳解Java中斷的哲學
1. 何時觸發(fā)中斷阻塞
按照操作系統對于線程的任務調度管理來說,當觸發(fā)以下幾種情況時線程就會阻塞而處于BLOCKED、WAITING、TIMED_WAITING幾種狀態(tài): 第一種是線程執(zhí)行IO請求,在等待IO資源返回,觸發(fā)阻塞,此時線程就處于BLOCKED狀態(tài),例如服務端server執(zhí)行serverSocket.accept()等待就緒的客戶端接入:

第二種則是等待條件為真期間,線程因此掛起等待notify通知或者通過sleep休眠,進而處于WAITING或者TIMED_WAITING:
new Thread(() -> ThreadUtil.sleep(3600), "t-0").start();因為并發(fā)互斥原因,線程需要等待其它線程釋放監(jiān)視鎖而進入BLOCKED阻塞態(tài):

2. Java是如何響應中斷的
在操作系統中,線程的中斷方式一般分為以下兩種:
- 搶占式:當線程需要中斷時,直接強制讓線程立刻停止手里的任務
- 協作式:當線程需要中斷時,通過標識告知線程需要被中斷,線程輪詢檢查時看到這個標識就會直接中斷
而Java線程則是采用協作式中斷,即調用interrupt時其底層僅僅是將線程設置為可中斷狀態(tài),等到線程主動檢查到線程標識被設置為中斷時,則觸發(fā)InterruptedException:

對應的我們以Linux為例給出JDK底層關于線程中斷函數interrupt的實現,即位于os_linux.cpp的interrupt方法,可以看到其底層本質上就是定位到java線程對應的os線程并將其中斷標識設置為true:
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
//設置os線程中斷標識為true
osthread->set_interrupted(true);
//......
}
//......
}同時我們也給出處于sleep休眠狀態(tài)的線程響應中斷的源碼,同樣是以Linux為例的線程封裝類os_linux.cpp下的sleep函數,可以看到進行休眠時其底層在明確知曉線程可被中斷的時候,就會在for循環(huán)中知曉休眠并定期檢查可中斷狀態(tài):
int os::sleep(Thread* thread, jlong millis, bool interruptible) {
//......
if (interruptible) {
jlong prevtime = javaTimeNanos();
for (;;) {
//循環(huán)中感知到中斷直接返回OS_INTRPT標識
if (os::is_interrupted(thread, true)) {
return OS_INTRPT;
}
}
} else {
//......
}
}3. 線程中斷的守則
通常來說我們對線程中斷響應度越高,就越容易處理并優(yōu)雅的完成兜底動作,一般來說,在處理線程中斷時一般會出現如下兩種情況:
- 當前代碼層面對象實例不具備處理該中斷異常
- 處于線程內部的run方法感知到中斷無法向上拋出
針對情況1,本質上就是權責上的轉移,如果當前業(yè)務層面不具備處理此類異常的能力,那么就將異常向上層拋出傳遞給上層使用者:
public void sleep(int seconds) throws InterruptedException {
TimeUnit.SECONDS.sleep(seconds);
}而情況2則相對麻煩一些,如果類似于Runnable 這種無法向上拋出的內置接口類的實現,我們則可以主動去捕獲中斷中斷異常,并將在完成必要的資源清理工作后,將當前線程打斷從而讓高層棧幀感知到這個異常中斷:
class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//執(zhí)行當前線程資源清理
//打斷當前線程引發(fā)更高層線程響應此中斷
Thread.currentThread().interrupt();
}
}
}二、Java線程中斷處理的一些實踐
1. 基于標識取消任務
我們先來說說基于自定義標識的方式中斷線程,即非java內置方法層面的協作式標識來停止線程,通過任務運行時輪詢檢測,一旦線程輪詢檢查看到中斷標識設置為true,則直接結束運行:

對應的我們給出自定義協作式中斷的實現,整體思路為:
- 采用cancelled標識中斷狀態(tài),并用volatile保證可見性
- 內置初始化一個執(zhí)行線程thread
- 對外暴露start方法,執(zhí)行線程啟動
- 對外暴露cancel方法修改線程中斷狀態(tài)
- run方法執(zhí)行業(yè)務邏輯,并定期輪詢檢查中斷標識,一旦標識被設置為true則退出循環(huán),結束線程
public class Task implements Runnable {
/**
* 使用volatile修飾保證標識修改可見
*/
privatevolatileboolean cancelled = false;
privatefinal Thread thread = new Thread(this);
public void start() {
thread.start();
}
/**
* 停止時,通過cancel請求取消
*/
public void cancel() {
cancelled = true;
}
@Override
public void run() {
//取消標識檢測,如果取消則直接結束循環(huán)
while (!cancelled) {
System.out.println("running");
ThreadUtil.sleep(1000);
}
System.out.println("task cancelled");
}
}對應的我們也給出這種方式的使用示例,可以看到我們的測試代碼會在5s后調用task暴露的任務取消方法完成線程中斷:
//線程啟動運行5s
Task task = new Task();
task.start();
//休眠5s后將task任務對應線程中斷
new Thread(()->{
ThreadUtil.sleep(5000);
task.cancel();
}).start();而執(zhí)行的輸出結果如下,是符合我們預期的:
running
running
running
running
running
running
task cancelled當然這種做法也存在一定的弊端,即帶有阻塞性質的操作,任務可能出現永遠無法檢查取消標志,例如我們的線程在循環(huán)往阻塞阻塞隊列blockingQueue的put添加元素,一旦隊列空間達到容器上界,當前線程就會阻塞即無法執(zhí)行到循環(huán)分支上:

對應我們也給出這段錯誤的樣例,即阻塞隊列添加操作后阻塞而走不到循環(huán)判斷:
private final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(100);
@Override
public void run() {
//取消標識檢測,如果取消則直接結束循環(huán)
while (!cancelled) {
System.out.println("running");
try {
queue.put(RandomUtil.randomInt(10));
} catch (InterruptedException e) {
//......
}
}
System.out.println("task cancelled");
}測試代碼還是和上一小節(jié)一致,不多贅述,測試輸出結果如下,即第二次添加操作時發(fā)現隊列已滿而阻塞從而無法打斷:
running
running2. 如何處理阻塞式的中斷
參考java內置方法Thread.sleep(1000);或者wait()方法,其底層都會針對外部中斷操作做檢測,一旦感知到中斷就會提前返回,即執(zhí)行如下步驟:
- 清除中斷狀態(tài)
- 拋出InterruptedException讓被中斷線程感知異常
所以對于阻塞式中斷的的正確方式為:
通過合理的時機發(fā)出中斷請求,讓線程在下一個合適時候處理中斷
所以對于上述阻塞隊列操作來說,可以按照如下方式進行線程優(yōu)雅中斷:
- 對外暴露interrupt方法打斷當前線程,確保阻塞隊列插入阻塞時依然可以利用內置方法完成線程打斷
- 線程感知中斷時不可直接拋出異常,而是利用異常捕獲將資源處理清楚,再次執(zhí)行中斷循環(huán)監(jiān)測,正常退出線程邏輯
對應我們給出改造后的代碼,可以看到我們將cancel改為調用線程的中斷方法將線程中斷,同時在感知到中斷異常時會將執(zhí)行中斷后的兜底邏輯:
/**
* 停止時,通過cancel請求取消
*/
public void cancel() {
thread.interrupt();
}
@Override
public void run() {
//取消標識檢測,如果取消則直接結束循環(huán)
while (!Thread.currentThread().isInterrupted()) {
System.out.println("運行中......");
Integer element = RandomUtil.randomInt(10);
try {
queue.put(element);
} catch (InterruptedException e) {
//處理中斷
}
}
}3. 合理的中斷策略
筆者在上面的文章中對于拋出的異常給出了一段todo的伏筆,這里我們就來說說線程面對中斷異常后響應的哲學。一般來說,線程級或者服務級的中斷策略為:
- 盡快的退出
- 必要時完成手頭任務的清理
這也就是為什么java中各種并發(fā)包的類庫對于中斷的任務僅僅是拋出InterruptedException而不是直接處理掉中斷 ,例如ArrayBlockingQueue的put方法:
//將任務中的中斷InterruptedException 丟給調用棧的上層代碼執(zhí)行
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//上可打斷的鎖
lock.lockInterruptibly();
try {
//......
} finally {
//1. 釋放鎖
lock.unlock();
}
}而中斷策略的響應,正確的做法應該是讓執(zhí)行該任務的線程進行按照如下原則進行處理:
- 如果不具備處理的能力,則將異常向上傳遞
- 如果無法傳遞異常則顯示拋出中斷讓上層的調用棧感知。
以我們Task生產者代碼為例,我們將提交給線程即哪些繼承runnable或者lambda表達式統稱為任務,一般來說持有這些任務的線程不一定是這些任務的執(zhí)行者,它們僅擁有對于任務狀態(tài)管理的一些權限,例如一個主線程main方法調用thread-0異步執(zhí)行阻塞隊列存取操作:

所以從任務的維度來說,執(zhí)行任務的線程應該小心的保存中斷的狀態(tài),即在面對中斷時,它們不應該對中斷進行任何的干預,而是讓擁有線程的代碼段做出正確的響應,即讓thread-0感知到中斷異常然后將狀態(tài)狀態(tài)還原向上傳遞:

對應的我們也給出阻塞存儲元素的優(yōu)雅中斷處理:
- 輪詢檢測本地中斷標識,若未中斷執(zhí)行插入
- 感知中斷,捕獲異常并打印未能處理的元素
- 完成資源兜底,主動打斷線程將狀態(tài)向上傳遞
//外部線程打斷的方法
public void cancel() {
thread.interrupt();
}
@Override
public void run() {
try {
//取消標識檢測,如果取消則直接結束循環(huán)
while (!Thread.currentThread().isInterrupted()) {
System.out.println("運行中......");
Integer element = RandomUtil.randomInt(10);
try {
queue.put(element);
} catch (InterruptedException e) {
Console.log("線程中斷,未處理資源:{}", element);
Thread.currentThread().interrupt();
}
}
} finally {
if (thread.isInterrupted()) {
System.out.println("任務已取消");
}
}
}對應的我們也給出輸出結果,可以看到阻塞的隊列在被打斷后完成必要的資源兜底,就會將中斷狀態(tài)向上傳遞:
運行中......
運行中......
線程中斷,未處理資源:6
任務已取消4. 時刻保留中斷的狀態(tài)
需要注意的是,執(zhí)行者僅僅傳遞中斷還是不行的,更重要的一點是:
在必要時刻,保存中斷的狀態(tài),并返回前恢復狀態(tài),而不是捕獲到isInterrupted,避免陷入無限循環(huán)的漩渦。
很多情況下當前任務不具備處理中斷的能力,例如Runnable收到中斷的請求不可拋出異常交由上層調用棧處理,那么就在收到中斷請求,按照如下步驟執(zhí)行:
- 基于本地標識保留中斷狀態(tài)
- 完成必要的收尾工作
- 在返回前打斷該線程恢復中斷狀態(tài)
例如線程0循環(huán)獲取阻塞隊列元素,在因為沒有元素而阻塞時,線程1打斷該線程,已按照時刻保留中斷的狀態(tài)守則,線程0則應該按照如下步驟執(zhí)行:
- 收到中斷,利用本地變量保留中斷狀態(tài)
- 繼續(xù)循環(huán)等待元素獲取
- 獲取到元素并返回,在返回前將當前線程打斷,讓外部感知

對應的我們給出消費者循環(huán)獲取元素并處理狀態(tài)的代碼:
privatestaticfinal BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1);
public static String getNextElement() {
boolean interrupted = false;
try {
while (true) {
try {
//1. 等待結果返回而阻塞
return blockingQueue.take();
} catch (InterruptedException e) {
//2.收到異常中斷,小心保存中斷狀態(tài),繼續(xù)阻塞等待元素返回
interrupted = true;
Console.log("消費者線程中斷,待完成本次資源獲取后執(zhí)行中斷");
}
}
} finally {
//3. 返回前基于中斷標識將中斷狀態(tài)向上傳遞
if (interrupted) {
Console.log("消費者線程已中斷");
Thread.currentThread().interrupt();
}
}
}對應的我們也給出測試代碼,即在消費者阻塞后將其打斷,并投遞元素,讓其完成優(yōu)雅中斷:
public static void main(String[] args) {
//消費者線程
Thread thread = new Thread(() -> {
String nextElement = getNextElement();
Console.log("消費者線程獲取結果:{}", nextElement);
});
thread.start();
Console.log("消費者線程啟動");
//休眠5s后將task任務對應線程中斷
new Thread(() -> {
//休眠5s后將task任務對應線程中斷
ThreadUtil.sleep(5000);
thread.interrupt();
//休眠5s后再投遞元素
ThreadUtil.sleep(5000);
try {
String element = RandomUtil.randomString(10);
Console.log("生產者線程投遞:{}", element);
blockingQueue.put(element);
} catch (InterruptedException e) {
//......
}
}).start();
}輸出結果如下,可以看到消費者在收到中斷后明確保留中斷狀態(tài),并完成資源處理的工作后執(zhí)行中斷:
消費者線程啟動
消費者線程中斷,待完成本次資源獲取后執(zhí)行中斷
生產者線程投遞:7gmnj1rqpj
消費者線程已中斷
消費者線程獲取結果:7gmnj1rqpj5. 超時任務取消的最優(yōu)解
如果我們現在需要實現這樣以一個函數,該函數會接受外部傳入一個異步任務并提交到我們的線程池異步執(zhí)行,并具備如下要求:
- 要求在給定時間完成執(zhí)行
- 任務執(zhí)行完成后,要知曉是超時取消,還是正常執(zhí)行完成返回,即任務正確執(zhí)行則返回true,反之返回false
- 任務執(zhí)行過程中可被中斷

所以對于該需求,要做到如下幾點:
- 可以感知任務執(zhí)行完成并返回true
- 可以感知任務執(zhí)行超時,并返回false
- 任務可中斷,直接拋出讓上層代碼解決
對應我們給出如下代碼,可以看到我們采用submit獲取異步任務的Future對象,利用Future實現帶有時限的阻塞獲取,一旦超時則直接拋出超時異常,并在函數返回前的finally語句塊調用cancel取消任務,需要注意的是這個cancel方法并不會一味的取消任務:
- 如果任務已完成,cancel就會返回false
- 如果任務因為超時等原因調用cancel,那么任務則還是活躍的,調用cancel可以取消并直接返回true
所以基于cancel這個特點,我們直接取反,即可實現正確執(zhí)行返回true,超時返回false:
private staticfinal ExecutorService executor = ThreadUtil.newExecutor(10);
public static boolean get(Runnable r, int timeout) throws InterruptedException {
Future<?> future = executor.submit(r);
try {
future.get(timeout, TimeUnit.SECONDS);
} catch (TimeoutException e) {
Console.error("任務執(zhí)行超時");
} catch (ExecutionException e) {
thrownew RuntimeException(e);
} finally {
/*
1. 設置為true,如果任務是運行中,則取消任務,如果已經取消,則沒有任務效果
2. 如果任務已經完成,則返回false,反之返回true
*/
return !future.cancel(true);
}
}對應的我們也給出測試代碼:
public static void main(String[] args) {
try {
boolean isTimeOut = get(() -> {
ThreadUtil.sleep(5000);
Console.log("任務執(zhí)行完成");
}, 1);
Console.log("任務是否正常執(zhí)行:{}", isTimeOut);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}輸出結果如下,可以看到任務執(zhí)行超時后直接打斷休眠,任務執(zhí)行到已完成的輸出,然后執(zhí)行超時取消,如果取反得到false:
任務執(zhí)行超時
任務執(zhí)行完成
任務是否正常執(zhí)行:false6. 處理系統層面阻塞IO
有時候阻塞并非來自阻塞式并發(fā)包的調用,而是例如硬件層面文件IO或者網絡層面的socket IO,這種API涉及內核態(tài)調用,通過interrupt我們也只能修改中斷表示,無法直接將其中斷。
所以我們只能通過間接的手段干預其資源關閉來做到中斷,無論是socket還是文件IO,本質上都是針對系統或者網卡IO數據的阻塞讀取,所以我們可以直接通過關閉文件IO流或者socket套接字來間接打斷其資源讀取。

對應的我們以文件IO為例給出代碼示例,可以看到我們通過繼承thread重寫其中斷方法,當我們需要打斷系統資源時,直接關閉其流通道讓工作線程感知到這一點,然后通過原生interrupt修改中斷狀態(tài):
public class IOThread extends Thread {
privatefinal BufferedReader utf8Reader;
public IOThread(String path) {
utf8Reader = FileUtil.getUtf8Reader(path);
}
@Override
public synchronized void start() {
while (true) {
try {
String line = utf8Reader.readLine();
Console.log(line);
} catch (IOException e) {
//保存IO上下文狀態(tài)
thrownew RuntimeException(e);
}
}
}
@Override
public void interrupt() {
try {
//強制關閉IO讓其感知中斷
utf8Reader.close();
} catch (IOException e) {
thrownew RuntimeException(e);
} finally {
super.interrupt();
}
}
}對應的我們也給出使用示例,這段代碼會在中斷線程調用interrupt關閉流通道直接直接將IOUtil 線程打斷:
IOThread ioThread = new IOThread("F:\\test.txt");
Thread thread = new Thread(() -> {
ThreadUtil.sleep(10_000);
ioThread.interrupt();
});
thread.start();
ioThread.start();































