workflow這個(gè)C++開源項(xiàng)目值得學(xué)習(xí)

最近發(fā)現(xiàn)了適合C++開發(fā)者進(jìn)階的開源項(xiàng)目,這個(gè)項(xiàng)目的名字叫workflow,項(xiàng)目地址如下:

sogou/workflowgithub.com/sogou/workflow

workflow是搜狗公司的服務(wù)器引擎,幾乎搜狗所有的后端C++服務(wù)和其他幾十家公司都在使用這個(gè)引擎,每日處理超百億請求。

其實(shí)去年在purecpp大會就聽過一個(gè)美女大佬介紹過這個(gè)項(xiàng)目,當(dāng)時(shí)就感慨這么項(xiàng)目怎么這么牛逼,但是也沒具體了解,直到最近又聽別人提起,我就仔細(xì)看了看項(xiàng)目的介紹,又研究了半個(gè)月項(xiàng)目的源碼,由衷感慨作者超強(qiáng)的架構(gòu)能力,明白了自己和架構(gòu)師之間巨大的差距,還是得多學(xué)習(xí)啊,肝。

本文目錄:

  • 框架有什么特點(diǎn)?
  • 框架能做什么?
  • 我為什么要推薦這個(gè)開源項(xiàng)目?

框架有什么特點(diǎn)?

  • 用戶體驗(yàn)相當(dāng)好:接口簡潔,支持常用協(xié)議,使用簡單,具體怎么簡單我下面會介紹;
  • 性能好:不單網(wǎng)絡(luò)、磁盤IO、CPU計(jì)算等,workflow著眼于所有異步資源都盡可能全部調(diào)起,有相當(dāng)充足的測試數(shù)據(jù)證明該框架的性能較目前主流的服務(wù)端框架更好;
  • 穩(wěn)定性高:搜狗和其他好多公司都在使用這個(gè)引擎,穩(wěn)定性肯定高啊,大家也可以自己去查數(shù)據(jù),我就不貼了;
  • 支持多種平臺:項(xiàng)目支持Linux、macOS、Windows、Android等操作系統(tǒng)。
  • 解放用戶生產(chǎn)力:用戶接觸到的只有任務(wù)(Task)和任務(wù)流(series)兩種概念,框架將資源高度封裝,用戶無需接觸到線程池、連接池、文件IO與各種異步通知機(jī)制等。用戶無需關(guān)心內(nèi)部細(xì)節(jié),可以將更多精力用在實(shí)現(xiàn)復(fù)雜的業(yè)務(wù)邏輯上。
  • 設(shè)計(jì)理念新穎:源碼值得學(xué)習(xí),我也是看了這個(gè)項(xiàng)目的源碼后才推薦給大家的,看完才知道,原來代碼可以這么寫,繼承可以這么玩。

框架能做什么?

框架能做的事情很多,我這里只介紹一些個(gè)人認(rèn)為比較重要的功能,更多功能還需要大家自行解鎖。

輕松的搭建server:不用多說,服務(wù)端框架如果不能搭建server那還玩啥了,但使用這個(gè)框架非常方便,以http server為例,只需要簡單幾行代碼即可:

#include <stdio.h>
#include "workflow/WFHttpServer.h"

int main() {
    WFHttpServer server([](WFHttpTask *task) {
        task->get_resp()->append_output_body("Hello World!");
    });
    if (server.start(8888) == 0) { // start server on port 8888
        getchar(); // press "Enter" to end.
        server.stop();
    }
    return 0;
}

輕松高效的發(fā)起客戶端請求:項(xiàng)目號稱可作為萬能異步客戶端,目前支持http,redis,mysql、websocket和kafka協(xié)議,下面是官方給出的一個(gè)mysql的客戶端示例:

int main(int argc, char *argv[]) {
    ...
    WFMySQLTask *task = WFTaskFactory::create_mysql_task(url, RETRY_MAX, mysql_callback);
    task->get_req()->set_query("SHOW TABLES;");
    ...
    task->start();
    ...
}

以往的C++ server需要訪問mysql時(shí),可能使用的是傳統(tǒng)的客戶端。在一個(gè)線程下以同步阻塞的方式等待數(shù)據(jù)到來。如果有多個(gè)網(wǎng)絡(luò)請求希望并發(fā),那么用戶需要管理多個(gè)mysql cli對象。

