本文介紹了協(xié)程的概念,并討論了 Tars Cpp 協(xié)程的實現(xiàn)原理和源碼分析。
一、前言
Tars 是 Linux 基金會的開源項目,它是基于名字服務使用 Tars 協(xié)議的高性能 RPC 開發(fā)框架,配套一體化的運營管理平臺,并通過伸縮調度,實現(xiàn)運維半托管服務。Tars 集可擴展協(xié)議編解碼、高性能 RPC 通信框架、名字路由與發(fā)現(xiàn)、發(fā)布監(jiān)控、日志統(tǒng)計、配置管理等于一體,通過它可以快速用微服務的方式構建自己的穩(wěn)定可靠的分布式應用,并實現(xiàn)完整有效的服務治理。
Tars 目前支持 C++,Java,PHP,Nodejs,Go 語言,其中 TarsCpp 3.x 全面啟用對協(xié)程的支持,服務框架全面融合協(xié)程。本文基于TarsCpp-v3.0.0版本,討論了協(xié)程在TarsCpp服務框架的實現(xiàn)。
二、協(xié)程的介紹
2.1 什么是協(xié)程
協(xié)程的概念最早出現(xiàn)在Melvin Conway在1963年的論文("Design of a separable transition-diagram compiler"),協(xié)程認為是“可以暫停和恢復執(zhí)行”的函數(shù)。

協(xié)程可以看成一種特殊的函數(shù),相比于函數(shù),協(xié)程最大的特點就是支持掛起(yield)和恢復(resume)的能力。如上圖所示:函數(shù)不能主動中斷執(zhí)行流;而協(xié)程支持主動掛起,中斷執(zhí)行流,并在一定時機恢復執(zhí)行。
協(xié)程的作用:
降低并發(fā)編碼的復雜度,尤其是異步編程(callback hell)。
協(xié)程在用戶態(tài)中實現(xiàn)調度,避免了陷入內核,上下文切換開銷小。
2.2 進程、線程和協(xié)程
我們可以簡單的認為協(xié)程是用戶態(tài)的線程。協(xié)程和線程主要異同:
相同點:都可以實現(xiàn)上下文切換(保存和恢復執(zhí)行流)
不同點:線程的上下文切換在內核實現(xiàn),切換的時機由內核調度器控制。協(xié)程的上下文切換在用戶態(tài)實現(xiàn),切換的時機由調用方自身控制。
進程、線程和協(xié)程的比較:

2.3 協(xié)程的分類
按控制傳遞(Control-transfer)機制分為:對稱(Symmetric)協(xié)程和非對稱(Asymmetric)協(xié)程。
對稱協(xié)程:協(xié)程之間相互獨立,調度權(CPU)可以在任意協(xié)程之間轉移。協(xié)程只有一種控制傳遞操作(yield)。對稱協(xié)程一般需要調度器支持,通過調度算法選擇下一個目標協(xié)程。
非對稱協(xié)程:協(xié)程之間存在調用關系,協(xié)程讓出的調度權只能返回給調用者。協(xié)程有兩種控制操作:恢復(resume)和掛起(yield)。
下圖演示了對稱協(xié)程的調度權轉移流程,協(xié)程只有一個操作yield,表示讓出CPU,返回給調度器。

對稱協(xié)程示意圖
下圖演示了非對稱協(xié)程的調度權轉移流程。協(xié)程可以有兩個操作,即resume和yield。resume表示轉移CPU給被調用者,yield表示被調用者返回CPU給調用者。

