基于ucontext.h的輕量級協(xié)程庫

本文主要是對自己學(xué)習(xí)協(xié)程并實(shí)現(xiàn)輕量級協(xié)程過程的一個記錄, 語言略顯啰嗦, 各位見諒. 水平有限, 如有疏漏, 歡迎各位指正.

一 了解協(xié)程

  • 協(xié)程可以理解為一種用戶態(tài)的輕量級線程, 切換由用戶定義
  • 協(xié)程上下文切換很快, 因?yàn)椴粫萑雰?nèi)核態(tài)
  • 協(xié)程擁有自己的寄存器上下文和棧, 協(xié)程調(diào)度切換時,將寄存器上下文和棧保存到其他地方,在切換回來的時候,恢復(fù)先前保存的寄存器上下文和棧

優(yōu)點(diǎn)

  • 協(xié)程具有極高的執(zhí)行效率 因?yàn)樽映绦蚯袚Q不是線程切換,是由程序自身控制,因此協(xié)程沒有線程切換的開銷, 多線程的線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯
  • 訪問共享資源不需要多線程的鎖機(jī)制, 因?yàn)橹挥幸粋€線程, 也不存在同時寫變量沖突, 所以在協(xié)程中控制共享資源無需加鎖, 只需要判斷狀態(tài)就好了,執(zhí)行效率比多線程高很多, 而且代碼編寫難度也可以相應(yīng)降低
  • 以同步代碼的方式寫異步邏輯

缺點(diǎn)

  • 無法利用多核資源, 除非和多進(jìn)程配合


二 了解ucontext

  • ucontext組件是什么
    • 頭文件<ucontext.h>定義了兩個數(shù)據(jù)結(jié)構(gòu), mcontext_t(暫時用不到)和ucontext_t和四個函數(shù), 可以被用來實(shí)現(xiàn)一個進(jìn)程中的用戶級線程(協(xié)程)切換

數(shù)據(jù)結(jié)構(gòu)

  • ucontext_t 結(jié)構(gòu)體至少擁有如下幾個域
    typedef struct ucontext {
        struct ucontext *uc_link;
        sigset_t uc_sigmask;
        stack_t uc_stack;
        mcontext_t uc_mcontext;
        ...
    } ucontext_t;
    
    • uc_link指向后繼上下文, 當(dāng)前上下文運(yùn)行終止時系統(tǒng)會恢復(fù)指向的上下文
    • uc_sigmask為該上下文中的阻塞信號集合
    • uc_stack為該上下文中使用的棧
    • uc_mcontex保存上下文的特定機(jī)器, 包括調(diào)用線程的特定寄存器等等
    • 簡而言之這個數(shù)據(jù)結(jié)構(gòu)是用來保存上下文的

函數(shù)

  1. int getcontext(ucontext_t * ucp);

    • 獲取當(dāng)前上下文, 初始化ucp結(jié)構(gòu)體, 將當(dāng)前上下文保存到ucp中
    • 成功返回0; 失敗返回-1, 并設(shè)置errno
  2. void makecontext(ucontext_t *ucp, void(*func)(), int argc, ...);

    • 創(chuàng)建一個目標(biāo)上下文 創(chuàng)建方式: (1) getcontext, (2) 指定分配給上下文的棧uc_stack.ss_sp, (3) 指定這塊棧的大小uc_stack.ss_size, (4) 指定uc_stack.ss_flags, (5) 指定后繼上下文uc_link
    • 協(xié)程運(yùn)行時使用主協(xié)程劃分的??臻g,而協(xié)程切回主線程時需要將該部分棧空間的內(nèi)容copy到每個協(xié)程各自的一個空間緩存起來,因?yàn)橹鲄f(xié)程中劃分的棧空間并不是只用于一個協(xié)程,而是會用于多個協(xié)程
    • makecontext可以修改通過getcontext初始化得到的上下文, (必須先調(diào)用getcontext), 然后為ucp指定一個??臻gucp->stack, 設(shè)置后繼的上下文ucp->uc_link
    • 當(dāng)上下文通過setcontext或者swapcontext激活后, 執(zhí)行func函數(shù)(argc為后續(xù)的參數(shù)個數(shù), 可變參數(shù)). 當(dāng)func執(zhí)行返回后, 繼承的上下文被激活(ucp->uc_link), 如果為NULL, 則線程退出
    ucontext_t tar_ctx;
    ucontext_t next_ctx;
    char stack[100];
    getcontext(&tar_ctx);
    tar_ctx.uc_stack.ss_sp = stack;
    tar_ctx.uc_stack.ss_sp = sizeof(stack);
    tar_ctx.uc_stack.ss_flags = 0;
    tar_ctx.uc_link = &next_ctx;
    
  3. int setcontext(const ucontext_t *ucp)

    • 設(shè)置當(dāng)前的上下文為ucp(激活ucp)
    • ucp來自getcontext, 那么上下文恢復(fù)至ucp
    • ucp來自makecontext, 那么將會調(diào)用makecontext函數(shù)的第二個參數(shù)指向的函數(shù)func, 如果func返回, 則恢復(fù)至ucp->uc_link指定的后繼上下文, 如果該ucp中的uc_link為NULL, 那么線程退出
    • 成功不返回, 失敗返回-1, 設(shè)置errno
  4. int swapcontext(ucontext_t *oucp, ucontext_t *ucp)

    • 切換上下文
    • 保存當(dāng)前上下文至oucp, 激活ucp上下文(先執(zhí)行makecontext指定的ucp入口函數(shù), 而后會執(zhí)行ucp->uc_link指向的后繼上下文)
    • 成功不返回, 失敗返回-1, 設(shè)置errno