workflow完美的解決了這一系列問題,把所有這種用戶請求交給內(nèi)部的poller線程統(tǒng)一管理,實(shí)現(xiàn)了高效的非阻塞IO行為,提升了server作為客戶端請求數(shù)據(jù)時(shí)的性能表現(xiàn)。再也不用擔(dān)心這種客戶端行為影響server整體的性能。

再看個(gè)使用http協(xié)議的wget示例:

int main(int argc, char *argv[]) {
    WFHttpTask *task; 
    std::string url = argv[1];
    url = "http://" + url;
    task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
                                           wget_callback);
    protocol::HttpRequest *req = task->get_req();
    req->add_header_pair("Accept", "*/*");
    req->add_header_pair("User-Agent", "Wget/1.14 (linux-gnu)");
    req->add_header_pair("Connection", "close");
    task->start();
    wait_group.wait();
    return 0;
}

首先發(fā)起了http請求,在wget_callback中處理http返回的消息體:

void wget_callback(WFHttpTask *task) {
    protocol::HttpRequest *req = task->get_req();
    protocol::HttpResponse *resp = task->get_resp();
    int state = task->get_state();
    int error = task->get_error();

    std::string name;
    std::string value;
    protocol::HttpHeaderCursor req_cursor(req);

    while (req_cursor.next(name, value))
        fprintf(stderr, "%s: %s\r\n", name.c_str(), value.c_str());
    fprintf(stderr, "\r\n");

    /* Print response header. */
    fprintf(stderr, "%s %s %s\r\n", resp->get_http_version(),
                                    resp->get_status_code(),
                                    resp->get_reason_phrase());

    protocol::HttpHeaderCursor resp_cursor(resp);
    while (resp_cursor.next(name, value))
        fprintf(stderr, "%s: %s\r\n", name.c_str(), value.c_str());
    fprintf(stderr, "\r\n");
    /* Print response body. */
    const void *body;
    size_t body_len;

    resp->get_parsed_body(&body, &body_len);
    fwrite(body, 1, body_len, stdout);
    fflush(stdout);

    fprintf(stderr, "\nSuccess. Press Ctrl-C to exit.\n");
}

就這么輕松的完成了wget的功能。

支持自定義協(xié)議client/server:用戶可構(gòu)建自己的RPC系統(tǒng),搜狗有個(gè)開源項(xiàng)目srpc就是以這個(gè)框架為基礎(chǔ)實(shí)現(xiàn)的。

可構(gòu)建異步任務(wù)流:支持串連,支持并聯(lián),支持串并聯(lián)的組合體,也支持復(fù)雜的DAG結(jié)構(gòu)。

異步IO:在Linux系統(tǒng)下可作為文件異步IO工具使用,性能超過任何標(biāo)準(zhǔn)調(diào)用。

通信與計(jì)算一體化:多數(shù)框架都著重于網(wǎng)絡(luò)IO的效率問題,而計(jì)算與任務(wù)調(diào)度等需要用戶自己實(shí)現(xiàn),workflow會自動對任務(wù)進(jìn)行調(diào)度,打通網(wǎng)絡(luò)和磁盤等資源,特別適合需要網(wǎng)絡(luò)通信的重計(jì)算模塊。

我為什么要推薦這個(gè)項(xiàng)目?

主要就一點(diǎn):值得學(xué)習(xí),適合C++開發(fā)者進(jìn)階,那具體學(xué)習(xí)什么?

學(xué)習(xí)系統(tǒng)的設(shè)計(jì),所謂初級重實(shí)現(xiàn),中級重炫技,高級重設(shè)計(jì)。

在作者的設(shè)計(jì)理念中,一切業(yè)務(wù)邏輯皆是任務(wù),多個(gè)任務(wù)會組成任務(wù)流,任務(wù)流可組成圖,這個(gè)圖可能是串連圖,可能是并聯(lián)圖,也有可能是串并聯(lián)圖,類似于這種:

[圖片上傳失敗...(image-fc5367-1628426033482)]

也有可能是這種復(fù)雜的DAG圖:

[圖片上傳失敗...(image-855834-1628426033481)]