非對稱協(xié)程示意圖
根據(jù)協(xié)程是否有獨立的??臻g,協(xié)程分為有棧協(xié)程(stackful)和無棧協(xié)程(stackless)兩種。
有棧協(xié)程:每個協(xié)程有獨立的棧空間,保存獨立的上下文(執(zhí)行棧、寄存器等),協(xié)程的喚醒和掛起就是拷貝和切換上下文。優(yōu)點:協(xié)程調度可以嵌套,在內存中的任意位置、任意時刻進行。局限:協(xié)程數(shù)目增大,內存開銷增大。
無棧協(xié)程:單個線程內所有協(xié)程都共享同一個棧空間(共享棧),協(xié)程的切換就是簡單的函數(shù)調用和返回,無棧協(xié)程通常是基于狀態(tài)機或閉包來實現(xiàn)。優(yōu)點:減小內存開銷。局限:協(xié)程調度產(chǎn)生的局部變量都在共享棧上, 一旦新的協(xié)程運行后共享棧中的數(shù)據(jù)就會被覆蓋, 先前協(xié)程的局部變量也就不再有效, 進而無法實現(xiàn)參數(shù)傳遞、嵌套調用等高級協(xié)程交互。
Golang 中的 goroutine、Lua 中的協(xié)程都是有棧協(xié)程;ES6的 await/async、Python 的 Generator、C++20 中的 cooroutine 都是無棧協(xié)程。
三、Tars 協(xié)程實現(xiàn)
實現(xiàn)協(xié)程的核心有兩點:
實現(xiàn)用戶態(tài)的上下文切換。
實現(xiàn)協(xié)程的調度。
Tars 協(xié)程的由下面幾個類實現(xiàn):
TC_CoroutineInfo 協(xié)程信息類:實現(xiàn)協(xié)程的上下文切換。每個協(xié)程對應一個 TC_CoroutineInfo 對象,上下文切換基于boost.context實現(xiàn)。
TC_CoroutineScheduler 協(xié)程調度器類:實現(xiàn)了協(xié)程的管理和調度。
TC_Coroutine 協(xié)程類:繼承于線程類(TC_Thread),方便業(yè)務快速使用協(xié)程。
Tars 協(xié)程有幾個特點:
有棧協(xié)程。每個協(xié)程都分配了獨立的??臻g。
對稱協(xié)程。協(xié)程之間相互獨立,由調度器負責調度。
基于 epoll 實現(xiàn)協(xié)程調度,和網(wǎng)絡IO無縫結合。
3.1 用戶態(tài)上下文切換的實現(xiàn)方式
協(xié)程可以看成一種特殊的函數(shù),和普通函數(shù)不同,協(xié)程函數(shù)有掛起(yield)和恢復(resume)的能力,即可以中斷自己的執(zhí)行流,并且在合適的時候恢復執(zhí)行流,這也稱為上下文切換的能力。
協(xié)程執(zhí)行的過程,依賴兩個關鍵要素:協(xié)程棧和寄存器,協(xié)程的上下文環(huán)境其實就是寄存器和棧的狀態(tài)。實現(xiàn)上下文切換的核心就是實現(xiàn)保存并恢復當前執(zhí)行環(huán)境的寄存器狀態(tài)的能力。
實現(xiàn)用戶態(tài)上下文切換一般有以下方式:

3.2 基于boost.context實現(xiàn)上下文切換
Tars 協(xié)程是基于 boost.context 實現(xiàn),boost.context 提供了兩個接口(make_fcontext, jump_fcontext)實現(xiàn)協(xié)程的上下文切換。
代碼1:
/**
* @biref 執(zhí)行環(huán)境上下文
*/
typedef void* fcontext_t;
/**
* @biref 事件參數(shù)包裝
*/
struct transfer_t {
fcontext_t fctx; // 來源的執(zhí)行上下文。來源的上下文指的是從什么位置跳轉過來的
void* data; // 接口傳入的自定義的指針
};
/**
* @biref 初始化執(zhí)行環(huán)境上下文
* @param sp ??臻g地址
* @param size ??臻g的大小
* @param fn 入口函數(shù)
* @return 返回初始化完成后的執(zhí)行環(huán)境上下文
*/
extern "C" fcontext_t make_fcontext(void * stack, std::size_t stack_size, void (* fn)( transfer_t));
/**
* @biref 跳轉到目標上下文
* @param to 目標上下文
* @param vp 目標上下文的附加參數(shù),會設置為transfer_t里的data成員
* @return 跳轉來源
*/
extern "C" transfer_t jump_fcontext(fcontext_t const to, void * vp);
(1)make_fcontext創(chuàng)建協(xié)程
接受三個參數(shù),stack是為協(xié)程分配的棧底,stack_size是棧的大小,fn是協(xié)程的入口函數(shù)
返回初始化完成后的執(zhí)行環(huán)境上下文
(2)jump_fcontext切換協(xié)程
接受兩個參數(shù),目標上下文地址和參數(shù)指針
返回一個上下文,指向當前上下文從哪個上下文跳轉過來

