最近發(fā)現(xiàn)了適合C++開發(fā)者進(jìn)階的開源項(xiàng)目,這個(gè)項(xiàng)目的名字叫workflow,項(xiàng)目地址如下:
sogou/workflowgithub.com/sogou/workflow
workflow是搜狗公司的服務(wù)器引擎,幾乎搜狗所有的后端C++服務(wù)和其他幾十家公司都在使用這個(gè)引擎,每日處理超百億請求。
其實(shí)去年在purecpp大會就聽過一個(gè)美女大佬介紹過這個(gè)項(xiàng)目,當(dāng)時(shí)就感慨這么項(xiàng)目怎么這么牛逼,但是也沒具體了解,直到最近又聽別人提起,我就仔細(xì)看了看項(xiàng)目的介紹,又研究了半個(gè)月項(xiàng)目的源碼,由衷感慨作者超強(qiáng)的架構(gòu)能力,明白了自己和架構(gòu)師之間巨大的差距,還是得多學(xué)習(xí)啊,肝。
本文目錄:
- 框架有什么特點(diǎn)?
- 框架能做什么?
- 我為什么要推薦這個(gè)開源項(xiàng)目?
框架有什么特點(diǎn)?
- 用戶體驗(yàn)相當(dāng)好:接口簡潔,支持常用協(xié)議,使用簡單,具體怎么簡單我下面會介紹;
- 性能好:不單網(wǎng)絡(luò)、磁盤IO、CPU計(jì)算等,workflow著眼于所有異步資源都盡可能全部調(diào)起,有相當(dāng)充足的測試數(shù)據(jù)證明該框架的性能較目前主流的服務(wù)端框架更好;
- 穩(wěn)定性高:搜狗和其他好多公司都在使用這個(gè)引擎,穩(wěn)定性肯定高啊,大家也可以自己去查數(shù)據(jù),我就不貼了;
- 支持多種平臺:項(xiàng)目支持Linux、macOS、Windows、Android等操作系統(tǒng)。
- 解放用戶生產(chǎn)力:用戶接觸到的只有任務(wù)(Task)和任務(wù)流(series)兩種概念,框架將資源高度封裝,用戶無需接觸到線程池、連接池、文件IO與各種異步通知機(jī)制等。用戶無需關(guān)心內(nèi)部細(xì)節(jié),可以將更多精力用在實(shí)現(xiàn)復(fù)雜的業(yè)務(wù)邏輯上。
- 設(shè)計(jì)理念新穎:源碼值得學(xué)習(xí),我也是看了這個(gè)項(xiàng)目的源碼后才推薦給大家的,看完才知道,原來代碼可以這么寫,繼承可以這么玩。
框架能做什么?
框架能做的事情很多,我這里只介紹一些個(gè)人認(rèn)為比較重要的功能,更多功能還需要大家自行解鎖。
輕松的搭建server:不用多說,服務(wù)端框架如果不能搭建server那還玩啥了,但使用這個(gè)框架非常方便,以http server為例,只需要簡單幾行代碼即可:
#include <stdio.h>
#include "workflow/WFHttpServer.h"
int main() {
WFHttpServer server([](WFHttpTask *task) {
task->get_resp()->append_output_body("Hello World!");
});
if (server.start(8888) == 0) { // start server on port 8888
getchar(); // press "Enter" to end.
server.stop();
}
return 0;
}
輕松高效的發(fā)起客戶端請求:項(xiàng)目號稱可作為萬能異步客戶端,目前支持http,redis,mysql、websocket和kafka協(xié)議,下面是官方給出的一個(gè)mysql的客戶端示例:
int main(int argc, char *argv[]) {
...
WFMySQLTask *task = WFTaskFactory::create_mysql_task(url, RETRY_MAX, mysql_callback);
task->get_req()->set_query("SHOW TABLES;");
...
task->start();
...
}
以往的C++ server需要訪問mysql時(shí),可能使用的是傳統(tǒng)的客戶端。在一個(gè)線程下以同步阻塞的方式等待數(shù)據(jù)到來。如果有多個(gè)網(wǎng)絡(luò)請求希望并發(fā),那么用戶需要管理多個(gè)mysql cli對象。
workflow完美的解決了這一系列問題,把所有這種用戶請求交給內(nèi)部的poller線程統(tǒng)一管理,實(shí)現(xiàn)了高效的非阻塞IO行為,提升了server作為客戶端請求數(shù)據(jù)時(shí)的性能表現(xiàn)。再也不用擔(dān)心這種客戶端行為影響server整體的性能。
再看個(gè)使用http協(xié)議的wget示例:
int main(int argc, char *argv[]) {
WFHttpTask *task;
std::string url = argv[1];
url = "http://" + url;
task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
wget_callback);
protocol::HttpRequest *req = task->get_req();
req->add_header_pair("Accept", "*/*");
req->add_header_pair("User-Agent", "Wget/1.14 (linux-gnu)");
req->add_header_pair("Connection", "close");
task->start();
wait_group.wait();
return 0;
}
首先發(fā)起了http請求,在wget_callback中處理http返回的消息體:
void wget_callback(WFHttpTask *task) {
protocol::HttpRequest *req = task->get_req();
protocol::HttpResponse *resp = task->get_resp();
int state = task->get_state();
int error = task->get_error();
std::string name;
std::string value;
protocol::HttpHeaderCursor req_cursor(req);
while (req_cursor.next(name, value))
fprintf(stderr, "%s: %s\r\n", name.c_str(), value.c_str());
fprintf(stderr, "\r\n");
/* Print response header. */
fprintf(stderr, "%s %s %s\r\n", resp->get_http_version(),
resp->get_status_code(),
resp->get_reason_phrase());
protocol::HttpHeaderCursor resp_cursor(resp);
while (resp_cursor.next(name, value))
fprintf(stderr, "%s: %s\r\n", name.c_str(), value.c_str());
fprintf(stderr, "\r\n");
/* Print response body. */
const void *body;
size_t body_len;
resp->get_parsed_body(&body, &body_len);
fwrite(body, 1, body_len, stdout);
fflush(stdout);
fprintf(stderr, "\nSuccess. Press Ctrl-C to exit.\n");
}
就這么輕松的完成了wget的功能。
支持自定義協(xié)議client/server:用戶可構(gòu)建自己的RPC系統(tǒng),搜狗有個(gè)開源項(xiàng)目srpc就是以這個(gè)框架為基礎(chǔ)實(shí)現(xiàn)的。
可構(gòu)建異步任務(wù)流:支持串連,支持并聯(lián),支持串并聯(lián)的組合體,也支持復(fù)雜的DAG結(jié)構(gòu)。
異步IO:在Linux系統(tǒng)下可作為文件異步IO工具使用,性能超過任何標(biāo)準(zhǔn)調(diào)用。
通信與計(jì)算一體化:多數(shù)框架都著重于網(wǎng)絡(luò)IO的效率問題,而計(jì)算與任務(wù)調(diào)度等需要用戶自己實(shí)現(xiàn),workflow會自動對任務(wù)進(jìn)行調(diào)度,打通網(wǎng)絡(luò)和磁盤等資源,特別適合需要網(wǎng)絡(luò)通信的重計(jì)算模塊。
我為什么要推薦這個(gè)項(xiàng)目?
主要就一點(diǎn):值得學(xué)習(xí),適合C++開發(fā)者進(jìn)階,那具體學(xué)習(xí)什么?
學(xué)習(xí)系統(tǒng)的設(shè)計(jì),所謂初級重實(shí)現(xiàn),中級重炫技,高級重設(shè)計(jì)。
在作者的設(shè)計(jì)理念中,一切業(yè)務(wù)邏輯皆是任務(wù),多個(gè)任務(wù)會組成任務(wù)流,任務(wù)流可組成圖,這個(gè)圖可能是串連圖,可能是并聯(lián)圖,也有可能是串并聯(lián)圖,類似于這種:
[圖片上傳失敗...(image-fc5367-1628426033482)]
也有可能是這種復(fù)雜的DAG圖:
[圖片上傳失敗...(image-855834-1628426033481)]
當(dāng)然圖的層次結(jié)構(gòu)可由用戶自定義,個(gè)人認(rèn)為框架最牛逼的一點(diǎn)就是支持動態(tài)創(chuàng)建任務(wù)流。
使用下面這段代碼可以很直觀友好的構(gòu)造出圖的結(jié)構(gòu),你有沒有很好奇這段代碼是怎么實(shí)現(xiàn)的?
WFGraphNode a, b, c, d;
a-->b;
a-->c;
b-->d;
c-->d;
這里先賣個(gè)關(guān)子,感興趣的自己去看吧,總貼人家代碼也不好。
我認(rèn)為項(xiàng)目最值得大家學(xué)習(xí)的就是架構(gòu)的設(shè)計(jì),特別任務(wù)與任務(wù)流的設(shè)計(jì),我現(xiàn)在還沒看完代碼,畫不出架構(gòu)的設(shè)計(jì)圖(也怕畫錯(cuò)了),只能籠統(tǒng)的說一句牛逼,因?yàn)榇_實(shí)驚艷到我了。
貼一個(gè)workflow的關(guān)于Task的架構(gòu)圖:

再簡單的貼一個(gè)定時(shí)器Task的實(shí)現(xiàn)代碼給大家看看:
項(xiàng)目里所有的Task都通過工廠創(chuàng)建:
static WFTimerTask *create_timer_task(const std::string& timer_name,
unsigned int microseconds,
timer_callback_t callback);
看下WFTimerTask的設(shè)計(jì):
class WFTimerTask : public SleepRequest {
public:
void start() {
assert(!series_of(this));
Workflow::start_series_work(this, nullptr);
}
void dismiss() {
assert(!series_of(this));
delete this;
}
protected:
virtual SubTask *done() {
if (this->callback)
this->callback(this);
}
};
再看下Workflow::start_series_work()的方法:
inline void Workflow::start_series_work(SubTask *first, series_callback_t callback) {
new SeriesWork(first, std::move(callback));
first->dispatch();
}
然后是SleepRequest:
class SleepRequest : public SubTask, public SleepSession {
public:
virtual void dispatch() {
if (this->scheduler->sleep(this) < 0) {
this->state = SS_STATE_ERROR;
this->error = errno;
this->subtask_done();
}
}
protected:
CommScheduler *scheduler;
virtual void handle(int state, int error) {
this->state = state;
this->error = error;
this->subtask_done();
}
};
再看下scheduler中的這個(gè)sleep()方法:
int Communicator::sleep(SleepSession *session) {
struct timespec value;
if (session->duration(&value) >= 0) {
if (mpoller_add_timer(&value, session, this->mpoller) >= 0)
return 0;
}
return -1;
}
然后是SubTask:
class SubTask {
public:
virtual void dispatch() = 0;
private:
virtual SubTask *done() = 0;
protected:
void subtask_done();
};
然后是subtask_done()方法的實(shí)現(xiàn):
void SubTask::subtask_done() {
SubTask *cur = this;
ParallelTask *parent;
SubTask **entry;
while (1) {
parent = cur->parent;
entry = cur->entry;
cur = cur->done();
xxx
break;
}
}
然后是SleepSession:
class SleepSession {
private:
virtual int duration(struct timespec *value) = 0;
virtual void handle(int state, int error) = 0;
public:
virtual ~SleepSession() { }
friend class Communicator;
};
看了這么多源碼,那WFTimerTask是如何實(shí)現(xiàn)的定時(shí)功能呢?
我總結(jié)了下面幾步:
步驟1:用戶調(diào)用WFTimerTask的start();
步驟2:start()中調(diào)用Workflow::start_series_work()方法;
步驟3:start_series_work()中調(diào)用SubTask的dispatch()方法,這個(gè)dispatch()方法由SubTask的子類SleepRequest(WFTimerTask的父類)實(shí)現(xiàn);
步驟4:SleepRequest類的dispath()方法會調(diào)用scheduler->sleep()方法;
步驟5:sleep()方法會調(diào)用SleepSession的duration()方法獲取具體sleep的時(shí)間,框架內(nèi)部用了timerfd把超時(shí)時(shí)間交給操作系統(tǒng),時(shí)間到了會通知框架層,進(jìn)而觸發(fā)SleepSession中的handle()調(diào)用;
步驟6:handle()的實(shí)現(xiàn)中調(diào)用subtask_done()方法;
步驟7:subtask_done()中會調(diào)用SubTask中的done()方法;
步驟8:這個(gè)done()方法具體由WFTimerTask覆蓋,實(shí)現(xiàn)中會調(diào)用到具體時(shí)間后觸發(fā)的回調(diào)函數(shù)。
乍一看可能感覺非常麻煩,為什么實(shí)現(xiàn)一個(gè)普通的定時(shí)功能會搞這么多繼承關(guān)系,但你真正看了源碼后就會發(fā)現(xiàn),項(xiàng)目抽象出的所有Task,比如計(jì)數(shù)器Task、文件IOTask、網(wǎng)絡(luò)Task、MySQLTask等,都是通過這種SubTask、XXXRequest、XXXSession的形式來實(shí)現(xiàn),后期再來個(gè)XXXTask可以很方便的擴(kuò)展,這才是優(yōu)秀項(xiàng)目該有的架構(gòu),真的佩服。
讀者們,你們可以設(shè)計(jì)出這么高端的架構(gòu)嗎?反正我要肝完這個(gè)項(xiàng)目,也推薦給大家一起學(xué)習(xí)!
小總結(jié)
最后總結(jié)了該項(xiàng)目中個(gè)人認(rèn)為值得我們學(xué)習(xí)的地方:
- 接口的設(shè)計(jì):項(xiàng)目的接入極其簡單,幾行代碼就可搭建個(gè)client或者server,幾行代碼也可構(gòu)建出簡單的任務(wù)流圖,可用于處理復(fù)雜的業(yè)務(wù)邏輯;
- 架構(gòu)的設(shè)計(jì):項(xiàng)目中的各種類是如何派生的,作者的設(shè)計(jì)思路是怎么樣的;
- 網(wǎng)絡(luò)通信:項(xiàng)目沒有使用任何網(wǎng)絡(luò)框架,而是使用網(wǎng)絡(luò)裸接口進(jìn)行網(wǎng)絡(luò)通信,我們都知道在大型項(xiàng)目中使用網(wǎng)絡(luò)裸接口進(jìn)行網(wǎng)絡(luò)通信需要處理很多異常條件,這里值得學(xué)習(xí)一波;
- 任務(wù)流的封裝:為什么可以動態(tài)的構(gòu)建任務(wù)流的串并聯(lián)圖,并在項(xiàng)目內(nèi)部靈活的調(diào)度呢?
- 文件I/O:項(xiàng)目號稱內(nèi)部文件I/O操作比標(biāo)準(zhǔn)調(diào)用性能還好,它是怎么做到的?
- 內(nèi)存的管理:項(xiàng)目沒有使用任何智能指針,卻能管理好內(nèi)存問題,這是個(gè)技術(shù)活,當(dāng)然,也得益于這優(yōu)秀的架構(gòu)設(shè)計(jì)。
我發(fā)現(xiàn)workflow團(tuán)隊(duì)對這個(gè)項(xiàng)目相當(dāng)重視,還特意建了個(gè)QQ交流群(群號碼是618773193),對此項(xiàng)目有任何問題都可以在這個(gè)群里探討,也方便了我們學(xué)習(xí),真的不錯(cuò)。
參考資料:
<u style="text-decoration: none; border-bottom: 1px dashed grey;">https://zhuanlan.zhihu.com/p/358869362</u>
<u style="text-decoration: none; border-bottom: 1px dashed grey;">https://zhuanlan.zhihu.com/p/165638263</u>
項(xiàng)目地址如下:
sogou/workflowgithub.com/sogou/workflow
在訪問GitHub遇到困難時(shí),可使用他們的Gitee官方倉庫:
搜狗開源/workflowgitee.com/sogou/workflow[圖片上傳失敗...(image-45b969-1628426033468)]
感覺這個(gè)項(xiàng)目值得學(xué)習(xí)的話就給人家個(gè)star,不要白嫖哈,對項(xiàng)目團(tuán)隊(duì)來說也是一種認(rèn)可和鼓勵(lì)。

