精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

C++協程項目實戰:協程函數與線程切換

開發 前端
協程宛如一顆璀璨新星,照亮了高并發編程的新路徑。協程,這一輕量級的并發編程模型,可在單線程內實現多任務的高效協作。它就像一位訓練有素的舞者,在舞臺上優雅地暫停、恢復,巧妙地避開資源爭奪,使得程序在處理 I/O 密集型任務時,效率得到質的飛躍。

在 C/C++ 編程的廣袤天地中,高并發一直是開發者們追求的圣杯。傳統的多線程、多進程編程模式,在帶來強大并發能力的同時,也伴隨著高昂的資源開銷與復雜的同步問題。想象一下,一個龐大的服務器程序,需要同時處理成千上萬的客戶端請求,若采用傳統方式,光是線程的頻繁切換與資源競爭,就可能讓程序陷入性能瓶頸的泥沼。

此時,協程宛如一顆璀璨新星,照亮了高并發編程的新路徑。協程,這一輕量級的并發編程模型,可在單線程內實現多任務的高效協作。它就像一位訓練有素的舞者,在舞臺上優雅地暫停、恢復,巧妙地避開資源爭奪,使得程序在處理 I/O 密集型任務時,效率得到質的飛躍。

在接下來的內容中,讓我們一起從 0 到 1 吃透 C/C++ 協程。我們將深入剖析協程的底層原理,手把手教你如何在代碼中巧妙運用協程,解鎖高并發編程的新姿勢,讓你的程序性能更上一層樓。

一、協程(Coroutine)簡介

協程,又稱微線程,纖程。英文名Coroutine。

協程的概念很早就提出來了,但直到最近幾年才在某些語言(如Lua)中得到廣泛應用。

子程序,或者稱為函數,在所有語言中都是層級調用,比如A調用B,B在執行過程中又調用了C,C執行完畢返回,B執行完畢返回,最后是A執行完畢。所以子程序調用是通過棧實現的,一個線程就是執行一個子程序。

子程序調用總是一個入口,一次返回,調用順序是明確的。而協程的調用和子程序不同,協程看上去也是子程序,但執行過程中,在子程序內部可中斷,然后轉而執行別的子程序,在適當的時候再返回來接著執行(注意,在一個子程序中中斷,去執行其他子程序,不是函數調用,有點類似CPU的中斷)。

比如子程序A、B:def A():

print '1'
print '2'
print '3'
def B():
print 'x'
print 'y'
print 'z'

假設由協程執行,在執行A的過程中,可以隨時中斷,去執行B,B也可能在執行過程中中斷再去執行A,結果可能是:

1
2
x
y
3
z

但是在A中是沒有調用B的,所以協程的調用比函數調用理解起來要難一些。

看起來A、B的執行有點像多線程,但協程的特點在于是一個線程執行,那和多線程比,協程有何優勢?

最大的優勢就是協程極高的執行效率。因為子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優勢就越明顯。

第二大優勢就是不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多線程高很多。

因為協程是一個線程執行,那怎么利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能。

Python對協程的支持還非常有限,用在generator中的yield可以一定程度上實現協程。雖然支持不完全,但已經可以發揮相當大的威力了。

來看例子:

傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。

如果改用協程,生產者生產消息后,直接通過yield跳轉到消費者開始執行,待消費者執行完畢后,切換回生產者繼續生產,效率極高:import time

def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
c.next()
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
if __name__=='__main__':
c = consumer()
produce(c)

執行結果:

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK=

注意到consumer函數是一個generator(生成器),把一個consumer傳入produce后:

  • 首先調用c.next()啟動生成器;
  • 然后,一旦生產了東西,通過c.send(n)切換到consumer執行;
  • consumer通過yield拿到消息,處理,又通過yield把結果傳回;
  • produce拿到consumer處理的結果,繼續生產下一條消息;
  • produce決定不生產了,通過c.close()關閉consumer,整個過程結束。

整個流程無鎖,由一個線程執行,produce和consumer協作完成任務,所以稱為“協程”,而非線程的搶占式多任務。

二、C/C++ 協程

c++作為一個相對古老的語言,曾經是步履蹣跚,直到c++11才奮起直追,但是對新技術的整體演進,其實c++仍然是保守的?,F在c++20的標準雖然已經實現了協程,但目前能比較好支持c++20的編譯器幾乎都和整體的環境不太兼容。換句話說,還需要繼續等待整個c++的迭代版本,可能到了c++23,整體的環境就會跟上去,協程才會真正的飛入程序員的“尋常百姓家”。

正如前面提到的,協程一般來說是不需要鎖的,但是如果協程的底層操作是跨越線程動態操作,仍然是需要鎖的存在的。這也是為什么要求盡量把協和的調度放到一個線程中去的原因。

首先需要聲明的是,這里不打算花時間來介紹什么是協程,以及協程和線程有什么不同。如果對此有任何疑問,可以自行 google。與 Python 不同,C/C++ 語言本身是不能天然支持協程的?,F有的 C++ 協程庫均基于兩種方案:利用匯編代碼控制協程上下文的切換,以及利用操作系統提供的 API 來實現協程上下文切換。

典型的例如:

  1. libco,Boost.context:基于匯編代碼的上下文切換
  2. phxrpc:基于 ucontext/Boost.context 的上下文切換
  3. libmill:基于 setjump/longjump 的協程切換

一般而言,基于匯編的上下文切換要比采用系統調用的切換更加高效,這也是為什么 phxrpc 在使用 Boost.context 時要比使用 ucontext 性能更好的原因。關于 phxrpc 和 libmill 具體的協程實現方式,以后有時間再詳細介紹。

2.1協程和線程之間區別

在了解了協程的基本概念之后,很多人可能會將它與線程混淆,畢竟它們都和程序的并發執行有關。那么,協程和線程到底有什么區別呢?接下來,我們就來深入探討一下。

(1)線程:操作系統的寵兒

線程,是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。打個比方,如果把進程看作是一個工廠,那么線程就是工廠里的工人,每個工人都可以獨立執行任務,但又共享著工廠的資源。

線程的創建、調度和切換都由操作系統內核負責。當我們在程序中創建一個線程時,操作系統會為它分配一系列的資源,包括獨立的棧空間、程序計數器(PC)等。線程的調度采用的是搶占式調度策略,也就是說,操作系統會根據一定的算法,在適當的時候剝奪當前正在執行的線程的 CPU 使用權,將其放入就緒隊列,然后從就緒隊列中選擇一個新的線程來執行。這種調度方式可以保證多個線程能夠公平地競爭 CPU 資源,實現并發執行。

(2)協程:輕量級的后起之秀

協程則是一種用戶態的輕量級線程,它的調度完全由用戶控制,而不是操作系統。這就好比一個小組里的成員,他們可以自行決定任務的執行順序和時間,不需要外部的強制干預。

在 C/C++ 中,協程通常通過函數庫或者語言特性來實現。創建協程時,系統只會為其分配少量的資源,比如一個很小的??臻g,用于保存協程的執行狀態和局部變量。協程的調度是協作式的,也就是說,只有當一個協程主動讓出執行權時,其他協程才有機會執行。比如,當一個協程遇到 I/O 操作、調用特定的掛起函數或者執行時間過長時,它可以主動暫停自己的執行,將執行權交給其他協程。

(3)二者之間的區別

調度方式:線程由操作系統內核調度,采用搶占式調度策略;而協程由用戶控制調度,采用協作式調度策略。這就導致了線程的調度是被動的,而協程的調度是主動的。

上下文切換:線程的上下文切換涉及到用戶態和內核態的切換,需要保存和恢復寄存器、棧指針等大量的狀態信息,開銷較大;而協程的上下文切換只在用戶態進行,只需要保存和恢復少量的寄存器和棧信息,開銷非常小,甚至可以忽略不計。

資源占用:線程的創建和銷毀需要操作系統分配和回收大量的資源,每個線程都有自己獨立的棧空間,通常??臻g較大,所以線程占用的資源較多;而協程的創建和銷毀開銷小,占用的資源也很少,一個線程中可以創建成百上千個協程。

適用場景:線程適用于需要充分利用多核 CPU 資源、處理計算密集型任務的場景;而協程適用于 I/O 密集型任務,比如網絡請求、文件讀寫等,因為在這些場景中,大量的時間都花費在等待 I/O 操作完成上,協程可以在等待時主動讓出執行權,提高程序的整體效率 。

為了更直觀地感受線程和協程的區別,我們來看下面這個表格:

比較項

線程

協程

調度者

操作系統內核

用戶程序

上下文切換開銷

資源占用

適用場景

計算密集型任務

I/O 密集型任務

2.2協程的原理

既然協程如此厲害,那么它實現的原理到底是什么呢?協程最重要的應用方式就是把線程在內核上的開銷轉到了應用層的開銷,避開或者屏蔽(對應用者)線程操作的難度。那多線程操作的復雜性在哪兒呢?線程切換的隨機性和線程Context的跟隨,出入棧的保存和恢復,相關數據的鎖和讀寫控制。這才是多線程的復雜性,如果再加異步引起的數據的非連續性和事件的非必然性操作,就更加增強了多線程遇到問題的判別和斷點的準確。

好,既然是這樣,那么上框架,封裝不就得了。

協程和線程一樣,同樣需要做好兩個重點:第一個是協程的調度;第二是上下文的切換。而這兩點在OS的相關書籍中的介紹海了去了,這里就不再贅述,原理基本都是一樣的。

如果以協程的關系來區分,協程也可以劃分為對稱和非對稱協程兩種。協程間是平等關系的,就是對稱的;反之為非對稱的。名字越起越多,但事兒還是那么兩下子,大家自己體會即可。

