GatewayWorker WebSocket分布式服務

1. 進程模型

圖片.png

特點:

從圖上我們可以看出Gateway負責接收客戶端的連接以及連接上的數(shù)據(jù),然后Worker接收Gateway發(fā)來的數(shù)據(jù)做處理,然后再經(jīng)由Gateway把結(jié)果轉(zhuǎn)發(fā)給其它客戶端。每個客戶端都有很多的路由到達另外一個客戶端,例如client⑦與client①可以經(jīng)由藍色路徑完成數(shù)據(jù)通訊

優(yōu)點:

1、可以方便的實現(xiàn)客戶端之間的通訊

2、Gateway與Worker之間是基于socket長連接通訊,也就是說Gateway、Worker可以部署在不同的服務器上,非常容易實現(xiàn)分布式部署,擴容服務器

3、Gateway進程只負責網(wǎng)絡(luò)IO,業(yè)務實現(xiàn)都在Worker進程上,可以reload Worker進程,實現(xiàn)在不影響用戶的情況下完成代碼熱更新。

適用范圍:

適用于客戶端與客戶端需要實時通訊的項目。

2. 目錄結(jié)構(gòu)

圖片.png

3. 注冊服務

注冊一個端口1238 的服務,去維護后端服務進程信息。

Register負責注冊內(nèi)部通訊地址。Gateway進程和BusinessWorker進程啟動后分別向Register進程注冊自己的通訊地址,Gateway進程和BusinessWorker通過Register進程得到通訊地址后,就可以建立起連接并通訊了。

register_start.php

<?php
/**
 * This file is part of workerman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link http://www.workerman.net/
 * @license http://www.opensource.org/licenses/mit-license.php MIT License
 */

use \Workerman\Worker;
use \GatewayWorker\Register;

// 自動加載類
require_once __DIR__ . '/../vendor/autoload.php';

// register 必須是text協(xié)議
$register = new Register('text://0.0.0.0:1238');

// 如果不是在根目錄啟動,則運行runAll方法
if (!defined('GLOBAL_START')) {
    Worker::runAll();
}

4. Gateway

<?php
/**
 * This file is part of workerman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link http://www.workerman.net/
 * @license http://www.opensource.org/licenses/mit-license.php MIT License
 */

use \Workerman\Worker;
use \Workerman\WebServer;
use \GatewayWorker\Gateway;
use \GatewayWorker\BusinessWorker;
use \Workerman\Autoloader;

// 自動加載類
require_once __DIR__ . '/../vendor/autoload.php';

// gateway 進程,這里使用Text協(xié)議,可以用telnet測試
//$gateway = new Gateway("tcp://0.0.0.0:8282");
$gateway = new Gateway("websocket://0.0.0.0:8181");
// gateway名稱,status方便查看
$gateway->name = 'gateway';
// gateway進程數(shù)
$gateway->count = 4;
// 本機ip,分布式部署時使用內(nèi)網(wǎng)ip
$gateway->lanIp = '127.0.0.1';
// 內(nèi)部通訊起始端口,假如$gateway->count=4,起始端口為4000
// 則一般會使用4000 4001 4002 4003 4個端口作為內(nèi)部通訊端口 
$gateway->startPort = 2900;
// 服務注冊地址
$gateway->registerAddress = '127.0.0.1:1238';

// 心跳間隔
$gateway->pingInterval = 60;
// 心跳數(shù)據(jù)
$gateway->pingData = '{"type":"ping"}';

/* 
// 當客戶端連接上來時,設(shè)置連接的onWebSocketConnect,即在websocket握手時的回調(diào)
$gateway->onConnect = function($connection)
{
    $connection->onWebSocketConnect = function($connection , $http_header)
    {
        // 可以在這里判斷連接來源是否合法,不合法就關(guān)掉連接
        // $_SERVER['HTTP_ORIGIN']標識來自哪個站點的頁面發(fā)起的websocket鏈接
        if($_SERVER['HTTP_ORIGIN'] != 'http://kedou.workerman.net')
        {
            $connection->close();
        }
        // onWebSocketConnect 里面$_GET $_SERVER是可用的
        // var_dump($_GET, $_SERVER);
    };
}; 
*/

// 如果不是在根目錄啟動,則運行runAll方法
if (!defined('GLOBAL_START')) {
    Worker::runAll();
}

5. BusinessWorker

<?php
/**
 * This file is part of workerman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link http://www.workerman.net/
 * @license http://www.opensource.org/licenses/mit-license.php MIT License
 */

use \Workerman\Worker;
use \Workerman\WebServer;
use \GatewayWorker\Gateway;
use \GatewayWorker\BusinessWorker;
use \Workerman\Autoloader;

