精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

C++ 從零實現協程調度框架

開發 前端
我們通常所熟知的最小調度單元稱為線程(thread)?,亦指內核級線程?,其創建、銷毀、調度過程需要交由操作系統內核?負責. 線程與進程?可以是多對一關系,可以充分利用 CPU 多核?的能力,提高程序運行的并發度。

0 前言

推進掌握一門知識的方法除了溫故知新之外,還可以是觸類旁通. 近期沉寂停更的時間里,我在經歷自學 c++ 理論知識的入門階段,不過大家都知道“紙上得來終覺淺”的道理,因此我決定以復刻 golang gmp 協程調度設計思路為目標,基于c++11的風格實現一個低配乞丐版協程調度框架,以此作為我的首個 c++ 實踐開源項目,并希望以此為契機,在提高 c++ 編程熟練度的同時,也能提供一波旁支輸入,反補提升我對gmp概念的理解.

該項目我已于 github 開源,cbricks 是我基于 c++11 從零實現的基礎工具開源庫:https://github.com/xiaoxuxiansheng/cbricks.其中實現的內容包括但不僅限于 協程調度框架 workerpool、協程 coroutine/線程 thread、并發通信隊列 channel、日志打印組件 logger等基本工具類,而 協程調度框架 workerpool 正是我今天我要向大家介紹的主題.這是我作為 c++ 初學者推進的首個開源項目,完全出于學習實踐目的,難免存在水平不足以及重復造輪的問題,如有考慮不到位、不完善之處請多多包涵,也歡迎批評指正~

在開始正文前,致敬環節 必不可少. 在實現 cbricks 的編程過程中,在很大程度上學習借鑒了sylar-yin 老師的課程,在此特別感謝,也附上其開源項目傳送門供大家參考使用:https://github.com/sylar-yin/sylar . 正因為有前輩們慷慨無私的傾囊分享,我的學習之路才得以更加平坦順暢. 正是以上種種鼓勵著我能有動力把技術分享以及這種開源精神繼續傳播下去.

1 基本概念

首先,需要大家一起理清楚有關協程的基本概念.

1.1 線程與協程

我們通常所熟知的最小調度單元稱為線程(thread),亦指內核級線程,其創建、銷毀、調度過程需要交由操作系統內核負責. 線程與進程可以是多對一關系,可以充分利用 CPU 多核的能力,提高程序運行的并發度。

而協程(coroutine) 又稱為用戶級線程,是在用戶態視角下對線程概念的二次封裝. 一方面,協程與線程關系為多對一,因此在邏輯意義上屬于更細粒度的調度單元;另一方面,因為協程的創建、銷毀、調度行為都在用戶態中完成,而無需內核參與,因此協程是一個更加輕量化的概念. (對于內核來說,其最小調度單元始終為線程不變,至于用戶態下對線程又作了怎樣的邏輯拆分,對于內核而言是完全透明無感知的,因此無需介入)

線程與協程線程與協程

1.2 coroutine 與 goroutine

因為我畢竟有著較長的 golang 開發使用經驗,需要在探討相關問題的時候是無法繞開對 golang 中對 goroutine 這一設計的對比與探討的.

我們把常規的協程稱為 coroutine. 而在 golang 語言層面天然支持一種優化版協程模型,稱為 goroutine,并運轉調度于 go 語言中知名的 gmp(goroutine-machine-processor) 架構之下.

gmp架構gmp架構

有關 gmp 相關內容更細致的講解,可以參見我之前分享的文章:golang gmp 原理

在經歷了 cbricks workerpool 的開發實踐后,我也對 gmp 架構下 groutine 相較于普通 coroutine 存在的優勢有了一些更深刻的體會:

  • ? 線程松耦合:經由 P 的從中斡旋,goroutine 能夠跨 M(thread)調度運行,真正實現 G 與 M 之間的松耦合. 而這一點我所實現的 c++ coroutine 中無法做到.(本文實現的 coroutine 底層依賴于 c 中的 ucontext 庫完成協程棧空間的分配,由于棧的線程私有性,一經分配便不允許被其他線程染指,因此 coroutine 在初始化后就必然是某個 thread 所獨有的)
  • ? 棧空間自適應擴縮:goroutine 棧空間大小可以根據實際需要做到自適應擴縮,并且針對使用方完全屏蔽這一細節. 而我所實現的 c++ coroutine 需要在其初始化時就顯式設定好棧空間大小,并且在運行過程中不便于修改.

用戶視角下的gmp并發用戶視角下的gmp并發

? 阻塞粒度適配:這一點非常重要. golang 為使用方屏蔽了線程的概念,所有并發操作都基于 goroutine 粒度完成,這不僅限于調度,也包括與之相應的一系列并發阻塞工具,例如 鎖 mutex,通道 channel 等,都在語言層面天然支持 goroutine 粒度的被動阻塞(go_park)操作,與 gmp 體系完美適配;而這一點在 c++ 中則有所不同,如 鎖 mutex、信號量 semaphore 等工具的最小阻塞粒度都是線程,這就會導致協程的優勢遭到削弱,因為在一個 coroutine 中的阻塞行為最終會上升到 thread 粒度,并進而導致 thread 下其他 coroutine 也無法得到正常調度.

2 快速上手

做完基本概念鋪墊后,下面我們開始介紹有關協程調度框架 cbricks workerpool 的具體實現內容.

2.1 使用方法

本章我們聚焦在如何快速上手使用 workerpool 這一問題. workerpool 類型聲明于 ./pool/workerpool.h 頭文件中,使用方通常只需關心其構造函數和兩個公開方法:

  • ? 構造函數——WorkerPool:初始化 workerpool 實例,其中唯一入參 threads 為需要啟用的線程個數,默認為 8 個
  • ? 公開方法——submit:往 workerpool 中投遞一個任務 task(以 void() 閉包函數的形式)
  • ? 公開方法——sched:主動讓渡當前 task 的執行權,以實現同線程下協程間的切換