只要能保證上面所說的對上下文數據的安全性保證又能夠實現協程在具體線程上的操作(某一個線程上執行的所有協程是串行的),那么鎖的操作,從理論上講是不需要的(但實際開發中,因為協程的應用還是少,所以還需要具體的問題具體分析)。協程的動作集中在應用層,而把復雜的內核調度的線程屏蔽在下層框架上(或者以后會不會出現OS進行封裝),從而大幅的降低了編程的難度,但卻擁有了線程快速異步調用的效果。

2.3協程實現機制

協程的實現有以下幾種機制:

①基于匯編的實現:這個對匯編編程得要求有兩下子,這個網上也有不少例子,就不再這里搬門弄斧了。

②基于switch-case來實現:這個其實更像是一個C語言的技巧,利用不同的狀態Case來達到目的,或者說大家認知中的對編程語言的一種內卷使用,網上有一個開源的項目:

https://github.com/georgeredinger/protothreads

③基于操作系統提供的接口:Linux的ucontext,Windows的Fiber

Fiber可能很多人都不熟悉,這其實就是微軟原來提供的纖程,有興趣的可以去網上查找一下,有幾年這個概念炒得還是比較火的。ucontext是Linux上的一種操作,這兩個都可以當作是一種類似特殊的應用存在。游戲界的大佬云風(《游戲之旅:我的編程感悟》作者)的coroutine就是類似于這種。興趣是編程的動力,大家如果對這些有興趣可以看看這本書,雖然其中很多的東西都比較老了,但是整體的思想還是非常有借鑒的。

④基于接口 setjmp 和 longjmp同時使用 static local 的變量來保存協程內部的數據

這兩個函數是C語言的一個非常有意思的應用,一般寫C好長時間的人,都沒接觸過這兩個API函數,這個函數的定義是:

int setjmp(jmp_buf envbuf);
void longjmp(jmp_buf envbuf, int val);

它們兩個的作用,前者是用來將棧楨(上下文)保存在jmp_buf這個數據結構中,然后可以通過后者 longjmp在指定的位置恢復出來。這就類似于使用goto語句跳轉到任意的地方,然后再把相關的數據恢復出來??匆幌聜€《C專家編程》中的例子:

#include <stdio.h>
#include <setjmp.h>

jmp_buf buf;

banana()
{
    printf("in banana() \n");
    longjmp(buf,1);
    printf("you'll never see this,because i longjmp'd");
}

main()
{
    if(setjmp(buf))
        printf("back in main\n");
    else
    {
        printf("first time through\n");
        banana();
    }
}

看完了上述的幾種方法,其實網上還有幾種實現的方式,但都是比較刻板,有興趣的可以搜索一下,這里就不提供鏈接了。

協程的實現,按理說還是OS搞定最好,其實是框架底層,但C/C++的復雜性,以及不同的平臺和不同編譯器、庫之間的長期差異,導致這方面能做好的可能性真心是覺得不會太大。

三、協程核心原理機制

3.1libco協程的創建和切換

在介紹 coroutine 的創建之前,我們先來熟悉一下 libco 中用來表示一個 coroutine 的數據結構,即定義在 co_routine_inner.h 中的 stCoRoutine_t:

struct stCoRoutine_t
{
stCoRoutineEnv_t *env; // 協程運行環境
pfn_co_routine_t pfn; // 協程執行的邏輯函數
void *arg; // 函數參數
coctx_t ctx; // 保存協程的下文環境
...
char cEnableSysHook; // 是否運行系統 hook,即非侵入式邏輯
char cIsShareStack; // 是否在共享棧模式
void *pvEnv;
stStackMem_t* stack_mem; // 協程運行時的??臻g
char* stack_sp; // 用來保存協程運行時的??臻g
unsigned int save_size;
char* save_buffer;
};

我們暫時只需要了解表示協程的最簡單的幾個參數,例如協程運行環境,協程的上下文環境,協程運行的函數以及運行時??臻g。后面的 stack_sp,save_size 和 save_buffer 與 libco 共享棧模式相關,有關共享棧的內容我們后續再說。

3.2協程的執行流程

為了更直觀地理解協程的執行流程,我們來看一個簡單的 C++ 代碼示例:

#include <iostream>
#include <coroutine>

struct Task {
    struct promise_type;
    using handle_type = std::coroutine_handle<promise_type>;
    handle_type coro;

    Task(handle_type h) : coro(h) {}

    ~Task() {
        if (coro) coro.destroy();
    }

    bool resume() {
        if (!coro.done()) {
            coro.resume();
            return true;
        }
        return false;
    }

    struct promise_type {
        Task get_return_object() {
            return Task{handle_type::from_promise(*this)};
        }

        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }

        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };

    // 協程函數
    static Task simple_coroutine() {
        std::cout << "Coroutine started" << std::endl;
        // 暫停協程
        co_await std::suspend_always{};
        std::cout << "Coroutine resumed" << std::endl;
    }
};

int main() {
    // 創建協程
    Task t = Task::simple_coroutine();
    std::cout << "Main function" << std::endl;
    // 恢復協程執行
    t.resume();
    return 0;
}

在這個示例中,我們定義了一個Task結構體來表示一個協程任務。simple_coroutine函數是一個協程函數,它內部使用了co_await關鍵字來暫停協程的執行。

當main函數中調用Task::simple_coroutine()時,協程開始創建,但并不會立即執行,而是返回一個Task對象。此時,協程處于掛起狀態。

接著,main函數繼續執行,輸出Main function。然后調用t.resume(),協程從上次暫停的地方(即co_await處)恢復執行,輸出Coroutine resumed。

從這個示例中,我們可以清晰地看到協程的執行流程:創建協程時,協程函數并不會立即執行完,而是可以通過co_await暫停執行,將執行權交回給調用者;當調用者調用resume方法時,協程又可以從暫停的地方恢復執行 。在協程暫停時,其內部的局部變量等狀態信息都會被保存下來,以便恢復執行時能夠繼續之前的操作。

3.3實現方式面面觀

在 C/C++ 中,實現協程主要有以下幾種常見方式:

⑴利用匯編代碼控制上下文切換

這是一種比較底層的實現方式。通過匯編代碼,我們可以直接操作 CPU 寄存器和棧,實現協程上下文的保存和恢復。例如,在 x86 架構下,我們需要保存和恢復rsp(棧指針)、rbp(?;分羔槪?、rbx、r12 - r15(數據寄存器)以及rip(程序運行的下一個指令地址)等寄存器的值。因為協程的切換本質上就是上下文的切換,通過精確控制這些寄存器,我們能夠實現協程在暫停和恢復時的狀態一致性。

這種方式的優點是性能極高,因為直接操作硬件資源,避免了操作系統 API 調用的開銷。然而,它的缺點也很明顯,代碼編寫難度大,需要對匯編語言和 CPU 架構有深入的了解,而且可移植性差,不同的 CPU 架構可能需要編寫不同的匯編代碼 。

⑵使用操作系統提供的 API

一些操作系統提供了用于上下文切換的 API,比如 Unix 系統中的ucontext和 Windows 系統中的fiber。以ucontext為例,它提供了getcontext、setcontext、makecontext和swapcontext等函數來管理上下文。getcontext用于獲取當前上下文,setcontext用于設置上下文,makecontext用于創建一個新的上下文,swapcontext則用于交換兩個上下文。

使用這些 API,我們可以相對容易地實現協程的上下文切換。這種方式的優點是實現相對簡單,不需要深入了解匯編語言,而且具有較好的可移植性,只要操作系統支持相應的 API。但是,由于涉及到系統調用,性能相對較低,因為系統調用會帶來一定的開銷,包括用戶態和內核態的切換等。

⑶利用 C 語言的setjmp和longjmp函數

setjmp函數用于保存當前的調用環境,包括寄存器的值和棧指針等,它會返回一個整數值。longjmp函數則用于恢復之前由setjmp保存的調用環境,并跳轉到setjmp調用的位置繼續執行。通過這兩個函數的配合,我們可以實現協程的暫停和恢復。例如,在協程需要暫停時,調用setjmp保存當前環境,然后在需要恢復時,調用longjmp恢復環境。

這種方式的優點是代碼實現相對簡潔,不需要復雜的匯編知識。但它也有局限性,它要求函數里面使用static local變量來保存協程內部的數據,因為setjmp和longjmp并不會自動保存和恢復局部變量,而且這種方式在處理復雜的函數調用和嵌套時可能會出現問題 。

四、協程的實現與原理剖析

4.1協程的起源

問題:協程存在的原因?協程能夠解決哪些問題?

在我們現在CS,BS開發模式下,服務器的吞吐量是一個很重要的參數。其實吞吐量是IO處理時間加上業務處理。為了簡單起見,比如,客戶端與服務器之間是長連接的,客戶端定期給服務器發送心跳包數據??蛻舳税l送一次心跳包到服務器,服務器更新該新客戶端狀態的。心跳包發送的過程,業務處理時長等于IO讀?。≧ECV系統調用)加上業務處理(更新客戶狀態)。吞吐量等于1s業務處理次數。

業務處理(更新客戶端狀態)時間,業務不一樣的,處理時間不一樣,我們就不做討論。

那如何提升recv的性能。若只有一個客戶端,recv的性能也沒有必要提升,也不能提升。若在有百萬計的客戶端長連接的情況,我們該如何提升。以Linux為例,在這里需要介紹一個“網紅”就是epoll。服務器使用epoll管理百萬計的客戶端長連接,代碼框架如下:

while (1) {
    int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);

    for (i = 0;i < nready;i ++) {

        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            int connfd = accept(listenfd, xxx, xxxx);

            setnonblock(connfd);

            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = connfd;
            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

        } else {
            handle(sockfd);
        }
    }
}

對于響應式服務器,所有的客戶端的操作驅動都是來源于這個大循環。來源于epoll_wait的反饋結果。

對于服務器處理百萬計的IO。Handle(sockfd)實現方式有兩種。

第一種,handle(sockfd)函數內部對sockfd進行讀寫動作。代碼如下

int handle(int sockfd) {
    recv(sockfd, rbuffer, length, 0);
    parser_proto(rbuffer, length);
    send(sockfd, sbuffer, length, 0);
}

handle的io操作(send,recv)與epoll_wait是在同一個處理流程里面的。這就是IO同步操作。

優點:

  • sockfd管理方便。
  • 操作邏輯清晰。

缺點:

  • 服務器程序依賴epoll_wait的循環響應速度慢。
  • 程序性能差

第二種,handle(sockfd)函數內部將sockfd的操作,push到線程池中,代碼如下:

int thread_cb(int sockfd) {
    // 此函數是在線程池創建的線程中運行。
    // 與handle不在一個線程上下文中運行
    recv(sockfd, rbuffer, length, 0);
    parser_proto(rbuffer, length);
    send(sockfd, sbuffer, length, 0);
}

int handle(int sockfd) {
    //此函數在主線程 main_thread 中運行
    //在此處之前,確保線程池已經啟動。
    push_thread(sockfd, thread_cb); //將sockfd放到其他線程中運行。
}

Handle函數是將sockfd處理方式放到另一個已經其他的線程中運行,如此做法,將io操作(recv,send)與epoll_wait 不在一個處理流程里面,使得io操作(recv,send)與epoll_wait實現解耦。這就叫做IO異步操作。

優點:

  • 子模塊好規劃。
  • 程序性能高。

缺點:

正因為子模塊好規劃,使得模塊之間的sockfd的管理異常麻煩。每一個子線程都需要管理好sockfd,避免在IO操作的時候,sockfd出現關閉或其他異常。

上文有提到IO同步操作,程序響應慢,IO異步操作,程序響應快。

下面來對比一下IO同步操作與IO異步操作。

代碼如下:

https://github.com/wangbojing/c1000k_test/blob/master/server_mulport_epoll.c

在這份代碼的486行,#if 1, 打開的時候,為IO異步操作。關閉的時候,為IO同步操作。

接下來把我測試接入量的結果粘貼出來。

  • IO異步操作,每1000個連接接入的服務器響應時間(900ms左右)。
  • IO同步操作,每1000個連接接入的服務器響應時間(6500ms左右)。
  • IO異步操作與IO同步操作

對比項

  • IO同步操作
  • IO異步操作

Sockfd管理

  • 管理方便
  • 多個線程共同管理

代碼邏輯

  • 程序整體邏輯清晰
  • 子模塊邏輯清晰

程序性能

  • 響應時間長,性能差
  • 響應時間短,性能好

有沒有一種方式,有異步性能,同步的代碼邏輯。來方便編程人員對IO操作的組件呢?有,采用一種輕量級的協程來實現。在每次send或者recv之前進行切換,再由調度器來處理epoll_wait的流程。

就是采用了基于這樣的思考,寫了NtyCo,實現了一個IO異步操作與協程結合的組件。

4.2協程的案例

問題:協程如何使用?與線程使用有何區別?

在做網絡IO編程的時候,有一個非常理想的情況,就是每次accept返回的時候,就為新來的客戶端分配一個線程,這樣一個客戶端對應一個線程。就不會有多個線程共用一個sockfd。每請求每線程的方式,并且代碼邏輯非常易讀。但是這只是理想,線程創建代價,調度代價就呵呵了。

先來看一下每請求每線程的代碼如下:

while(1) {
    socklen_t len = sizeof(struct sockaddr_in);
    int clientfd = accept(sockfd, (struct sockaddr*)&remote, &len);

    pthread_t thread_id;
    pthread_create(&thread_id, NULL, client_cb, &clientfd);

}

這樣的做法,寫完放到生產環境下面,如果你的老板不打死你,你來找我。我來幫你老板,為民除害。

如果我們有協程,我們就可以這樣實現。參考代碼如下:

https://github.com/wangbojing/NtyCo/blob/master/nty_server_test.c

while (1) {
    socklen_t len = sizeof(struct sockaddr_in);
    int cli_fd = nty_accept(fd, (struct sockaddr*)&remote, &len);

    nty_coroutine *read_co;
    nty_coroutine_create(&read_co, server_reader, &cli_fd);
}

這樣的代碼是完全可以放在生成環境下面的。如果你的老板要打死你,你來找我,我幫你把你老板打死,為民除害。

線程的API思維來使用協程,函數調用的性能來測試協程。

NtyCo封裝出來了若干接口,一類是協程本身的,二類是posix的異步封裝

協程API:while

  • 協程創建
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)
  • 協程調度器的運行
void nty_schedule_run(void)

POSIX異步封裝API:

int nty_socket(int domain, int type, int protocol)
int nty_accept(int fd, struct sockaddr *addr, socklen_t *len)
int nty_recv(int fd, void *buf, int length)
int nty_send(int fd, const void *buf, int length)
int nty_close(int fd)

4.3協程的實現之工作流程

問題:協程內部是如何工作呢?

先來看一下協程服務器案例的代碼, 代碼參考:

https://github.com/wangbojing/NtyCo/blob/master/nty_server_test.c

分別討論三個協程的比較晦澀的工作流程。第一個協程的創建;第二個IO異步操作;第三個協程子過程回調

(1)創建協程

當我們需要異步調用的時候,我們會創建一個協程。比如accept返回一個新的sockfd,創建一個客戶端處理的子過程。再比如需要監聽多個端口的時候,創建一個server的子過程,這樣多個端口同時工作的,是符合微服務的架構的。

創建協程的時候,進行了如何的工作?

創建API如下:

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)
  • 參數1:nty_coroutine **new_co,需要傳入空的協程的對象,這個對象是由內部創建的,并且在函數返回的時候,會返回一個內部創建的協程對象。
  • 參數2:proc_coroutine func,協程的子過程。當協程被調度的時候,就會執行該函數。
  • 參數3:void *arg,需要傳入到新協程中的參數。

協程不存在親屬關系,都是一致的調度關系,接受調度器的調度。調用create API就會創建一個新協程,新協程就會加入到調度器的就緒隊列中。

創建的協程具體步驟會在《協程的實現之原語操作》來描述。

(2)實現IO異步操作

大部分的朋友會關心IO異步操作如何實現,在send與recv調用的時候,如何實現異步操作的。

先來看一下一段代碼:

while (1) {
    int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);

    for (i = 0;i < nready;i ++) {

        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            int connfd = accept(listenfd, xxx, xxxx);

            setnonblock(connfd);

            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = connfd;
            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

        } else {

            epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
            recv(sockfd, buffer, length, 0);

            //parser_proto(buffer, length);

            send(sockfd, buffer, length, 0);
            epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, NULL);
        }
    }
}

在進行IO操作(recv,send)之前,先執行了 epoll_ctl的del操作,將相應的sockfd從epfd中刪除掉,在執行完IO操作(recv,send)再進行epoll_ctl的add的動作。這段代碼看起來似乎好像沒有什么作用。

如果是在多個上下文中,這樣的做法就很有意義了。能夠保證sockfd只在一個上下文中能夠操作IO的。不會出現在多個上下文同時對一個IO進行操作的。協程的IO異步操作正式是采用此模式進行的。

把單一協程的工作與調度器的工作的劃分清楚,先引入兩個原語操作 resume,yield會在《協程的實現之原語操作》來講解協程所有原語操作的實現,yield就是讓出運行,resume就是恢復運行。

調度器與協程的上下文切換如下圖所示:

圖片圖片

在協程的上下文IO異步操作(nty_recv,nty_send)函數,步驟如下:

  • 將sockfd 添加到epoll管理中。
  • 進行上下文環境切換,由協程上下文yield到調度器的上下文。
  • 調度器獲取下一個協程上下文。Resume新的協程

IO異步操作的上下文切換的時序圖如下:

圖片圖片

(3)回調協程的子過程

在create協程后,何時回調子過程?何種方式回調子過程?

首先來回顧一下x86_64寄存器的相關知識。匯編與寄存器相關知識還會在《協程的實現之切換》繼續深入探討的。x86_64 的寄存器有16個64位寄存器,分別是:

%rax, %rbx,%rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12, %r13, %r14, %r15。
  • %rax 作為函數返回值使用的。
  • %rsp 棧指針寄存器,指向棧頂
  • %rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函數參數,依次對應第1參數,第2參數。。。
  • %rbx, %rbp, %r12, %r13, %r14, %r15 用作數據存儲,遵循調用者使用規則,換句話說,就是隨便用。調用子函數之前要備份它,以防它被修改
  • %r10, %r11 用作數據存儲,就是使用前要先保存原值

以NtyCo的實現為例,來分析這個過程。CPU有一個非常重要的寄存器叫做EIP,用來存儲CPU運行下一條指令的地址。我們可以把回調函數的地址存儲到EIP中,將相應的參數存儲到相應的參數寄存器中。實現子過程調用的邏輯代碼如下:

void _exec(nty_coroutine *co) {
    co->func(co->arg); //子過程的回調函數
}