三 輕量級協(xié)程實(shí)現(xiàn)

名詞說明

  • 協(xié)程調(diào)度器 代碼中的SingleSchedule
  • 用戶協(xié)程 代碼中的Coroutine
  • ??臻g/棧區(qū) 對應(yīng)SingleSchedule中的成員stack
  • 棧緩存/緩存區(qū) 對應(yīng)的是Coroutine中的成員Buffer
  • 主協(xié)程上下文 SingleSchedule中的main_ctx, 對應(yīng)main函數(shù)中的上下文
  • 用戶協(xié)程上下文 Coroutine中的ctx, 對應(yīng)每個用戶協(xié)程自身的上下文

思路

  • 本文基于ucontext.h實(shí)現(xiàn)協(xié)程庫
  • 基本思路:
    • 構(gòu)造一個協(xié)程調(diào)度類, 該類用于調(diào)度所有的用戶協(xié)程, 提供一個協(xié)程池ctxPool, 使用單例模式實(shí)現(xiàn).
    • 構(gòu)造一個用戶協(xié)程類, 該類對象對應(yīng)每個用戶協(xié)程, 提供一個用戶協(xié)程邏輯虛函數(shù)CoProcess, 提供一個用戶協(xié)程上下文ctx
    • 用戶協(xié)程首次激活, 將會為其分配協(xié)程調(diào)度器提供的棧區(qū)stack
    • 用戶協(xié)程被掛起, 那么會將該協(xié)程的棧信息棧區(qū)stack保存到其自身的緩存區(qū)buffer;
    • 用戶協(xié)程被喚醒, 那么會將該用戶協(xié)程的棧信息緩存區(qū)buffer中Reload至棧區(qū)
  • 協(xié)程庫框架
    • 激活 初始化用戶協(xié)程(指定協(xié)程狀態(tài)RUNNING), 初始化用戶協(xié)程上下文(指定協(xié)程??臻gstack, 指定后繼上下文), 將協(xié)程加入?yún)f(xié)程池
    • 掛起 將??臻gstack對應(yīng)的數(shù)據(jù)緩存至當(dāng)前用戶協(xié)程的棧緩存buffer中, 更改協(xié)程狀態(tài)SUSPEND
    • 恢復(fù) 將用戶協(xié)程棧緩存buffer中的數(shù)據(jù)reload進(jìn)棧空間stack
      示意圖

1 用戶協(xié)程類 Coroutine

數(shù)據(jù)成員

  • 協(xié)程狀態(tài)CoState state(FREE, RUNNING, SUSPEND)
  • 協(xié)程號int id(對應(yīng)協(xié)程調(diào)度類中的協(xié)程池的id)
  • 棧緩存char * Buffer, 一個動態(tài)數(shù)組, 當(dāng)協(xié)程被切出時, 緩存??臻g
  • 所需緩存區(qū)尺寸int stack_size
  • 用戶協(xié)程棧容量尺寸int cap cap如果小于stack_size, 那么需要重新分配緩存區(qū), 否則可以直接緩存
  • 用戶協(xié)程上下文ucontext_t ctx

