Swoole擴(kuò)展自帶的Task進(jìn)程功能非常強(qiáng)大,可以用來實(shí)現(xiàn)各種復(fù)雜的業(yè)務(wù)邏輯。本文主要介紹使用task/finish功能實(shí)現(xiàn)程序內(nèi)的Map-Reduce并發(fā)任務(wù)處理。一個(gè)聊天服務(wù)經(jīng)常會(huì)有群聊需求,我的群組和群組內(nèi)成員,另外群組內(nèi)成員需要按照積分排序,類似與這樣的功能就可以使用Swoole簡(jiǎn)單實(shí)現(xiàn)。
傳統(tǒng)多線程方案
創(chuàng)建2個(gè)全局變量Map,group_map以group_id為Key,存儲(chǔ)成員set。user_map以u(píng)id為Key存儲(chǔ)當(dāng)前用戶加入的所有g(shù)roup。
多線程環(huán)境下實(shí)際上不能直接操作這2個(gè)Map,必須要加鎖。當(dāng)添加用戶到一個(gè)組或者用戶退出一個(gè)組時(shí)需要操作這2個(gè)map,必須要加鎖。如果操作很頻繁,實(shí)際上鎖的碰撞是很嚴(yán)重的,這部分操作就會(huì)變成串行的。同時(shí)只有一個(gè)線程可以對(duì)map進(jìn)行操作。鎖的爭(zhēng)搶也會(huì)帶來大量線程切換浪費(fèi)很多CPU資源。
lock.lock();
group_map[group_id].append([uid, score]);
user_map[uid].append(group_id);
group_map.sortByScore();
lock.unlock();
基于Swoole的Task功能
基于Swoole的Task功能,可以將任務(wù)切片,然后hash投遞到不同的Task進(jìn)程,完成任務(wù)。排序功能可以直接使用PHP提供的SplHeap實(shí)現(xiàn),時(shí)間復(fù)雜度為O(logn),如果要實(shí)現(xiàn)查詢功能,如根據(jù)UID查詢用戶加入的所有群組,根據(jù)GroupId查詢有哪些成員??梢韵扔?jì)算Hash找到對(duì)應(yīng)Task進(jìn)程,然后通過task/taskwait發(fā)送指令,直接讀取進(jìn)程的變量查找到信息。
$serv->set(array("task_worker_num" => 24));
$serv->task(array("cmd" => "user", "uid" => $uid, "gid" => $gid, "score" => $score), $gid % $task_worker_num);
$serv->task(array("cmd" => "group", "uid" => $uid, "gid" => $gid), $uid % $task_worker_num);
class MyMaxHeap extends SplHeap
{
public function compare($value1, $value2)
{
return ($value1['score'] - $value2['score']);
}
}
function onTask($serv, $taskId, $srcWorkerId, $data) {
static $userMap = array();
static $groupMap = array();
if ($data['cmd'] == 'group')
{
if (!isset($groupMap[$data['gid']]))
{
$groupMap[$data['gid']] = new MyMaxHeap();
}
$heap = $groupMap[$data['gid']];
$heap->insert(array("uid" => $data['uid'], "score" => $data['score']));
}
elseif ($data['cmd'] == 'user')
{
$userMap[$data['uid']][] = $data['gid'];
}
}
由于Task進(jìn)程只有數(shù)組操作,所以是非阻塞的,只需要開啟與CPU核數(shù)相同的進(jìn)程數(shù)量即可。進(jìn)程間無任何加鎖爭(zhēng)搶,性能非常好。Swoole的Task進(jìn)程通信使用UnixSocket,是內(nèi)核提供的全內(nèi)存通信方式無任何IO,一寫一讀單進(jìn)程可達(dá)100萬/秒。雖然沒有直接讀變量的速度快,但性能也足夠了。