void nty_coroutine_init(nty_coroutine *co) {
    //ctx 就是協程的上下文
    co->ctx.edi = (void*)co; //設置參數
    co->ctx.eip = (void*)_exec; //設置回調函數入口
    //當實現上下文切換的時候,就會執行入口函數_exec , _exec 調用子過程func
}

4.4協程的實現之原語操作

問題:協程的內部原語操作有哪些?分別如何實現的?

協程的核心原語操作:create, resume, yield。協程的原語操作有create怎么沒有exit?以NtyCo為例,協程一旦創建就不能有用戶自己銷毀,必須得以子過程執行結束,就會自動銷毀協程的上下文數據。

以_exec執行入口函數返回而銷毀協程的上下文與相關信息。co->func(co->arg) 是子過程,若用戶需要長久運行協程,就必須要在func函數里面寫入循環等操作。所以NtyCo里面沒有實現exit的原語操作。

create:創建一個協程。

  • 調度器是否存在,不存在也創建。調度器作為全局的單例。將調度器的實例存儲在線程的私有空間pthread_setspecific。
  • 分配一個coroutine的內存空間,分別設置coroutine的數據項,??臻g,棧大小,初始狀態,創建時間,子過程回調函數,子過程的調用參數。
  • 將新分配協程添加到就緒隊列 ready_queue中

實現代碼如下:

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) {

    assert(pthread_once(&sched_key_once, nty_coroutine_sched_key_creator) == 0);
    nty_schedule *sched = nty_coroutine_get_sched();

    if (sched == NULL) {
        nty_schedule_create(0);

        sched = nty_coroutine_get_sched();
        if (sched == NULL) {
            printf("Failed to create schedulern");
            return -1;
        }
    }

    nty_coroutine *co = calloc(1, sizeof(nty_coroutine));
    if (co == NULL) {
        printf("Failed to allocate memory for new coroutinen");
        return -2;
    }

    //
    int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);
    if (ret) {
        printf("Failed to allocate stack for new coroutinen");
        free(co);
        return -3;
    }

    co->sched = sched;
    co->stack_size = sched->stack_size;
    co->status = BIT(NTY_COROUTINE_STATUS_NEW); //
    co->id = sched->spawned_coroutines ++;
co->func = func;

    co->fd = -1;
co->events = 0;

    co->arg = arg;
    co->birth = nty_coroutine_usec_now();
    *new_co = co;

    TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next);

    return 0;
}

yield:讓出CPU。

void nty_coroutine_yield(nty_coroutine *co)

參數:當前運行的協程實例

調用后該函數不會立即返回,而是切換到最近執行resume的上下文。該函數返回是在執行resume的時候,會有調度器統一選擇resume的,然后再次調用yield的。resume與yield是兩個可逆過程的原子操作。

resume:恢復協程的運行權

int nty_coroutine_resume(nty_coroutine *co)

參數:需要恢復運行的協程實例

調用后該函數也不會立即返回,而是切換到運行協程實例的yield的位置。返回是在等協程相應事務處理完成后,主動yield會返回到resume的地方。

4.5協程的實現之切換

問題:協程的上下文如何切換?切換代碼如何實現?

首先來回顧一下x86_64寄存器的相關知識。x86_64 的寄存器有16個64位寄存器,分別是:

%rax, %rbx, %rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12,%r13, %r14, %r15。
  • %rax 作為函數返回值使用的。
  • %rsp 棧指針寄存器,指向棧頂
  • %rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函數參數,依次對應第1參數,第2參數。。。
  • %rbx, %rbp, %r12, %r13, %r14, %r15 用作數據存儲,遵循調用者使用規則,換句話說,就是隨便用。調用子函數之前要備份它,以防它被修改
  • %r10, %r11 用作數據存儲,就是使用前要先保存原值。

上下文切換,就是將CPU的寄存器暫時保存,再將即將運行的協程的上下文寄存器,分別mov到相對應的寄存器上。此時上下文完成切換。如下圖所示:

切換_switch函數定義:

int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
  • 參數1:即將運行協程的上下文,寄存器列表
  • 參數2:正在運行協程的上下文,寄存器列表

我們nty_cpu_ctx結構體的定義,為了兼容x86,結構體項命令采用的是x86的寄存器名字命名。

typedef struct _nty_cpu_ctx {
void *esp; //
void *ebp;
void *eip;
void *edi;
void *esi;
void *ebx;
void *r1;
void *r2;
void *r3;
void *r4;
void *r5;
} nty_cpu_ctx;

_switch返回后,執行即將運行協程的上下文,是實現上下文的切換;

_switch的實現代碼:

0: __asm__ (
1: "    .text                                  n"
2: "       .p2align 4,,15                                   n"
3: ".globl _switch                                          n"
4: ".globl __switch                                         n"
5: "_switch:                                                n"
6: "__switch:                                               n"
7: "       movq %rsp, 0(%rsi)      # save stack_pointer     n"
8: "       movq %rbp, 8(%rsi)      # save frame_pointer     n"
9: "       movq (%rsp), %rax       # save insn_pointer      n"
10: "       movq %rax, 16(%rsi)                              n"
11: "       movq %rbx, 24(%rsi)     # save rbx,r12-r15       n"
12: "       movq %r12, 32(%rsi)                              n"
13: "       movq %r13, 40(%rsi)                              n"
14: "       movq %r14, 48(%rsi)                              n"
15: "       movq %r15, 56(%rsi)                              n"
16: "       movq 56(%rdi), %r15                              n"
17: "       movq 48(%rdi), %r14                              n"
18: "       movq 40(%rdi), %r13     # restore rbx,r12-r15    n"
19: "       movq 32(%rdi), %r12                              n"
20: "       movq 24(%rdi), %rbx                              n"
21: "       movq 8(%rdi), %rbp      # restore frame_pointer  n"
22: "       movq 0(%rdi), %rsp      # restore stack_pointer  n"
23: "       movq 16(%rdi), %rax     # restore insn_pointer   n"
24: "       movq %rax, (%rsp)                                n"
25: "       ret                                              n"
26: );

按照x86_64的寄存器定義,%rdi保存第一個參數的值,即new_ctx的值,%rsi保存第二個參數的值,即保存cur_ctx的值。X86_64每個寄存器是64bit,8byte。

  1. Movq %rsp, 0(%rsi) 保存在棧指針到cur_ctx實例的rsp項
  2. Movq %rbp, 8(%rsi)
  3. Movq (%rsp), %rax #將棧頂地址里面的值存儲到rax寄存器中。Ret后出棧,執行棧頂
  4. Movq %rbp, 8(%rsi) #后續的指令都是用來保存CPU的寄存器到new_ctx的每一項中
  5. Movq 8(%rdi), %rbp #將new_ctx的值
  6. Movq 16(%rdi), %rax #將指令指針rip的值存儲到rax中
  7. Movq %rax, (%rsp) # 將存儲的rip值的rax寄存器賦值給棧指針的地址的值。
  8. Ret # 出棧,回到棧指針,執行rip指向的指令。

上下文環境的切換完成。

4.6協程的實現之定義

問題:協程如何定義? 調度器如何定義?

先來一道設計題:設計一個協程的運行體R與運行體調度器S的結構體

  • 1. 運行體R:包含運行狀態{就緒,睡眠,等待},運行體回調函數,回調參數,棧指針,棧大小,當前運行體
  • 2. 調度器S:包含執行集合{就緒,睡眠,等待}

這道設計題拆分兩個個問題,一個運行體如何高效地在多種狀態集合更換。調度器與運行體的功能界限。

(1)運行體如何高效地在多種狀態集合更換

新創建的協程,創建完成后,加入到就緒集合,等待調度器的調度;協程在運行完成后,進行IO操作,此時IO并未準備好,進入等待狀態集合;IO準備就緒,協程開始運行,后續進行sleep操作,此時進入到睡眠狀態集合。

  • 就緒(ready),睡眠(sleep),等待(wait)集合該采用如何數據結構來存儲?
  • 就緒(ready)集合并不沒有設置優先級的選型,所有在協程優先級一致,所以可以使用隊列來存儲就緒的協程,簡稱為就緒隊列(ready_queue)。
  • 睡眠(sleep)集合需要按照睡眠時長進行排序,采用紅黑樹來存儲,簡稱睡眠樹(sleep_tree)紅黑樹在工程實用為<key, value>, key為睡眠時長,value為對應的協程結點。
  • 等待(wait)集合,其功能是在等待IO準備就緒,等待IO也是有時長的,所以等待(wait)集合采用紅黑樹的來存儲,簡稱等待樹(wait_tree),此處借鑒nginx的設計。

Coroutine就是協程的相應屬性,status表示協程的運行狀態。sleep與wait兩顆紅黑樹,ready使用的隊列,比如某協程調用sleep函數,加入睡眠樹(sleep_tree),status |= S即可。比如某協程在等待樹(wait_tree)中,而IO準備就緒放入ready隊列中,只需要移出等待樹(wait_tree),狀態更改status &= ~W即可。有一個前提條件就是不管何種運行狀態的協程,都在就緒隊列中,只是同時包含有其他的運行狀態。

(2)調度器與協程的功能界限

每一協程都需要使用的而且可能會不同屬性的,就是協程屬性。每一協程都需要的而且數據一致的,就是調度器的屬性。比如棧大小的數值,每個協程都一樣的后不做更改可以作為調度器的屬性,如果每個協程大小不一致,則可以作為協程的屬性。

用來管理所有協程的屬性,作為調度器的屬性。比如epoll用來管理每一個協程對應的IO,是需要作為調度器屬性。

