本文介紹了協(xié)程的概念,并討論了 Tars Cpp 協(xié)程的實現(xiàn)原理和源碼分析。
一、前言
Tars 是 Linux 基金會的開源項目,它是基于名字服務(wù)使用 Tars 協(xié)議的高性能 RPC 開發(fā)框架,配套一體化的運營管理平臺,并通過伸縮調(diào)度,實現(xiàn)運維半托管服務(wù)。Tars 集可擴展協(xié)議編解碼、高性能 RPC 通信框架、名字路由與發(fā)現(xiàn)、發(fā)布監(jiān)控、日志統(tǒng)計、配置管理等于一體,通過它可以快速用微服務(wù)的方式構(gòu)建自己的穩(wěn)定可靠的分布式應(yīng)用,并實現(xiàn)完整有效的服務(wù)治理。
Tars 目前支持 C++,Java,PHP,Nodejs,Go 語言,其中 TarsCpp 3.x 全面啟用對協(xié)程的支持,服務(wù)框架全面融合協(xié)程。本文基于TarsCpp-v3.0.0版本,討論了協(xié)程在TarsCpp服務(wù)框架的實現(xiàn)。
二、協(xié)程的介紹
2.1 什么是協(xié)程
協(xié)程的概念最早出現(xiàn)在Melvin Conway在1963年的論文("Design of a separable transition-diagram compiler"),協(xié)程認為是“可以暫停和恢復(fù)執(zhí)行”的函數(shù)。
協(xié)程可以看成一種特殊的函數(shù),相比于函數(shù),協(xié)程最大的特點就是支持掛起(yield)和恢復(fù)(resume)的能力。如上圖所示:函數(shù)不能主動中斷執(zhí)行流;而協(xié)程支持主動掛起,中斷執(zhí)行流,并在一定時機恢復(fù)執(zhí)行。
協(xié)程的作用:
降低并發(fā)編碼的復(fù)雜度,尤其是異步編程(callback hell)。
協(xié)程在用戶態(tài)中實現(xiàn)調(diào)度,避免了陷入內(nèi)核,上下文切換開銷小。
2.2 進程、線程和協(xié)程
我們可以簡單的認為協(xié)程是用戶態(tài)的線程。協(xié)程和線程主要異同:
相同點:都可以實現(xiàn)上下文切換(保存和恢復(fù)執(zhí)行流)
不同點:線程的上下文切換在內(nèi)核實現(xiàn),切換的時機由內(nèi)核調(diào)度器控制。協(xié)程的上下文切換在用戶態(tài)實現(xiàn),切換的時機由調(diào)用方自身控制。
進程、線程和協(xié)程的比較:
2.3 協(xié)程的分類
按控制傳遞(Control-transfer)機制分為:對稱(Symmetric)協(xié)程和非對稱(Asymmetric)協(xié)程。
對稱協(xié)程:協(xié)程之間相互獨立,調(diào)度權(quán)(CPU)可以在任意協(xié)程之間轉(zhuǎn)移。協(xié)程只有一種控制傳遞操作(yield)。對稱協(xié)程一般需要調(diào)度器支持,通過調(diào)度算法選擇下一個目標協(xié)程。
非對稱協(xié)程:協(xié)程之間存在調(diào)用關(guān)系,協(xié)程讓出的調(diào)度權(quán)只能返回給調(diào)用者。協(xié)程有兩種控制操作:恢復(fù)(resume)和掛起(yield)。
下圖演示了對稱協(xié)程的調(diào)度權(quán)轉(zhuǎn)移流程,協(xié)程只有一個操作yield,表示讓出CPU,返回給調(diào)度器。
對稱協(xié)程示意圖
下圖演示了非對稱協(xié)程的調(diào)度權(quán)轉(zhuǎn)移流程。協(xié)程可以有兩個操作,即resume和yield。resume表示轉(zhuǎn)移CPU給被調(diào)用者,yield表示被調(diào)用者返回CPU給調(diào)用者。
非對稱協(xié)程示意圖
根據(jù)協(xié)程是否有獨立的棧空間,協(xié)程分為有棧協(xié)程(stackful)和無棧協(xié)程(stackless)兩種。
有棧協(xié)程:每個協(xié)程有獨立的??臻g,保存獨立的上下文(執(zhí)行棧、寄存器等),協(xié)程的喚醒和掛起就是拷貝和切換上下文。優(yōu)點:協(xié)程調(diào)度可以嵌套,在內(nèi)存中的任意位置、任意時刻進行。局限:協(xié)程數(shù)目增大,內(nèi)存開銷增大。
無棧協(xié)程:單個線程內(nèi)所有協(xié)程都共享同一個??臻g(共享棧),協(xié)程的切換就是簡單的函數(shù)調(diào)用和返回,無棧協(xié)程通常是基于狀態(tài)機或閉包來實現(xiàn)。優(yōu)點:減小內(nèi)存開銷。局限:協(xié)程調(diào)度產(chǎn)生的局部變量都在共享棧上, 一旦新的協(xié)程運行后共享棧中的數(shù)據(jù)就會被覆蓋, 先前協(xié)程的局部變量也就不再有效, 進而無法實現(xiàn)參數(shù)傳遞、嵌套調(diào)用等高級協(xié)程交互。
Golang 中的 goroutine、Lua 中的協(xié)程都是有棧協(xié)程;ES6的 await/async、Python 的 Generator、C++20 中的 cooroutine 都是無棧協(xié)程。
三、Tars 協(xié)程實現(xiàn)
實現(xiàn)協(xié)程的核心有兩點:
實現(xiàn)用戶態(tài)的上下文切換。
實現(xiàn)協(xié)程的調(diào)度。
Tars 協(xié)程的由下面幾個類實現(xiàn):
TC_CoroutineInfo 協(xié)程信息類:實現(xiàn)協(xié)程的上下文切換。每個協(xié)程對應(yīng)一個 TC_CoroutineInfo 對象,上下文切換基于boost.context實現(xiàn)。
TC_CoroutineScheduler 協(xié)程調(diào)度器類:實現(xiàn)了協(xié)程的管理和調(diào)度。
TC_Coroutine 協(xié)程類:繼承于線程類(TC_Thread),方便業(yè)務(wù)快速使用協(xié)程。
Tars 協(xié)程有幾個特點:
有棧協(xié)程。每個協(xié)程都分配了獨立的??臻g。
對稱協(xié)程。協(xié)程之間相互獨立,由調(diào)度器負責(zé)調(diào)度。
基于 epoll 實現(xiàn)協(xié)程調(diào)度,和網(wǎng)絡(luò)IO無縫結(jié)合。
3.1 用戶態(tài)上下文切換的實現(xiàn)方式
協(xié)程可以看成一種特殊的函數(shù),和普通函數(shù)不同,協(xié)程函數(shù)有掛起(yield)和恢復(fù)(resume)的能力,即可以中斷自己的執(zhí)行流,并且在合適的時候恢復(fù)執(zhí)行流,這也稱為上下文切換的能力。
協(xié)程執(zhí)行的過程,依賴兩個關(guān)鍵要素:協(xié)程棧和寄存器,協(xié)程的上下文環(huán)境其實就是寄存器和棧的狀態(tài)。實現(xiàn)上下文切換的核心就是實現(xiàn)保存并恢復(fù)當(dāng)前執(zhí)行環(huán)境的寄存器狀態(tài)的能力。
實現(xiàn)用戶態(tài)上下文切換一般有以下方式:
3.2 基于boost.context實現(xiàn)上下文切換
Tars 協(xié)程是基于 boost.context 實現(xiàn),boost.context 提供了兩個接口(make_fcontext, jump_fcontext)實現(xiàn)協(xié)程的上下文切換。
代碼1:
/** * @biref 執(zhí)行環(huán)境上下文 */ typedef void* fcontext_t; /** * @biref 事件參數(shù)包裝 */ struct transfer_t { fcontext_t fctx; // 來源的執(zhí)行上下文。來源的上下文指的是從什么位置跳轉(zhuǎn)過來的 void* data; // 接口傳入的自定義的指針 }; /** * @biref 初始化執(zhí)行環(huán)境上下文 * @param sp ??臻g地址 * @param size 棧空間的大小 * @param fn 入口函數(shù) * @return 返回初始化完成后的執(zhí)行環(huán)境上下文 */ extern "C" fcontext_t make_fcontext(void * stack, std::size_t stack_size, void (* fn)( transfer_t)); /** * @biref 跳轉(zhuǎn)到目標上下文 * @param to 目標上下文 * @param vp 目標上下文的附加參數(shù),會設(shè)置為transfer_t里的data成員 * @return 跳轉(zhuǎn)來源 */ extern "C" transfer_t jump_fcontext(fcontext_t const to, void * vp);
(1)make_fcontext創(chuàng)建協(xié)程
接受三個參數(shù),stack是為協(xié)程分配的棧底,stack_size是棧的大小,fn是協(xié)程的入口函數(shù)
返回初始化完成后的執(zhí)行環(huán)境上下文
(2)jump_fcontext切換協(xié)程
接受兩個參數(shù),目標上下文地址和參數(shù)指針
返回一個上下文,指向當(dāng)前上下文從哪個上下文跳轉(zhuǎn)過來
fcontext的結(jié)構(gòu)
boost context 是通過 fcontext_t結(jié)構(gòu)體來保存協(xié)程狀態(tài)。相對于其它匯編實現(xiàn)的協(xié)程庫,boost的context和stack是一起的,棧底指針就是context,切換context就是切換stack。
3.3 Tars協(xié)程信息類
TC_CoroutineInfo 協(xié)程信息類,包裝了 boost.context 提供的接口,表示一個 TARS 協(xié)程。
其中,TC_CoroutineInfo::registerFunc 定義了協(xié)程的創(chuàng)建。
代碼2:
void TC_CoroutineInfo::registerFunc(const std::function& callback) { _callback = callback; _init_func.coroFunc = TC_CoroutineInfo::corotineProc; _init_func.args = this; fcontext_t ctx = make_fcontext(_stack_ctx.sp, _stack_ctx.size, TC_CoroutineInfo::corotineEntry); // 創(chuàng)建協(xié)程 transfer_t tf = jump_fcontext(ctx, this); // context 切換 //實際的ctx this->setCtx(tf.fctx); } void TC_CoroutineInfo::corotineEntry(transfer_t tf) { TC_CoroutineInfo * coro = static_cast< TC_CoroutineInfo * >(tf.data); // this auto func = coro->_init_func.coroFunc; void* args = coro->_init_func.args; transfer_t t = jump_fcontext(tf.fctx, NULL); //拿到自己的協(xié)程堆棧, 當(dāng)前協(xié)程結(jié)束以后, 好跳轉(zhuǎn)到main coro->_scheduler->setMainCtx(t.fctx); //再跳轉(zhuǎn)到具體函數(shù) func(args, t); }
TC_CoroutineInfo::switchCoro 定義了協(xié)程切換。
代碼3
void TC_CoroutineScheduler::switchCoro(TC_CoroutineInfo *to) { //跳轉(zhuǎn)到to協(xié)程 _currentCoro = to; transfer_t t = jump_fcontext(to->getCtx(), NULL); //并保存協(xié)程堆棧 to->setCtx(t.fctx); }
四、Tars 協(xié)程調(diào)度器
基于 boost.context 的 TC_CoroutineInfo 類實現(xiàn)了協(xié)程的上下文切換,協(xié)程的管理和調(diào)度,則是由 TC_CoroutineScheduler 協(xié)程調(diào)度器類來負責(zé),分管理和調(diào)度兩個方面來說明 TC_CoroutineScheduler 調(diào)度類。
協(xié)程管理:目的是需要合理的數(shù)據(jù)結(jié)構(gòu)來組織協(xié)程(TC_CoroutineInfo),方便調(diào)度的實現(xiàn)。
協(xié)程調(diào)度:目的是控制協(xié)程的啟動、休眠和喚醒,實現(xiàn)了 yield, sleep 等功能,本質(zhì)就是實現(xiàn)協(xié)程的狀態(tài)機,完成協(xié)程的狀態(tài)切換。Tars 協(xié)程分為 5 個狀態(tài):FREE, ACTIVE, AVAIL, INACTIVE, TIMEOUT
代碼4:
/** * 協(xié)程的狀態(tài)信息 */ enum CORO_STATUS { CORO_FREE = 0, CORO_ACTIVE = 1, CORO_AVAIL = 2, CORO_INACTIVE = 3, CORO_TIMEOUT = 4 };
4.1 Tars 協(xié)程的管理
TC_CoroutineScheduler 主要通過以下方法管理協(xié)程:
TC_CoroutineScheduler::create()
創(chuàng)建TC_CoroutineScheduler對象
TC_CoroutineScheduler::init()初始化,分配協(xié)程棧內(nèi)存
TC_CoroutineScheduler::run()啟動調(diào)度
TC_CoroutineScheduler::terminate()停止調(diào)度
TC_CoroutineScheduler::destroy()資源銷毀,釋放協(xié)程棧內(nèi)存
我們可以通過 TC_CoroutineScheduler::init()看到數(shù)據(jù)結(jié)構(gòu)的初始化過程。
代碼5:
void TC_CoroutineScheduler::init() { ... .... createCoroutineInfo(_poolSize); // _all_coro = new TC_CoroutineInfo*[_poolSize+1]; TC_CoroutineInfo::CoroutineHeadInit(&_active); TC_CoroutineInfo::CoroutineHeadInit(&_avail); TC_CoroutineInfo::CoroutineHeadInit(&_inactive); TC_CoroutineInfo::CoroutineHeadInit(&_timeout); TC_CoroutineInfo::CoroutineHeadInit(&_free); int iSucc = 0; for(size_t i = 0; i < _currentSize; ++i) { //iId=0不使用, 給mainCoro使用!!!! uint32_t iId = generateId(); stack_context s_ctx = stack_traits::allocate(_stackSize); // 分配協(xié)程棧內(nèi)存 TC_CoroutineInfo *coro = new TC_CoroutineInfo(this, iId, s_ctx); _all_coro[iId] = coro; TC_CoroutineInfo::CoroutineAddTail(coro, &_free); ++iSucc; } _currentSize = iSucc; _mainCoro.setUid(0); _mainCoro.setStatus(TC_CoroutineInfo::CORO_FREE); _currentCoro = &_mainCoro; }
通過下面的 TC_CoroutineScheduler 調(diào)度類數(shù)據(jù)結(jié)構(gòu)圖,可以更清楚的看到協(xié)程的組織方式:
Tars調(diào)度類數(shù)據(jù)結(jié)構(gòu)
使用協(xié)程之前,需要在協(xié)程數(shù)組(_all_coro),創(chuàng)建指定數(shù)量的協(xié)程對象,并為每個協(xié)程分配協(xié)程棧內(nèi)存。
通過鏈表的方式管理協(xié)程,每個狀態(tài)都有一個鏈表。協(xié)程狀態(tài)切換,對應(yīng)協(xié)程在不同狀態(tài)鏈表的轉(zhuǎn)移。
4.2Tars 協(xié)程的調(diào)度
Tars 調(diào)度是基于epoll實現(xiàn),在 epoll 循環(huán)里檢查是否有需要執(zhí)行的協(xié)程, 有則執(zhí)行之, 沒有則等待在epoll對象上, 直到有喚醒或者超時。使用 epoll 實現(xiàn)的好處是可以和網(wǎng)絡(luò)IO無縫粘合, 當(dāng)有數(shù)據(jù)發(fā)送/接收時, 喚醒epoll對象, 從而完成協(xié)程的切換。
Tars協(xié)程調(diào)度的核心邏輯是
TC_CoroutineScheduler::run()
代碼6:
void TC_CoroutineScheduler::run() { ... ... while(!_epoller->isTerminate()) { if(_activeCoroQueue.empty() && TC_CoroutineInfo::CoroutineHeadEmpty(&_avail) && TC_CoroutineInfo::CoroutineHeadEmpty(&_active)) { _epoller->done(1000); // epoll_wait(..., 1000ms) 先處理epoll的網(wǎng)絡(luò)事件 } //喚醒需要激活的協(xié)程 wakeup(); //喚醒sleep的協(xié)程 wakeupbytimeout(); //喚醒yield的協(xié)程 wakeupbyself(); int iLoop = 100; //執(zhí)行active協(xié)程, 每次執(zhí)行100個, 避免占滿cpu while(iLoop > 0 && !TC_CoroutineInfo::CoroutineHeadEmpty(&_active)) { TC_CoroutineInfo *coro = _active._next; switchCoro(coro); --iLoop; } //執(zhí)行available協(xié)程, 每次執(zhí)行1個 if(!TC_CoroutineInfo::CoroutineHeadEmpty(&_avail)) { TC_CoroutineInfo *coro = _avail._next; switchCoro(coro); } } ... ... }
下圖可以更清楚得看到協(xié)程調(diào)度和狀態(tài)轉(zhuǎn)移的過程。
Tars協(xié)程調(diào)度狀態(tài)轉(zhuǎn)移圖
TC_CoroutineScheduler 提供了下面四種方法實現(xiàn)協(xié)程的調(diào)度:
(1) TC_CoroutineScheduler:啟動協(xié)程。
(2)TC_CoroutineScheduler:當(dāng)前協(xié)程放棄繼續(xù)執(zhí)行。并提供了兩種方式,支持不同的喚醒策略。
yield(true):會自動喚醒(等到下次協(xié)程調(diào)度,都會再激活當(dāng)前線程)
yield(false):不再自動喚醒,除非自己調(diào)度該協(xié)程(比如put到調(diào)度器中)
(3)TC_CoroutineScheduler:當(dāng)前協(xié)程休眠iSleepTime時間(單位:毫秒),然后會被喚醒繼續(xù)執(zhí)行。
(4)TC_CoroutineScheduler:放入需要喚醒的協(xié)程, 將協(xié)程放入到調(diào)度器中, 馬上會被調(diào)度器調(diào)度。
五、總結(jié)
本文介紹了協(xié)程的概念,并討論了 Tars Cpp 協(xié)程的實現(xiàn)原理和源碼分析。
審核編輯:劉清
-
RPC
+關(guān)注
關(guān)注
0文章
111瀏覽量
11493 -
C++語言
+關(guān)注
關(guān)注
0文章
147瀏覽量
6951 -
Lua語言
+關(guān)注
關(guān)注
0文章
8瀏覽量
1482 -
調(diào)度器
+關(guān)注
關(guān)注
0文章
98瀏覽量
5232
原文標題:Tars-Cpp協(xié)程實現(xiàn)分析
文章出處:【微信號:OSC開源社區(qū),微信公眾號:OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論