fcontext的結構
boost context 是通過 fcontext_t結構體來保存協(xié)程狀態(tài)。相對于其它匯編實現(xiàn)的協(xié)程庫,boost的context和stack是一起的,棧底指針就是context,切換context就是切換stack。
3.3 Tars協(xié)程信息類
TC_CoroutineInfo 協(xié)程信息類,包裝了 boost.context 提供的接口,表示一個 TARS 協(xié)程。
其中,TC_CoroutineInfo::registerFunc 定義了協(xié)程的創(chuàng)建。
代碼2:
void TC_CoroutineInfo::registerFunc(const std::function& callback) { _callback = callback; _init_func.coroFunc = TC_CoroutineInfo::corotineProc; _init_func.args = this; fcontext_t ctx = make_fcontext(_stack_ctx.sp, _stack_ctx.size, TC_CoroutineInfo::corotineEntry); // 創(chuàng)建協(xié)程 transfer_t tf = jump_fcontext(ctx, this); // context 切換 //實際的ctx this->setCtx(tf.fctx); } void TC_CoroutineInfo::corotineEntry(transfer_t tf) { TC_CoroutineInfo * coro = static_cast< TC_CoroutineInfo * >(tf.data); // this auto func = coro->_init_func.coroFunc; void* args = coro->_init_func.args; transfer_t t = jump_fcontext(tf.fctx, NULL); //拿到自己的協(xié)程堆棧, 當前協(xié)程結束以后, 好跳轉到main coro->_scheduler->setMainCtx(t.fctx); //再跳轉到具體函數(shù) func(args, t); }
TC_CoroutineInfo::switchCoro 定義了協(xié)程切換。
代碼3
void TC_CoroutineScheduler::switchCoro(TC_CoroutineInfo *to)
{
//跳轉到to協(xié)程
_currentCoro = to;
transfer_t t = jump_fcontext(to->getCtx(), NULL);
//并保存協(xié)程堆棧
to->setCtx(t.fctx);
}
四、Tars 協(xié)程調度器
基于 boost.context 的 TC_CoroutineInfo 類實現(xiàn)了協(xié)程的上下文切換,協(xié)程的管理和調度,則是由 TC_CoroutineScheduler 協(xié)程調度器類來負責,分管理和調度兩個方面來說明 TC_CoroutineScheduler 調度類。
協(xié)程管理:目的是需要合理的數(shù)據(jù)結構來組織協(xié)程(TC_CoroutineInfo),方便調度的實現(xiàn)。
協(xié)程調度:目的是控制協(xié)程的啟動、休眠和喚醒,實現(xiàn)了 yield, sleep 等功能,本質就是實現(xiàn)協(xié)程的狀態(tài)機,完成協(xié)程的狀態(tài)切換。Tars 協(xié)程分為 5 個狀態(tài):FREE, ACTIVE, AVAIL, INACTIVE, TIMEOUT
代碼4:
/**
* 協(xié)程的狀態(tài)信息
*/
enum CORO_STATUS
{
CORO_FREE = 0,
CORO_ACTIVE = 1,
CORO_AVAIL = 2,
CORO_INACTIVE = 3,
CORO_TIMEOUT = 4
};
4.1 Tars 協(xié)程的管理
TC_CoroutineScheduler 主要通過以下方法管理協(xié)程:
TC_CoroutineScheduler::create()
創(chuàng)建TC_CoroutineScheduler對象
TC_CoroutineScheduler::init()初始化,分配協(xié)程棧內存
TC_CoroutineScheduler::run()啟動調度
TC_CoroutineScheduler::terminate()停止調度
TC_CoroutineScheduler::destroy()資源銷毀,釋放協(xié)程棧內存
我們可以通過 TC_CoroutineScheduler::init()看到數(shù)據(jù)結構的初始化過程。
代碼5:
void TC_CoroutineScheduler::init()
{
... ....
createCoroutineInfo(_poolSize); // _all_coro = new TC_CoroutineInfo*[_poolSize+1];
TC_CoroutineInfo::CoroutineHeadInit(&_active);
TC_CoroutineInfo::CoroutineHeadInit(&_avail);
TC_CoroutineInfo::CoroutineHeadInit(&_inactive);
TC_CoroutineInfo::CoroutineHeadInit(&_timeout);
TC_CoroutineInfo::CoroutineHeadInit(&_free);
int iSucc = 0;
for(size_t i = 0; i < _currentSize; ++i)
{
//iId=0不使用, 給mainCoro使用!!!!
uint32_t iId = generateId();
stack_context s_ctx = stack_traits::allocate(_stackSize); // 分配協(xié)程棧內存
TC_CoroutineInfo *coro = new TC_CoroutineInfo(this, iId, s_ctx);
_all_coro[iId] = coro;
TC_CoroutineInfo::CoroutineAddTail(coro, &_free);
++iSucc;
}
_currentSize = iSucc;
_mainCoro.setUid(0);
_mainCoro.setStatus(TC_CoroutineInfo::CORO_FREE);
_currentCoro = &_mainCoro;
}
通過下面的 TC_CoroutineScheduler 調度類數(shù)據(jù)結構圖,可以更清楚的看到協(xié)程的組織方式:

Tars調度類數(shù)據(jù)結構
使用協(xié)程之前,需要在協(xié)程數(shù)組(_all_coro),創(chuàng)建指定數(shù)量的協(xié)程對象,并為每個協(xié)程分配協(xié)程棧內存。
通過鏈表的方式管理協(xié)程,每個狀態(tài)都有一個鏈表。協(xié)程狀態(tài)切換,對應協(xié)程在不同狀態(tài)鏈表的轉移。
4.2Tars 協(xié)程的調度
Tars 調度是基于epoll實現(xiàn),在 epoll 循環(huán)里檢查是否有需要執(zhí)行的協(xié)程, 有則執(zhí)行之, 沒有則等待在epoll對象上, 直到有喚醒或者超時。使用 epoll 實現(xiàn)的好處是可以和網(wǎng)絡IO無縫粘合, 當有數(shù)據(jù)發(fā)送/接收時, 喚醒epoll對象, 從而完成協(xié)程的切換。
Tars協(xié)程調度的核心邏輯是
TC_CoroutineScheduler::run()
代碼6:
void TC_CoroutineScheduler::run()
{
... ...
while(!_epoller->isTerminate())
{
if(_activeCoroQueue.empty() && TC_CoroutineInfo::CoroutineHeadEmpty(&_avail) && TC_CoroutineInfo::CoroutineHeadEmpty(&_active))
{
_epoller->done(1000); // epoll_wait(..., 1000ms) 先處理epoll的網(wǎng)絡事件
}
//喚醒需要激活的協(xié)程
wakeup();
//喚醒sleep的協(xié)程
wakeupbytimeout();
//喚醒yield的協(xié)程
wakeupbyself();
int iLoop = 100;
//執(zhí)行active協(xié)程, 每次執(zhí)行100個, 避免占滿cpu
while(iLoop > 0 && !TC_CoroutineInfo::CoroutineHeadEmpty(&_active))
{
TC_CoroutineInfo *coro = _active._next;
switchCoro(coro);
--iLoop;
}
//執(zhí)行available協(xié)程, 每次執(zhí)行1個
if(!TC_CoroutineInfo::CoroutineHeadEmpty(&_avail))
{
TC_CoroutineInfo *coro = _avail._next;
switchCoro(coro);
}
}
... ...
}
下圖可以更清楚得看到協(xié)程調度和狀態(tài)轉移的過程。

Tars協(xié)程調度狀態(tài)轉移圖
TC_CoroutineScheduler 提供了下面四種方法實現(xiàn)協(xié)程的調度:
(1) TC_CoroutineScheduler:啟動協(xié)程。
(2)TC_CoroutineScheduler:當前協(xié)程放棄繼續(xù)執(zhí)行。并提供了兩種方式,支持不同的喚醒策略。
yield(true):會自動喚醒(等到下次協(xié)程調度,都會再激活當前線程)
yield(false):不再自動喚醒,除非自己調度該協(xié)程(比如put到調度器中)
(3)TC_CoroutineScheduler:當前協(xié)程休眠iSleepTime時間(單位:毫秒),然后會被喚醒繼續(xù)執(zhí)行。
(4)TC_CoroutineScheduler:放入需要喚醒的協(xié)程, 將協(xié)程放入到調度器中, 馬上會被調度器調度。
五、總結
本文介紹了協(xié)程的概念,并討論了 Tars Cpp 協(xié)程的實現(xiàn)原理和源碼分析。
審核編輯:劉清
-
RPC
+關注
關注
0文章
113瀏覽量
12207 -
C++語言
+關注
關注
0文章
147瀏覽量
7604 -
Lua語言
+關注
關注
0文章
9瀏覽量
1629 -
調度器
+關注
關注
0文章
99瀏覽量
5638
原文標題:Tars-Cpp協(xié)程實現(xiàn)分析
文章出處:【微信號:OSC開源社區(qū),微信公眾號:OSC開源社區(qū)】歡迎添加關注!文章轉載請注明出處。
發(fā)布評論請先 登錄
談談協(xié)程的那些事兒

基于TarsCpp-v3.0.0討論協(xié)程在TarsCpp服務框架的實現(xiàn)
評論