按照前面幾章的描述,定義一個協程結構體需要多少域,我們描述了每一個協程有自己的上下文環境,需要保存CPU的寄存器ctx;需要有子過程的回調函數func;需要有子過程回調函數的參數 arg;需要定義自己的棧空間 stack;需要有自己??臻g的大小 stack_size;需要定義協程的創建時間 birth;需要定義協程當前的運行狀態 status;需要定當前運行狀態的結點(ready_next, wait_node, sleep_node);需要定義協程id;需要定義調度器的全局對象 sched。

協程的核心結構體如下:

typedef struct _nty_coroutine {

    nty_cpu_ctx ctx;
    proc_coroutine func;
    void *arg;
    size_t stack_size;

    nty_coroutine_status status;
    nty_schedule *sched;

    uint64_t birth;
    uint64_t id;

    void *stack;

    RB_ENTRY(_nty_coroutine) sleep_node;
    RB_ENTRY(_nty_coroutine) wait_node;

    TAILQ_ENTRY(_nty_coroutine) ready_next;
    TAILQ_ENTRY(_nty_coroutine) defer_next;

} nty_coroutine;

調度器是管理所有協程運行的組件,協程與調度器的運行關系。

調度器的屬性,需要有保存CPU的寄存器上下文 ctx,可以從協程運行狀態yield到調度器運行的。從協程到調度器用yield,從調度器到協程用resume以下為協程的定義。

typedef struct _nty_coroutine_queue nty_coroutine_queue;

typedef struct _nty_coroutine_rbtree_sleep nty_coroutine_rbtree_sleep;
typedef struct _nty_coroutine_rbtree_wait nty_coroutine_rbtree_wait;

typedef struct _nty_schedule {
    uint64_t birth;
nty_cpu_ctx ctx;

    struct _nty_coroutine *curr_thread;
    int page_size;

    int poller_fd;
    int eventfd;
    struct epoll_event eventlist[NTY_CO_MAX_EVENTS];
    int nevents;

    int num_new_events;

    nty_coroutine_queue ready;
    nty_coroutine_rbtree_sleep sleeping;
    nty_coroutine_rbtree_wait waiting;

} nty_schedule;

4.7協程的實現之調度器

問題:協程如何被調度?

調度器的實現,有兩種方案,一種是生產者消費者模式,另一種多狀態運行。

(1)生產者消費者模式

邏輯代碼如下:

while (1) {

        //遍歷睡眠集合,將滿足條件的加入到ready
        nty_coroutine *expired = NULL;
        while ((expired = sleep_tree_expired(sched)) != ) {
            TAILQ_ADD(&sched->ready, expired);
        }

        //遍歷等待集合,將滿足添加的加入到ready
        nty_coroutine *wait = NULL;
        int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
        for (i = 0;i < nready;i ++) {
            wait = wait_tree_search(events[i].data.fd);
            TAILQ_ADD(&sched->ready, wait);
        }

        // 使用resume回復ready的協程運行權
        while (!TAILQ_EMPTY(&sched->ready)) {
            nty_coroutine *ready = TAILQ_POP(sched->ready);
            resume(ready);
        }
    }

(2)多狀態運行

實現邏輯代碼如下:

while (1) {

        //遍歷睡眠集合,使用resume恢復expired的協程運行權
        nty_coroutine *expired = NULL;
        while ((expired = sleep_tree_expired(sched)) != ) {
            resume(expired);
        }

        //遍歷等待集合,使用resume恢復wait的協程運行權
        nty_coroutine *wait = NULL;
        int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
        for (i = 0;i < nready;i ++) {
            wait = wait_tree_search(events[i].data.fd);
            resume(wait);
        }

        // 使用resume恢復ready的協程運行權
        while (!TAILQ_EMPTY(sched->ready)) {
            nty_coroutine *ready = TAILQ_POP(sched->ready);
            resume(ready);
        }
    }

4.8協程性能測試

測試環境:4臺VMWare 虛擬機

  • 1臺服務器 6G內存,4核CPU
  • 3臺客戶端 2G內存,2核CPU

操作系統:ubuntu 14.04

  • 服務器端測試代碼:https://github.com/wangbojing/NtyCo
  • 客戶端測試代碼:https://github.com/wangbojing/c1000k_test/blob/master/client_mutlport_epoll.c
  • 按照每一個連接啟動一個協程來測試。每一個協程棧空間 4096byte
  • 6G內存 –> 測試協程數量100W無異常。并且能夠正常收發數據。

五、協程創建和運行

由于多個協程運行于一個線程內部的,因此當創建線程中的第一個協程時,需要初始化該協程所在的環境 stCoRoutineEnv_t,這個環境是線程用來管理協程的,通過該環境,線程可以得知當前一共創建了多少個協程,當前正在運行哪一個協程,當前應當如何調度協程:

struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ]; // 記錄當前創建的協程
int iCallStackSize; // 記錄當前一共創建了多少個協程
stCoEpoll_t *pEpoll; // 該線程的協程調度器
// 在使用共享棧模式拷貝棧內存時記錄相應的 coroutine
stCoRoutine_t* pending_co;
stCoRoutine_t* occupy_co;
};

上述代碼表明 libco 允許一個線程內最多創建 128 個協程,其中 pCallStack[iCallStackSize-1] 也就是棧頂的協程表示當前正在運行的協程。當調用函數 co_create 時,首先檢查當前線程中的 coroutine env 結構是否創建。

這里 libco 對于每個線程內的 stCoRoutineEnv_t 并沒有使用 thread-local 的方式(例如gcc 內置的 __thread,phxrpc采用這種方式)來管理,而是預先定義了一個大的數組,并通過對應的 PID 來獲取其協程環境。

static stCoRoutineEnv_t* g_arrCoEnvPerThread[204800]
stCoRoutineEnv_t *co_get_curr_thread_env()
{
return g_arrCoEnvPerThread[ GetPid() ];
}

初始化 stCoRoutineEnv_t 時主要完成以下幾步:

為 stCoRoutineEnv_t 申請空間并且進行初始化,設置協程調度器 pEpoll。

創建一個空的 coroutine,初始化其上下文環境( 有關 coctx 在后文詳細介紹 ),將其加入到該線程的協程環境中進行管理,并且設置其為 main coroutine。這個 main coroutine 用來運行該線程主邏輯。

當初始化完成協程環境之后,調用函數 co_create_env 來創建具體的協程,該函數初始化一個協程結構 stCoRoutine_t,設置該結構中的各項字段,例如運行的函數 pfn,運行時的棧地址等等。需要說明的就是,如果使用了非共享棧模式,則需要為該協程單獨申請棧空間,否則從共享棧中申請空間。棧空間表示如下:

struct stStackMem_t
{
stCoRoutine_t* occupy_co; // 使用該棧的協程
int stack_size; // 棧大小
char* stack_bp; // 棧的指針,棧從高地址向低地址增長
char* stack_buffer; // 棧底
};

使用 co_create 創建完一個協程之后,將調用 co_resume 來將該協程激活運行:

void co_resume( stCoRoutine_t *co )
{
stCoRoutineEnv_t *env = co->env;
// 獲取當前正在運行的協程的結構
stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
if( !co->cStart )
{
// 為將要運行的 co 布置上下文環境
coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
co->cStart = 1;
}
env->pCallStack[ env->iCallStackSize++ ] = co; // 設置co為運行的線程
co_swap( lpCurrRoutine, co );
}

函數 co_swap 的作用類似于 Unix 提供的函數 swapcontext:將當前正在運行的 coroutine 的上下文以及狀態保存到結構 lpCurrRoutine 中,并且將 co 設置成為要運行的協程,從而實現協程的切換。co_swap 具體完成三項工作:

記錄當前協程 curr 的運行棧的棧頂指針,通過 char c; curr_stack_sp=&c 實現,當下次切換回 curr時,可以從該棧頂指針指向的位置繼續,執行完 curr 后可以順利釋放該棧。

處理共享棧相關的操作,并且調用函數 coctx_swap 來完成上下文環境的切換。注意執行完 coctx_swap之后,執行流程將跳到新的 coroutine 也就是 pending_co 中運行,后續的代碼需要等下次切換回 curr 時才會執行。

當下次切換回 curr 時,處理共享棧相關的操作。

對應于 co_resume 函數,協程主動讓出執行權則調用 co_yield 函數。co_yield 函數調用了 co_yield_env,將當前協程與當前線程中記錄的其他協程進行切換:

void co_yield_env( stCoRoutineEnv_t *env )
{
stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
env->iCallStackSize--;
co_swap( curr, last);
}

前面我們已經提到過,pCallStack 棧頂所指向的即為當前正在運行的協程所對應的結構,因此該函數將 curr 取出來,并將當前正運行的協程上下文保存到該結構上,并切換到協程 last 上執行。接下來我們以 32-bit 的系統為例來分析 libco 是如何實現協程運行環境的切換的。

六、協程上下文的創建和切換

libco 使用結構 struct coctx_t 來表示一個協程的上下文環境:

struct coctx_t
{

if defined(__i386__)
void *regs[ 8 ];

else
void *regs[ 14 ];

endif
size_t ss_size;
char *ss_sp;
};

圖片圖片

結合上圖,我們需要知道關鍵的幾點:

函數調用棧是調用者和被調用者共同負責布置的。Caller 將其參數從右向左反向壓棧,再將調用后的返回地址壓棧,然后將執行流程交給 Callee。

典型的編譯器會將 Callee 函數匯編成為以 push %ebp; move %ebp, %esp; sub $esp N; 這種形式開頭的匯編代碼。這幾句代碼主要目的是為了方便 Callee 利用 ebp 來訪問調用者提供的參數以及自身的局部變量(如下圖)。

