本文主要是對自己學(xué)習(xí)協(xié)程并實(shí)現(xiàn)輕量級協(xié)程過程的一個記錄, 語言略顯啰嗦, 各位見諒. 水平有限, 如有疏漏, 歡迎各位指正.
一 了解協(xié)程
- 協(xié)程可以理解為一種用戶態(tài)的輕量級線程, 切換由用戶定義
- 協(xié)程上下文切換很快, 因?yàn)椴粫萑雰?nèi)核態(tài)
- 協(xié)程擁有自己的寄存器上下文和棧, 協(xié)程調(diào)度切換時,將寄存器上下文和棧保存到其他地方,在切換回來的時候,恢復(fù)先前保存的寄存器上下文和棧
優(yōu)點(diǎn)
- 協(xié)程具有極高的執(zhí)行效率 因?yàn)樽映绦蚯袚Q不是線程切換,是由程序自身控制,因此協(xié)程沒有線程切換的開銷, 多線程的線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯
- 訪問共享資源不需要多線程的鎖機(jī)制, 因?yàn)橹挥幸粋€線程, 也不存在同時寫變量沖突, 所以在協(xié)程中控制共享資源無需加鎖, 只需要判斷狀態(tài)就好了,執(zhí)行效率比多線程高很多, 而且代碼編寫難度也可以相應(yīng)降低
- 以同步代碼的方式寫異步邏輯
缺點(diǎn)
- 無法利用多核資源, 除非和多進(jìn)程配合
二 了解ucontext
-
ucontext組件是什么
- 頭文件
<ucontext.h>定義了兩個數(shù)據(jù)結(jié)構(gòu),mcontext_t(暫時用不到)和ucontext_t和四個函數(shù), 可以被用來實(shí)現(xiàn)一個進(jìn)程中的用戶級線程(協(xié)程)切換
- 頭文件
數(shù)據(jù)結(jié)構(gòu)
-
ucontext_t結(jié)構(gòu)體至少擁有如下幾個域typedef struct ucontext { struct ucontext *uc_link; sigset_t uc_sigmask; stack_t uc_stack; mcontext_t uc_mcontext; ... } ucontext_t;-
uc_link指向后繼上下文, 當(dāng)前上下文運(yùn)行終止時系統(tǒng)會恢復(fù)指向的上下文 -
uc_sigmask為該上下文中的阻塞信號集合 -
uc_stack為該上下文中使用的棧 -
uc_mcontex保存上下文的特定機(jī)器, 包括調(diào)用線程的特定寄存器等等 - 簡而言之這個數(shù)據(jù)結(jié)構(gòu)是用來保存上下文的
-
函數(shù)
-
int getcontext(ucontext_t * ucp);- 獲取當(dāng)前上下文, 初始化ucp結(jié)構(gòu)體, 將當(dāng)前上下文保存到ucp中
- 成功返回0; 失敗返回-1, 并設(shè)置errno
-
void makecontext(ucontext_t *ucp, void(*func)(), int argc, ...);-
創(chuàng)建一個目標(biāo)上下文 創(chuàng)建方式: (1) getcontext, (2) 指定分配給上下文的棧
uc_stack.ss_sp, (3) 指定這塊棧的大小uc_stack.ss_size, (4) 指定uc_stack.ss_flags, (5) 指定后繼上下文uc_link - 協(xié)程運(yùn)行時使用主協(xié)程劃分的??臻g,而協(xié)程切回主線程時需要將該部分棧空間的內(nèi)容copy到每個協(xié)程各自的一個空間緩存起來,因?yàn)橹鲄f(xié)程中劃分的棧空間并不是只用于一個協(xié)程,而是會用于多個協(xié)程
- makecontext可以修改通過getcontext初始化得到的上下文, (必須先調(diào)用getcontext), 然后為ucp指定一個??臻g
ucp->stack, 設(shè)置后繼的上下文ucp->uc_link - 當(dāng)上下文通過setcontext或者swapcontext激活后, 執(zhí)行func函數(shù)(argc為后續(xù)的參數(shù)個數(shù), 可變參數(shù)). 當(dāng)func執(zhí)行返回后, 繼承的上下文被激活(
ucp->uc_link), 如果為NULL, 則線程退出
ucontext_t tar_ctx; ucontext_t next_ctx; char stack[100]; getcontext(&tar_ctx); tar_ctx.uc_stack.ss_sp = stack; tar_ctx.uc_stack.ss_sp = sizeof(stack); tar_ctx.uc_stack.ss_flags = 0; tar_ctx.uc_link = &next_ctx; -
創(chuàng)建一個目標(biāo)上下文 創(chuàng)建方式: (1) getcontext, (2) 指定分配給上下文的棧
-
int setcontext(const ucontext_t *ucp)- 設(shè)置當(dāng)前的上下文為ucp(激活ucp)
- ucp來自getcontext, 那么上下文恢復(fù)至ucp
- ucp來自makecontext, 那么將會調(diào)用makecontext函數(shù)的第二個參數(shù)指向的函數(shù)func, 如果func返回, 則恢復(fù)至
ucp->uc_link指定的后繼上下文, 如果該ucp中的uc_link為NULL, 那么線程退出 - 成功不返回, 失敗返回-1, 設(shè)置errno
-
int swapcontext(ucontext_t *oucp, ucontext_t *ucp)- 切換上下文
- 保存當(dāng)前上下文至oucp, 激活ucp上下文(先執(zhí)行makecontext指定的ucp入口函數(shù), 而后會執(zhí)行
ucp->uc_link指向的后繼上下文) - 成功不返回, 失敗返回-1, 設(shè)置errno
三 輕量級協(xié)程實(shí)現(xiàn)
名詞說明
- 協(xié)程調(diào)度器 代碼中的SingleSchedule
- 用戶協(xié)程 代碼中的Coroutine
- ??臻g/棧區(qū) 對應(yīng)SingleSchedule中的成員stack
- 棧緩存/緩存區(qū) 對應(yīng)的是Coroutine中的成員Buffer
-
主協(xié)程上下文 SingleSchedule中的
main_ctx, 對應(yīng)main函數(shù)中的上下文 -
用戶協(xié)程上下文 Coroutine中的
ctx, 對應(yīng)每個用戶協(xié)程自身的上下文
思路
- 本文基于
ucontext.h實(shí)現(xiàn)協(xié)程庫 -
基本思路:
- 構(gòu)造一個協(xié)程調(diào)度類, 該類用于調(diào)度所有的用戶協(xié)程, 提供一個協(xié)程池ctxPool, 使用單例模式實(shí)現(xiàn).
- 構(gòu)造一個用戶協(xié)程類, 該類對象對應(yīng)每個用戶協(xié)程, 提供一個用戶協(xié)程邏輯虛函數(shù)CoProcess, 提供一個用戶協(xié)程上下文ctx
- 用戶協(xié)程首次激活, 將會為其分配協(xié)程調(diào)度器提供的棧區(qū)stack
- 用戶協(xié)程被掛起, 那么會將該協(xié)程的棧信息由棧區(qū)stack保存到其自身的緩存區(qū)buffer;
- 用戶協(xié)程被喚醒, 那么會將該用戶協(xié)程的棧信息從緩存區(qū)buffer中Reload至棧區(qū)
-
協(xié)程庫框架
- 激活 初始化用戶協(xié)程(指定協(xié)程狀態(tài)RUNNING), 初始化用戶協(xié)程上下文(指定協(xié)程??臻gstack, 指定后繼上下文), 將協(xié)程加入?yún)f(xié)程池
- 掛起 將??臻gstack對應(yīng)的數(shù)據(jù)緩存至當(dāng)前用戶協(xié)程的棧緩存buffer中, 更改協(xié)程狀態(tài)SUSPEND
-
恢復(fù) 將用戶協(xié)程棧緩存buffer中的數(shù)據(jù)reload進(jìn)棧空間stack
示意圖
1 用戶協(xié)程類 Coroutine
數(shù)據(jù)成員
- 協(xié)程狀態(tài)
CoState state(FREE, RUNNING, SUSPEND) - 協(xié)程號
int id(對應(yīng)協(xié)程調(diào)度類中的協(xié)程池的id) - 棧緩存
char * Buffer, 一個動態(tài)數(shù)組, 當(dāng)協(xié)程被切出時, 緩存??臻g - 所需緩存區(qū)尺寸
int stack_size - 用戶協(xié)程棧容量尺寸
int capcap如果小于stack_size, 那么需要重新分配緩存區(qū), 否則可以直接緩存 - 用戶協(xié)程上下文
ucontext_t ctx
主要成員函數(shù)
- 掛起協(xié)程函數(shù)
void Yield();- 掛起當(dāng)前協(xié)程, 并SaveStack??臻g, 切換狀態(tài)至SUSPEND
- 恢復(fù)協(xié)程函數(shù)
void Resume()- 恢復(fù)該協(xié)程, 并ReloadStack??臻g
- 緩存堆棧函數(shù)
void SaveStack();- 調(diào)用時機(jī)是協(xié)程被切出, 會將協(xié)程調(diào)度對象中的堆棧緩存入用戶協(xié)程自身的緩存區(qū)
- 載入堆棧函數(shù)
void ReloadStack();- 調(diào)用時機(jī)是協(xié)程被恢復(fù)時, 會將該用戶協(xié)程的堆棧信息從緩存區(qū)回復(fù)到協(xié)程調(diào)度對象的堆棧中
- 用戶協(xié)程邏輯虛函數(shù)
virtual void CoProcess();- 用于派生成員中定義業(yè)務(wù)邏輯
2 協(xié)程調(diào)度類
- 單例
數(shù)據(jù)成員
- 協(xié)程池
std::map<int, Coroutine*> crtPool; - 主協(xié)程上下文
ucontext_t main_ctx - 協(xié)程堆棧
char stack[DEFAULT_STACK_SIZE], 所有的協(xié)程都利用這塊區(qū)域
成員函數(shù)
- 協(xié)程啟動函數(shù)
void CoroutineNew(Coroutine * crt);- 初始化用戶協(xié)程(指定協(xié)程狀態(tài)RUNNING), 初始化用戶協(xié)程上下文(指定協(xié)程??臻gstack, 指定后繼上下文), 將協(xié)程加入?yún)f(xié)程池
- 用戶協(xié)程入口函數(shù)
static void CoroutineEntry(void * crt);- 指向用戶協(xié)程的入口函數(shù)
- 協(xié)程恢復(fù)函數(shù)
void Resume(int id);- 根據(jù)id恢復(fù)對應(yīng)協(xié)程
- 檢查并清理協(xié)程池
int HasCoroutine();- 清理FREE的協(xié)程, 并返回剩余的協(xié)程數(shù)量
- 協(xié)程刪除函數(shù)
void Remove(int id);- 刪除對應(yīng)協(xié)程
注意點(diǎn)
- 所有的用戶協(xié)程都使用調(diào)度器的棧空間, 每個用戶協(xié)程自身的buffer只不過用來作緩存
- SaveStack和ReloadStack函數(shù)的實(shí)現(xiàn)需要注意, 如何緩存協(xié)程棧
- 協(xié)程棧是由用戶分配的, 如代碼中stack數(shù)組, 由于該數(shù)組的目的是用作??臻g, 而進(jìn)程中棧是預(yù)分配的, 即首先確定棧的高地址, 從高地址開始往低使用, 根據(jù)這一點(diǎn)我們可以確定需要被緩存的棧空間大小.
char * stackBottom = SingleSchedule::GetInst()->GetStackBottom(); // 獲取到棧底, 即高地址 char dumy = 0; // 最后創(chuàng)建的變量, 必然分配在棧頂 assert(stackBottom-&dumy <= DEFAULT_STACK_SIZE); // 被棧緩存不能大于??臻g if (cap<stackBottom-&dumy){ // cap 代表當(dāng)前棧緩存大小, 如果不夠需要重分配 if(buffer){ // 釋放當(dāng)前棧緩存 delete [] buffer; } cap = stackBottom-&dumy; buffer = new char[cap]; } stack_size = stackBottom-&dumy; // ??臻g大小 memcpy(buffer, &dumy, stack_size); // 緩存
代碼實(shí)現(xiàn)
https://github.com/trioZwShen/my_code_rep/tree/master/My_Coroutine
1 用戶協(xié)程
/**
* @file : Coroutine.h
* @author : neilzwshen
* @time : 2018-7-31
* @version : 3.0
* @remark : 用戶協(xié)程類
*/
#ifndef COROUTINE_H_
#define COROUTINE_H_
#define DEFAULT_STACK_SIZE (1024*1024)
#include <stdio.h>
#include <string.h>
#include <ucontext.h>
enum CoState {FREE = 0, RUNNING = 1, SUSPEND = 2};
class Coroutine
{
public:
Coroutine();
virtual ~Coroutine();
/**
* 用戶協(xié)程入口函數(shù)
*/
virtual void CoProcess();
/**
* 用戶協(xié)程恢復(fù)函數(shù)
*/
void Resume();
/**
* 獲取協(xié)程id
* @return [返回id]
*/
int GetId()const {
return id;
}
/**
* 設(shè)置協(xié)程id
*/
void SetId(int _id) {
id = _id;
}
/**
* 獲取協(xié)程狀態(tài)
* @return [返回協(xié)程狀態(tài)]
*/
int GetState()const {
return state;
}
/**
* 設(shè)置協(xié)程狀態(tài)
*/
void SetState(CoState _state) {
state = _state;
}
protected:
/**
* 用戶協(xié)程掛起函數(shù)
*/
void Yield();
/**
* 堆棧緩存
*/
void SaveStack();
/**
* 堆?;謴?fù)
*/
void ReloadStack();
public:
char *buffer; // 緩存協(xié)程堆棧
ucontext_t ctx;
private:
int stack_size;
int cap;
int id;
CoState state;
};
#endif
#include <assert.h>
#include "Coroutine.h"
#include "Schedule.h"
Coroutine::Coroutine()
:id(0),state(FREE),cap(0),stack_size(0),buffer(nullptr)
{
}
Coroutine::~Coroutine()
{
delete [] buffer;
}
void Coroutine::CoProcess()
{
}
void Coroutine::Resume()
{
if(state==SUSPEND){
ReloadStack();
state = RUNNING;
swapcontext(&(SingleSchedule::GetInst()->mainCtx), &ctx);
}
}
void Coroutine::Yield()
{
if (state == RUNNING){
SaveStack();
state = SUSPEND;
swapcontext(&ctx, &(SingleSchedule::GetInst()->mainCtx));
}
}
void Coroutine::SaveStack()
{
char * stackBottom = SingleSchedule::GetInst()->GetStackBottom();
char dumy = 0;
assert(stackBottom-&dumy <= DEFAULT_STACK_SIZE);
if (cap<stackBottom-&dumy){
if(buffer){
delete [] buffer;
}
cap = stackBottom-&dumy;
buffer = new char[cap];
}
stack_size = stackBottom-&dumy;
memcpy(buffer, &dumy, stack_size);
}
void Coroutine::ReloadStack()
{
memcpy(SingleSchedule::GetInst()->GetStackBottom()-stack_size, buffer, stack_size);
}
2 單例模板
/**
* @file : Singleton.h
* @author : neilzwshen
* @time : 2018-7-30
* @version : 1.0
* @remark : 單例模板, 只要將對象作為T, 就可以獲取到一個單例對象, 構(gòu)造函數(shù)不能傳參
*/
#ifndef SINGLETON_H_
#define SINGLETON_H_
template<class T>
class Singleton {
public:
/**
* 單例獲取
* @return [返回T的單例對象]
*/
static T* GetInst(){
if (!flag_instance){
flag_instance = new Singleton();
}
return &flag_instance->_instance;
}
protected:
/**
* 單例構(gòu)造
*/
Singleton(){}
private:
/**
* T對象實(shí)例
*/
T _instance;
/**
* 單例模板實(shí)例,
*/
static Singleton<T> * flag_instance;
};
template<class T>
Singleton<T> * Singleton<T>::flag_instance = 0;
#endif
3 協(xié)程調(diào)度器
/**
* @file : Schedule.h
* @author : neilzwshen
* @time : 2018-7-31
* @version : 3.0
* @remark : 協(xié)程調(diào)度類
*/
#ifndef SCHEDULE_H_
#define SCHEDULE_H_
#include <stdio.h>
#include <map>
#include <ucontext.h>
#include "Coroutine.h"
#include "Singleton.h"
typedef std::map<int, Coroutine*> CrtMap;
class Schedule
{
public:
Schedule();
virtual ~Schedule();
/**
* 用戶協(xié)程入口函數(shù)
*/
static void CoroutineEntry(void * crt);
/**
* 將協(xié)程crt加入?yún)f(xié)程池, 并開啟
* @param crt [協(xié)程指針]
*/
void CoroutineNew(Coroutine * crt);
/**
* 恢復(fù)用戶協(xié)程
* @param id [description]
*/
void Resume(int id);
/**
* 判斷協(xié)程池中是否還有未完成的協(xié)程, 并將已經(jīng)終止的協(xié)程刪除
* @return [返回協(xié)程數(shù)]
*/
int HasCoroutine();
/**
* 根據(jù)協(xié)程id刪除協(xié)程
* @param id [協(xié)程id]
*/
void Remove(int id);
/**
* 獲取到棧底
* @return [返回棧底地址]
*/
char* GetStackBottom(){
return stack + DEFAULT_STACK_SIZE;
}
public:
ucontext_t mainCtx;
char stack[DEFAULT_STACK_SIZE]; // 運(yùn)行協(xié)程堆棧
private:
CrtMap crtPool;
};
typedef Singleton<Schedule> SingleSchedule;
#endif
#include <assert.h>
#include "Schedule.h"
Schedule::Schedule()
{
}
Schedule::~Schedule()
{
}
void Schedule::CoroutineEntry(void * crt) {
((Coroutine *)crt)->SetState(RUNNING);
((Coroutine *)crt)->CoProcess();
((Coroutine *)crt)->SetState(FREE);
}
void Schedule::CoroutineNew(Coroutine * crt) {
int id = crt->GetId();
CoState state = CoState(crt->GetState());
assert(id != 0);
assert(state == FREE);
//printf("--%d,%d--\n",id, state);
if (crtPool[id] != nullptr) {
CrtMap::iterator it = crtPool.find(id);
crtPool.erase(it);
}
// 構(gòu)建用戶協(xié)程上下文
getcontext(&(crt->ctx));
//memset(stack, 0, DEFAULT_STACK_SIZE);
crt->ctx.uc_stack.ss_sp = stack;
crt->ctx.uc_stack.ss_size = DEFAULT_STACK_SIZE;
crt->ctx.uc_stack.ss_flags = 0;
crt->ctx.uc_link = &mainCtx;
crtPool[id] = crt;
makecontext(&crt->ctx, (void(*)(void))CoroutineEntry, 1, (void *)crt);
swapcontext(&mainCtx, &crt->ctx);
}
void Schedule::Resume(int id){
if (crtPool[id] != nullptr) {
crtPool[id]->Resume();
}
}
int Schedule::HasCoroutine() {
int count = 0;
CrtMap::iterator it;
for (it = crtPool.begin(); it != crtPool.end(); it++) {
if (it->second->GetState() != FREE) {
count++;
}else{
it=crtPool.erase(it);
it--;
}
}
return count;
}
void Schedule::Remove(int id) {
if (crtPool[id] != nullptr) {
crtPool.erase(crtPool.find(id));
}
}
4 示例
#include <stdio.h>
#include <memory>
#include "Coroutine.h"
#include "Schedule.h"
class Logic1 : public Coroutine{
void CoProcess(){
puts("1");
Yield();
puts("4");
Yield();
puts("7");
}
};
class Logic2 : public Coroutine{
void CoProcess(){
puts("2");
Yield();
puts("5");
Yield();
puts("8");
}
};
class Logic3 : public Coroutine{
void CoProcess(){
puts("3");
Yield();
puts("6");
Yield();
puts("9");
}
};
int main() {
std::shared_ptr<Coroutine> ct1(new Logic1());
std::shared_ptr<Coroutine> ct2(new Logic2());
std::shared_ptr<Coroutine> ct3(new Logic3());
ct1->SetId(1);
ct2->SetId(2);
ct3->SetId(3);
SingleSchedule::GetInst()->CoroutineNew(ct1.get());
SingleSchedule::GetInst()->CoroutineNew(ct2.get());
SingleSchedule::GetInst()->CoroutineNew(ct3.get());
SingleSchedule::GetInst()->Resume(1);
SingleSchedule::GetInst()->Resume(2);
SingleSchedule::GetInst()->Resume(3);
SingleSchedule::GetInst()->Resume(1);
SingleSchedule::GetInst()->Resume(2);
SingleSchedule::GetInst()->Resume(3);
//SingleSchedule::GetInst()->Remove(1);
//SingleSchedule::GetInst()->Remove(2);
//SingleSchedule::GetInst()->Remove(3);
int count = SingleSchedule::GetInst()->HasCoroutine();
printf("%d\n", count);
return 0;
}
5 執(zhí)行結(jié)果
szw@szw-VirtualBox:~/code/coroutine/temp/My_Coroutine_3_0$ ./main
1
2
3
4
5
6
7
8
9
0
