開源項目Workflow是C++異步調度的高性能框架,廣泛用于高吞吐低延遲的網絡服務器、并行計算和組裝復雜網絡請求的客戶端等領域。在異步調度的編程范式下,想要實現(xiàn)并發(fā)控制是非常困難的,因為一旦無法做到無阻塞的調度,那么框架性能就會大打折扣。
線上非常常見的場景是:異步服務器需要限制用戶的并發(fā),從而保護有限的后端資源比如GPU計算,并在超載時可以立刻拒絕用戶或者實施排隊等待的處理策略。
一個好的并發(fā)控制組件,應該是因框架制宜,實現(xiàn)上能夠做到完全非阻塞,而語義上能做到足夠簡單、抽象、通用。因此在這里介紹一下C++ Workflow項目中的ResourcePool資源池,歡迎正在使用Workflow的小伙伴嘗試,相信可以讓你的代碼簡化不少,也歡迎有類似場景的開發(fā)者們參考與交流~~~
https://github.com/sogou/workflow
一、信號量Semaphore
信號量(Semaphore)小伙伴們都知道,它由大名鼎鼎的Dijkstra發(fā)明(就是那個也發(fā)明與自己同名的最短路算法的計算機科學家)~ 這是一個用于并發(fā)和同步場景下的抽象概念:假設對任何對象我們都想要控制訪問它的并發(fā)度為n,那么定義出兩個簡單的操作就可以做到。
- P操作:將n減1
- V操作:將n加1
結合定義,我們不難思考出這兩個操作的底層邏輯:
- 我們想操作任何一個資源的時候,就可以執(zhí)行P;用完資源想還回去,可以執(zhí)行V;
- 資源不是無限的,執(zhí)行P如果n減到要小于0了,說明資源不夠,那你就等著;
- 執(zhí)行V把資源歸還,意味著有人在等的時候,你可以叫醒這個人;
這個抽象的語義,具體到每個系統(tǒng)中的實現(xiàn)是不一樣的。
雖然Workflow構思資源池的時候并不是奔著實現(xiàn)信號量的語義去做的,但是基于任務流的語義想要解決并發(fā)控制的問題時,會發(fā)現(xiàn)與信號量的概念殊途同歸。

二、資源池ResourcePool
了解了語義和要解決的問題,那么我們看看資源池的接口,進一步擁有更具體的理解:
class WFResourcePool
{
public:
WFConditional *get(SubTask *task, void **resbuf);
WFConditional *get(SubTask *task);
void post(void *res);
...
protected:
virtual void *pop()
{
return this->data.res[this->data.index++];
}
virtual void push(void *res)
{
this->data.res[--this->data.index] = res;
}
...
public:
WFResourcePool(void *const *res, size_t n);
WFResourcePool(size_t n);
...
};
上述代碼個人認為只需要劃三個重點:構造函數(shù)、get()和post()。
構造函數(shù)有兩個可選,簡單版可以只傳一個n。如果對于資源池有傳遞資源的需求,比如不知,那么還可以傳入一個長度為n資源數(shù)組,數(shù)組每個元素為一個void *,內部會再分配一份相同大小的內存,把數(shù)組復制走。
因此相對上面兩種情況,get()函數(shù)也有兩個。get()等價于上述的P操作,用于拿一個訪問資源的資格,如果使用第二個接口,那么是可以通過一個void **resbuf獲得具體的資源。
post()函數(shù)用于資源使用完畢想歸還的時候,也就是V操作,這里post()的res參數(shù)無需與get()得到res的一致。
三、與任務流結合:WFConditional
資源池的實現(xiàn)本身是很簡單的,巧妙之處是它如何與Workflow當前的異步任務結合。答案就在上述get()函數(shù)的返回值,我們需要引入一層條件任務:WFConditional。
先回顧一下,Workflow中一個異步任務可以這樣原地發(fā)射:
auto *task = WFTaskFactory::create_xxx_task(x, x, callback);
task->start();
也可以扔到一個任務流中,待前序邏輯完成后執(zhí)行,以實現(xiàn)任務編排和調度:
SeriesWork *series; // 假設這一個現(xiàn)成的任務流。一般來自于我們自己創(chuàng)建,或者其他任務的callback中拿到
series->push_back(task);
在Workflow的實現(xiàn)中,它們的調度都不會卡住任何線程。
那么,現(xiàn)在加了一個資源池的約束,也就意味著:一個任務的發(fā)起,應當是'當前流程允許執(zhí)行'并且'從資源池中拿到資格'才能真正得到調度。
WFConditional條件任務就是這個中間層:我們通過get()接口,把任務是否能通過資源池獲得資格這個工作,交給了WFConditional,而WFConditional替代了這個任務本身被start或者放到任務流中。

用一個并發(fā)為1的小spider,展示一下常用的使用方式:
WFResourcePool pool(1); // 0. 構造資源池,表示并發(fā)只允許1
// 1. 構造一個http任務。在它的callback中,我們把資源池的資源歸還
WFHttpTask *t = WFTaskFactory::create_http_task(..., [](void *){pool.post(nullptr);});
// 2. 構造一個條件任務,它會在真正dispatch的時候才去嘗試獲取資源
WFConditional *c = pool.get(t, &t->user_data); // 用user_data來保存res是一種實用方法。
// 3. 接下來用這個條件任務,替代http任務去和任務流結合
c->start(); // or series->push_back(c);
四、更多...
回到剛開始的場景,可以參考這個issue,這個也是內部用戶咨詢得最多的需求之一:《如何根據用戶配置去限制server的qps》,?? https://github.com/sogou/workflow/issues/1319
當然我們實現(xiàn)一個server的時候,實際上是要控制用戶請求的并發(fā)而非QPS,因為QPS是由資源處理能力附加了時間維度而得出的,核心指標應該是同時能處理的并發(fā)數(shù)。
細心的小伙伴還會發(fā)現(xiàn),資源池有兩個protected的接口,這是供聰明的你派生使用的。比如上述的post要叫醒最早在排隊的那個人以實現(xiàn)先來先服務、還是叫醒最晚等待的那個人來減少超時用戶的數(shù)量,這都是可以通過派生資源池來實現(xiàn),而資源池本身只是一個非常通用的組件。
最近學習其他領域的知識也越來越發(fā)現(xiàn),計算機中的語義無論是在操作系統(tǒng)還是框架還是算法層實現(xiàn),都是相通的。也就是說如果對計算機底層邏輯能做到融會貫通,那么無論投身哪個細分領域都可以有新發(fā)現(xiàn)新貢獻。雖然博主已經墮落到了Q更,最近才有空寫一下資源池的介紹,但其實WFResourcePool的推出已經有2年了,本人單方面宣布它有可能是Workflow在開源之后才增加的組件中最重要的一個。非常感謝開發(fā)者們的支持,期待大家對更多場景的交流和共建,一起打造更多有趣的組件,發(fā)現(xiàn)更多通用的方案~