當調用過程完成清除了局部變量以后,會執行 pop %ebp; ret,這樣指令會跳轉到 RA 也就是返回地址上面執行。這一點也是實現協程切換的關鍵:我們只需要將指定協程的函數指針地址保存到 RA 中,當調用完 coctx_swap 之后,會自動跳轉到該協程的函數起始地址開始運行。

了解了這些,我們就來看一下協程上下文環境的初始化函數 coctx_make:

int coctx_make( coctx_t ctx, coctx_pfn_t pfn, const void s, const void *s1 )
{
char *sp = ctx->ss_sp + ctx->ss_size - sizeof(coctx_param_t);
sp = (char*)((unsigned long)sp & -16L);
coctx_param_t param = (coctx_param_t)sp ;
param->s1 = s;
param->s2 = s1;
memset(ctx->regs, 0, sizeof(ctx->regs));
ctx->regs[ kESP ] = (char)(sp) - sizeof(void);
ctx->regs[ kEIP ] = (char*)pfn;
return 0;
}

這段代碼應該比較好理解,首先為函數 coctx_pfn_t 預留 2 個參數的??臻g并對其到 16 字節,之后將實參設置到預留的棧上空間中。最后在 ctx 結構中填入相應的,其中記錄 reg[kEIP] 返回地址為函數指針 pfn,記錄 reg[kESP] 為獲得的棧頂指針 sp 減去一個指針長度,這個減去的空間是為返回地址 RA 預留的。當調用 coctx_swap 時,reg[kEIP] 會被放到返回地址 RA 的位置,待 coctx_swap 執行結束,自然會跳轉到函數 pfn 處執行。

coctx_swap(ctx1, ctx2) 在 coctx_swap.S 中實現。這里可以看到,該函數并沒有使用 push %ebp; move %ebp, %esp; sub $esp N; 開頭,因此??臻g分布中不會出現 ebp 的位置。coctx_swap 函數主要分為兩段,其首先將當前的上下文環境保存到 ctx1 結構中:

leal 4(%esp), %eax // eax = old_esp + 4
movl 4(%esp), %esp // 將 esp 的值設為 &ctx1(即ctx1的地址)
leal 32(%esp), %esp // esp = (char*)&ctx1 + 32
pushl %eax // ctx1->regs[EAX] = %eax
pushl %ebp // ctx1->regs[EBP] = %ebp
pushl %esi // ctx1->regs[ESI] = %esi
pushl %edi // ctx1->regs[EDI] = %edi
pushl %edx // ctx1->regs[EDX] = %edx
pushl %ecx // ctx1->regs[ECX] = %ecx
pushl %ebx // ctx1->regs[EBX] = %ebx
pushl -4(%eax) // ctx1->regs[EIP] = RA, 注意:%eax-4=%old_esp

這里需要注意指令 leal 和 movl 的區別。leal 將 eax 的值設置成為 esp 的值加 4,而 movl 將 esp 的值設為 esp+4 所指向的內存上的值,也就是參數 ctx1 的地址。之后該函數將 ctx2 中記錄的上下文恢復到 CPU 寄存器中,并跳轉到其函數地址處運行:

movl 4(%eax), %esp // 將 esp 的值設為 &ctx2(即ctx2的地址)
popl %eax // %eax = ctx1->regs[EIP],也就是 &pfn
popl %ebx // %ebx = ctx1->regs[EBP]
popl %ecx // %ecx = ctx1->regs[ECX]
popl %edx // %edx = ctx1->regs[EDX]
popl %edi // %edi = ctx1->regs[EDI]
popl %esi // %esi = ctx1->regs[ESI]
popl %ebp // %ebp = ctx1->regs[EBP]
popl %esp // %esp = ctx1->regs[ESP],即(char)(sp) - sizeof(void)
pushl %eax // RA = %eax = &pfn,注意此時esp已經指向了新的esp
xorl %eax, %eax // reset eax
ret

上面的代碼看起來可能有些繞:

  • 首先 line 1 將 esp 設置為參數 ctx2 的地址,后續的 popl 操作均在 ctx2 的內存空間上執行。
  • line 2-9 將 ctx2->regs[] 中的內容恢復到相應的寄存器中。還記得在前面 coctx_make 中設置了 regs[EIP] 和 regs[ESP] 嗎?這里剛好就對應恢復了相應的值。
  • 當執行完 line 9 之后,esp 已經指向了 ctx2 中新的棧頂指針,由于在 coctx_make 中預留了一個指針長度的 RA 空間,line 10 剛好將新的函數指針 &pfn 設置到該 RA 上。
  • 最后執行 ret 指令時,函數流程將跳到 pfn 處執行。這樣,整個協程上下文的切換就完成了。

七、如何使用 libco

我們首先以 libco 提供的例子 example_echosvr.cpp 來介紹應用程序如何使用 libco 來編寫服務端程序。在 example_echosvr.cpp 的 main 函數中,主要執行如下幾步:

  1. 創建 socket,監聽在本機的 1024 端口,并設置為非阻塞;
  2. 主線程使用函數 readwrite_coroutine 創建多個讀寫協程,調用 co_resume 啟動協程運行直到其掛起。這里我們忽略掉無關的多進程 fork 的過程;
  3. 主線程繼續創建 socket 接收協程 accpet_co,同樣調用 co_resume 啟動協程直到其掛起;
  4. 主線程調用函數 co_eventloop 實現事件的監聽和協程的循環切換;

函數 readwrite_coroutine 在外層循環中將新創建的讀寫協程都加入到隊列 g_readwrite 中,此時這些讀寫協程都沒有具體與某個 socket 連接對應,可以將隊列 g_readwrite 看成一個 coroutine pool。當加入到隊列中之后,調用函數 co_yield_ct 函數讓出 CPU,此時控制權回到主線程。

主線程中的函數 co_eventloop 監聽網絡事件,將來自于客戶端新進的連接交由協程 accept_co 處理,關于 co_eventloop 如何喚醒 accept_co 的細節我們將在后續介紹。accept_co 調用函數 accept_routine 接收新連接,該函數的流程如下:

檢查隊列 g_readwrite 是否有空閑的讀寫 coroutine,如果沒有,調用函數 poll 將該協程加入到 Epoll 管理的定時器隊列中,也就是 sleep(1000) 的作用;

調用 co_accept 來接收新連接,如果接收連接失敗,那么調用 co_poll 將服務端的 listen_fd 加入到 Epoll 中來觸發下一次連接事件;

對于成功的連接,從 g_readwrite 中取出一個讀寫協程來負責處理讀寫;

再次回到函數 readwrite_coroutine 中,該函數會調用 co_poll 將新建立的連接的 fd 加入到 Epoll 監聽中,并將控制流程返回到 main 協程;當有讀或者寫事件發生時,Epoll 會喚醒對應的 coroutine ,繼續執行 read 函數以及 write 函數。

上面的過程大致說明了控制流程是如何在不同的協程中切換,接下來我們介紹具體的實現細節,即如何通過 Epoll 來管理協程,以及如何對系統函數進行改造以滿足 libco 的調用。

八、通過 Epoll 管理和喚醒協程

上一章節中介紹了協程可以通過函數 co_poll 來將 fd 交由 Epoll 管理,待 Epoll 的相應的事件觸發時,再切換回來執行 read 或者 write 操作,從而實現由 Epoll 管理協程的功能。co_poll 函數原型如下:

int co_poll(stCoEpoll_t *ctx, struct pollfd fds[],
nfds_t nfds, int timeout_ms)

stCoEpoll_t 是為 libco 定制的 Epoll 相關數據結構,fds 是 pollfd 結構的文件句柄,nfds 為 fds 數組的長度,最后一個參數表示定時器時間,也就是在 timeout 毫秒之后觸發處理這些文件句柄。這里可以看到,co_poll 能夠同時將多個文件句柄同時加入到 Epoll 管理中。我們先看 stCoEpoll_t 結構:

struct stCoEpoll_t
{
int iEpollFd; // Epoll 主 FD
static const int _EPOLL_SIZE = 1024 * 10; // Epoll 可以監聽的句柄總數
struct stTimeout_t *pTimeout; // 時間輪定時器
struct stTimeoutItemLink_t *pstTimeoutList; // 已經超時的時間
struct stTimeoutItemLink_t *pstActiveList; // 活躍的事件
co_epoll_res *result; // Epoll 返回的事件結果
};

以 stTimeout_ 開頭的數據結構與 libco 的定時器管理有關,我們在后面介紹。co_epoll_res 是對 Epoll 事件數據結構的封裝,也就是每次觸發 Epoll 事件時的返回結果,在 Unix 和 MaxOS 下,libco 將使用 Kqueue 替代 Epoll,因此這里也保留了 kevent 數據結構。