// 命名空間 cbricks::pool
namespace cbricks{namespace pool{
// 協程調度池
classWorkerPool: base::Noncopyable{
public:
// 構造函數  threads——使用的線程個數. 默認為 8 個
WorkerPool(size_t threads =8);
// ...
public:
/**
        公開方法
    */
/**
        * submit: 向協程調度池中提交一個任務 (仿 golang 協程池 ants 風格)
            - task 會被隨機分配到線程手中,保證負載均衡
            - task 被一個線程取到之后,會創建對應協程實例,為其分配本地棧,此時該任務和協程就固定屬于一個線程了
        * param:task——提交的任務  nonblock——是否為阻塞模式
            - 阻塞模式:線程本地任務隊列滿時阻塞等待 
            - 非阻塞模式:線程本地隊列滿時直接返回 false
        * response:true——提交成功 false——提交失敗
    */
bool submit(task task, bool nonblock = false);

// 工作協程調度任務過程中,可以通過執行次方法主動讓出線程的調度權 (仿 golang runtime.Goched 風格)
void sched();
}}

2.2 使用示例

下面是關于 workerpool 的具體使用示例,其中演示了如何完成一個 workerpool 的初始化,并通過 submit 方法向其中批量投遞異步執行的任務,最后對執行結果進行驗收:

#include <iostream>

#include "sync/sem.h"
#include "pool/workerpool.h"

void testWorkerPool();

int main(int argc, char** argv){
// 測試函數
testWorkerPool();
}

void testWorkerPool(){
// 協程調度框架類型別名定義
typedef cbricks::pool::WorkerPool workerPool;
// 信號量類型別名定義
typedef cbricks::sync::Semaphore semaphore;

// 初始化協程調度框架,設置并發的 threads 數量為 8
workerPool::ptr workerPoolPtr(new workerPool(8));

// 初始化一個原子計數器
    std::atomic<int> cnt{0};
// 初始化一個信號量實例
    semaphore sem;

// 投遞 10000 個異步任務到協程調度框架中,執行邏輯就是對 cnt 加 1
for(int i =0; i <10000; i++){
// 執行 submit 方法,將任務提交到協程調度框架中
        workerPoolPtr->submit([&cnt,&sem](){
            cnt++;
            sem.notify();
});
}

// 通過信號量等待 10000 個異步任務執行完成
for(int i =0; i <10000; i++){
        sem.wait();
}

// 輸出 cnt 結果(預期結果為 10000)
    std::cout << cnt << std::endl;
}

3 架構設計

了解完使用方式后,隨后就來揭曉其底層實現原理. 本著由總到分的學習指導綱領,本章我們從全局視角縱覽 workerpool 的設計實現架構.

3.1 整體架構與核心概念

cbricks協程調度架構cbricks協程調度架構

workerpool 自下而上,由粗到細可以分為如下層級概念:

? 線程池 threadPool:workerpool 初始化時就啟動指定數量的常駐線程 thread 實例. 這些 thread 數量固定不變,并且會持續運行,直到整個 workerpool 被析構為止. 由這些 thread 組成的集合,我們稱為 線程池 threadPool.

? 線程 thread:持續運營的 thread 單元,不斷執行著調度邏輯,依次嘗試從本地任務隊列 taskq、本地協程隊列 sched_q 中獲取任務 task /協程 coroutine 進行調度. 如果前兩者都空閑,則 thread 會仿照 gmp 中的 workstealing 機制,從其他 thread 的 taskq 中竊取 task 過來執行. 最后 steal 后仍缺少 task 供執行調度,則會利用 channel 的機制,使 thread 陷入阻塞,從而讓出 cpu 執行權

? 任務 task:用戶提交的異步任務(對應為 void() 閉包函數類型). task 會被均勻分配到特定 thread 的 taskq 中,但還存在被其他 thread 竊取的可能性,因此 task 本質上還是能夠跨 thread 傳遞使用的

? 協程 coroutine:在 workerpool 中,thread 不會直接執行 task,而是會為 task 一對一構建出 coroutine 實例,并切換至 coroutine 中完成對 task 的執行. coroutine 被創建出來后,會完成棧 stack 的初始化和分配,隨后 coroutine 就固定屬于一個 thread 了,終生不可再被其他 thread 染指

? 線程本地任務隊列 taskq:每個 thread 私有的緩存 task 的隊列,底層由并發安全的通信隊列 channel 實現. 當一筆 task 被投遞到 workerpool 時,會基于負載均衡策略投遞到特定 thread 的 taskq 中,接下來會被該 thread 優先調度執行

? 線程本地協程隊列 schedq:每個 thread 私有的緩存 coroutine 的隊列,底層由普通隊列 queue 實現,但屬于線程本地變量 thread_local,因此也是并發安全的. 當一個 coroutine 因主動讓渡 sched 操作而暫停執行時,會將其暫存到 schedq 中,等待后續再找時機完成該 coroutine 的調度工作.

3.2 相比 gmp 的不足之處

我在實現 workerpool 時,一定程度上仿照了 gmp 的風格,包括 thread 本地任務隊列 taskq 的實現以及 workstealing 機制的設計.

cbricks協程調度框架的不足之處cbricks協程調度框架的不足之處

然而受限于我的個人水平以及語言層面的風格差異,相較于 gmp,workerpool 還存在幾個明顯的缺陷:

? coroutine 與 thread 強綁定:當一個 coroutine 被初始化時,我使用的是 c 語言中 ucontext.h 完成 stack 的分配,這樣 coroutine stack 就是 thread 私有的,因此 coroutine 不能做到跨 thread 調度.

? thread 級阻塞粒度:c++ 中,并發工具因此的阻塞行為都是以 thread 為單位. 以互斥鎖 lock 為例,哪怕觸發加鎖阻塞行為的對象是 coroutine,但最終還是會引起整個 thread 對象陷入阻塞,從而導致 thread 下的其他已分配好的 coroutine 也無法得到執行.

要解決這一問題,就必須連帶著對 lock、cond、semaphore 等工具進行改造,使得其能夠支持 coroutine 粒度的阻塞操作,這樣的成本無疑很高,本項目未予以實踐.

4 頭文件源碼

從第 4 章開始,我們正式進入源碼解析環節. 首先給出關于 workerpool 頭文件的完整代碼展示,包含其中的成員屬性以及公私方法定義. 下面的示意圖以及源碼中給出的注釋相對比較完備,在此不再贅述:

workerpool 類定義workerpool 類定義

代碼位于 ./pool/workerpool.h:

// 保證頭文件內容不被重復編譯
#pragma once 

/**
 依賴的標準庫頭文件   
*/
// 標準庫智能指針相關
#include <memory>
// 標準庫函數編程相關
#include <functional>
// 標準庫原子量相關
#include <atomic>
// 標準庫——動態數組,以此作為線程池的載體
#include <vector>

/**
    依賴的項目內部頭文件
*/
// 線程 thread 實現
#include "../sync/thread.h"
// 協程 coroutine 實現
#include "../sync/coroutine.h"
// 阻塞隊列 channel 實現 (一定程度上仿 golang channel 風格)
#include "../sync/channel.h"
// 信號量 semaphore 實現
#include "../sync/sem.h"
// 拷貝禁用工具,用于保證類實例無法被值拷貝和值傳遞
#include "../base/nocopy.h"

// 命名空間 cbricks::pool
namespace cbricks{namespace pool{
// 協程調度池 繼承 Noncopyable 保證禁用值拷貝和值傳遞功能
classWorkerPool: base::Noncopyable{
public:
// 協程池共享指針類型別名
typedef std::shared_ptr<WorkerPool> ptr;
// 一筆需要執行的任務
typedef std::function<void()> task;
// 一個線程持有的本地任務隊列
typedef sync::Channel<task> localq;
// 本地任務隊列指針別名
typedef localq::ptr localqPtr;
// 線程指針別名
typedef sync::Thread* threadPtr;
// 一個分配了運行任務的協程
typedef sync::Coroutine worker;
// 協程智能指針別名
typedef sync::Coroutine::ptr workerPtr;
// 讀寫鎖別名
typedef sync::RWLock rwlock;
// 信號量類型別名
typedef sync::Semaphore semaphore;

public:
/**
      構造/析構函數
    */
// 構造函數  threads——使用的線程個數. 默認為 8 個
WorkerPool(size_t threads =8);
// 析構函數  
~WorkerPool();

public:
/**
        公開方法
    */
/**
        * submit: 向協程調度池中提交一個任務 (仿 golang 協程池 ants 風格)
            - task 會被隨機分配到線程手中,保證負載均衡
            - task 被一個線程取到之后,會創建對應協程實例,為其分配本地棧,此時該任務和協程就固定屬于一個線程了
        * param:task——提交的任務  nonblock——是否為阻塞模式
            - 阻塞模式:線程本地任務隊列滿時阻塞等待 
            - 非阻塞模式:線程本地隊列滿時直接返回 false
        * response:true——提交成功 false——提交失敗
    */
bool submit(task task, bool nonblock = false);

// 工作協程調度任務過程中,可以通過執行次方法主動讓出線程的調度權 (仿 golang runtime.Goched 風格)
void sched();

private:
/**
     * thread——workerPool 中封裝的線程類
     * - index:線程在線程池中的 index
     * - thr:真正的線程實例,類型為 sync/thread.h 中的 Thread
     * - taskq:線程的本地任務隊列,其中數據類型為閉包函數 void()
     * - lock:一把線程實例粒度的讀寫鎖. 用于隔離 submit 操作和 workstealing 操作,避免因任務隊列阻塞導致死鎖
     */
structthread{
typedef std::shared_ptr<thread> ptr;
int index;
        threadPtr thr;
        localqPtr taskq;
        rwlock lock;
/**
         *  構造函數
         * param: index: 線程在線程池中的 index; thr: 底層真正的線程實例; taskq:線程持有的本地任務隊列
        */
thread(int index,threadPtr thr, localqPtr taskq):index(index),thr(thr),taskq(taskq){}
~thread()=default;
};

private:
/**
        私有方法
    */
// work:線程運行主函數,持續不斷地從本地任務隊列 taskq 或本地協程隊列 t_schedq 中獲取任務/協程進行調度. 倘若本地任務為空,會嘗試從其他線程本地任務隊列竊取任務執行
void work();
/**
     * readAndGo:從指定的任務隊列中獲取任務并執行
     * param:taskq——指定的任務隊列 nonblock——是否為阻塞模式
     * reponse:true——成功 false——失敗
    */
bool readAndGo(localqPtr taskq, bool nonblock);
/**
     * goTask: 為一筆任務創建一個協程實例,并調度該任務函數
     * param: cb——待執行任務
     * tip:如果該任務未一次性執行完成(途中使用了 sched 方法),則會在棧中封存好任務的執行信息,然后將該協程實例追加到線程本地的協程隊列 t_schedq 中,等待后續再被線程調度
     */
void goTask(task cb);
/**
     * goWorker:調度某個協程實例,其中已經分配好執行的任務函數
     * param: worker——分配好執行任務函數的協程實例
     * tip:如果該任務未一次性執行完成(途中使用了 sched 方法),則會在棧中封存好任務的執行信息,然后將該協程實例追加到線程本地的協程隊列 t_schedq 中,等待后續再被線程調度
    */
void goWorker(workerPtr worker);

/**
     * workStealing:當其他線程任務隊列 taskq 中竊取半數任務填充到本地隊列
     */
void workStealing();
/**
     * workStealing 重載:從線程 stealFrom 的任務隊列中竊取半數任務填充到線程 stealTo 本地隊列
     */
void workStealing(thread::ptr stealTo, thread::ptr stealFrom);
/**
     * getStealingTarget:隨機獲取一個線程作為竊取目標
     */
thread::ptr getStealingTarget();

/**
     * getThreadByThreadName 通過線程名稱獲取對應的線程實例
     */
thread::ptr getThreadByThreadName(std::string threadName);
/**
     * getThread 獲取當前線程實例
     */
thread::ptr getThread();

private:
/**
     * 靜態私有方法
     */
// getThreadNameByIndex:通過線程 index 映射得到線程名稱
static const std::string getThreadNameByIndex(int index);
// getThreadIndex:獲取當前線程的 index
static const int getThreadIndex();
// getThreadName:獲取當前線程的名稱
static const std::string getThreadName();


private:
/**
     * 私有成員屬性
     */
// 基于 vector 實現的線程池,元素類型為 WorkerPool::thread 對應共享指針
    std::vector<thread::ptr> m_threadPool;

// 基于原子變量標識 workerPool 是否已關閉
    std::atomic<bool> m_closed{false};
};

}}

5 核心實現源碼

接下來針對 workerpool 中的核心流程進行詳細的源碼走讀,有關 workerpool 具體實現代碼位于 ./pool/workerpool.cpp 中.

5.1 依賴的頭文件與變量

圖片圖片

依賴的外部變量

首先涉及到兩個核心變量的定義:

  • ? 全局變量 s_taskId:全局單調遞增的原子計數器,為每個到來的 task 分配全局唯一 task id,并依據此 id 明確 task 應該指派給哪個 thread
  • ? 線程本地變量(thread_local) t_schedq:線程私有的協程隊列. 運行過程因主動讓渡而暫停的 coroutine,會被暫存到其中,等待后續被相同的 thread 繼續調度執行.
// 標準庫隊列實現. 依賴隊列作為線程本地協程隊列的存儲載體
#include <queue>

// workerpool 頭文件
#include "workerpool.h"
// 本項目定義的斷言頭文件
#include "../trace/assert.h"

// namespace cbricks::pool
namespace cbricks{namespace pool{

/**
 * 全局變量 s_taskId:用于分配任務 id 的全局遞增計數器,通過原子變量保證并發安全
 * 每個任務函數會根據分配到的 id,被均勻地分發給各個線程,以此實現負載均衡
*/
static std::atomic<int> s_taskId{0};

/**
 * 線程本地變量  t_schedq:線程私有的協程隊列
 * 當線程下某個協程沒有一次性將任務執行完成時(任務調用了 sched 讓渡函數),則該協程會被暫存于此隊列中,等待后續被相同的線程繼續調度
 */
staticthread_local std::queue<WorkerPool::workerPtr> t_schedq;

// ...

}}

5.2 構造函數與析構函數

workerpool 構造函數workerpool 構造函數

下面介紹workerpool 的構造函數,其任務很明確,就是初始化好指定數量的 thread,為其分配好對應的 taskq,并將 thread 一一投遞進入到線程池 threadPool 中.

此處值得一提的是,thread 啟動后異步運行的方法是 WorkerPool::work,其中會涉及到從 threadPool 中取出當前 thread 實例的操作,因此這里需要通過信號量 semaphore 保證 thread 實例先被投遞進入 threadPool 后,對應 WorkerPool::work 方法才能被放行.

// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
 * workerpool 構造函數:
 * - 初始化好各個線程實例 thread
 * - 將各 thread 添加到線程池 m_threadPool 中
 */
WorkerPool::WorkerPool(size_t threads){
CBRICKS_ASSERT(threads >0,"worker pool init with nonpositive threads num");

// 為線程池預留好對應的容量
this->m_threadPool.reserve(threads);

/**
     * 構造好對應于每個 thread 的信號量
     * 這是為了保證 thread 實例先被添加進入 m_threadPool,thread 中的調度函數才能往下執行
     * 這樣做是因為 thread 調度函數有依賴于從 m_threadPool 獲取自身實例的操作
    */
std::vector<semaphore> sems(threads);
// 另一個信號量,用于保證所有 thread 調度函數都正常啟動后,當前構造函數才能退出,避免 sems 被提前析構
    semaphore waitGroup;

// 初始化好對應數量的 thread 實例并添加進入 m_threadPool
for(int i =0; i < threads; i++){
// 根據 index 映射得到 thread 名稱
        std::string threadName =WorkerPool::getThreadNameByIndex(i);
// 將 thread 實例添加進入 m_threadPool
this->m_threadPool.push_back(thread::ptr(
// thread 實例初始化
newthread(
            i,
// 
new sync::Thread([this,&sems,&waitGroup](){
/**
                 * 此處 wait 操作是需要等待對應 thread 實例先被推送進入 m_threadPool
                 * 因為一旦后續的 work 函數運行,就會涉及從 m_threadPool 中獲取 thread 實例的操作
                 * 因此先后順序不能顛倒
                 */
                sems[getThreadIndex()].wait();
/**
                 * 此處 notify 操作是配合外層的 waitGroup.wait 操作
                 * 保證所有 thread 都正常啟動后,workerPool 構造函數才能退出
                 * 這是為了防止 sems 被提前析構
                 */
                waitGroup.notify();
// 異步啟動的 thread,最終運行的調度函數是 workerpool::work
this->work();

},
// 注入 thread 名稱,與 index 有映射關系
            threadName),
// 分配給 thread 的本地任務隊列
localqPtr(new localq))));
/**
         * 在 thread 實例被推送入 m_threadPool 后進行 notify
         * 這樣 thread 調度函數才會被向下放行
         */
        sems[i].notify();
}

/**
     * 等待所有 thread 實例正常啟動后,構造函數再退出
     */
for(int i =0; i < threads; i++){
        waitGroup.wait();
}
}

在析構函數中,要做的處理是將 workerpool 關閉標識 m_closed 置為 true,并且一一關閉所有 thread 下的 taskq ,這樣運行中的 thread 在感知到這一信息后都會主動退出.

// 析構函數
WorkerPool::~WorkerPool(){
// 將 workpool 的關閉標識置為 true,后續運行中的線程感知到此標識后會主動退出
this->m_closed.store(true);
// 等待所有線程都退出后,再退出 workpool 的析構函數
for(int i =0; i <this->m_threadPool.size(); i++){
// 關閉各 thread 的本地任務隊列
this->m_threadPool[i]->taskq->close();
// 等待各 thread 退出
this->m_threadPool[i]->thr->join();
}
}

// ...

}}