主要成員函數(shù)

  • 掛起協(xié)程函數(shù)void Yield();
    • 掛起當(dāng)前協(xié)程, 并SaveStack??臻g, 切換狀態(tài)至SUSPEND
  • 恢復(fù)協(xié)程函數(shù)void Resume()
    • 恢復(fù)該協(xié)程, 并ReloadStack??臻g
  • 緩存堆棧函數(shù)void SaveStack();
    • 調(diào)用時機(jī)是協(xié)程被切出, 會將協(xié)程調(diào)度對象中的堆棧緩存入用戶協(xié)程自身的緩存區(qū)
  • 載入堆棧函數(shù)void ReloadStack();
    • 調(diào)用時機(jī)是協(xié)程被恢復(fù)時, 會將該用戶協(xié)程的堆棧信息從緩存區(qū)回復(fù)到協(xié)程調(diào)度對象的堆棧中
  • 用戶協(xié)程邏輯虛函數(shù)virtual void CoProcess();
    • 用于派生成員中定義業(yè)務(wù)邏輯

2 協(xié)程調(diào)度類

  • 單例

數(shù)據(jù)成員

  • 協(xié)程池std::map<int, Coroutine*> crtPool;
  • 主協(xié)程上下文ucontext_t main_ctx
  • 協(xié)程堆棧char stack[DEFAULT_STACK_SIZE], 所有的協(xié)程都利用這塊區(qū)域

成員函數(shù)

  • 協(xié)程啟動函數(shù) void CoroutineNew(Coroutine * crt);
    • 初始化用戶協(xié)程(指定協(xié)程狀態(tài)RUNNING), 初始化用戶協(xié)程上下文(指定協(xié)程??臻gstack, 指定后繼上下文), 將協(xié)程加入?yún)f(xié)程池
  • 用戶協(xié)程入口函數(shù)static void CoroutineEntry(void * crt);
    • 指向用戶協(xié)程的入口函數(shù)
  • 協(xié)程恢復(fù)函數(shù)void Resume(int id);
    • 根據(jù)id恢復(fù)對應(yīng)協(xié)程
  • 檢查并清理協(xié)程池int HasCoroutine();
    • 清理FREE的協(xié)程, 并返回剩余的協(xié)程數(shù)量
  • 協(xié)程刪除函數(shù)void Remove(int id);
    • 刪除對應(yīng)協(xié)程

注意點(diǎn)

  1. 所有的用戶協(xié)程都使用調(diào)度器的棧空間, 每個用戶協(xié)程自身的buffer只不過用來作緩存
  2. SaveStack和ReloadStack函數(shù)的實(shí)現(xiàn)需要注意, 如何緩存協(xié)程棧
    • 協(xié)程棧是由用戶分配的, 如代碼中stack數(shù)組, 由于該數(shù)組的目的是用作??臻g, 而進(jìn)程中棧是預(yù)分配的, 即首先確定棧的高地址, 從高地址開始往低使用, 根據(jù)這一點(diǎn)我們可以確定需要被緩存的棧空間大小.
    char * stackBottom = SingleSchedule::GetInst()->GetStackBottom();    // 獲取到棧底, 即高地址
    char dumy = 0;                                                       // 最后創(chuàng)建的變量, 必然分配在棧頂
    assert(stackBottom-&dumy <= DEFAULT_STACK_SIZE);                     // 被棧緩存不能大于??臻g
    if (cap<stackBottom-&dumy){                                          // cap 代表當(dāng)前棧緩存大小, 如果不夠需要重分配
        if(buffer){                                                      // 釋放當(dāng)前棧緩存
            delete [] buffer;
        }
        cap = stackBottom-&dumy;
        buffer = new char[cap];
    }
    stack_size = stackBottom-&dumy;                                      // ??臻g大小
    memcpy(buffer, &dumy, stack_size);                                   // 緩存
    

代碼實(shí)現(xiàn)

https://github.com/trioZwShen/my_code_rep/tree/master/My_Coroutine

1 用戶協(xié)程

/**
 * @file    : Coroutine.h
 * @author  : neilzwshen
 * @time    : 2018-7-31
 * @version : 3.0
 * @remark  : 用戶協(xié)程類
 */

#ifndef COROUTINE_H_
#define COROUTINE_H_
#define DEFAULT_STACK_SIZE (1024*1024)
#include <stdio.h>
#include <string.h>
#include <ucontext.h>


enum CoState {FREE = 0, RUNNING = 1, SUSPEND = 2};

class Coroutine
{
public:
    Coroutine();
    virtual ~Coroutine();

    /**
     * 用戶協(xié)程入口函數(shù)
     */
    virtual void CoProcess();
    
    /**
     * 用戶協(xié)程恢復(fù)函數(shù)
     */
    void Resume();