// 自動加載類
require_once __DIR__ . '/../vendor/autoload.php';

// businessWorker 進程
$worker = new BusinessWorker();
// worker名稱
$worker->name = 'PHPWorker01';
// businessWorker進程數(shù)量
$worker->count = 4;
// 服務注冊地址
$worker->registerAddress = '127.0.0.1:1238';

// 如果不是在根目錄啟動,則運行runAll方法
if (!defined('GLOBAL_START')) {
    Worker::runAll();
}

6. Events

<?php
/**
 * This file is part of workerman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link http://www.workerman.net/
 * @license http://www.opensource.org/licenses/mit-license.php MIT License
 */

/**
 * 用于檢測業(yè)務代碼死循環(huán)或者長時間阻塞等問題
 * 如果發(fā)現(xiàn)業(yè)務卡死,可以將下面declare打開(去掉//注釋),并執(zhí)行php start.php reload
 * 然后觀察一段時間workerman.log看是否有process_timeout異常
 */

//declare(ticks=1);

use \GatewayWorker\Lib\Gateway;

/**
 * 主邏輯
 * 主要是處理 onConnect onMessage onClose 三個方法
 * onConnect 和 onClose 如果不需要可以不用實現(xiàn)并刪除
 */
class Events
{
    /**
     * 當客戶端連接時觸發(fā)
     * 如果業(yè)務不需此回調(diào)可以刪除onConnect
     *
     * @param int $client_id 連接id
     * @throws Exception
     */
    public static function onConnect($client_id)
    {
        $count = Gateway::getAllClientCount();
        $list  = Gateway::getAllClientIdList();
        // 向當前client_id發(fā)送數(shù)據(jù) 
        Gateway::sendToClient($client_id, "Hello $client_id\r\n");
        // 向所有人發(fā)送
        Gateway::sendToAll("$client_id login count:{$count} list:" . json_encode($list) . "\r\n");
    }

    /**
     * 當客戶端發(fā)來消息時觸發(fā)
     * @param int $client_id 連接id
     * @param mixed $message 具體消息
     * @throws Exception
     */
    public static function onMessage($client_id, $message)
    {
        // 向所有人發(fā)送 
        Gateway::sendToAll("$client_id said $message\r\n");
    }

    /**
     * 當用戶斷開連接時觸發(fā)
     * @param int $client_id 連接id
     * @throws Exception
     */
    public static function onClose($client_id)
    {
        // 向所有人發(fā)送
        GateWay::sendToAll("$client_id logout\r\n");
    }
}

7. Lib\Gateway類接口

文件位置:GatewayWorker/Lib/Gateway.php

Lib\Gateway類是Gateway/BusinessWorker模型中給客戶端發(fā)送數(shù)據(jù)的類。

提供了單發(fā)、群發(fā)以及關(guān)閉客戶端連接的接口。

7.1 sendToAll

void Gateway::sendToAll(string $send_data [, array $client_id_array = null [, array $exclude_client_id = null [, bool $raw = false]]]);

向所有客戶端或者client_id_array指定的客戶端發(fā)送send_data數(shù)據(jù)。如果指定的client_id_array中的client_id不存在則自動丟棄

7.2 sendToClient

void Gateway::sendToClient(string $client_id, string $send_data);

向客戶端client_id發(fā)送$send_data數(shù)據(jù)。如果client_id對應的客戶端不存在或者不在線則自動丟棄發(fā)送數(shù)據(jù)

7.3 closeClient

void Gateway::closeClient(string $client_id);

斷開與client_id對應的客戶端的連接

7.4 isOnline

int Gateway::isOnline(string $client_id);

判斷$client_id是否還在線

是否在線取決于對應client_id是否觸發(fā)過onClose回調(diào)。

7.5 bindUid

void Gateway::bindUid(string $client_id, mixed $uid);

將client_id與uid綁定,以便通過Gateway::sendToUid($uid)發(fā)送數(shù)據(jù),通過Gateway::isUidOnline($uid)用戶是否在線。

uid解釋:這里uid泛指用戶id或者設(shè)備id,用來唯一確定一個客戶端用戶或者設(shè)備。

7.6 sendToUid

void Gateway::sendToUid(mixed $uid, string $message);

向uid綁定的所有在線client_id發(fā)送數(shù)據(jù)。

注意:默認uid與client_id是一對多的關(guān)系,如果當前uid下綁定了多個client_id,則多個client_id對應的客戶端都會收到消息,這類似于PC QQ和手機QQ同時在線接收消息。

更多方法可參考官方文檔。

后期規(guī)劃的架構(gòu),可橫向擴展。

圖片.png

參考資料

gateway-worker

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容