5.3 公有方法:提交任務

workerpool提交任務流程

用戶通過 submit 方法,能夠將 task 提交到 workerpool 中. 在 submit 流程中:

  • ? 首先,為 task 分配全局唯一的 taskId.
  • ? 然后,對 threadPool 長度取模后,找到 task 從屬的 thread.
  • ? 接下來,將 task 投遞到該 thread 的 taskq 中即可.

這里需要注意的是,在投遞任務到 thread 的 taskq 前,需要先加上該 thread 的讀鎖 readlock. 這是為了和該 thread 下可能正在執行的 workStealing 操作進行互斥,避免因 taskq 空間不足而導致死鎖問題. 這個點在竊取流程的講解中詳細展開.

// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
 * submit: 提交一個任務到協程調度池中,任務以閉包函數 void() 的形式組裝
 * - 為任務分配全局遞增且唯一的 taskId
 * - 根據 taskId 將任務均勻分發給指定 thread
 * - 將任務寫入到指定 thread 的本地任務隊列中
 */
bool WorkerPool::submit(task task, bool nonblock){
// 若 workerpool 已關閉,則提交失敗
if(this->m_closed.load()){
returnfalse;
}

// 基于任務 id 對 m_threadPool 長度取模,將任務映射到指定 thread
int targetThreadId =(s_taskId++)%(this->m_threadPool.size());
    thread::ptr targetThr =this->m_threadPool[targetThreadId];

// 針對目標 thread 加讀鎖,這是為了防止和目標 thread 的 workstealing 操作并發最終因任務隊列 taskq 容量溢出而導致死鎖
rwlock::readLockGuard guard(targetThr->lock);

// 往對應 thread 的本地任務隊列中寫入任務
return targetThr->taskq->write(task, nonblock);
}