    /**
     * 獲取協(xié)程id
     * @return [返回id]
     */
    int GetId()const {
        return id;
    }

    /**
     * 設(shè)置協(xié)程id
     */
    void SetId(int _id) {
        id = _id;
    }

    /**
     * 獲取協(xié)程狀態(tài)
     * @return [返回協(xié)程狀態(tài)]
     */
    int GetState()const {
        return state;
    }

    /**
     * 設(shè)置協(xié)程狀態(tài)
     */
    void SetState(CoState _state) {
        state = _state;
    }

protected:
    /**
     * 用戶協(xié)程掛起函數(shù)
     */
    void Yield();

    /**
     * 堆棧緩存
     */
    void SaveStack();

    /**
     * 堆?;謴?fù)
     */
    void ReloadStack();
    
public:
    char *buffer;       // 緩存協(xié)程堆棧
    ucontext_t ctx;

private:
    int stack_size;
    int cap;
    int id;
    CoState state;
};

#endif
#include <assert.h>
#include "Coroutine.h"
#include "Schedule.h"

Coroutine::Coroutine()
        :id(0),state(FREE),cap(0),stack_size(0),buffer(nullptr)
{

}

Coroutine::~Coroutine()
{
    delete [] buffer;
}

void Coroutine::CoProcess()
{

}

void Coroutine::Resume()
{
    if(state==SUSPEND){
        ReloadStack();
        state = RUNNING;
        swapcontext(&(SingleSchedule::GetInst()->mainCtx), &ctx);
    }
}

void Coroutine::Yield()
{
    if (state == RUNNING){
        SaveStack();
        state = SUSPEND;
        swapcontext(&ctx, &(SingleSchedule::GetInst()->mainCtx));
    }
}

void Coroutine::SaveStack()
{
    char * stackBottom = SingleSchedule::GetInst()->GetStackBottom();
    char dumy = 0;

    assert(stackBottom-&dumy <= DEFAULT_STACK_SIZE);
    if (cap<stackBottom-&dumy){
        if(buffer){
            delete [] buffer;
        }
        cap = stackBottom-&dumy;
        buffer = new char[cap];
    }
    stack_size = stackBottom-&dumy;
    memcpy(buffer, &dumy, stack_size);
}

void Coroutine::ReloadStack()
{
    memcpy(SingleSchedule::GetInst()->GetStackBottom()-stack_size, buffer, stack_size);
}

2 單例模板

/**
 * @file    : Singleton.h
 * @author  : neilzwshen
 * @time    : 2018-7-30
 * @version : 1.0
 * @remark  : 單例模板, 只要將對象作為T, 就可以獲取到一個單例對象, 構(gòu)造函數(shù)不能傳參
 */

#ifndef SINGLETON_H_
#define SINGLETON_H_

template<class T>
class Singleton {
public:
    /**
     * 單例獲取
     * @return [返回T的單例對象]
     */
    static T* GetInst(){
        if (!flag_instance){
            flag_instance = new Singleton();
        }
        return &flag_instance->_instance;
    }

protected:
    /**
     * 單例構(gòu)造
     */
    Singleton(){}

private:
    /**
     * T對象實(shí)例
     */
    T _instance;
    /**
     * 單例模板實(shí)例,
     */
    static Singleton<T> * flag_instance;
};

template<class T>
Singleton<T> * Singleton<T>::flag_instance = 0;

#endif

3 協(xié)程調(diào)度器

/**
 * @file    : Schedule.h
 * @author  : neilzwshen
 * @time    : 2018-7-31
 * @version : 3.0
 * @remark  : 協(xié)程調(diào)度類
 */

#ifndef SCHEDULE_H_
#define SCHEDULE_H_
#include <stdio.h>
#include <map>
#include <ucontext.h>
#include "Coroutine.h"
#include "Singleton.h"


typedef std::map<int, Coroutine*> CrtMap;

class Schedule
{
public:
    Schedule();
    virtual ~Schedule();

    /**
     * 用戶協(xié)程入口函數(shù)
     */
    static void CoroutineEntry(void * crt);

    /**
     * 將協(xié)程crt加入?yún)f(xié)程池, 并開啟
     * @param  crt [協(xié)程指針]
     */
    void CoroutineNew(Coroutine * crt);

    /**
     * 恢復(fù)用戶協(xié)程
     * @param id [description]
     */
    void Resume(int id);

    /**
     * 判斷協(xié)程池中是否還有未完成的協(xié)程, 并將已經(jīng)終止的協(xié)程刪除
     * @return [返回協(xié)程數(shù)]
     */
    int HasCoroutine();

