基于swoole的task功能實(shí)現(xiàn)Map-Reduce

Swoole擴(kuò)展自帶的Task進(jìn)程功能非常強(qiáng)大,可以用來實(shí)現(xiàn)各種復(fù)雜的業(yè)務(wù)邏輯。本文主要介紹使用task/finish功能實(shí)現(xiàn)程序內(nèi)的Map-Reduce并發(fā)任務(wù)處理。一個(gè)聊天服務(wù)經(jīng)常會(huì)有群聊需求,我的群組和群組內(nèi)成員,另外群組內(nèi)成員需要按照積分排序,類似與這樣的功能就可以使用Swoole簡(jiǎn)單實(shí)現(xiàn)。

傳統(tǒng)多線程方案

創(chuàng)建2個(gè)全局變量Map,group_map以group_id為Key,存儲(chǔ)成員set。user_map以u(píng)id為Key存儲(chǔ)當(dāng)前用戶加入的所有g(shù)roup。

多線程環(huán)境下實(shí)際上不能直接操作這2個(gè)Map,必須要加鎖。當(dāng)添加用戶到一個(gè)組或者用戶退出一個(gè)組時(shí)需要操作這2個(gè)map,必須要加鎖。如果操作很頻繁,實(shí)際上鎖的碰撞是很嚴(yán)重的,這部分操作就會(huì)變成串行的。同時(shí)只有一個(gè)線程可以對(duì)map進(jìn)行操作。鎖的爭(zhēng)搶也會(huì)帶來大量線程切換浪費(fèi)很多CPU資源。

lock.lock();
group_map[group_id].append([uid, score]);
user_map[uid].append(group_id);
group_map.sortByScore();
lock.unlock();

基于Swoole的Task功能

基于Swoole的Task功能,可以將任務(wù)切片,然后hash投遞到不同的Task進(jìn)程,完成任務(wù)。排序功能可以直接使用PHP提供的SplHeap實(shí)現(xiàn),時(shí)間復(fù)雜度為O(logn),如果要實(shí)現(xiàn)查詢功能,如根據(jù)UID查詢用戶加入的所有群組,根據(jù)GroupId查詢有哪些成員??梢韵扔?jì)算Hash找到對(duì)應(yīng)Task進(jìn)程,然后通過task/taskwait發(fā)送指令,直接讀取進(jìn)程的變量查找到信息。

$serv->set(array("task_worker_num" => 24));

$serv->task(array("cmd" => "user", "uid" => $uid, "gid" => $gid, "score" => $score), $gid % $task_worker_num);
$serv->task(array("cmd" => "group", "uid" => $uid, "gid" => $gid), $uid % $task_worker_num);

class MyMaxHeap extends SplHeap
{
    public function compare($value1, $value2)
    {
        return ($value1['score'] - $value2['score']);
    }
}


function onTask($serv, $taskId, $srcWorkerId, $data) {
    static $userMap = array();
    static $groupMap = array();
    
    if ($data['cmd'] == 'group')
    {
        if (!isset($groupMap[$data['gid']]))
        {
            $groupMap[$data['gid']] = new MyMaxHeap();
        }
        $heap = $groupMap[$data['gid']];
        $heap->insert(array("uid" => $data['uid'], "score" => $data['score']));
    }
    elseif ($data['cmd'] == 'user')
    {
        $userMap[$data['uid']][] = $data['gid'];
    }
}

由于Task進(jìn)程只有數(shù)組操作,所以是非阻塞的,只需要開啟與CPU核數(shù)相同的進(jìn)程數(shù)量即可。進(jìn)程間無任何加鎖爭(zhēng)搶,性能非常好。Swoole的Task進(jìn)程通信使用UnixSocket,是內(nèi)核提供的全內(nèi)存通信方式無任何IO,一寫一讀單進(jìn)程可達(dá)100萬/秒。雖然沒有直接讀變量的速度快,但性能也足夠了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容