SpringBoot + ResponseBodyEmitter 實時異步流式推送,優雅!
ChatGPT 的火爆,讓流式輸出技術迅速走進大眾視野。在那段時間里,許多熱愛鉆研技術的小伙伴紛紛開始學習和實踐 SSE 異步處理。
我當時也寫過相關文章,今天,咱們換一種更為簡便的方式來實現流式輸出,那就是 ResponseBodyEmitter。
其實,ResponseBodyEmitter 并非新技術,早在 Spring Framework 4.2 版本就已被引入。直到最近,我們在開發一個滾動日志輸出功能時,才深入了解到它的強大之處。
ResponseBodyEmitter 的作用
相較于 SSE 技術,ResponseBodyEmitter 更加簡單易用。它主要用于處理異步的 HTTP 響應,其核心優勢在于 允許逐步將數據發送到客戶端,而非一次性發送所有內容。這一特性使得它在需要長時間處理或進行流式傳輸的場景中表現出色。需要注意的是,ResponseBodyEmitter 本質上是一個接口。
使用場景
- 長輪詢:服務器在有數據時會立即響應客戶端請求,若暫無數據,則保持連接開放,等待數據到來。
- **服務器推送事件 (SSE)**:服務器能夠持續不斷地向客戶端推送各類事件,實現實時交互。
- 流式傳輸:可逐步發送大量數據,像文件下載或者實時數據流傳輸等場景都適用。
- 異步處理:在處理耗時任務時,能逐步返回處理結果,避免客戶端長時間等待,提升用戶體驗。
業務場景舉例
在實際業務中,ResponseBodyEmitter 有著廣泛的應用,比如進度條的實時更新、實時聊天功能、股票價格的實時更新、系統日志的流式輸出以及 AI 的流式響應等。
實時日志流實戰
接下來,我們通過一個簡單的實時日志流功能,來深入了解 ResponseBodyEmitter 的使用。假設我們有一個應用程序,需要實時查看服務器的日志,以便快速定位和解決問題。
創建控制器
首先,我們在 Spring Boot 應用中創建一個控制器,借助 ResponseBodyEmitter 實現實時日志流。
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
@RestController
@RequestMapping("/api/log")
publicclass LogController {
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseBodyEmitter streamLogs() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// 開啟異步線程處理數據并發送
new Thread(() -> {
try {
while (true) {
String logEntry = getLatestLogEntry();
if (logEntry != null) {
emitter.send(logEntry);
}
// 每秒檢查一次日志更新
Thread.sleep(1000);
}
} catch (Exception e) {
// 出現異常時結束響應并傳遞錯誤信息
emitter.completeWithError(e);
}
}).start();
return emitter;
}
private String getLatestLogEntry() {
// 模擬從日志文件中獲取最新日志條目
return"2025-02-12 12:00:00 - INFO: User logged in successfully.";
}
}運行效果
當我們啟動這個應用程序,并訪問 /api/log/stream 路徑時,就能看到一個實時更新的日志流。服務器會每秒向客戶端推送一條新的日志條目,客戶端會將其顯示在頁面上,效果如下:
運行效果
ResponseBodyEmitter 的核心方法
- send(Object data):向客戶端發送數據,該方法可以多次調用,實現數據的逐步發送。
- complete():用于結束響應流,表示數據已經全部發送完畢。
- onTimeout(Runnable callback):設置超時回調函數,當連接超時時,會執行該回調。
- onCompletion(Runnable callback):設置完成回調函數,當數據發送完成后,會執行該回調。
ResponseBodyEmitter 工作原理
異步數據生成與推送
在傳統的 HTTP 請求 - 響應模式中,服務器通常需要等待整個響應數據生成完成后,才會將其一次性發送給客戶端。關注公眾號:碼猿技術專欄,回復關鍵詞:1111 獲取阿里內部Java性能調優手冊!而 ResponseBodyEmitter 打破了這種模式,它允許服務端在任務執行過程中異步地生成響應數據。
當有部分數據準備好時,就可以立即調用 send() 方法將這些數據推送給客戶端,而無需等待整個任務完成。這就好比一場接力賽,每完成一段賽程(生成一部分數據),就馬上將接力棒(數據)傳遞給客戶端,大大提高了數據傳輸的實時性。
分塊傳輸機制
ResponseBodyEmitter 采用了 HTTP 的分塊編碼(Chunked Encoding)方式來傳輸數據。在傳統的 HTTP 響應中,通常需要在響應頭中明確指定 Content-Length,表示整個響應數據的長度。但在分塊傳輸中,服務器不會提前設置 Content-Length,而是將數據分成多個獨立的塊,每個塊都有自己的長度標識。
客戶端在接收到數據塊后,可以立即對其進行處理,而不必等待整個響應數據接收完畢。這種方式使得數據可以邊生成邊傳輸,減少了客戶端的等待時間,提高了用戶體驗。
連接生命周期管理
為了確保資源的合理使用,ResponseBodyEmitter 提供了對連接生命周期的有效管理。當所有數據都發送完畢后,需要調用 complete() 方法來明確告知客戶端響應結束,關閉連接。如果在數據傳輸過程中出現異常,可以調用 completeWithError() 方法,結束響應并向客戶端傳遞錯誤信息。
這樣可以避免連接長時間保持開放,造成資源浪費。
注意事項
- 客戶端支持:雖然大多數瀏覽器和 HTTP 客戶端庫都支持分塊傳輸,但某些老舊的客戶端可能存在兼容性問題。
- 超時設置:為避免長連接長時間占用資源,可以為 ResponseBodyEmitter 設置超時時間,示例代碼如下:
emitter.onTimeout(() -> emitter.complete());- 線程安全:ResponseBodyEmitter 的 send() 方法是線程安全的,但在使用時需要注意控制任務線程的生命周期,避免出現資源泄漏。
- 連接關閉:務必確保在任務結束時調用 complete() 或 completeWithError() 方法,否則可能導致連接無法正常關閉,造成資源浪費。
與 Streaming 和 SSE 的對比
- Streaming:直接通過 OutputStream 向客戶端寫入數據,靈活性較高,但需要手動處理流的關閉,增加了開發的復雜度。
- Server-Sent Events (SSE):基于 text/event-stream 協議,適用于服務端事件推送場景,但要求客戶端支持 SSE 協議。
- ResponseBodyEmitter:通用性更強,適用于任何支持 HTTP 的客戶端,并且易于與 Spring 框架集成,是一種更為便捷的流式傳輸解決方案。
在處理類似 AI 這種響應式的流式輸出場景時,相較于 SSE,ResponseBodyEmitter 作為 Spring 提供的輕量級流式傳輸解決方案,在 HTTP 協議兼容性方面表現更優。
小結
ResponseBodyEmitter 是 Spring 框架提供的輕量級流式傳輸解決方案,它能夠顯著提升高并發和實時性場景下的用戶體驗。通過 ResponseBodyEmitter,我們可以輕松實現服務器向客戶端的實時數據推送。
無論是進度條的實時更新、實時聊天、股票價格的實時監控還是系統日志的流式輸出,ResponseBodyEmitter 都能幫助我們構建更加動態和互動的應用程序。

