// ...

}}

5.4 公有方法:讓渡執行權

workerpool協程讓渡流程workerpool協程讓渡流程

task 在運行過程中,可以通過調用 workerpool::sched 方法完成執行權的主動讓渡. 此時 task 對應 coroutine 會暫停運行,并將執行權切換回到 thread 主函數中,然后 thread 會將該 coroutine 暫存到本地協程隊列 schedq 中,等待后續再對其調度執行.

// namespace cbricks::pool
namespace cbricks{ namespace pool{
// ...
// sched:讓渡函數. 在任務執行過程中,可以通過該方法主動讓出線程的執行權,則此時任務所屬的協程會被添加到 thread 的本地協程隊列 t_schedq 中,等待后續再被調度執行
void WorkerPool::sched(){
    worker::GetThis()->sched();
}

// ...

}}

5.5 線程調度任務主流程

workerpool線程調度主流程workerpool線程調度主流程

workerpool::work 方法是各 thread 循環運行的主函數,其中包含了 thread 調度 task 和 coroutine 的核心邏輯:

  • ? 調度優先級一:從 thread 的本地任務隊列 taskq 中獲取 task 并調度執行
  • ? 調度優先級二:當 taskq 為空或者連續獲取 10 次 taskq 后(為避免 schedq 產生饑餓),會主動獲取一次本地協程隊列 schedq 中的 coroutine 進行調度
  • ? 調度優先級三:如果 taskq 和 schedq 都是空的,則進入 workstealing 流程,嘗試從其他 thread taskq 中竊取半數 taskq 填充到當前 thread taskq 中
  • ? 必要性阻塞:如果經歷完上述流程,仍沒有合適的目標供 thread 調度,則 thread 會依賴 channel 的阻塞消費能力陷入阻塞,從而讓出 cpu 執行權,避免資源浪費
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
 * work: 線程運行的主函數
 * 1) 獲取需要調度的協程(下述任意步驟執行成功,則跳到步驟 2))
 *   - 從本地任務隊列 taskq 中取任務,獲取成功則為之初始化協程實例
 *   - 從本地協程隊列 schedq 中取協程
 *   - 從其他線程的任務隊列 taskq 中偷取一半任務到本地任務隊列
 * 2) 調度協程執行任務
 * 3) 針對主動讓渡而退出的協程,添加到本地協程隊列
 * 4) 循環上述流程
 */
void WorkerPool::work(){
// 獲取到當前 thread 對應的本地任務隊列 taskq
    localqPtr taskq =this->getThread()->taskq;

// main loop
while(true){
// 如果 workerpool 已關閉 則主動退出
if(this->m_closed.load()){
return;
}

/**  
         * 執行優先級為 本地任務隊列 taskq -> 本地協程隊列 t_t_schedq -> 竊取其他線程任務隊列 other_taskq
         * 為防止饑餓,至多調度 10 次的 taskq 后,必須嘗試處理一次 t_schedq 
        */

// 標識本地任務隊列 taskq 是否為空
bool taskqEmpty =false;
// 至多調度 10 次本地任務隊列 taskq
for(int i =0; i <10; i++){
// 以【非阻塞模式】從 taskq 獲取任務并為之分配協程實例和調度執行
if(!this->readAndGo(taskq,false)){
// 如果 taskq 為空,將 taskqEmpty 置為 true 并直接退出循環
                taskqEmpty =true;
break;
}
}

// 嘗試從線程本地的協程隊列 t_schedq 中獲取協程并進行調度
if(!t_schedq.empty()){
// 從協程隊列中取出頭部的協程實例
            workerPtr worker = t_schedq.front();
            t_schedq.pop();
// 進行協程調度
this->goWorker(worker);
// 處理完成后直接進入下一輪循環
continue;
}

// 如果未發現 taskq 為空,則無需 workstealing,直接進入下一輪循環
if(!taskqEmpty){
continue;
}

/** 
         * 走到這里意味著 taskq 和 schedq 都是空的,則要嘗試發起竊取操作
         * 隨機選擇一個目標線程竊取半數任務添加到本地隊列中
        */
this->workStealing();

/**  
         * 以【阻塞模式】嘗試從本地任務獲取任務并調度執行
         * 若此時仍沒有可調度的任務,則當前 thread 陷入阻塞,讓出 cpu 執行權
         * 直到有新任務分配給當前 thread 時,thread 才會被喚醒
        */
this->readAndGo(taskq,true);
}
}
// ...

}}

workerpool單個任務處理流程workerpool單個任務處理流程

以 readAndGo 方法為入口,thread 會嘗試從 taskq 中獲取一筆 task;獲取到后,會為 task 構建一一對應的 coroutine 實例(至此 task/coroutine 與 thread 完全綁定),然后通過 coroutine::go 方法,將 thread 執行權切換至 coroutine 手中,由 coroutine 執行其中的 task. 只有在 task 執行結束或者主動讓渡時,執行權才會返還到 thread 主函數中,此時 thread 會判斷 coroutine 是否是因為主動讓渡而暫停執行,如果是的話,則會將該 coroutine 實例追加到 schedq 中,等待后續尋找合適時機再作調度執行.

// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
 * readAndGo:
 *   - 從指定任務隊列中獲取一個任務
 *   - 為之分配協程實例并調度執行
 *   - 若協程實例未一次性執行完成(執行了讓渡 sched),則將協程添加到線程本地的協程隊列 schedq 中
 * param:taskq——任務隊列;nonblock——是否以非阻塞模式從任務隊列中獲取任務
 * response:true——成功;false,失敗(任務隊列 taskq 為空)
 */