```clike
struct co_epoll_res
{
int size;
struct epoll_event *events; // for linux epoll
struct kevent *eventlist; // for Unix or MacOs kqueue
};

co_poll 實際是對函數 co_poll_inner 的封裝。我們將 co_epoll_inner 函數的結構分為上下兩半段。在上半段中,調用 co_poll 的協程 CC 將其需要監聽的句柄數組 fds 都加入到 Epoll 管理中,并通過函數 co_yield_env 讓出 CPU;當 main 協程的事件循環 co_eventloop 中觸發了 CC 對應的監聽事件時,會恢復 CC的執行。此時,CC 將開始執行下半段,即將上半段添加的句柄 fds 從 epoll 中移除,清理殘留的數據結構,下面的流程圖簡要說明了控制流的轉移過程:

圖片圖片

有了上面的基本概念,我們來看具體的實現細節。co_poll 首先在內部將傳入的文件句柄數組 fds 轉化為數據結構 stPoll_t,這一步主要是為了方便后續處理。該結構記錄了 iEpollFd,ndfs,fds 數組,以及該協程需要執行的函數和參數。有兩點需要說明的是:

  1. 對于每一個 fd,為其申請一個 stPollItem_t 來管理對應 Epoll 事件以及記錄回調參數。libco 在此做了一個小的優化,對于長度小于 2 的 fds 數組,直接在棧上定義相應的 stPollItem_t 數組,否則從堆中申請內存。這也是一種比較常見的優化,畢竟從堆中申請內存比較耗時;
  2. 函數指針 OnPollProcessEvent 封裝了協程的切換過程。當傳入指定的 stPollItem_t 結構時,即可喚醒對應于該結構的 coroutine,將控制權交由其執行;

co_poll 的第二步,也是最關鍵的一步,就是將 fd 數組全部加入到 Epoll 中進行監聽。協程 CC 會將每一個 epoll_event 的 data.ptr 域設置為對應的 stPollItem_t 結構。這樣當事件觸發時,可以直接從對應的 ptr中取出 stPollItem_t 結構,然后喚醒指定協程。

如果本次操作提供了 Timeout 參數,co_poll 還會將協程 CC 本次操作對應的 stPoll_t 加入到定時器隊列中。這表明在 Timeout 定時觸發之后,也會喚醒協程 CC 的執行。當整個上半段都完成后,co_poll 立即調用 co_yield_env 讓出 CPU,執行流程跳轉回到 main 協程中。

從上面的流程圖中也可以看出,當執行流程再次跳回時,表明協程 CC 添加的讀寫等監聽事件已經觸發,即可以執行相應的讀寫操作了。此時 CC 首先將其在上半段中添加的監聽事件從 Epoll 中刪除,清理殘留的數據結構,然后調用讀寫邏輯。

九、定時器實現

協程 CC 在將一組 fds 加入 Epoll 的同時,還能為其設置一個超時時間。在超時時間到期時,也會再次喚醒 CC 來執行。libco 使用 Timing-Wheel 來實現定時器。關于 Timing-Wheel 算法,可以參考,其優勢是 O(1) 的插入和刪除復雜度,缺點是只有有限的長度,在某些場合下不能滿足需求。

回過去看 stCoEpoll_t 結構,其中 pTimeout 代表時間輪,通過函數 AllocateTimeout 初始化為一個固定大?。?0 1000)的數組。根據 Timing-Wheel 的特性可知,libco 只支持最大 60s 的定時事件。而實際上,在添加定時器時,libco 要求定時時間不超過 40s。成員 pstTimeoutList 記錄在 co_eventloop 中發生超時的事件,而 pstActiveList 記錄當前活躍的事件,包括超時事件。這兩個結構都將在 co_eventloop 中進行處理。

下面我們簡要分析一下加入定時器的實現:

int AddTimeout( stTimeout_t apTimeout, stTimeoutItem_t apItem,
unsigned long long allNow )
{
if( apTimeout->ullStart == 0 ) // 初始化時間輪的基準時間
{
apTimeout->ullStart = allNow;
apTimeout->llStartIdx = 0; // 當前時間輪指針指向數組0
}
// 1. 當前時間不可能小于時間輪的基準時間
// 2. 加入的定時器的超時時間不能小于當前時間
if( allNow < apTimeout->ullStart || apItem->ullExpireTime < allNow )
{
return __LINE__;
}
int diff = apItem->ullExpireTime - apTimeout->ullStart;
if( diff >= apTimeout->iItemSize ) // 添加的事件不能超過時間輪的大小
{
return __LINE__;
}
// 插入到時間輪盤的指定位置
AddTail( apTimeout->pItems +
(apTimeout->llStartIdx + diff ) % apTimeout->iItemSize, apItem );
return 0;
}

定時器的超時檢查在函數 co_eventloop 中執行。

十、EPOLL 事件循環

main 協程通過調用函數 co_eventloop 來監聽 Epoll 事件,并在相應的事件觸發時切換到指定的協程執行。有關 co_eventloop 與 應用協程的交互過程在上一節的流程圖中已經比較清楚了,下面我們主要介紹一下 co_eventloop 函數的實現:

上文中也提到,通過 epoll_wait 返回的事件都保存在 stCoEpoll_t 結構的 co_epoll_res 中。因此 co_eventloop 首先為 co_epoll_res 申請空間,之后通過一個無限循環來監聽所有 coroutine 添加的所有事件:

for(;;)
{
int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
...
}

對于每一個觸發的事件,co_eventloop 首先通過指針域 data.ptr 取出保存的 stPollItem_t 結構,并將其添加到 pstActiveList 列表中;之后從定時器輪盤中取出所有已經超時的事件,也將其全部添加到 pstActiveList 中,pstActiveList 中的所有事件都作為活躍事件處理。

對于每一個活躍事件,co_eventloop 將通過調用對應的 pfnProcess 也就是上圖中的OnPollProcessEvent 函數來切換到該事件對應的 coroutine,將流程跳轉到該 coroutine 處執行。

最后 co_eventloop 在調用時也提供一個額外的參數來供調用者傳入一個函數指針 pfn。該函數將會在每次循環完成之后執行;當該函數返回 -1 時,將會終止整個事件循環。用戶可以利用該函數來控制 main 協程的終止或者完成一些統計需求。

責任編輯:武曉燕 來源: 深度Linux
相關推薦

2022-09-06 20:30:48

協程Context主線程

2021-09-16 09:59:13

PythonJavaScript代碼

2023-11-17 11:36:59

協程纖程操作系統

2022-09-12 06:35:00

C++協程協程狀態

2020-11-29 17:03:08

進程線程協程

2023-10-12 09:46:00

并發模型線程

2025-08-06 01:22:00

并發編程數據

2023-11-04 20:00:02

C++20協程

2024-09-25 08:28:45

2022-09-10 18:51:09

C++協程主線程

2021-04-25 09:36:20

Go協程線程

2023-10-24 19:37:34

協程Java

2025-02-08 09:13:40

2021-12-09 06:41:56

Python協程多并發

2025-06-26 02:00:00

2020-07-07 09:19:28

Android 協程開發

2025-03-26 01:22:00

NtyCo協程框架

2017-08-10 15:50:44

PHP協程阻塞

2022-04-19 20:39:03

協程多進程

2020-08-04 10:56:09

進程線程協程
點贊
收藏

51CTO技術棧公眾號

欧美成人国产| 精品国产麻豆| 中文字幕乱码日本亚洲一区二区| 国产成人精品视频| 久久精品色妇熟妇丰满人妻| 日韩午夜视频在线| 一级日本不卡的影视| 精品国产乱码久久久久| 日韩 国产 欧美| 亚洲欧美综合久久久| 欧美精品一区二区三区四区| 丁香婷婷激情网| 18网站在线观看| 91在线云播放| 91久久久国产精品| 日韩精品1区2区| 亚洲不卡av不卡一区二区| 精品久久久久久久久久久久久久久久久 | 国产av无码专区亚洲av毛网站| 538任你躁精品视频网免费| 色综合久久久久| 亚洲色欲久久久综合网东京热| 黄色影院在线播放| 成人免费毛片aaaaa**| 国产精品欧美日韩| 欧美福利视频一区二区| 亚洲成人二区| 亚洲性xxxx| 荫蒂被男人添免费视频| 亚洲日韩中文字幕一区| 色伊人久久综合中文字幕| 人妻av无码专区| 黄色网页网址在线免费| 国产女人aaa级久久久级 | 国产精品国产三级国产专播品爱网 | 三级黄色在线视频| 欧美ab在线视频| 最近日韩中文字幕中文| 中文字字幕码一二三区| 都市激情久久| 欧美电影免费提供在线观看| 亚洲成人av免费看| 中文在线а√天堂| 亚洲va欧美va人人爽午夜| 三上悠亚免费在线观看| 91社区在线观看播放| 久久人人爽爽爽人久久久| 国产精品日韩一区二区免费视频| 国产又粗又大又爽视频| 免费在线看成人av| 日韩av免费网站| 男女啊啊啊视频| 亚洲三级影院| 国产69精品久久久久99| 久久久久黄色片| 女人香蕉久久**毛片精品| 日韩亚洲国产中文字幕| 日本精品久久久久中文| 禁果av一区二区三区| 亚洲欧洲免费视频| 老熟妇一区二区| 国产在视频线精品视频www666| 精品88久久久久88久久久| www.四虎精品| 黄色欧美网站| 亚洲精品720p| 国产精品三级在线观看无码| 欧美日韩看看2015永久免费 | 7777kkkk成人观看| 国产成人免费看| 亚洲一区二区毛片| 日韩美女视频中文字幕| 中文字幕av第一页| 九九热在线视频观看这里只有精品| 国产精品欧美日韩一区二区| 国产又粗又大又爽视频| 国产馆精品极品| 狠狠色综合网站久久久久久久| 天堂在线资源库| 国产午夜亚洲精品羞羞网站| 午夜视频久久久| 黄黄的网站在线观看| 亚洲一区二区三区爽爽爽爽爽| 国产真人做爰毛片视频直播| 黄网av在线| 一本一本大道香蕉久在线精品 | 精品国产乱码久久久久久老虎| 国产精品偷伦视频免费观看了| 成人涩涩网站| 亚洲偷熟乱区亚洲香蕉av| 国产免费一区二区三区四区| 在线欧美三区| 国产精品久久久久久久久影视| 国产手机视频在线| thepron国产精品| 日韩在线观看电影完整版高清免费| 日本在线视频网| 午夜精品久久久久久久99水蜜桃 | 日韩影院在线观看| 成人中文字幕+乱码+中文字幕| 成 人 黄 色 片 在线播放| www.欧美色图| 中文字幕中文字幕在线中一区高清| 性直播体位视频在线观看| 欧美午夜影院在线视频| 亚洲在线观看网站| 妖精视频一区二区三区| 欧美大胆在线视频| 在线免费观看av网址| 国产不卡在线一区| 日韩欧美一区二区视频在线播放| 成人免费在线| 欧美自拍丝袜亚洲| 亚洲av成人精品一区二区三区| 欧美精品一区二区久久| 久久久久久av| 国产精品国产三级国产aⅴ| 成人av动漫在线| 中文字幕久久综合| 成人软件在线观看| 精品福利一二区| 波多野结衣喷潮| 久久久亚洲一区| 国产精品区一区二区三在线播放| 日本高清视频在线播放| 色婷婷综合激情| 久久人妻一区二区| 欧美三级网页| 成人福利在线观看| 国产三区四区在线观看| 精品成人av一区| 中文字幕一二三| 天天做天天爱天天综合网2021| 欧美中文字幕第一页| 懂色av蜜臀av粉嫩av分享吧| 一区精品在线播放| 性欧美videossex精品| 亚洲综合福利| **欧美日韩vr在线| 亚洲人成色777777老人头| 亚洲国产精品一区二区www| 污污视频在线免费| 91精品一区国产高清在线gif| 国产高清视频一区三区| 日漫免费在线观看网站| 精品福利在线视频| 涩视频在线观看| 国产在线成人| 高清不卡日本v二区在线| 国产精品久久麻豆| 91 com成人网| 成人在线观看免费完整| 国产资源在线一区| 亚洲第一综合网站| 97久久中文字幕| 久久香蕉国产线看观看av| 成人av网站在线观看免费| 日本a级片电影一区二区| 日本加勒比一区| 亚洲一区二区精品久久av| 91精品国产三级| 午夜久久黄色| 国产欧美亚洲日本| av在线资源| 亚洲精品suv精品一区二区| 日韩久久精品视频| 26uuu久久天堂性欧美| 国产美女三级视频| 精品国内自产拍在线观看视频| 国产成人av网址| 天天在线视频色| 日韩欧美激情在线| 日本在线观看视频网站| 99国产精品久久久久久久久久久 | 欧美视频自拍偷拍| 国产jizz18女人高潮| 精品综合久久久久久8888| 好色先生视频污| 成功精品影院| 日本精品久久久| 91在线播放网站| 欧美成人video| 亚洲影院在线播放| 国产精品久久久久久久久免费丝袜 | 91精品国产乱| 精品在线免费观看视频| 91丝袜国产在线播放| 在线免费观看av的网站| 欧美成熟视频| 欧美xxxx黑人又粗又长密月 | 亚洲一区二区三区乱码aⅴ| 黄色小说在线播放| 国产亚洲精品久久久久久777| 国产在成人精品线拍偷自揄拍| 亚洲一区二区在线免费观看视频| 亚洲精品成人无码熟妇在线| 捆绑变态av一区二区三区| 日韩在线观看a| 精品一区电影| 国产自产在线视频一区| 久久久免费人体| 97视频在线播放| 天堂аⅴ在线地址8| 亚洲精品黄网在线观看| 国产精品久久久久久免费免熟| 狠狠爱在线视频一区| 久久中文免费视频| 久久久久久久一区| 国产精品果冻传媒| 久久av中文字幕片| 农村妇女精品一二区| 欧美日韩日本国产亚洲在线| 日韩欧美精品一区二区三区经典| 粉嫩的18在线观看极品精品| 国产日韩欧美视频在线| 在线男人天堂| 欧美精品videos另类日本| 日本a在线播放| 亚洲男人av在线| 亚洲精品综合网| 777欧美精品| 日韩中文字幕高清| 狠狠综合久久av一区二区小说| 精品自拍偷拍视频| 国产精品久久久久影院色老大 | 成人观看免费完整观看| 午夜精彩国产免费不卡不顿大片| 日韩中文一区| 亚洲国产国产| 国产一区不卡在线观看| 亚洲综合网狠久久| 成人深夜直播免费观看| 狠狠久久综合| 国产精品网站视频| 美女写真久久影院| 国产ts一区二区| 亚洲人成在线网站| 77777少妇光屁股久久一区| 9lporm自拍视频区在线| 欧美人在线观看| 在线观看wwwxxxx| 久久综合免费视频| 黄色网在线播放| 久久亚洲综合国产精品99麻豆精品福利| 国产高清免费在线播放| 亚洲午夜久久久久久久| 国产中文字幕在线视频| 亚洲视频视频在线| 国产精品视频二区三区| 亚洲深夜福利网站| 成人影院免费观看| 在线视频一区二区| 日本高清中文字幕在线| 久热精品视频在线| 在线视频国产区| 欧美激情a∨在线视频播放| 2024最新电影在线免费观看| 欧美美女操人视频| 福利小视频在线| 97在线精品国自产拍中文| 欧美gv在线| 国产福利精品av综合导导航| 免费高清视频在线一区| 国产女精品视频网站免费| 成人在线视频www| 超碰97在线播放| 牛牛精品成人免费视频| 欧美二区三区在线| 日韩a一区二区| 99中文字幕在线观看| 亚洲午夜极品| 国产最新免费视频| 免费久久精品视频| 亚洲av无码久久精品色欲| 99视频一区二区| www.av天天| 亚洲天堂av一区| 日韩精品成人在线| 色又黄又爽网站www久久| 伊人亚洲综合网| 日韩欧美色综合网站| 亚洲三级黄色片| xxxx性欧美| tube8在线hd| 国产精品国语对白| 999精品视频在这里| 欧美精品二区三区四区免费看视频| 日韩一区二区三区免费播放| 欧美 日韩 国产精品| 久色成人在线| 黑人巨大猛交丰满少妇| 91麻豆swag| 免费在线观看黄色小视频| 五月天婷婷综合| 在线播放国产一区| 亚洲国产精品久久久久秋霞蜜臀| 岛国视频免费在线观看| 欧美高清视频在线| 精品裸体bbb| 国产一区二区高清不卡| 欧美大黑bbbbbbbbb在线| 久久精品国产sm调教网站演员| 蜜臀av性久久久久av蜜臀妖精| 无码人妻一区二区三区精品视频| 国产亚洲污的网站| 国产真人真事毛片| 欧美揉bbbbb揉bbbbb| 蜜桃久久一区二区三区| 中文字幕日韩精品在线| 麻豆视频在线观看免费网站黄| 国产精品吴梦梦| 日本精品影院| 精品一区二区三区无码视频| 裸体在线国模精品偷拍| 中文字幕狠狠干| 亚洲午夜精品在线| 一区二区国产欧美| 亚洲色图25p| 超碰在线cao| 91精品综合久久| 色一区二区三区四区| www.com毛片| 成人免费观看av| 欧美在线视频第一页| 欧美色精品天天在线观看视频| 天天躁日日躁狠狠躁喷水| 另类视频在线观看| 四虎国产精品永久在线国在线| 精品伊人久久大线蕉色首页| 欧美一区免费| 91精品999| 国产精品美女一区二区在线观看| 久久国产黄色片| 日韩精品高清在线观看| 国产www视频在线观看| 亚洲自拍偷拍色片视频| 久久精品久久久| 三级视频中文字幕| 国产日产欧产精品推荐色| 亚洲综合久久网| 精品中文字幕久久久久久| 毛片在线网站| 久久久99国产精品免费| 99成人在线| 中文乱码人妻一区二区三区视频| 亚洲h在线观看| 三级在线观看网站| 97久久久免费福利网址| 欧洲亚洲视频| 久草青青在线观看| 国产婷婷一区二区| 青青艹在线观看| 最近2019年中文视频免费在线观看 | 国产高潮在线观看| 欧美黑人视频一区| 99精品中文字幕在线不卡 | 欧美色老头old∨ideo| av片在线免费观看| 成人欧美一区二区三区在线湿哒哒 | 亚洲精品乱码久久久久久金桔影视| av手机免费在线观看| 狠狠色综合色区| 日韩电影在线一区二区| avhd101老司机| 欧美一卡2卡三卡4卡5免费| 成年人黄视频在线观看| 国产精品区一区二区三在线播放 | 韩剧1988免费观看全集| 精品三级av| 波多野结衣作品集| 国产精品久久精品日日| 99在线小视频| 欧美性做爰毛片| 成人婷婷网色偷偷亚洲男人的天堂| 日本黄大片一区二区三区| 亚洲欧美日韩国产另类专区 | 亚洲一区二区小说| 99久热在线精品视频| 成人18视频在线播放| aaa在线视频| 久久亚洲成人精品| 乱中年女人伦av一区二区| 精品免费国产一区二区| 亚洲色图制服丝袜| 色呦呦视频在线| 国产精品啪视频| 国产精品v日韩精品v欧美精品网站 | 在线免费观看一区二区三区| 国产精品一二三| 欧美性猛交bbbbb精品| 精品精品国产国产自在线| 高清精品xnxxcom| 亚洲人辣妹窥探嘘嘘| 一区二区三区精品久久久| 国内精品在线视频| 亚洲自拍av在线| 久久国产主播| 国产第一页第二页| 在线亚洲国产精品网| 99ri日韩精品视频|