當(dāng)然圖的層次結(jié)構(gòu)可由用戶自定義,個(gè)人認(rèn)為框架最牛逼的一點(diǎn)就是支持動態(tài)創(chuàng)建任務(wù)流。

使用下面這段代碼可以很直觀友好的構(gòu)造出圖的結(jié)構(gòu),你有沒有很好奇這段代碼是怎么實(shí)現(xiàn)的?

WFGraphNode a, b, c, d;
a-->b;
a-->c;
b-->d;
c-->d;

這里先賣個(gè)關(guān)子,感興趣的自己去看吧,總貼人家代碼也不好。

我認(rèn)為項(xiàng)目最值得大家學(xué)習(xí)的就是架構(gòu)的設(shè)計(jì),特別任務(wù)與任務(wù)流的設(shè)計(jì),我現(xiàn)在還沒看完代碼,畫不出架構(gòu)的設(shè)計(jì)圖(也怕畫錯(cuò)了),只能籠統(tǒng)的說一句牛逼,因?yàn)榇_實(shí)驚艷到我了。

貼一個(gè)workflow的關(guān)于Task的架構(gòu)圖:

image

再簡單的貼一個(gè)定時(shí)器Task的實(shí)現(xiàn)代碼給大家看看:

項(xiàng)目里所有的Task都通過工廠創(chuàng)建:

static WFTimerTask *create_timer_task(const std::string& timer_name,
                                          unsigned int microseconds,
                                          timer_callback_t callback);

看下WFTimerTask的設(shè)計(jì):

class WFTimerTask : public SleepRequest {
public:
    void start() {
        assert(!series_of(this));
        Workflow::start_series_work(this, nullptr);
    }
    void dismiss() {
        assert(!series_of(this));
        delete this;
    }
protected:
    virtual SubTask *done() {
        if (this->callback)
            this->callback(this);
    }
};

再看下Workflow::start_series_work()的方法:

inline void Workflow::start_series_work(SubTask *first, series_callback_t callback) {
    new SeriesWork(first, std::move(callback));
    first->dispatch();
}

然后是SleepRequest:

class SleepRequest : public SubTask, public SleepSession {
public:
    virtual void dispatch() {
        if (this->scheduler->sleep(this) < 0) {
            this->state = SS_STATE_ERROR;
            this->error = errno;
            this->subtask_done();
        }
    }
protected:
    CommScheduler *scheduler;
    virtual void handle(int state, int error) {
        this->state = state;
        this->error = error;
        this->subtask_done();
    }
};

再看下scheduler中的這個(gè)sleep()方法:

int Communicator::sleep(SleepSession *session) {
    struct timespec value;
    if (session->duration(&value) >= 0) {
        if (mpoller_add_timer(&value, session, this->mpoller) >= 0)
            return 0;
    }
    return -1;
}

然后是SubTask:

class SubTask {
public:
    virtual void dispatch() = 0;
private:
    virtual SubTask *done() = 0;
protected:
    void subtask_done();
};

然后是subtask_done()方法的實(shí)現(xiàn):

void SubTask::subtask_done() {
    SubTask *cur = this;
    ParallelTask *parent;
    SubTask **entry;
    while (1) {
        parent = cur->parent;
        entry = cur->entry;
        cur = cur->done();
        xxx
        break;
    }
}

然后是SleepSession:

class SleepSession {
private:
    virtual int duration(struct timespec *value) = 0;
    virtual void handle(int state, int error) = 0;
public:
    virtual ~SleepSession() { }
    friend class Communicator;
};

看了這么多源碼,那WFTimerTask是如何實(shí)現(xiàn)的定時(shí)功能呢?

我總結(jié)了下面幾步:

步驟1:用戶調(diào)用WFTimerTask的start();

步驟2:start()中調(diào)用Workflow::start_series_work()方法;

步驟3:start_series_work()中調(diào)用SubTask的dispatch()方法,這個(gè)dispatch()方法由SubTask的子類SleepRequest(WFTimerTask的父類)實(shí)現(xiàn);

步驟4:SleepRequest類的dispath()方法會調(diào)用scheduler->sleep()方法;

步驟5:sleep()方法會調(diào)用SleepSession的duration()方法獲取具體sleep的時(shí)間,框架內(nèi)部用了timerfd把超時(shí)時(shí)間交給操作系統(tǒng),時(shí)間到了會通知框架層,進(jìn)而觸發(fā)SleepSession中的handle()調(diào)用;

