如何在 NestJS 中使用 Node.js 流高效處理大文件流式傳輸與數據處理
本文將學習如何在 NestJS 服務器上高效、可靠地處理大文件,結合 Node.js 流、S3 存儲桶以及 CSV 轉 JSON 的實際示例。閱讀本文后,你將不再擔心因大文件導致服務器崩潰的問題。
前置要求
為充分理解本文內容,你需要具備以下基礎知識:
- HTTP 下載與上傳的基本原理
- 使用 Multer 處理文件上傳的經驗
- AWS S3 SDK 的基本使用
- NestJS 架構的基本理解
項目初始化
首先,創建一個 NestJS 項目:
nest new stream-app
cd stream-app接著,生成所需的模塊與控制器文件:
nest g module files \
&& nest g controller files \
&& nest g service files \
&& nest g controller files/csv \
&& nest g service files/csv \
&& nest g controller files/s3 \
&& nest g service files/s3安裝項目所需依賴:
npm install multer csv-parser mime-types @aws-sdk/client-s3 @nestjs/config
npm install -D @types/multer @types/mime-types其中:
multer:用于處理文件上傳csv-parser:用于將 CSV 轉換為 JSONmime-types:用于設置正確的文件 Content-Type@aws-sdk/client-s3:用于上傳文件至 S3 兼容存儲(如 DigitalOcean Spaces)@nestjs/config:用于讀取環境變量
隨后,在 app.module.ts 中導入 ConfigModule:
import { Module } from"@nestjs/common";
import { AppController } from"./app.controller";
import { AppService } from"./app.service";
import { FilesModule } from"./files/files.module";
import { ConfigModule } from"@nestjs/config";
@Module({
imports: [ConfigModule.forRoot({ isGlobal: true }), FilesModule],
controllers: [AppController],
providers: [AppService],
})
exportclassAppModule {}最后,在項目根目錄下創建名為 storage 的文件夾,并放入一個至少 100MB 的大文件,例如:
stream-app/storage/large-report.pdfNestJS 中的基礎流式傳輸
向用戶發送大文件的錯誤方式是使用 readFileSync()。該方法會將整個文件加載到內存中,一次性發送,對大文件或高并發應用極不實用。
// 錯誤示例 —— 切勿使用
@Get('download-bad')
getFileBad(@Res() res: Response) {
const filePath = join(process.cwd(), 'storage', 'large-report.pdf');
const fileBuffer = readFileSync(filePath); // 將整個文件加載進內存
res.setHeader('Content-Type', 'application/pdf');
res.setHeader('Content-Disposition', 'attachment; filename="report.pdf"');
return res.send(fileBuffer); // 一次性發送全部緩沖數據
}幸運的是,Node.js 提供了流(Stream)機制,可以高效、漸進、非阻塞地處理數據。通過 createReadStream(),文件將以 64KB 的默認塊大小逐步讀取。
更新 files.controller.ts:
import {
Controller,
Get,
Query,
Res,
HttpException,
HttpStatus,
Post,
UploadedFile,
UseInterceptors,
ConsoleLogger,
} from"@nestjs/common";
import { Response } from"express";
import { extname, join } from"path";
import { createReadStream, statSync } from"fs";
import { StreamableFile } from"@nestjs/common";
import * as mime from"mime-types";
import { FilesService } from"./files.service";
import { FileInterceptor } from"@nestjs/platform-express";
import { diskStorage } from"multer";
@Controller("files")
exportclassFilesController {
constructor(private readonly filesService: FilesService) {}
@Get("download")
getFile(@Res({ passthrough: true }) res: Response) {
const filePath = join(process.cwd(), "storage", "large-report.pdf");
const fileStream = createReadStream(filePath);
res.set({
"Content-Type": "application/pdf",
"Content-Disposition": 'attachment; filename="report.pdf"',
});
returnnewStreamableFile(fileStream);
}
}在上述代碼中,@Res({ passthrough: true }) 告訴 NestJS 允許我們自定義響應頭,同時仍由框架負責發送響應數據,無需手動調用 res.send()。
我們設置的響應頭包括:
Content-Type:告知瀏覽器文件類型Content-Disposition:告知瀏覽器文件名及應觸發下載
StreamableFile(fileStream) 將原始流包裝成 NestJS 可識別的響應對象,適用于 Express 和 Fastify。若需切換至 Fastify,僅需修改 main.ts 并安裝適配器即可。
優化文件下載功能
上述示例雖可運行,但在生產環境中還需增強錯誤處理、輸入驗證、正確設置響應頭及復用邏輯。
更新 files.service.ts:
import {
Injectable,
StreamableFile,
NotFoundException,
BadRequestException,
} from"@nestjs/common";
import { join } from"path";
import { createReadStream, existsSync } from"fs";
import { ReadStream } from"fs";
@Injectable()
exportclassFilesService {
getFileStream(fileName: string): { stream: ReadStream; path: string } {
try {
// 基礎文件名驗證
if (!fileName || typeof fileName !== "string") {
thrownewBadRequestException("無效的文件名");
}
// 防止目錄遍歷攻擊
if (
fileName.includes("..") ||
fileName.includes("/") ||
fileName.includes("\\")
) {
thrownewBadRequestException(
"無效文件名:包含路徑遍歷字符"
);
}
const filePath = join(process.cwd(), "storage", fileName);
if (!existsSync(filePath)) {
thrownewNotFoundException(`文件 '${fileName}' 未找到`);
}
const stream = createReadStream(filePath);
return { stream, path: filePath };
} catch (error) {
if (
error instanceofNotFoundException ||
error instanceofBadRequestException
) {
throw error;
}
thrownewBadRequestException(
`獲取文件流失敗 ${fileName}: ${error.message}`
);
}
}
}上述代碼中:
- 驗證文件名非空且為字符串,防止崩潰
- 攔截路徑遍歷攻擊,確保僅能訪問
storage目錄內的文件 - 使用 NestJS 異常機制進行統一錯誤處理
existsSync()用于檢查指定路徑是否存在,存在返回true,否則返回false。
隨后更新 files.controller.ts,添加以下端點:
@Get('improved-download')
downloadFile(@Query('name') name: string, @Res({ passthrough: true }) res: Response) {
if (!name) {
thrownewHttpException('文件名是必需的', HttpStatus.BAD_REQUEST);
}
const { stream, path } = this.filesService.getFileStream(name);
const fileSize = statSync(path).size;
const fileExtension = extname(path);
const contentType = mime.lookup(fileExtension) || 'application/octet-stream';
res.set({
'Content-Type': contentType,
'Content-Disposition': `attachment; filename="${name}"`,
'Content-Length': fileSize.toString(),
'Cache-Control': 'no-cache, no-store, must-revalidate',
});
returnnewStreamableFile(stream);
}在該端點中:
- 使用查詢參數
name動態選擇文件 - 調用
getFileStream(name)獲取流與路徑 - 使用
statSync()獲取文件大小,用于瀏覽器顯示下載進度 - 通過
mime-types庫自動映射文件擴展名到正確的 MIME 類型(如application/pdf) - 設置
Cache-Control防止瀏覽器緩存過期文件
下載文件時,瀏覽器可能緩存響應,導致用戶獲取舊版本。通過設置
Cache-Control可避免此問題。
上傳大文件
接下來,我們將學習如何通過流式方式上傳文件至磁盤和 S3 存儲桶。
上傳至磁盤
在 FilesController 中添加以下上傳路由:
@Post('upload')
@UseInterceptors(
FileInterceptor('file', {
storage: diskStorage({
destination: './uploads',
filename: (req, file, callback) => {
const uniqueName = Date.now() + extname(file.originalname);
callback(null, uniqueName);
},
}),
limits: {
fileSize: 500 * 1024 * 1024, // 500MB
},
}),
)
handleUpload(@UploadedFile() file: Express.Multer.File) {
return {
message: '文件上傳成功',
filename: file.filename,
size: file.size,
};
}@UseInterceptors 是 NestJS 的裝飾器,用于為路由綁定攔截器。此處使用 FileInterceptor,它是對 Multer 的封裝,自動從請求中提取文件并解析。
diskStorage 將文件分塊寫入磁盤,而非加載至內存。filename 函數用于生成唯一文件名。
通過 @UploadedFile() 裝飾器,可獲取文件對象,包含 filename、originalname、mimetype、size、path 等信息。由于使用了 diskStorage,file.buffer 將為 undefined。
上傳至 S3
本例中,我們將先通過 diskStorage 上傳至本地,再將文件流直接推送至 S3 存儲桶。
本例使用 DigitalOcean Spaces,其完全兼容 S3 協議,使用相同的 AWS SDK,僅需替換端點與 CDN 地址。
更新 s3.service.ts:
import { Injectable } from"@nestjs/common";
import { ConfigService } from"@nestjs/config";
import { S3Client, PutObjectCommand } from"@aws-sdk/client-s3";
import { Readable } from"stream";
import * as path from"path";
@Injectable()
exportclassS3Service {
privates3: S3Client;
privatereadonlybucketName: string;
privatereadonlyendpoint: string;
privatereadonlyregion: string;
privatereadonlycdnUrl: string;
constructor(private readonly configService: ConfigService) {
this.bucketName = this.configService.getOrThrow<string>(
"DIGITAL_OCEAN_SPACE_BUCKET_NAME"
);
this.endpoint = this.configService.getOrThrow<string>(
"DIGITAL_OCEAN_SPACE_ENDPOINT"
);
this.region = this.configService.getOrThrow<string>(
"DIGITAL_OCEAN_SPACE_REGION"
);
this.cdnUrl = this.configService.getOrThrow<string>(
"DIGITAL_OCEAN_SPACE_CDN_URL"
);
const accessKeyId = this.configService.getOrThrow<string>(
"DIGITAL_OCEAN_SPACE_ACCESS_KEY_ID"
);
const secretAccessKey = this.configService.getOrThrow<string>(
"DIGITAL_OCEAN_SPACE_SECRET_KEY"
);
this.s3 = newS3Client({
endpoint: this.endpoint,
forcePathStyle: false,
region: this.region,
credentials: {
accessKeyId,
secretAccessKey,
},
});
}
asyncuploadImageStream(payload: {
location: string;
file: {
stream: Readable;
filename: string;
mimetype: string;
size: number;
};
}): Promise<{ path: string; key: string }> {
const { location, file } = payload;
const uid = Date.now().toString();
const extension = path.extname(file.filename);
const key = `${location}/${uid}${extension}`;
const command = newPutObjectCommand({
Bucket: this.bucketName,
Key: key,
Body: file.stream,
ContentLength: file.size,
});
try {
awaitthis.s3.send(command);
return {
path: `${this.cdnUrl}/${key}`,
key,
};
} catch (error) {
console.error("上傳文件流失敗:", error);
thrownewError("文件上傳失敗");
}
}
}在 uploadImageStream() 方法中:
- 生成唯一文件鍵(key)
- 使用 AWS SDK v3 創建上傳命令,將可讀流作為
Body - 在
try-catch中執行上傳并返回路徑與鍵
隨后更新 s3.controller.ts:
import {
Controller,
Post,
UploadedFile,
UseInterceptors,
BadRequestException,
} from"@nestjs/common";
import { FileInterceptor } from"@nestjs/platform-express";
import { diskStorage } from"multer";
import * as fs from"fs";
import * as path from"path";
import { S3Service } from"./s3.service";
@Controller("s3")
exportclassS3Controller {
constructor(private readonly s3Service: S3Service) {}
@Post("upload")
@UseInterceptors(
FileInterceptor("file", {
storage: diskStorage({
destination: "./uploads",
filename: (req, file, cb) => {
cb(null, `${Date.now()}-${file.originalname}`);
},
}),
limits: { fileSize: 200 * 1024 * 1024 },
})
)
asyncuploadToS3(@UploadedFile() file: Express.Multer.File) {
if (!file) {
thrownewBadRequestException("未上傳文件");
}
const location = "uploads";
const filePath = file.path;
const readStream = fs.createReadStream(filePath);
const { size } = fs.statSync(filePath);
try {
const uploadResult = awaitthis.s3Service.uploadImageStream({
location,
file: {
stream: readStream,
filename: file.originalname,
mimetype: file.mimetype,
size,
},
});
return {
message: "文件已上傳至 S3",
...uploadResult,
};
} catch (error) {
thrownewError(`文件上傳失敗: ${error.message}`);
} finally {
// 清理臨時文件
if (file.path && fs.existsSync(file.path)) {
fs.unlinkSync(file.path);
}
}
}
}在 uploadToS3 中:
- 將文件流與元數據傳遞給
uploadImageStream() - 成功后返回 S3 路徑與鍵
- 最終通過
fs.unlinkSync()刪除本地臨時文件
處理大文件:CSV 轉 JSON 示例
更新 csv.service.ts:
import { Injectable, BadRequestException } from"@nestjs/common";
import * as csv from"csv-parser";
import { Readable } from"stream";
exportinterfaceCsvRow {
[key: string]: string;
}
exportinterfaceCsvProcessingResult {
totalRows: number;
data: CsvRow[];
}
@Injectable()
exportclassCsvService {
asyncprocessCsvStream(fileStream: Readable): Promise<CsvProcessingResult> {
returnnewPromise((resolve, reject) => {
constresults: CsvRow[] = [];
// 創建 CSV 解析流
const csvStream = csv();
// 錯誤處理
csvStream.on("error", (error) => {
reject(newBadRequestException(`CSV 解析失敗: ${error.message}`));
});
// 處理完成
csvStream.on("end", () => {
resolve({
totalRows: results.length,
data: results,
});
});
// 流式處理
fileStream.pipe(csvStream).on("data", (data: CsvRow) => {
results.push(data);
// 對于超大文件,建議替換為數據庫寫入邏輯:
// this.databaseService.insertRow(data);
// 或批量累積后批量插入以提升性能
});
});
}
}在 processCsvStream() 中:
- 創建 Promise 處理異步流
- 使用
csv-parser創建轉換流,將 CSV 數據逐行轉為 JSON 對象 - 通過
fileStream.pipe(csvStream)將原始數據輸入解析器 - 每解析一行,觸發
data事件,將結果存入數組 - 出錯時拒絕 Promise,完成時返回結果
更新 csv.controller.ts:
import {
Controller,
Post,
UploadedFile,
UseInterceptors,
BadRequestException,
} from"@nestjs/common";
import { FileInterceptor } from"@nestjs/platform-express";
import { diskStorage } from"multer";
import * as fs from"fs";
import { CsvService } from"./csv.service";
@Controller("csv")
exportclassCsvController {
constructor(private readonly csvService: CsvService) {}
@Post("upload")
@UseInterceptors(
FileInterceptor("file", {
storage: diskStorage({
destination: "./uploads",
filename: (req, file, cb) => {
cb(null, `${Date.now()}-${file.originalname}`);
},
}),
limits: { fileSize: 50 * 1024 * 1024 }, // 50MB 限制
})
)
asynchandleCsvUpload(@UploadedFile() file: Express.Multer.File) {
if (!file) {
thrownewBadRequestException("未上傳文件");
}
// 創建文件讀取流(真正流式處理)
const fileStream = fs.createReadStream(file.path);
try {
// 使用服務流式處理 CSV
const result = awaitthis.csvService.processCsvStream(fileStream);
return {
message: "CSV 處理成功",
filename: file.originalname,
...result,
};
} catch (error) {
thrownewBadRequestException(`CSV 處理失敗: ${error.message}`);
} finally {
// 清理臨時文件
if (file.path && fs.existsSync(file.path)) {
fs.unlinkSync(file.path);
}
}
}
}最后,確認 files.module.ts 中的控制器與提供者配置正確:
import { Module } from"@nestjs/common";
import { FilesController } from"./files.controller";
import { FilesService } from"./files.service";
import { CsvController } from"./csv/csv.controller";
import { S3Controller } from"./s3/s3.controller";
import { S3Service } from"./s3/s3.service";
import { CsvService } from"./csv/csv.service";
@Module({
controllers: [FilesController, CsvController, S3Controller],
providers: [FilesService, S3Service, CsvService],
})
exportclassFilesModule {}總結
本文詳細介紹了在 NestJS 中如何通過 Node.js 流實現:
- 大文件下載
- 文件上傳至磁盤與 S3
- CSV 文件流式轉換為 JSON
你已掌握正確的實踐方式、常見陷阱及其原因。下一步可考慮:
- 將 CSV 解析結果直接寫入數據庫
- 為 S3 上傳添加重試機制
原文鏈接:https://www.telerik.com/blogs/how-stream-large-files-handle-data-efficiently-nodejs-streams-nestjs作者:Christian Nwamba



