// 將一個任務包裝成協程并進行調度. 如果沒有一次性調度完成,則將協程實例添加到線程本地的協程隊列 t_schedq
bool WorkerPool::readAndGo(cbricks::pool::WorkerPool::localqPtr taskq, bool nonblock){
// 任務容器
    task cb;
// 從 taskq 中獲取任務
if(!taskq->read(cb,nonblock)){
returnfalse;
}

// 對任務進行調度
this->goTask(cb);
returntrue;
}

/**
 * goTask
 *   - 為指定任務分配協程實例
 *   - 執行協程
 *   - 若協程實例未一次性執行完成(執行了讓渡 sched),則將協程添加到線程本地的協程隊列 schedq 中
 * param:cb——待執行的任務
 */
void WorkerPool::goTask(task cb){
// 初始化協程實例
    workerPtr _worker(newworker(cb));
// 調度協程
this->goWorker(_worker);
}

/**
 * goWorker
 *   - 執行協程
 *   - 若協程實例未一次性執行完成(執行了讓渡 sched),則將協程添加到線程本地的協程隊列 schedq 中
 * param:worker——待運行的協程
 */
void WorkerPool::goWorker(workerPtr worker){
// 調度協程,此時線程的執行權會切換進入到協程對應的方法棧中
    worker->go();
// 走到此處意味著線程執行權已經從協程切換回來
// 如果此時協程并非已完成的狀態,則需要將其添加到線程本地的協程隊列 schedq 中,等待后續繼續調度
if(worker->getState()!= sync::Coroutine::Dead){
        t_schedq.push(worker);
}
}
// ...

}}

5.6 任務竊取流程

workerpool跨線程任務竊取流程workerpool跨線程任務竊取流程

當 thread 發現 taskq 和 schedq 都空閑時,則會嘗試執行竊取操作. 此時 thread 隨機選取另一個 thread 作為竊取目標,竊取其 taskq 中的半數 task,追加到本地 taskq 中.

在執行竊取操作的過程中,需要對當前 thread 加寫鎖,以避免發生死鎖問題:

比如在竊取前,當前 thread 判定自己的 taskq 還有足夠空間用于承載竊取來的 task;但是此期間若有新的任務 submit 到來,則可能把 taskq 的空間占據,最后導致沒有足夠容量承載竊取到的 task,最終導致 thread 調度流程 hang 死在 workstealing 流程無法退出.

上述問題的解法就是,在竊取前,先加 thread 寫鎖(這樣并發到來的 submit 操作就無法完成 task 投遞)然后再檢查一遍 taskq 并確認容量充足后,再發起實際的竊取操作.

// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
// 從某個 thread 中竊取一半任務給到本 thread 的 taskq
void WorkerPool::workStealing(){
// 選擇一個竊取的目標 thread 
    thread::ptr stealFrom =this->getStealingTarget();
if(!stealFrom){
return;
}
// 從目標 thread 中竊取半數任務添加到本 thread taskq 中
this->workStealing(this->getThread(),stealFrom);
}

// 從 thread:stealFrom 中竊取半數任務給到 thread:stealTo
void WorkerPool::workStealing(thread::ptr stealTo, thread::ptr stealFrom){
// 確定竊取任務數量:目標本地任務隊列 taskq 中任務總數的一半
int stealNum = stealFrom->taskq->size()/2;
if(stealNum <=0){
return;
}

// 針對 thread:stealTo 加寫鎖,防止因 workstealing 和 submit 行為并發,導致線程因 taskq 容量溢出而發生死鎖
rwlock::lockGuard guard(stealTo->lock);
// 檢查此時 stealTo 中的 taskq 如果剩余容量已不足以承載擬竊取的任務量,則直接退出
if(stealTo->taskq->size()+ stealNum > stealTo->taskq->cap()){
return;
}

// 創建任務容器,以非阻塞模式從 stealFrom 的 taskq 中竊取指定數量的任務
std::vector<task> containers(stealNum);
if(!stealFrom->taskq->readN(containers,true)){
return;
}

// 將竊取到的任務添加到 stealTo 的 taskq 中
    stealTo->taskq->writeN(containers,false);
}

// 隨機選擇本 thread 外的一個 thread 作為竊取的目標
WorkerPool::thread::ptr WorkerPool::getStealingTarget(){
// 如果線程池長度不足 2,直接返回
if(this->m_threadPool.size()<2){
returnnullptr;
}

// 通過隨機數,獲取本 thread 之外的一個目標 thread index
int threadIndex =WorkerPool::getThreadIndex();
int targetIndex =rand()%this->m_threadPool.size();
while( targetIndex == threadIndex){
        targetIndex =rand()%this->m_threadPool.size();
}

// 返回目標 thread
returnthis->m_threadPool[targetIndex];
}
// ...

}}

6 總結

祝賀,至此本文結束. 本篇和大家探討了,如何基于 c++ 從零到一實現一個協程調度框架,其核心功能包括:

  • ? 創建指定數量線程持續復用,調度后續到來的任務
  • ? 以閉包函數的風格提交任務到框架中,由異步協程完成執行
  • ? 任務運行過程中支持通過主動讓渡操作讓出調度執行權
  • ? 支持線程間的任務竊取操作,使得各調度線程間忙閑有度、負載均衡
責任編輯:武曉燕 來源: 小徐先生的編程世界
相關推薦

2022-09-06 20:30:48

協程Context主線程

2025-06-26 04:10:00

2022-09-12 06:35:00

C++協程協程狀態

2022-09-10 18:51:09

C++協程主線程

2024-12-03 15:15:22

2025-01-03 09:00:00

代碼C++gTest

2023-11-04 20:00:02

C++20協程

2017-05-02 11:38:00

PHP協程實現過程

2024-02-05 09:06:25

Python協程Asyncio庫

2022-12-30 07:50:05

無棧協程Linux

2021-09-16 09:59:13

PythonJavaScript代碼

2023-04-19 21:20:49

Tars-Cpp協程

2025-06-03 00:00:02

Go協程鎖機制

2019-03-01 08:57:47

iOScoobjc協程

2025-03-26 01:22:00

NtyCo協程框架

2025-08-08 08:23:49

2023-11-17 11:36:59

協程纖程操作系統

2025-01-26 00:00:15

PHP協程控制權

2021-05-06 10:33:30

C++Napiv8