步驟6:handle()的實(shí)現(xiàn)中調(diào)用subtask_done()方法;

步驟7:subtask_done()中會調(diào)用SubTask中的done()方法;

步驟8:這個(gè)done()方法具體由WFTimerTask覆蓋,實(shí)現(xiàn)中會調(diào)用到具體時(shí)間后觸發(fā)的回調(diào)函數(shù)。

乍一看可能感覺非常麻煩,為什么實(shí)現(xiàn)一個(gè)普通的定時(shí)功能會搞這么多繼承關(guān)系,但你真正看了源碼后就會發(fā)現(xiàn),項(xiàng)目抽象出的所有Task,比如計(jì)數(shù)器Task、文件IOTask、網(wǎng)絡(luò)Task、MySQLTask等,都是通過這種SubTask、XXXRequest、XXXSession的形式來實(shí)現(xiàn),后期再來個(gè)XXXTask可以很方便的擴(kuò)展,這才是優(yōu)秀項(xiàng)目該有的架構(gòu),真的佩服。

讀者們,你們可以設(shè)計(jì)出這么高端的架構(gòu)嗎?反正我要肝完這個(gè)項(xiàng)目,也推薦給大家一起學(xué)習(xí)!

小總結(jié)

最后總結(jié)了該項(xiàng)目中個(gè)人認(rèn)為值得我們學(xué)習(xí)的地方:

  • 接口的設(shè)計(jì):項(xiàng)目的接入極其簡單,幾行代碼就可搭建個(gè)client或者server,幾行代碼也可構(gòu)建出簡單的任務(wù)流圖,可用于處理復(fù)雜的業(yè)務(wù)邏輯;
  • 架構(gòu)的設(shè)計(jì):項(xiàng)目中的各種類是如何派生的,作者的設(shè)計(jì)思路是怎么樣的;
  • 網(wǎng)絡(luò)通信:項(xiàng)目沒有使用任何網(wǎng)絡(luò)框架,而是使用網(wǎng)絡(luò)裸接口進(jìn)行網(wǎng)絡(luò)通信,我們都知道在大型項(xiàng)目中使用網(wǎng)絡(luò)裸接口進(jìn)行網(wǎng)絡(luò)通信需要處理很多異常條件,這里值得學(xué)習(xí)一波;
  • 任務(wù)流的封裝:為什么可以動態(tài)的構(gòu)建任務(wù)流的串并聯(lián)圖,并在項(xiàng)目內(nèi)部靈活的調(diào)度呢?
  • 文件I/O:項(xiàng)目號稱內(nèi)部文件I/O操作比標(biāo)準(zhǔn)調(diào)用性能還好,它是怎么做到的?
  • 內(nèi)存的管理:項(xiàng)目沒有使用任何智能指針,卻能管理好內(nèi)存問題,這是個(gè)技術(shù)活,當(dāng)然,也得益于這優(yōu)秀的架構(gòu)設(shè)計(jì)。

我發(fā)現(xiàn)workflow團(tuán)隊(duì)對這個(gè)項(xiàng)目相當(dāng)重視,還特意建了個(gè)QQ交流群(群號碼是618773193),對此項(xiàng)目有任何問題都可以在這個(gè)群里探討,也方便了我們學(xué)習(xí),真的不錯(cuò)。

參考資料:

<u style="text-decoration: none; border-bottom: 1px dashed grey;">https://zhuanlan.zhihu.com/p/358869362</u>

<u style="text-decoration: none; border-bottom: 1px dashed grey;">https://zhuanlan.zhihu.com/p/165638263</u>

項(xiàng)目地址如下:

sogou/workflowgithub.com/sogou/workflow

在訪問GitHub遇到困難時(shí),可使用他們的Gitee官方倉庫:

搜狗開源/workflowgitee.com/sogou/workflow[圖片上傳失敗...(image-45b969-1628426033468)]

感覺這個(gè)項(xiàng)目值得學(xué)習(xí)的話就給人家個(gè)star,不要白嫖哈,對項(xiàng)目團(tuán)隊(duì)來說也是一種認(rèn)可和鼓勵(lì)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容