搞懂 io_uring,優化 I/O 不費勁
在計算機系統中,I/O 操作的性能一直是影響系統整體性能的關鍵因素。無論是文件讀寫、網絡通信還是數據庫訪問,高效的 I/O 處理都至關重要。對于 Linux 系統而言,I/O 模型的發展經歷了多個階段,從早期的阻塞式 I/O 到如今強大的 io_uring,每一次變革都為開發者帶來了更高效、更靈活的 I/O 處理方式。
它作為 Linux 內核異步 I/O 領域的革新者,旨在打破傳統異步 I/O 模型的性能束縛,為開發者提供更高效、更強大的 I/O 處理能力。那么,io_uring 究竟有何獨特之處?它是如何實現高性能異步 I/O 的?接下來,讓我們一起深入探究 io_uring 的奧秘。
一、傳統 I/O 模型的痛點
在深入了解 io_uring 之前,讓我們先回顧一下傳統 I/O 模型,剖析它們在應對高并發、高性能需求時所面臨的挑戰。
1.1 阻塞式 I/O
阻塞式 I/O 是最基礎、最直觀的 I/O 模型。在這種模型下,當應用程序執行 I/O 操作(如 read 或 write)時,進程會被阻塞,直到 I/O 操作完成。例如,當從文件中讀取數據時,如果數據尚未準備好,進程就會一直等待,期間無法執行其他任務。就好比你去餐廳點餐,然后一直在餐桌旁等待食物上桌,在等待的過程中什么也做不了。
以一個簡單的文件讀取操作為例,假設我們有如下代碼:
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#define BUFFER_SIZE 1024
int main() {
int fd = open("example.txt", O_RDONLY);
if (fd == -1) {
perror("open");
return 1;
}
char buffer[BUFFER_SIZE];
ssize_t bytes_read = read(fd, buffer, BUFFER_SIZE);
if (bytes_read == -1) {
perror("read");
close(fd);
return 1;
}
printf("Read %zd bytes: %.*s\n", bytes_read, (int)bytes_read, buffer);
close(fd);
return 0;
}在這段代碼中,read函數會阻塞進程,直到數據從文件中讀取到緩沖區。如果文件很大或者讀取過程中出現延遲,進程將長時間處于阻塞狀態,無法處理其他任務。
在高并發的 Web 服務器場景中,如果使用阻塞式 I/O,每一個客戶端連接都需要一個獨立的線程來處理。當并發連接數增多時,線程資源將被大量消耗,系統性能會急劇下降。因為每個線程在等待 I/O 操作完成時,都會占用一定的系統資源(如棧空間、寄存器等),而線程的創建和銷毀也會帶來額外的開銷。
1.2 非阻塞式 I/O
為了解決阻塞式 I/O 的問題,非阻塞式 I/O 應運而生。在非阻塞式 I/O 模型中,當應用程序執行 I/O 操作時,如果數據尚未準備好,系統不會阻塞進程,而是立即返回一個錯誤(如 EWOULDBLOCK 或 EAGAIN)。應用程序可以繼續執行其他任務,然后通過輪詢的方式再次嘗試 I/O 操作,直到數據準備好。這就像你在餐廳點餐時,服務員告訴你需要等待一段時間,你可以先去做其他事情,然后時不時回來詢問食物是否準備好了。
在 Linux 中,可以通過fcntl函數將文件描述符設置為非阻塞模式,示例代碼如下:
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#define BUFFER_SIZE 1024
int main() {
int fd = open("example.txt", O_RDONLY | O_NONBLOCK);
if (fd == -1) {
perror("open");
return 1;
}
char buffer[BUFFER_SIZE];
while (1) {
ssize_t bytes_read = read(fd, buffer, BUFFER_SIZE);
if (bytes_read == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 數據未準備好,繼續執行其他任務或再次輪詢
usleep(1000); // 稍微等待一下再輪詢
continue;
} else {
perror("read");
close(fd);
return 1;
}
}
break;
}
printf("Read %zd bytes: %.*s\n", bytes_read, (int)bytes_read, buffer);
close(fd);
return 0;
}非阻塞式I/O提高了系統的并發處理能力,進程在等待 I/O操作的過程中可以執行其他任務。但是,頻繁的輪詢會消耗大量的 CPU 資源,增加了系統的開銷。而且非阻塞式 I/O 的編程復雜度較高,需要處理更多的錯誤和狀態判斷。例如,在上述代碼中,需要不斷地檢查read函數的返回值和errno來判斷 I/O操作的狀態 。
1.3 I/O 多路復用
I/O 多路復用是在非阻塞式 I/O 的基礎上進一步發展而來的,它允許一個進程同時監視多個 I/O 描述符(如文件描述符、套接字等),當其中任何一個描述符就緒(即有數據可讀或可寫)時,進程就可以對其進行處理。常見的 I/O 多路復用技術有 select、poll 和 epoll。
以 select 為例,應用程序通過調用 select 函數,將需要監視的 I/O 描述符集合傳遞給內核,內核會監視這些描述符的狀態,當有描述符就緒時,select 函數返回,應用程序再對就緒的描述符進行 I/O 操作。這就好比你在餐廳同時點了多道菜,你只需要等待服務員一次性通知你哪些菜已經準備好了,然后去取相應的菜,而不需要每道菜都單獨詢問。
#include <stdio.h>
#include <sys/select.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#define BUFFER_SIZE 1024
#define FD_SETSIZE 1024
int main() {
int fd1 = open("file1.txt", O_RDONLY | O_NONBLOCK);
int fd2 = open("file2.txt", O_RDONLY | O_NONBLOCK);
if (fd1 == -1 || fd2 == -1) {
perror("open");
return 1;
}
fd_set read_fds;
FD_ZERO(&read_fds);
FD_SET(fd1, &read_fds);
FD_SET(fd2, &read_fds);
int max_fd = (fd1 > fd2)? fd1 : fd2;
int ret = select(max_fd + 1, &read_fds, NULL, NULL, NULL);
if (ret == -1) {
perror("select");
close(fd1);
close(fd2);
return 1;
} else if (ret > 0) {
char buffer[BUFFER_SIZE];
if (FD_ISSET(fd1, &read_fds)) {
ssize_t bytes_read = read(fd1, buffer, BUFFER_SIZE);
if (bytes_read == -1) {
perror("read fd1");
} else {
printf("Read from fd1: %.*s\n", (int)bytes_read, buffer);
}
}
if (FD_ISSET(fd2, &read_fds)) {
ssize_t bytes_read = read(fd2, buffer, BUFFER_SIZE);
if (bytes_read == -1) {
perror("read fd2");
} else {
printf("Read from fd2: %.*s\n", (int)bytes_read, buffer);
}
}
}
close(fd1);
close(fd2);
return 0;
}在上述代碼中,通過select函數同時監視fd1和fd2兩個文件描述符,當其中任何一個有數據可讀時,select函數返回,然后分別檢查FD_ISSET來判斷是哪個描述符就緒并進行讀取操作。
而 epoll 是 Linux 2.6 內核引入的高性能 I/O 多路復用機制,它通過三個系統調用來實現:epoll_create創建一個 epoll 實例,epoll_ctl添加 / 刪除文件描述符到 epoll 實例中,epoll_wait等待 I/O 事件。epoll 使用事件驅動機制,僅返回已就緒的文件描述符,避免了 select 和 poll 的線性遍歷開銷。例如在 Nginx 中,就大量使用了 epoll 來處理高并發的網絡連接,使得 Nginx 能夠高效地應對成千上萬的并發請求 。
盡管如此,在高并發場景下,大量的事件處理也可能成為 epoll 的性能瓶頸。當需要處理的事件數量非常大時,epoll_wait的返回和事件處理過程可能會產生一定的延遲,影響系統的整體性能 。
二、io_uring 閃亮登場
2.1 io_uring 是什么?
io_uring 是 Linux 內核提供的高性能異步 I/O 框架,于 2019 年在 Linux 5.1 版本中首次引入,由 Jens Axboe 開發。它旨在解決傳統異步 I/O 模型(如 epoll 或 POSIX AIO)在大規模 I/O 操作中的效率問題,是 Linux 異步 I/O 領域的一次重大革新。
在 io_uring 出現之前,傳統 I/O 模型在高并發場景下存在諸多性能瓶頸,如系統調用開銷大、數據拷貝次數多、異步處理能力有限等。而 io_uring 通過創新的設計,實現了低時延、低開銷、異步、高吞吐的 I/O 操作,為開發者提供了更強大的 I/O 處理能力。
io_uring 的核心概念主要包括提交隊列(SQ)、完成隊列(CQ)、提交隊列項(SQE)和完成隊列項(CQE):
- 提交隊列(SQ,Submission Queue):用于存放用戶空間提交的 I/O 請求。用戶將 I/O 請求填充到 SQ 中,并通知內核有新的請求需要處理。它是一個環形隊列,用戶通過操作隊列的 tail 指針來寫入新的請求 。
- 完成隊列(CQ,Completion Queue):用于存放已經完成的 I/O 請求結果。內核在處理完 I/O 請求后,會將結果填充到 CQ 中,并通知用戶空間有請求已完成。同樣是環形隊列,用戶通過操作隊列的 head 指針來讀取完成的結果 。
- 提交隊列項(SQE,Submission Queue Entry):表示一個具體的 I/O 請求,包含了操作類型(如 READ、WRITE、ACCEPT 等)、文件描述符、緩沖區地址、偏移量、數據長度等信息。用戶通過填充 SQE 來描述 I/O 請求,并將其放入 SQ 隊列 。例如:
struct io_uring_sqe {
__u8 opcode; // 操作類型,如 READ, WRITE, ACCEPT…
__u8 flags;
__u16 ioprio;
__s32 fd;
__u64 offset;
__u64 addr; // 用戶緩沖區地址
__u32 len;
__u64 user_data; // 用戶自定義數據(回調、標識等)
};完成隊列項(CQE,Completion Queue Entry):表示一個 I/O 請求的完成結果,包含返回值(成功時為字節數,失敗時為 -errno)、用戶自定義數據等信息。用戶從 CQ 隊列中讀取 CQE 來獲取 I/O 請求的執行結果 。其數據結構如下:
struct io_uring_cqe {
__u64 user_data; // 與 SQE 中設置的一致
__s32 res; // 返回值:成功時為字節數,失敗時為 -errno
__u32 flags;
};SQ 和 CQ 通過內存映射(mmap)的方式映射到用戶空間,使得用戶態和內核態可以直接訪問,避免了頻繁的系統調用和數據拷貝。用戶是 SQ 的生產者,內核是消費者;內核是 CQ 的生產者,用戶是消費者 。
2.2 設計目標與特點
統一網絡和磁盤異步 I/O:在 io_uring 之前,Linux 的網絡 I/O 和磁盤 I/O 使用不同的機制,這給開發者帶來了很大的不便。io_uring 的設計目標之一就是統一網絡和磁盤異步 I/O,使得開發者可以使用統一的接口來處理不同類型的 I/O 操作。這就像一個萬能的工具,無論你是處理網絡數據的傳輸,還是磁盤文件的讀寫,都可以使用 io_uring 這個工具,而不需要在不同的工具之間切換。
提供統一完善的異步 API:它提供了一套統一且完善的異步 API,簡化了異步 I/O 編程。在傳統的 I/O 模型中,開發者可能需要使用多個不同的函數和系統調用來實現異步 I/O,而且這些接口可能并不統一,容易出錯。io_uring 將這些復雜的操作封裝成了簡單易用的 API,開發者只需要調用這些 API,就可以輕松地實現異步 I/O 操作,降低了編程的難度和出錯的概率。
支持異步、輪詢、無鎖、零拷貝:io_uring 支持異步操作,應用程序在發起 I/O 請求后不必等待操作完成,可以繼續執行其他任務,提高了系統的并發處理能力;它還支持輪詢模式,不依賴硬件的中斷,通過調用 IORING_ENTER_GETEVENTS 不斷輪詢收割完成事件,減少了中斷開銷;同時,io_uring 采用了無鎖設計,避免了鎖競爭帶來的性能損耗;在數據傳輸過程中,io_uring 支持零拷貝技術,減少了數據在用戶空間和內核空間之間的拷貝次數,提高了數據傳輸的效率。例如,在一個文件傳輸的場景中,使用 io_uring 可以大大減少數據拷貝的時間,提高文件傳輸的速度。
2.3 io_uring設計思路
(1)解決“系統調用開銷大”的問題?
針對這個問題,考慮是否每次都需要系統調用。如果能將多次系統調用中的邏輯放到有限次數中來,就能將消耗降為常數時間復雜度。
(2)解決“拷貝開銷大”的問題?
之所以在提交和完成事件中存在大量的內存拷貝,是因為應用程序和內核之間的通信需要拷貝數據,所以為了避免這個問題,需要重新考量應用與內核間的通信方式。我們發現,兩者通信,不是必須要拷貝,通過現有技術,可以讓應用與內核共享內存。
要實現核外與內核的零拷貝,最佳方式就是實現一塊內存映射區域,兩者共享一段內存,核外往這段內存寫數據,然后通知內核使用這段內存數據,或者內核填寫這段數據,核外使用這部分數據。因此,需要一對共享的ring buffer用于應用程序和內核之間的通信。
- 一塊用于核外傳遞數據給內核,一塊是內核傳遞數據給核外,一方只讀,一方只寫。
- 提交隊列SQ(submission queue)中,應用是IO提交的生產者,內核是消費者。
- 完成隊列CQ(completion queue)中,內核是IO完成的生產者,應用是消費者。
- 內核控制SQ ring的head和CQ ring的tail,應用程序控制SQ ring的tail和CQ ring的head
(3)解決“API不友好”的問題?
問題在于需要多個系統調用才能完成,考慮是否可以把多個系統調用合而為一。有時候,將多個類似的函數合并并通過參數區分不同的行為是更好的選擇,而有時候可能需要將復雜的函數分解為更簡單的部分來進行重構。
如果發現函數中的某一部分代碼可以獨立出來成為一個單獨的函數,可以先進行這樣的提煉,然后再考慮是否需要進一步使用參數化方法重構。
三、io_uring的實現原理
io_uring實現異步I/O的方式其實是一個生產者-消費者模型:
- 用戶進程生產I/O請求,放入提交隊列(Submission Queue,簡稱SQ)。
- 內核消費SQ中的I/O請求,完成后將結果放入完成隊列(Completion Queue,簡稱CQ)。
- 用戶進程從CQ中收割I/O結果。
SQ和CQ是內核初始化io_uring實例的時候創建的。為了減少系統調用和減少用戶進程與內核之間的數據拷貝,io_uring使用mmap的方式讓用戶進程和內核共享SQ和CQ的內存空間。
另外,由于先提交的I/O請求不一定先完成,SQ保存的其實是一個數組索引(數據類型 uint32),真正的SQE(Submission Queue Entry)保存在一個獨立的數組(SQ Array)。所以要提交一個I/O請求,得先在SQ Array中找到一個空閑的SQE,設置好之后,將其數組索引放到SQ中。
用戶進程、內核、SQ、CQ和SQ Array之間的基本關系如下:
圖片
3.1 核心組件解析
提交隊列(SQ)與提交隊列項(SQE):提交隊列(Submission Queue,簡稱 SQ)是 io_uring 中用于存儲 I/O 請求的隊列,它是一個環形緩沖區,位于用戶態和內核態共享的內存區域。每個 I/O 請求在提交隊列中都以提交隊列項(Submission Queue Entry,簡稱 SQE)的形式存在。SQE 是一個結構體,它存儲了 I/O 請求的詳細信息,包括操作類型(如讀、寫、異步連接等)、目標文件描述符、緩沖區地址、操作長度、偏移量等關鍵信息。
例如,在進行文件讀取操作時,SQE 會記錄要讀取的文件描述符、讀取數據的緩沖區地址、讀取的字節數以及文件中的偏移量等信息。應用程序通過填充 SQE 結構體,并將其添加到 SQ 中,來向內核提交 I/O 請求。由于 SQ 是環形緩沖區,當隊列滿時,新的請求會覆蓋舊的請求,從而保證 I/O 請求的持續提交。
完成隊列(CQ)與完成隊列項(CQE):完成隊列(Completion Queue,簡稱 CQ)同樣是一個環形緩沖區,用于存儲 I/O 請求的完成結果。當內核完成一個 I/O 操作后,會將操作的結果封裝成一個完成隊列項(Completion Queue Entry,簡稱 CQE),并將其放入 CQ 中。CQE 結構體包含了 I/O 操作的返回值、狀態碼、用戶自定義數據等信息。
通過這些信息,應用程序可以判斷 I/O 操作是否成功,并獲取操作的相關結果。比如,在文件讀取操作完成后,CQE 中的返回值會表示實際讀取的字節數,狀態碼則用于指示操作是否成功,若操作失敗,狀態碼會包含具體的錯誤信息。應用程序可以通過輪詢 CQ 或者等待特定的事件通知,來獲取完成的 I/O 請求結果,從而進行后續的處理。
SQ Ring 與 CQ Ring:SQ Ring 和 CQ Ring 分別是提交隊列和完成隊列的環形緩沖區結構。它們包含了隊列本身(即 SQ 和 CQ)、頭部索引(head)、尾部索引(tail)以及隊列大小等關鍵信息。頭部索引(head)指向隊列中第一個待處理的元素,而尾部索引(tail)則指向隊列中下一個空閑的位置。當應用程序向 SQ 提交 I/O 請求時,它會將請求信息填充到 tail 指向的 SQE 中,然后將 tail 指針遞增,指向下一個空閑位置。
內核在處理 I/O 請求時,會從 head 指向的 SQE 中獲取請求信息,處理完成后,將結果放入 CQ 中。同樣,CQ Ring 通過 head 和 tail 指針來管理完成隊列,內核將完成的 I/O 結果放入 tail 指向的 CQE 中,并遞增 tail 指針,應用程序則從 head 指向的 CQE 中獲取結果。這種環形緩沖區結構以及基于 head 和 tail 指針的操作方式,實現了用戶態和內核態之間高效的數據交換,減少了鎖的使用和上下文切換的開銷,從而大大提高了 I/O 操作的效率。
3.2 系統調用詳解
io_uring的實現僅僅使用了三個syscall:io_uring_setup, io_uring_enter和io_uring_register。這幾個系統調用接口都在io_uring.c文件中:
⑴io_uring_setup是用于初始化 io_uring 環境的系統調用。在使用 io_uring 進行異步 I/O 操作之前,首先需要調用 io_uring_setup 來創建一個 io_uring 實例。它接受兩個參數,第一個參數是期望的提交隊列(SQ)的大小,即隊列中可以容納的 I/O 請求數量;第二個參數是一個指向 io_uring_params 結構體的指針,該結構體用于返回 io_uring 實例的相關參數,如實際分配的 SQ 和完成隊列(CQ)的大小、隊列的偏移量等信息。
在調用 io_uring_setup 時,內核會為 io_uring 實例分配所需的內存空間,包括 SQ、CQ 以及相關的控制結構。同時,內核還會創建一些內部數據結構,用于管理和調度 I/O 請求。如果初始化成功,io_uring_setup 會返回一個文件描述符,這個文件描述符用于標識創建的 io_uring 實例,后續的 io_uring 系統調用(如 io_uring_enter、io_uring_register)將通過這個文件描述符來操作該 io_uring 實例。若初始化失敗,函數將返回一個負數,表示相應的錯誤代碼。
io_uring_setup():
SYSCALL_DEFINE2(io_uring_setup, u32, entries,
struct io_uring_params __user *, params)
{
return io_uring_setup(entries, params);
}- 功能:用于初始化和配置 io_uring 。
- 應用用途:在使用 io_uring 之前,首先需要調用此接口初始化一個 io_uring 環,并設置其參數。
⑵io_uring_enter是用于提交和等待 I/O 操作的系統調用。它的主要作用是將應用程序準備好的 I/O 請求提交給內核,并可以選擇等待這些操作完成。io_uring_enter 接受多個參數,其中包括 io_uring_setup 返回的文件描述符,用于指定要操作的 io_uring 實例;to_submit 參數表示要提交的 I/O 請求的數量,即從提交隊列(SQ)中取出并提交給內核的 SQE 的數量;min_complete 參數指定了內核在返回之前必須等待完成的 I/O 操作的最小數量;flags 參數則用于控制 io_uring_enter 的行為,例如可以設置是否等待 I/O 操作完成、是否獲取完成的 I/O 事件等。當調用 io_uring_enter 時,如果 to_submit 參數大于 0,內核會從 SQ 中取出相應數量的 SQE,并將這些 I/O 請求提交到內核中進行處理。
同時,如果設置了等待 I/O 操作完成的標志,內核會阻塞等待,直到至少有 min_complete 個 I/O 操作完成,然后將這些完成的操作結果放入完成隊列(CQ)中。應用程序可以通過檢查 CQ 來獲取這些完成的 I/O 請求的結果。通過 io_uring_enter,應用程序可以靈活地控制 I/O 請求的提交和等待策略,提高 I/O 操作的效率和靈活性。
io_uring_enter():
SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
u32, min_complete, u32, flags, const void __user *, argp,
size_t, argsz)- 功能:用于提交和處理異步 I/O 操作。
- 應用用途:在向 io_uring 環中提交 I/O 操作后,通過調用此接口觸發內核處理這些操作,并獲取完成的操作結果。
⑶io_uring_register用于注冊文件描述符或事件文件描述符到 io_uring 實例中,以便在后續的 I/O 操作中使用。它接受四個參數,第一個參數是 io_uring_setup 返回的文件描述符,用于指定要注冊到的 io_uring 實例;第二個參數 opcode 表示注冊的類型,例如可以是 IORING_REGISTER_FILES(注冊文件描述符集合)、IORING_REGISTER_BUFFERS(注冊內存緩沖區)、IORING_REGISTER_EVENTFD(注冊 eventfd 用于通知完成事件)等;
第三個參數 arg 是一個指針,根據 opcode 的類型不同,它指向不同的內容,如注冊文件描述符時,arg 指向一個包含文件描述符的數組;注冊緩沖區時,arg 指向一個描述緩沖區的結構體數組;第四個參數 nr_args 表示 arg 所指向的數組的長度。通過 io_uring_register 注冊文件描述符或緩沖區等資源后,內核在處理 I/O 請求時,可以直接訪問這些預先注冊的資源,而無需每次都重新設置相關信息,從而提高了 I/O 操作的效率。例如,在進行大量文件讀寫操作時,預先注冊文件描述符可以避免每次提交 I/O 請求時都進行文件描述符的查找和驗證,減少了系統開銷,提升了 I/O 性能。
io_uring_register():
SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
void __user *, arg, unsigned int, nr_args)- 功能:用于注冊文件描述符、緩沖區、事件文件描述符等資源到 io_uring 環中。
- 應用用途:在進行 I/O 操作之前,需要將相關的資源注冊到 io_uring 環中,以便進行后續的異步 I/O 操作。
3.3 io_uring 的工作流程
io_uring 的工作流程涉及用戶態和內核態的交互,具體如下:
(1)初始化:用戶空間程序通過io_uring_setup系統調用創建 io_uring 實例,并設置相關參數,如隊列的大小等。這個過程會在內核中創建 io_uring 相關的數據結構,包括 SQ 和 CQ,并通過 mmap 將 SQ 和 CQ 映射到用戶空間,使得用戶態和內核態可以共享這兩個隊列 。例如:
#include <liburing.h>
struct io_uring ring;
struct io_uring_params params;
int ret = io_uring_queue_init_params(ENTRIES, &ring, ?ms);
if (ret < 0) {
// 初始化失敗處理
}(2)準備 I/O 請求:用戶空間程序準備 I/O 請求,通過io_uring_get_sqe獲取一個 SQE,然后使用io_uring_prep_XXX系列函數(如io_uring_prep_read、io_uring_prep_write等)填充 SQE,指定操作類型、文件描述符、緩沖區、偏移地址等信息 。例如:
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
if (!sqe) {
// 獲取SQE失敗處理
}
io_uring_prep_read(sqe, fd, buffer, size, offset);(3)提交 I/O 請求:將填充好的 SQE 寫入 SQ 隊列,并更新 SQ 隊列的 tail 指針。用戶可以通過io_uring_submit函數提交請求,該函數會觸發io_uring_enter系統調用(若未啟用 SQ 輪詢),通知內核有新的 I/O 請求需要處理 。代碼示例:
int ret = io_uring_submit(&ring);
if (ret < 0) {
// 提交請求失敗處理
}(4)內核處理:內核通過檢查 SQ 隊列,發現有新的請求后,從 SQ 隊列中取出 SQE 并進行處理。處理過程中可能涉及磁盤操作、網絡通信等實際的 I/O 操作 。
(5)完成通知:當 I/O 請求完成后,內核將結果填充到 CQ 隊列中,并更新 CQ 隊列的 tail 指針,通知用戶空間有請求已完成 。
(6)用戶空間處理完成請求:用戶空間程序通過io_uring_wait_cqe或io_uring_peek_cqe等函數檢查 CQ 隊列,發現有請求完成后,從 CQ 隊列中取出 CQE 并進行后續處理,獲取返回值和用戶自定義數據等。處理完成后,通過io_uring_cqe_seen函數標記 CQE 已處理,以便內核可以重用該位置 。例如:
struct io_uring_cqe *cqe;
ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0) {
// 等待CQE失敗處理
}
if (cqe->res >= 0) {
// 處理成功的結果
} else {
// 處理失敗的結果
}
io_uring_cqe_seen(&ring, cqe);(7)重復操作:用戶空間程序可以重復上述步驟,繼續提交和處理更多的 I/O 請求 。
通過這樣的工作流程,io_uring 實現了高效的異步 I/O 操作,減少了系統調用開銷和上下文切換,提高了 I/O 性能 。
四、io_uring 與其他 I/O 模型的對比
4.1 io_uring與阻塞 I/O 對比
阻塞 I/O 在進行 I/O 操作時,線程會被阻塞,直到操作完成。例如在讀取文件時,若數據未準備好,線程就會一直等待,期間無法執行其他任務,這就像你排隊買奶茶,必須等買到奶茶才能去做別的事。而 io_uring 是異步 I/O 模型,提交 I/O 請求后,線程不會阻塞,可以立即返回去執行其他任務,內核會在 I/O 操作完成后通過完成隊列通知應用程序,就好比你點了奶茶后可以先去附近逛逛,等奶茶做好了店員會通知你。
在系統資源利用率方面,阻塞 I/O 在高并發場景下,由于每個 I/O 操作都會阻塞線程,大量線程被阻塞,導致線程上下文切換頻繁,系統資源被大量消耗。而 io_uring 采用異步方式,一個線程可以同時處理多個 I/O 請求,大大提高了系統資源的利用率,減少了線程上下文切換的開銷。比如在一個高并發的文件服務器中,使用阻塞 I/O 時,每個文件讀取請求都可能阻塞一個線程,當并發請求數增多時,線程資源會被迅速耗盡,而使用 io_uring,一個線程可以同時處理多個文件讀取請求,系統可以輕松應對大量并發請求 。
4.2 io_uring與非阻塞 I/O 對比
非阻塞 I/O 在數據未準備好時,會立即返回一個錯誤(如 EWOULDBLOCK 或 EAGAIN),應用程序需要通過輪詢的方式不斷檢查 I/O 操作是否完成。這就像你在等快遞,每隔一段時間就去快遞站看看快遞到了沒有。而 io_uring 通過提交隊列和完成隊列實現異步 I/O,應用程序提交 I/O 請求后,無需輪詢,內核會自動處理并在完成后通知應用程序,就像快遞到了會直接給你打電話通知。
從 CPU 資源利用角度來看,非阻塞 I/O 的頻繁輪詢會消耗大量的 CPU 資源,因為每次輪詢都需要 CPU 進行計算和判斷。而 io_uring 避免了這種無效的 CPU 消耗,內核在后臺處理 I/O 操作,只有在 I/O 完成時才會通知應用程序,使得 CPU 可以更高效地處理其他任務。在編程復雜度上,非阻塞 I/O 需要處理復雜的輪詢邏輯和錯誤處理,而 io_uring 提供了更簡潔的 API,開發者只需要關注 I/O 請求的提交和結果的獲取,降低了編程難度。例如在編寫一個網絡爬蟲程序時,使用非阻塞 I/O 需要不斷地檢查網絡連接是否可讀可寫,處理各種錯誤情況,而使用 io_uring 可以更簡單地提交網絡請求,等待結果返回 。
4.3 io_uring與 epoll 對比
從系統調用次數來看,epoll 雖然是一種高效的 I/O 多路復用機制,但每次 I/O 操作仍需要多次系統調用,如epoll_wait獲取就緒事件后,還需要調用read/write等函數進行數據傳輸,這會帶來一定的系統調用開銷。而 io_uring 通過提交隊列和完成隊列,用戶可以一次性提交多個 I/O 請求,內核處理完成后將結果放入完成隊列,大大減少了系統調用次數。例如在處理大量網絡連接時,epoll 需要頻繁調用epoll_wait和read/write,而 io_uring 可以一次性提交多個網絡請求,等待結果統一處理 。
在異步處理能力方面,epoll 本質上還是同步非阻塞的,當epoll_wait返回后,應用程序仍然需要主動調用read/write等函數來進行數據拷貝操作,這在高并發場景下會限制系統的性能提升。而 io_uring 實現了真正的異步 I/O,I/O 操作由內核異步處理,數據拷貝也由內核完成,應用程序只需在 I/O 完成后處理結果,提高了系統的異步處理能力和響應速度。
在功能支持上,io_uring 支持更多的異步系統調用,如accept、connect、fsync、open、sendmsg、recvmsg等,不僅適用于文件 I/O,還廣泛應用于網絡 I/O 等多種場景,相比之下,epoll 的功能相對單一,主要用于 I/O 多路復用 。
綜上所述,io_uring 在高并發場景下,無論是與阻塞 I/O、非阻塞 I/O 還是 epoll 相比,都具有顯著的性能優勢和功能優勢,為開發者提供了更高效、更強大的 I/O 處理方式 。
五、io_uring 的應用場景
5.1 高性能網絡服務
在高性能網絡服務領域,io_uring 展現出了強大的性能優勢,為應對海量并發請求提供了高效的解決方案。以 Nginx 的 io_uring 模塊為例,在傳統的 Nginx 架構中,使用 epoll 進行 I/O 多路復用處理網絡請求。當面臨大量并發網絡請求時,epoll 雖然能夠高效地處理事件通知,但在頻繁的系統調用和數據拷貝過程中,仍然會產生一定的性能開銷。
而引入 io_uring 模塊后,Nginx 的性能得到了顯著提升。io_uring 通過提交隊列和完成隊列,實現了用戶態與內核態之間的高效通信,減少了系統調用次數。在處理大量并發連接時,用戶可以一次性將多個網絡請求提交到提交隊列中,內核在后臺異步處理這些請求,并將結果放入完成隊列。Nginx 只需從完成隊列中獲取已完成的請求結果,無需像傳統方式那樣頻繁地進行系統調用和輪詢,大大提高了處理效率。
以一個實際的 Web 服務器場景來說,假設一個熱門的電商網站在促銷活動期間,瞬間涌入了數十萬的并發訪問請求。在使用 io_uring 之前,Nginx 服務器可能會因為頻繁的系統調用和上下文切換,導致響應延遲增加,部分用戶甚至會遇到頁面加載緩慢或超時的問題。而啟用 io_uring 模塊后,Nginx 能夠更快速地處理這些并發請求,大大降低了響應延遲,用戶能夠更流暢地瀏覽商品、下單支付,有效提升了用戶體驗和業務轉化率 。
io_uring 處理網絡請求的簡化代碼示例:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <liburing.h>
#define PORT 8080
#define BUFFER_SIZE 1024
#define QUEUE_DEPTH 1024
// 存儲請求上下文
struct request {
int fd;
struct sockaddr_in client_addr;
socklen_t client_len;
char buffer[BUFFER_SIZE];
};
int main() {
int server_fd;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
// 創建服務器套接字
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
// 設置套接字選項
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
perror("setsockopt");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 綁定套接字到端口
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
// 開始監聽
if (listen(server_fd, 3) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
// 初始化io_uring
struct io_uring ring;
if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) {
perror("io_uring_queue_init");
exit(EXIT_FAILURE);
}
// 分配請求上下文
struct request *reqs = malloc(sizeof(struct request) * QUEUE_DEPTH);
if (!reqs) {
perror("malloc failed");
exit(EXIT_FAILURE);
}
// 準備接受連接的請求
struct request *accept_req = &reqs[0];
accept_req->fd = server_fd;
accept_req->client_len = sizeof(accept_req->client_addr);
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
io_uring_prep_accept(sqe, server_fd,
(struct sockaddr *)&accept_req->client_addr,
&accept_req->client_len, 0);
io_uring_sqe_set_data(sqe, accept_req);
io_uring_submit(&ring);
printf("Server listening on port %d...\n", PORT);
while (1) {
struct io_uring_cqe *cqe;
// 等待完成事件
int ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0) {
perror("io_uring_wait_cqe");
break;
}
struct request *req = io_uring_cqe_get_data(cqe);
int res = cqe->res;
// 處理完成事件
if (req->fd == server_fd) {
// 新連接建立
if (res < 0) {
fprintf(stderr, "Accept error: %d\n", res);
} else {
int client_fd = res;
printf("New connection: %d\n", client_fd);
// 準備讀取請求
struct request *read_req = &reqs[client_fd % QUEUE_DEPTH];
read_req->fd = client_fd;
struct io_uring_sqe *read_sqe = io_uring_get_sqe(&ring);
io_uring_prep_recv(read_sqe, client_fd, read_req->buffer,
BUFFER_SIZE, 0);
io_uring_sqe_set_data(read_sqe, read_req);
// 再次提交接受請求
struct request *new_accept_req = &reqs[(client_fd + 1) % QUEUE_DEPTH];
new_accept_req->fd = server_fd;
new_accept_req->client_len = sizeof(new_accept_req->client_addr);
struct io_uring_sqe *accept_sqe = io_uring_get_sqe(&ring);
io_uring_prep_accept(accept_sqe, server_fd,
(struct sockaddr *)&new_accept_req->client_addr,
&new_accept_req->client_len, 0);
io_uring_sqe_set_data(accept_sqe, new_accept_req);
io_uring_submit(&ring);
}
} else if (res > 0) {
// 讀取到數據,準備響應
printf("Received %d bytes from %d: %s\n", res, req->fd, req->buffer);
// 準備響應內容
const char *response = "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World!";
struct io_uring_sqe *write_sqe = io_uring_get_sqe(&ring);
io_uring_prep_send(write_sqe, req->fd, response, strlen(response), 0);
io_uring_sqe_set_data(write_sqe, req);
io_uring_submit(&ring);
} else {
// 關閉連接
printf("Closing connection: %d\n", req->fd);
close(req->fd);
}
io_uring_cqe_seen(&ring, cqe);
}
// 清理資源
free(reqs);
io_uring_queue_exit(&ring);
close(server_fd);
return 0;
}- 首先創建一個服務器套接字并進行初始化配置
- 初始化 io_uring 環境,設置隊列深度
- 提交一個接受連接的請求到提交隊列 (SQ)
- 進入主循環,等待完成隊列 (CQ) 中的事件
- 當有新連接到來時,處理連接并提交讀取請求
- 當讀取到數據后,準備響應并提交寫入請求
- 完成所有操作后關閉連接
這種模式特別適合處理類似電商促銷期間的高并發場景,能夠更高效地利用系統資源,降低響應延遲;要編譯運行此代碼,需要系統支持 io_uring 并安裝相應的庫(通常是 liburing)。
5.2 數據庫系統
在數據庫系統中,I/O 性能是影響數據庫整體性能的關鍵因素。無論是數據的讀取、寫入還是日志操作,都涉及大量的 I/O 操作。傳統的 I/O 模型在處理這些操作時,由于系統調用開銷大、數據拷貝次數多等問題,難以滿足數據庫對高性能 I/O 的需求。
io_uring 的出現為數據庫系統帶來了新的轉機。以 Ceph 分布式存儲系統為例,Ceph 在使用 io_uring 進行優化后,性能得到了顯著提升。在讀寫操作中,io_uring 允許 Ceph 一次性提交多個 I/O 請求,內核異步處理這些請求并將結果返回,減少了 I/O 操作的等待時間,提高了系統的吞吐量。在實際測試中,開啟 io_uring 優化后,Ceph 的吞吐(iops)提升了 20% - 30%,同時延遲降低了 20 - 30% 。
對于數據庫系統中的事務處理,io_uring 也發揮著重要作用。在事務的提交和回滾過程中,需要進行大量的日志寫入和數據更新操作。io_uring 的高效異步 I/O 能力,使得這些操作能夠快速完成,減少了事務的執行時間,提高了數據庫的并發處理能力。比如在一個銀行核心交易系統中,每秒可能會處理成千上萬的交易事務,使用 io_uring 能夠確保這些事務快速、穩定地執行,保障了金融業務的高效運轉 。io_uring 優化數據庫 I/O 操作的代碼示例:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <liburing.h>
#include <errno.h>
#define QUEUE_DEPTH 256
#define BLOCK_SIZE 4096
#define LOG_ENTRY_SIZE 512
#define MAX_TRANSACTIONS 100
// 數據庫操作類型
typedef enum {
OP_READ,
OP_WRITE,
OP_LOG
} OpType;
// 數據庫操作請求
struct db_request {
OpType type; // 操作類型
int fd; // 文件描述符
off_t offset; // 操作偏移量
size_t size; // 數據大小
char *buffer; // 數據緩沖區
char *log_entry; // 日志條目
int transaction_id; // 事務ID
};
// 初始化io_uring
int init_io_uring(struct io_uring *ring) {
int ret = io_uring_queue_init(QUEUE_DEPTH, ring, 0);
if (ret < 0) {
fprintf(stderr, "io_uring初始化失敗: %s\n", strerror(-ret));
return -1;
}
return 0;
}
// 提交數據庫讀請求
void submit_read_request(struct io_uring *ring, struct db_request *req) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe) {
fprintf(stderr, "無法獲取SQE\n");
return;
}
io_uring_prep_read(sqe, req->fd, req->buffer, req->size, req->offset);
io_uring_sqe_set_data(sqe, req);
}
// 提交數據庫寫請求
void submit_write_request(struct io_uring *ring, struct db_request *req) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe) {
fprintf(stderr, "無法獲取SQE\n");
return;
}
io_uring_prep_write(sqe, req->fd, req->buffer, req->size, req->offset);
io_uring_sqe_set_data(sqe, req);
}
// 提交日志寫入請求
void submit_log_request(struct io_uring *ring, struct db_request *req) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe) {
fprintf(stderr, "無法獲取SQE\n");
return;
}
// 日志通常追加到文件末尾
io_uring_prep_write(sqe, req->fd, req->log_entry, LOG_ENTRY_SIZE, -1);
io_uring_sqe_set_data(sqe, req);
}
// 處理完成的I/O請求
void handle_completion(struct io_uring *ring, struct io_uring_cqe *cqe) {
struct db_request *req = io_uring_cqe_get_data(cqe);
int res = cqe->res;
if (res < 0) {
fprintf(stderr, "操作失敗 (類型: %d, 事務: %d): %s\n",
req->type, req->transaction_id, strerror(-res));
} else {
switch (req->type) {
case OP_READ:
printf("讀取完成 - 事務: %d, 大小: %d bytes\n",
req->transaction_id, res);
break;
case OP_WRITE:
printf("寫入完成 - 事務: %d, 大小: %d bytes\n",
req->transaction_id, res);
break;
case OP_LOG:
printf("日志完成 - 事務: %d, 大小: %d bytes\n",
req->transaction_id, res);
break;
}
}
// 釋放緩沖區
if (req->buffer) free(req->buffer);
if (req->log_entry) free(req->log_entry);
free(req);
}
// 處理事務 - 包含數據讀寫和日志記錄
void process_transaction(struct io_uring *ring, int db_fd, int log_fd, int tx_id) {
// 分配事務所需的請求結構
struct db_request *read_req = malloc(sizeof(struct db_request));
struct db_request *write_req = malloc(sizeof(struct db_request));
struct db_request *log_req = malloc(sizeof(struct db_request));
// 初始化讀請求
read_req->type = OP_READ;
read_req->fd = db_fd;
read_req->offset = (tx_id % 100) * BLOCK_SIZE; // 模擬不同數據塊
read_req->size = BLOCK_SIZE;
read_req->buffer = malloc(BLOCK_SIZE);
read_req->log_entry = NULL;
read_req->transaction_id = tx_id;
// 初始化寫請求
write_req->type = OP_WRITE;
write_req->fd = db_fd;
write_req->offset = (tx_id % 100) * BLOCK_SIZE;
write_req->size = BLOCK_SIZE;
write_req->buffer = malloc(BLOCK_SIZE);
snprintf(write_req->buffer, BLOCK_SIZE, "事務 %d 的數據", tx_id);
write_req->log_entry = NULL;
write_req->transaction_id = tx_id;
// 初始化日志請求
log_req->type = OP_LOG;
log_req->fd = log_fd;
log_req->offset = 0; // 會被忽略,使用追加模式
log_req->size = LOG_ENTRY_SIZE;
log_req->buffer = NULL;
log_req->log_entry = malloc(LOG_ENTRY_SIZE);
snprintf(log_req->log_entry, LOG_ENTRY_SIZE,
"事務 %d 已提交 - 操作日志", tx_id);
log_req->transaction_id = tx_id;
// 提交請求到io_uring
submit_read_request(ring, read_req);
submit_write_request(ring, write_req);
submit_log_request(ring, log_req);
}
int main() {
struct io_uring ring;
int db_fd, log_fd;
int i;
// 初始化io_uring
if (init_io_uring(&ring) < 0) {
return 1;
}
// 打開數據庫文件和日志文件
db_fd = open("database.dat", O_RDWR | O_CREAT, 0644);
log_fd = open("transaction.log", O_WRONLY | O_CREAT | O_APPEND, 0644);
if (db_fd < 0 || log_fd < 0) {
perror("文件打開失敗");
return 1;
}
printf("開始處理 %d 個事務...\n", MAX_TRANSACTIONS);
// 提交一批事務處理請求
for (i = 0; i < MAX_TRANSACTIONS; i++) {
process_transaction(&ring, db_fd, log_fd, i);
// 每提交QUEUE_DEPTH個請求就提交一次
if ((i + 1) % QUEUE_DEPTH == 0 || i == MAX_TRANSACTIONS - 1) {
io_uring_submit(&ring);
}
}
// 處理所有完成的請求
for (i = 0; i < MAX_TRANSACTIONS * 3; i++) { // 每個事務有3個操作
struct io_uring_cqe *cqe;
int ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0) {
fprintf(stderr, "等待完成事件失敗: %s\n", strerror(-ret));
break;
}
handle_completion(&ring, cqe);
io_uring_cqe_seen(&ring, cqe);
}
printf("所有事務處理完成\n");
// 清理資源
close(db_fd);
close(log_fd);
io_uring_queue_exit(&ring);
return 0;
}編譯此程序需要系統支持 io_uring 并安裝 liburing 庫,編譯命令通常為:gcc db_io_uring.c -o db_io_uring -luring。
5.3 大規模文件處理
在處理大規模文件讀寫時,io_uring 相較于傳統 I/O 模型具有明顯的優勢。以文件傳輸場景為例,假設我們需要將一個大型數據中心的海量數據文件傳輸到另一個存儲節點,傳統的 I/O 模型在傳輸過程中,由于頻繁的系統調用和數據拷貝,會導致傳輸速度緩慢,耗費大量的時間和系統資源。
而 io_uring 通過零拷貝技術和高效的異步 I/O 機制,大大提高了文件傳輸速度。它允許用戶預先將內存緩沖區注冊到內核,在文件傳輸過程中,數據可以直接從用戶緩沖區發送到目標存儲節點,避免了內核與用戶空間之間的數據拷貝,減少了數據傳輸的時間開銷。同時,io_uring 支持批量提交 I/O 請求,能夠同時處理多個文件的讀寫操作,進一步提高了文件處理的效率。
在一個大型媒體公司的文件存儲和分發系統中,每天都需要處理大量的視頻、音頻文件。使用io_uring后,文件的上傳、下載和轉碼等操作速度大幅提升,能夠更快地滿足用戶對媒體內容的訪問需求,提升了公司的業務競爭力 。io_uring 實現高效文件傳輸的代碼示例:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <liburing.h>
#include <errno.h>
#include <sys/stat.h>
#define QUEUE_DEPTH 128
#define BLOCK_SIZE 1024*1024 // 1MB塊大小,適合大文件傳輸
#define MAX_FILES 10 // 同時處理的最大文件數量
// 傳輸請求結構
struct transfer_request {
int src_fd; // 源文件描述符
int dest_fd; // 目標文件描述符
off_t offset; // 當前傳輸偏移量
size_t remaining; // 剩余傳輸大小
char *buffer; // 數據緩沖區
struct iovec iov; // 用于零拷貝的iovec結構
char filename[256]; // 文件名,用于日志
};
// 初始化io_uring
int init_uring(struct io_uring *ring) {
int ret = io_uring_queue_init(QUEUE_DEPTH, ring, IORING_SETUP_IOPOLL);
if (ret < 0) {
fprintf(stderr, "io_uring初始化失敗: %s\n", strerror(-ret));
return -1;
}
return 0;
}
// 獲取文件大小
off_t get_file_size(int fd) {
struct stat st;
if (fstat(fd, &st) < 0) {
perror("獲取文件大小失敗");
return -1;
}
return st.st_size;
}
// 提交讀請求
void submit_read_request(struct io_uring *ring, struct transfer_request *req) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe) {
fprintf(stderr, "無法獲取SQE\n");
return;
}
size_t read_size = req->remaining < BLOCK_SIZE ? req->remaining : BLOCK_SIZE;
// 準備讀取請求
io_uring_prep_readv(sqe, req->src_fd, &req->iov, 1, req->offset);
req->iov.iov_len = read_size;
io_uring_sqe_set_data(sqe, req);
}
// 提交寫請求
void submit_write_request(struct io_uring *ring, struct transfer_request *req) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe) {
fprintf(stderr, "無法獲取SQE\n");
return;
}
// 準備寫入請求,使用從讀取操作得到的數據
io_uring_prep_writev(sqe, req->dest_fd, &req->iov, 1, req->offset);
io_uring_sqe_set_data(sqe, req);
}
// 處理完成的請求
void handle_completion(struct io_uring *ring, struct io_uring_cqe *cqe) {
struct transfer_request *req = io_uring_cqe_get_data(cqe);
int res = cqe->res;
if (res < 0) {
fprintf(stderr, "文件 %s 操作失敗: %s\n", req->filename, strerror(-res));
return;
} else if (res == 0) {
// 讀取完成(EOF)
printf("文件 %s 傳輸完成\n", req->filename);
close(req->src_fd);
close(req->dest_fd);
free(req->buffer);
free(req);
return;
}
// 根據當前偏移量判斷是讀完成還是寫完成
if (req->offset == 0 || req->remaining + res == req->iov.iov_len) {
// 讀操作完成,提交寫操作
submit_write_request(ring, req);
} else {
// 寫操作完成,更新偏移量并繼續讀取
req->offset += res;
req->remaining -= res;
if (req->remaining > 0) {
submit_read_request(ring, req);
} else {
printf("文件 %s 傳輸完成\n", req->filename);
close(req->src_fd);
close(req->dest_fd);
free(req->buffer);
free(req);
}
}
}
// 初始化文件傳輸請求
int init_transfer_request(const char *src_path, const char *dest_path,
struct io_uring *ring) {
// 打開源文件和目標文件
int src_fd = open(src_path, O_RDONLY);
int dest_fd = open(dest_path, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (src_fd < 0 || dest_fd < 0) {
perror("文件打開失敗");
if (src_fd >= 0) close(src_fd);
if (dest_fd >= 0) close(dest_fd);
return -1;
}
// 獲取文件大小
off_t file_size = get_file_size(src_fd);
if (file_size < 0) {
close(src_fd);
close(dest_fd);
return -1;
}
// 分配請求結構和緩沖區
struct transfer_request *req = malloc(sizeof(struct transfer_request));
if (!req) {
perror("內存分配失敗");
close(src_fd);
close(dest_fd);
return -1;
}
req->buffer = malloc(BLOCK_SIZE);
if (!req->buffer) {
perror("緩沖區分配失敗");
free(req);
close(src_fd);
close(dest_fd);
return -1;
}
// 初始化請求結構
strncpy(req->filename, src_path, sizeof(req->filename)-1);
req->src_fd = src_fd;
req->dest_fd = dest_fd;
req->offset = 0;
req->remaining = file_size;
req->iov.iov_base = req->buffer;
req->iov.iov_len = BLOCK_SIZE;
// 提交初始讀請求
submit_read_request(ring, req);
printf("開始傳輸文件: %s (大小: %ld bytes)\n", src_path, file_size);
return 0;
}
int main(int argc, char *argv[]) {
if (argc < 3 || argc % 2 != 1) {
fprintf(stderr, "用法: %s <源文件1> <目標文件1> [<源文件2> <目標文件2> ...]\n", argv[0]);
return 1;
}
int num_files = (argc - 1) / 2;
if (num_files > MAX_FILES) {
fprintf(stderr, "最大支持同時傳輸 %d 個文件\n", MAX_FILES);
return 1;
}
struct io_uring ring;
if (init_uring(&ring) < 0) {
return 1;
}
// 初始化所有文件傳輸請求
for (int i = 0; i < num_files; i++) {
const char *src = argv[1 + i*2];
const char *dest = argv[2 + i*2];
if (init_transfer_request(src, dest, &ring) < 0) {
fprintf(stderr, "初始化文件傳輸失敗: %s -> %s\n", src, dest);
}
}
// 提交所有請求
io_uring_submit(&ring);
// 處理所有完成的I/O操作
int completed = 0;
while (completed < num_files) {
struct io_uring_cqe *cqe;
int ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0) {
fprintf(stderr, "等待完成事件失敗: %s\n", strerror(-ret));
break;
}
handle_completion(&ring, cqe);
io_uring_cqe_seen(&ring, cqe);
completed++;
}
printf("所有文件傳輸操作已處理\n");
// 清理資源
io_uring_queue_exit(&ring);
return 0;
}與傳統文件傳輸方式相比,這種實現特別適合媒體公司的大文件處理場景,能夠顯著提升視頻、音頻等大型文件的傳輸效率,減少系統資源占用。編譯時需要鏈接 liburing 庫:
gcc file_transfer_uring.c -o file_transfer_uring -luring運行方式:
./file_transfer_uring 源文件1 目標文件1 源文件2 目標文件2 ...六、io_uring 的代碼實踐
6.1 環境準備
使用 io_uring 需要 Linux 內核版本 5.1 及以上。你可以通過以下命令檢查當前系統的內核版本:
uname -r如果內核版本低于 5.1,你需要升級內核。升級內核的方法因 Linux 發行版而異,以 Ubuntu 為例,可以通過官方源進行內核升級:
sudo apt update
sudo apt install linux-image-generic對于 CentOS,可以參考官方文檔或相關社區教程進行內核升級 。
安裝 liburing 庫,它提供了用戶空間與 io_uring 交互的接口。可以從官方倉庫獲取源碼進行編譯安裝,步驟如下:
(1)安裝依賴庫
在 Debian/Ubuntu 系統中:
sudo apt install build-essential libssl-dev在 RHEL/CentOS 系統中:
sudo yum groupinstall "Development Tools"
sudo yum install openssl-devel(2)下載 liburing 源碼
git clone https://git.kernel.dk/liburing
cd liburing(3)配置與編譯
./configure --cc=gcc --cxx=g++
make -j$(nproc)(4)安裝到系統路徑
sudo make install安裝完成后,動態庫默認位于/usr/local/lib,頭文件位于/usr/local/include/liburing 。
6.2 簡單示例代碼解析
下面是一個使用 io_uring 進行文件讀取的簡單 C 語言示例代碼:
#include <liburing.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define QUEUE_DEPTH 1
#define BUFFER_SIZE 4096
int main() {
struct io_uring ring;
struct io_uring_cqe *cqe;
struct io_uring_sqe *sqe;
int ret, fd;
char buffer[BUFFER_SIZE];
// 打開文件
fd = open("testfile.txt", O_RDONLY);
if (fd < 0) {
perror("open");
return 1;
}
// 初始化io_uring實例
ret = io_uring_queue_init(QUEUE_DEPTH, &ring, 0);
if (ret < 0) {
perror("io_uring_queue_init");
return 1;
}
// 獲取一個提交隊列條目
sqe = io_uring_get_sqe(&ring);
if (!sqe) {
fprintf(stderr, "io_uring_get_sqe failed\n");
return 1;
}
// 準備讀取請求
io_uring_prep_read(sqe, fd, buffer, BUFFER_SIZE, 0);
// 提交請求
ret = io_uring_submit(&ring);
if (ret < 0) {
perror("io_uring_submit");
return 1;
}
// 等待請求完成
ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0) {
perror("io_uring_wait_cqe");
return 1;
}
// 檢查請求結果
if (cqe->res < 0) {
fprintf(stderr, "I/O error: %s\n", strerror(-cqe->res));
return 1;
}
// 輸出讀取的數據
write(STDOUT_FILENO, buffer, cqe->res);
// 釋放完成隊列條目
io_uring_cqe_seen(&ring, cqe);
// 清理
io_uring_queue_exit(&ring);
close(fd);
return 0;
}①打開文件:
fd = open("testfile.txt", O_RDONLY);使用open函數打開名為testfile.txt的文件,以只讀模式打開。如果打開失敗,perror函數會輸出錯誤信息并返回 1。
②初始化 io_uring 實例:
ret = io_uring_queue_init(QUEUE_DEPTH, &ring, 0);調用io_uring_queue_init函數初始化 io_uring 實例,QUEUE_DEPTH指定了隊列的深度,這里設置為 1,表示最多可以同時處理 1 個 I/O 請求。如果初始化失敗,perror函數會輸出錯誤信息并返回 1 。
③獲取提交隊列條目:
sqe = io_uring_get_sqe(&ring);使用io_uring_get_sqe函數從提交隊列中獲取一個空的提交隊列條目(SQE),用于描述 I/O 請求。如果獲取失敗,輸出錯誤信息并返回 1 。
④準備讀取請求:
io_uring_prep_read(sqe, fd, buffer, BUFFER_SIZE, 0);通過io_uring_prep_read函數填充 SQE,準備一個讀取請求。參數依次為 SQE 指針、文件描述符fd、緩沖區buffer、讀取長度BUFFER_SIZE和偏移量 0 。
⑤提交請求:
ret = io_uring_submit(&ring);調用io_uring_submit函數將提交隊列中的請求提交給內核處理。如果提交失敗,perror函數會輸出錯誤信息并返回 1 。
⑥等待請求完成:
ret = io_uring_wait_cqe(&ring, &cqe);使用io_uring_wait_cqe函數阻塞等待,直到有 I/O 請求完成,完成的結果會存儲在cqe中。如果等待失敗,perror函數會輸出錯誤信息并返回 1 。
⑦檢查請求結果:
if (cqe->res < 0) {
fprintf(stderr, "I/O error: %s\n", strerror(-cqe->res));
return 1;
}檢查完成隊列條目(CQE)的結果cqe->res,如果小于 0 表示 I/O 操作失敗,通過strerror函數獲取錯誤信息并輸出,然后返回 1 。
⑧輸出讀取的數據:
write(STDOUT_FILENO, buffer, cqe->res);如果 I/O 操作成功,使用write函數將讀取到的數據輸出到標準輸出。
⑨釋放完成隊列條目:
io_uring_cqe_seen(&ring, cqe);調用io_uring_cqe_seen函數標記 CQE 已處理,以便內核可以重用該位置。
⑩清理:
io_uring_queue_exit(&ring);
close(fd);最后,調用io_uring_queue_exit函數清理 io_uring 實例,關閉文件描述符 。
6.2 常見問題與解決方法
在使用 io_uring 進行實踐時,初學者可能會遇到以下一些問題:
- 初始化失敗:在調用io_uring_queue_init時可能會失敗,常見原因包括內核版本不支持、系統資源不足等。解決方法是首先確保內核版本符合要求,然后檢查系統資源(如內存、文件描述符限制等)。可以通過ulimit -n查看當前用戶的文件描述符限制,若不夠可以通過修改/etc/security/limits.conf文件來增加限制 。
- I/O 請求提交錯誤:調用io_uring_submit時返回錯誤,可能是因為提交隊列已滿、SQE 填充不正確等。可以檢查提交隊列的深度設置是否合理,以及 SQE 的各個字段是否正確填充,比如文件描述符是否有效、緩沖區地址是否正確等 。
- 獲取完成隊列條目失敗:使用io_uring_wait_cqe或io_uring_peek_cqe獲取 CQE 時失敗,可能是由于內核處理 I/O 請求出錯、信號干擾等。可以檢查內核日志(如/var/log/syslog)查看是否有相關錯誤信息,同時注意在多線程環境中處理信號時,要確保信號處理函數不會干擾 io_uring 的正常工作 。
- 內存鎖定限制問題:在使用 io_uring 時,可能會遇到java.lang.RuntimeException: failed to create io_uring ring fd Cannot allocate memory的異常,這通常是由于內存鎖定限制(memlock)不足導致的。解決方法是檢查當前 memlock 限制(使用ulimit -l命令),并通過修改系統配置文件(如/etc/security/limits.conf)或使用命令行工具(如ulimit -l unlimited)來增加 memlock 限制,然后重新啟動應用程序并確認 memlock 限制已成功增加 。



