2023-12-27 08:07:49

Golang協程池Ants
點贊
收藏

51CTO技術棧公眾號

最近中文字幕免费| 精品少妇人妻av免费久久洗澡| 午夜精品一区二| 成人aaaa| 欧美大片一区二区| 欧美日韩国产精品激情在线播放| 国产永久免费高清在线观看| 另类调教123区| 欧美老少配视频| 全黄一级裸体片| 95精品视频| 精品久久中文字幕久久av| 色姑娘综合av| 黄色一级大片在线免费看国产一| 久久精品观看| 久久综合久中文字幕青草| 欧美大喷水吹潮合集在线观看| av有声小说一区二区三区| 亚洲免费观看视频| 日本成人三级| 男人天堂网在线视频| 奇米精品一区二区三区在线观看 | 亚洲最大天堂网| 麻豆网站免费在线观看| 亚洲视频中文字幕| 蜜桃网站成人| 午夜免费福利视频| 免费在线一区观看| 欧美专区在线观看| 国产在线综合网| 99久久精品费精品国产| 日韩精品小视频| 无码人妻久久一区二区三区蜜桃| 91超碰碰碰碰久久久久久综合| 亚洲成人1区2区| 亚洲一区 在线播放| 99青草视频在线播放视| 久久影院视频免费| 国内一区二区三区在线视频| 99免费在线视频| 视频一区中文字幕国产| 久久久久国色av免费观看性色| 精品在线观看一区| 欧美一区二区三| 亚洲香蕉伊综合在人在线视看| 97人妻精品一区二区三区免费| 久久99成人| 91精品久久久久久久99蜜桃| 潘金莲激情呻吟欲求不满视频| 巨茎人妖videos另类| 性做久久久久久久免费看| 在线观看18视频网站| 国产一二区在线| 中文字幕一区三区| 亚洲欧美日韩国产yyy| 高清在线观看av| 国产欧美日韩久久| 日韩亚洲视频| 欧美一区二区三区| 亚洲视频在线一区二区| 日本免费黄色小视频| 50度灰在线| 亚洲伊人色欲综合网| 欧美久久在线观看| 91av久久| 色综合中文字幕国产| 无码人妻丰满熟妇区五十路百度| 欧美成人黑人| 欧美三级视频在线| 亚洲视频一二三四| 亚洲午夜免费| 亚洲韩国日本中文字幕| 免费在线观看你懂的| 国产一区二区观看| 最新国产成人av网站网址麻豆| 蜜桃视频最新网址| 欧美涩涩网站| 91豆花精品一区| 免费av中文字幕| 久久成人免费网站| 成人羞羞视频免费| 午夜福利一区二区三区| 国产欧美精品一区aⅴ影院| 在线播放豆国产99亚洲| 天堂va在线| 狠狠色噜噜狠狠狠狠97| 免费一级特黄录像| 日韩精品三级| 亚洲精品日韩在线| 农村老熟妇乱子伦视频| 国产精品www.| 国产精品99蜜臀久久不卡二区| 一区二区三区精彩视频| 国产精品99久久久久久久女警| 精品久久久久久一区二区里番| 国产中文字幕在线观看| 亚洲乱码中文字幕| heyzo国产| 国产精品一区免费在线 | 亚洲免费二区| 韩日精品中文字幕| 中文av免费观看| 国产99久久久国产精品免费看| 欧美日韩精品一区| 在线网址91| 在线精品视频小说1| 日韩精品aaa| 蜜桃国内精品久久久久软件9| 北条麻妃99精品青青久久| 日本在线小视频| 国产麻豆精品在线观看| 日本不卡一区二区三区视频| 羞羞的视频在线看| 欧美性感一区二区三区| 精品一区二区视频在线观看| 91影院成人| 欧洲美女7788成人免费视频| 国产欧美一区二区三区视频在线观看| 91麻豆国产精品久久| 久久亚洲国产成人精品无码区| 日韩视频网站在线观看| 亚洲国产97在线精品一区| 手机av在线看| 蜜臀99久久精品久久久久久软件| 精品一区在线播放| 亚洲卡一卡二| 3d动漫精品啪啪1区2区免费| 在线观看日本中文字幕| 亚洲高清av| 999热视频在线观看| 日本三级在线视频| 欧美性xxxxxx少妇| 日本少妇高潮喷水xxxxxxx| 夜夜嗨网站十八久久| 91av免费看| 成人在线影视| 6080yy午夜一二三区久久| 黄色三级生活片| 久久av一区二区三区| 激情视频在线观看一区二区三区| 丝袜美腿av在线| 91精品国产全国免费观看| 国产精品麻豆一区| 日本中文字幕一区二区视频 | 国产精品国产自产拍高清av水多| 无码h黄肉3d动漫在线观看| 亚洲一二三区在线观看| 稀缺呦国内精品呦| 激情综合久久| 国产一区二区高清不卡| 久久免费电影| 亚洲电影天堂av | 在线日韩欧美| 国产一区二区无遮挡 | 国产精品黄色av| 精品一二三区视频| 在线免费精品视频| 国产jk精品白丝av在线观看| 噜噜噜躁狠狠躁狠狠精品视频 | 国产精品aaaa| 午夜在线小视频| 88在线观看91蜜桃国自产| 色偷偷www8888| 国产电影一区二区三区| 国产曰肥老太婆无遮挡| 激情av综合| 欧美亚洲成人精品| 久久这里精品| 精品视频在线免费看| 岛国片在线免费观看| 国产一区二区三区观看| 大荫蒂性生交片| 台湾佬综合网| 国产精品久久久久9999| 免费的黄网站在线观看| 日韩欧美一二三区| 国产成人无码精品久久久久| 久久久久亚洲蜜桃| 色啦啦av综合| 91久久午夜| 欧美日韩国产高清视频| 日韩午夜视频在线| 欧美精品久久久久久久久| 女人天堂在线| 在线播放欧美女士性生活| 伊人国产在线观看| 久久久久久夜精品精品免费| 三级av免费观看| 亚洲小说区图片区| 欧美福利一区二区三区| 日日夜夜精品| 91精品国产91久久久| h视频网站在线观看| 日韩精品专区在线| 亚洲av无码不卡| 樱花草国产18久久久久| 国内精品久久99人妻无码| 精品一区二区在线观看| 免费看又黄又无码的网站| 久久一区91| 精品无人区一区二区三区| 日本亚洲欧洲无免费码在线| 91精品国产99| 成人日韩欧美| 伊是香蕉大人久久| 手机在线观看毛片| 91精品国产综合久久精品图片| 可以免费在线观看的av| 亚洲欧美日韩小说| 波多野结衣av在线观看| 成人免费毛片aaaaa**| 中文字幕第88页| 亚洲黄色影片| 性生活免费观看视频| 国产一区二区三区电影在线观看 | 国产不卡av在线免费观看| 影音先锋中文在线视频| 一区二区日韩精品| 天堂中文在线观看视频| 欧美一区二区成人6969| 中文字幕乱码人妻无码久久 | 欧美不卡视频一区发布| 国产永久免费高清在线观看| 亚洲国产精品嫩草影院久久| 国产叼嘿视频在线观看| 欧美日产在线观看| 无码人妻黑人中文字幕| 亚洲成人一区二区在线观看| 国产1区2区3区4区| 国产精品久久三| 在线免费看黄视频| 91婷婷韩国欧美一区二区| 女女调教被c哭捆绑喷水百合| 国精品**一区二区三区在线蜜桃| 美女一区二区三区视频| 丝袜亚洲另类欧美| 1024精品视频| 欧美专区18| 九九九九免费视频| 一本久久综合| 999在线观看视频| 午夜电影亚洲| 免费极品av一视觉盛宴| 在线一区电影| www.国产亚洲| 国产精品草草| www.日本在线视频| 亚洲婷婷免费| 国产69精品久久久久久久| 99热这里只有精品8| 久久久久免费看黄a片app| 亚洲国产专区校园欧美| 精品久久久久久久久久中文字幕| 亚洲精品黄色| 97成人在线观看视频| 免费一区视频| 亚洲成人av免费看| 精品午夜久久福利影院| 一级黄色高清视频| 成人毛片在线观看| 黄色性生活一级片| 久久综合av免费| 久久久久久久毛片| 中文字幕一区二区在线播放| 三级黄色在线观看| 亚洲精品日韩专区silk| 日韩字幕在线观看| 一本一道综合狠狠老| 中文字幕码精品视频网站| 3751色影院一区二区三区| 成人av免费播放| 亚洲国产成人在线播放| 欧美另类自拍| 日韩在线免费观看视频| 五月婷婷视频在线观看| 98精品在线视频| 成人黄色图片网站| 亚洲一区二区三区久久| 国偷自产视频一区二区久| 美日韩免费视频| 成人同人动漫免费观看| 日韩中文在线字幕| 亚洲精品专区| 色乱码一区二区三区在线| 国产激情精品久久久第一区二区 | 粉色视频免费看| 国产98色在线|日韩| 深爱五月激情网| 日韩毛片精品高清免费| 日韩精品视频免费看| 欧美亚洲动漫制服丝袜| 亚洲成人黄色片| 国产一区av在线| 少女频道在线观看免费播放电视剧| 97精品国产97久久久久久| 99riav视频一区二区| 99蜜桃在线观看免费视频网站| 精品国产精品国产偷麻豆| 日本丰满大乳奶| 米奇777在线欧美播放| 一级做a爱视频| 久久久99免费| 久久久久久久久久久久久久免费看 | 亚洲女同性videos| 中文字幕资源网在线观看| 日本三级韩国三级久久| 美国十次综合久久| 日本不卡久久| 国产欧美日韩综合一区在线播放 | 久久国产精品网| 男男成人高潮片免费网站| 800av在线播放| 亚洲情趣在线观看| www.久久视频| 日韩精品在线免费观看视频| 四虎影视成人| 成人天堂噜噜噜| 成人aaaa| 欧美日韩在线成人| 99在线精品一区二区三区| 午夜精品一区二区三区视频| 欧洲色大大久久| 日韩三级电影网| 久久免费在线观看| 欧美一级网址| 一区二区三区四区欧美| 久久精品男女| 免费在线观看成年人视频| 亚洲午夜一区二区| 国产成人久久精品77777综合| 色七七影院综合| 成人四虎影院| 日韩精品国内| 丝袜亚洲另类欧美| 91中文字幕永久在线| 婷婷成人综合网| 亚洲 欧美 激情 另类| 欧美精品激情在线观看| 日日夜夜精品视频| 丰满人妻一区二区三区53号| 激情国产一区二区| 国精品人伦一区二区三区蜜桃| 精品视频123区在线观看| 97在线观看免费观看高清| 国产精品www色诱视频| 中国av一区| 少妇性l交大片| 国产欧美日韩精品一区| 在线观看你懂的网站| 国产亚洲视频在线观看| 九九热线视频只有这里最精品| 欧美日韩国产高清视频| 天使萌一区二区三区免费观看| 亚洲自拍偷拍图| 欧美日韩在线播放| 黄色网址视频在线观看| 亚洲伊人一本大道中文字幕| 欧美国内亚洲| 亚洲天堂av网站| 日韩欧美在线免费| 黄色av网站在线免费观看| 国产精品久久久久77777| 午夜精品毛片| 亚洲一区和二区| 亚洲国产精品久久久男人的天堂| 婷婷丁香花五月天| 国产精品99免视看9| 888久久久| 极品白嫩的小少妇| 日韩欧美精品网站| 在线观看二区| 99超碰麻豆| 亚洲一区二区免费看| 蜜臀久久99精品久久久久久| 欧美日本国产视频| 国产偷倩在线播放| 欧美不卡在线一区二区三区| 亚洲精品久久久蜜桃动漫 | 久久久性生活视频| 久久久另类综合| 国产精品视频久久久久久| 欧美人在线观看| 亚洲国产国产| 91亚洲精品久久久蜜桃借种| 亚洲一区中文日韩| 国产系列在线观看| 99久久久精品免费观看国产| 国产精品视区| 青花影视在线观看免费高清| 亚洲精品国产精品国自产在线 | 蜜桃精品久久久久久久免费影院| 日本欧美大码aⅴ在线播放| 91视频青青草| 亚洲精品乱码久久久久久金桔影视 | 人妻在线日韩免费视频| 欧美色偷偷大香| аⅴ资源天堂资源库在线| 先锋影音一区二区三区| 成人动漫在线一区| 国产精品久久久久久免费播放|