    /**
     * 根據(jù)協(xié)程id刪除協(xié)程
     * @param id [協(xié)程id]
     */
    void Remove(int id);

    /**
     * 獲取到棧底
     * @return [返回棧底地址]
     */
    char* GetStackBottom(){
        return stack + DEFAULT_STACK_SIZE;
    }

public:
    ucontext_t mainCtx;
    char stack[DEFAULT_STACK_SIZE];     // 運(yùn)行協(xié)程堆棧

private:
    CrtMap crtPool;
};

typedef Singleton<Schedule> SingleSchedule;

#endif

#include <assert.h>
#include "Schedule.h"

Schedule::Schedule()
{

}

Schedule::~Schedule()
{

}

void Schedule::CoroutineEntry(void * crt) {
    ((Coroutine *)crt)->SetState(RUNNING);
    ((Coroutine *)crt)->CoProcess();
    ((Coroutine *)crt)->SetState(FREE);
}

void Schedule::CoroutineNew(Coroutine * crt) {
    
    int id = crt->GetId();
    CoState state = CoState(crt->GetState());
    assert(id != 0);
    assert(state == FREE);
    //printf("--%d,%d--\n",id, state);

    if (crtPool[id] != nullptr) {
        CrtMap::iterator it = crtPool.find(id);
        crtPool.erase(it);
    }
    
    // 構(gòu)建用戶協(xié)程上下文
    getcontext(&(crt->ctx));
    //memset(stack, 0, DEFAULT_STACK_SIZE);
    crt->ctx.uc_stack.ss_sp = stack;
    crt->ctx.uc_stack.ss_size = DEFAULT_STACK_SIZE;
    crt->ctx.uc_stack.ss_flags = 0;
    crt->ctx.uc_link = &mainCtx;
    crtPool[id] = crt;
    
    makecontext(&crt->ctx, (void(*)(void))CoroutineEntry, 1, (void *)crt);
    swapcontext(&mainCtx, &crt->ctx);
}

void Schedule::Resume(int id){
    if (crtPool[id] != nullptr) {
        crtPool[id]->Resume();
    }
}

int Schedule::HasCoroutine() {
    int count = 0;
    CrtMap::iterator it;
    for (it = crtPool.begin(); it != crtPool.end(); it++) {
        if (it->second->GetState() != FREE) {
            count++;
        }else{
            it=crtPool.erase(it);
            it--;
        }
    }
    return count;
}

void Schedule::Remove(int id) {
    if (crtPool[id] != nullptr) {
        crtPool.erase(crtPool.find(id));
    }
}

4 示例

#include <stdio.h>
#include <memory>
#include "Coroutine.h"
#include "Schedule.h"


class Logic1 : public Coroutine{
    void CoProcess(){
        puts("1");
        Yield();
        puts("4");
        Yield();
        puts("7");
    }
};

class Logic2 : public Coroutine{
    void CoProcess(){
        puts("2");
        Yield();
        puts("5");
        Yield();
        puts("8");
    }
};

class Logic3 : public Coroutine{
    void CoProcess(){
        puts("3");
        Yield();
        puts("6");
        Yield();
        puts("9");
    }
};

int main() {

    std::shared_ptr<Coroutine> ct1(new Logic1());
    std::shared_ptr<Coroutine> ct2(new Logic2());
    std::shared_ptr<Coroutine> ct3(new Logic3());

    ct1->SetId(1);
    ct2->SetId(2);
    ct3->SetId(3);

    SingleSchedule::GetInst()->CoroutineNew(ct1.get());
    SingleSchedule::GetInst()->CoroutineNew(ct2.get());
    SingleSchedule::GetInst()->CoroutineNew(ct3.get());

    SingleSchedule::GetInst()->Resume(1);
    SingleSchedule::GetInst()->Resume(2);
    SingleSchedule::GetInst()->Resume(3);
    SingleSchedule::GetInst()->Resume(1);
    SingleSchedule::GetInst()->Resume(2);
    SingleSchedule::GetInst()->Resume(3);


    //SingleSchedule::GetInst()->Remove(1);
    //SingleSchedule::GetInst()->Remove(2);
    //SingleSchedule::GetInst()->Remove(3);

    int count = SingleSchedule::GetInst()->HasCoroutine();
    printf("%d\n", count);

    return 0;
}

5 執(zhí)行結(jié)果

szw@szw-VirtualBox:~/code/coroutine/temp/My_Coroutine_3_0$ ./main 
1
2
3
4
5
6
7
8
9
0
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容