服務(wù)器的演進(jìn)

服務(wù)器的演進(jìn)

1)單進(jìn)程阻塞的網(wǎng)絡(luò)服務(wù)器

說明:

1.創(chuàng)建一個(gè)socket,綁定服務(wù)器端口(bind),監(jiān)聽端口(listen),在PHP中用stream_socket_server一個(gè)函數(shù)就能完成上面3個(gè)步驟

2.進(jìn)入while循環(huán),阻塞在accept操作上,等待客戶端連接進(jìn)入。此時(shí)程序會(huì)進(jìn)入睡眠狀態(tài),直到有新的客戶端發(fā)起connect到服務(wù)器,操作系統(tǒng)會(huì)喚醒此進(jìn)程。accept函數(shù)返回客戶端連接的socket

1.利用fread讀取客戶端socket當(dāng)中的數(shù)據(jù)收到數(shù)據(jù)后服務(wù)器程序進(jìn)行處理然后使用fwrite向客戶端發(fā)送響應(yīng)。長(zhǎng)連接的服務(wù)會(huì)持續(xù)與客戶端交互,而短連接服務(wù)一般收到響應(yīng)就會(huì)close。

缺點(diǎn):

一次只能處理一個(gè)連接,不支持多個(gè)連接同時(shí)處理

每個(gè)連接進(jìn)入到我們的服務(wù)端的時(shí)候,單獨(dú)創(chuàng)建一個(gè)進(jìn)程/線程提供服務(wù)

簡(jiǎn)單實(shí)現(xiàn)

  • 單進(jìn)程阻塞的網(wǎng)絡(luò)服務(wù)器

  • 1 創(chuàng)建一個(gè)socket,綁定服務(wù)器端口(bind),監(jiān)聽端口(listen),在PHP中用stream_socket_server一個(gè)函數(shù)就能完成上面3個(gè)步驟

  • 2 進(jìn)入while循環(huán),阻塞在accept操作上,等待客戶端連接進(jìn)入。此時(shí)程序會(huì)進(jìn)入睡眠狀態(tài),直到有新的客戶端發(fā)起connect到服務(wù)器,操作系統(tǒng)會(huì)喚醒此進(jìn)程。

  • accept函數(shù)返回客戶端連接的socket

  • 3 利用fread讀取客戶端socket當(dāng)中的數(shù)據(jù)收到數(shù)據(jù)后服務(wù)器程序進(jìn)行處理然后使用fwrite向客戶端發(fā)送響應(yīng)。

  • 長(zhǎng)連接的服務(wù)會(huì)持續(xù)與客戶端交互,而短連接服務(wù)一般收到響應(yīng)就會(huì)close。

  • 缺點(diǎn):

  • 1 一次只能處理一個(gè)連接,不支持多個(gè)連接同時(shí)處理

  • 2 每個(gè)連接進(jìn)入到我們的服務(wù)端的時(shí)候,單獨(dú)創(chuàng)建一個(gè)進(jìn)程/線程提供服務(wù)


class Worker{

    protected $socket = null;

    public $onMessage = null;

    public $onConnect = null;

    public function __construct($socket_address)

{

        //綁定地址監(jiān)聽端口

        $this->socket = stream_socket_server($socket_address);

    }

    public function start(){

        while (true){

            //阻塞監(jiān)聽客戶端socket狀態(tài)如連接成功發(fā)送的消息等,調(diào)用相應(yīng)回調(diào)  直到返回

            $clientSocket = stream_socket_accept($this->socket);//返回客戶端資源

            if(!empty($clientSocket) && is_callable($this->onConnect)){

                call_user_func($this->onConnect,$clientSocket);

            }

            //從連接當(dāng)中讀取客戶端的內(nèi)容

            $buffer=fread($clientSocket,65535);

            //正常讀取到數(shù)據(jù),觸發(fā)消息接收事件,響應(yīng)內(nèi)容

            if(!empty($buffer) && is_callable($this->onMessage)){

                call_user_func($this->onMessage,$clientSocket,$buffer);

            }

            //如果不關(guān)閉連接不能支持大點(diǎn)的并發(fā)請(qǐng)求

//fclose($clientSocket);

        }

}

}

$server = new Worker('tcp://0.0.0.0:9800');

//客戶端連接成功觸發(fā)

$server->onConnect = function ($fd){

    echo '連接事件觸發(fā)',(int)$fd,PHP_EOL;

};

//客戶端端發(fā)消息過來觸發(fā)

$server->onMessage = function ($conn, $message){

    //事件回調(diào)當(dāng)中寫業(yè)務(wù)邏輯

//var_dump($conn,$message);

    $content="我是peter";

    $http_resonse = "HTTP/1.1 200 OK\r\n";

    $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";

  //  $http_resonse .= "Connection: keep-alive\r\n"; //連接保持

    $http_resonse .= "Server: php socket server\r\n";

    $http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";

    $http_resonse .= $content;

    fwrite($conn, $http_resonse);

};

$server->start(); //啟動(dòng)


結(jié)論:

一次只能處理一個(gè)連接,不支持多個(gè)連接同時(shí)處理

每個(gè)連接進(jìn)入到我們的服務(wù)端的時(shí)候,單獨(dú)創(chuàng)建一個(gè)進(jìn)程/線程提供服務(wù)

1)預(yù)派生子進(jìn)程模式 類似php-fpm

1、程序啟動(dòng)后就會(huì)創(chuàng)建N個(gè)進(jìn)程。每個(gè)子進(jìn)程進(jìn)入 Accept,等待新的連接進(jìn)入。當(dāng)客戶端連接到服務(wù)器時(shí),其中一個(gè)子進(jìn)程會(huì)被喚醒,開始處理客戶端請(qǐng)求,并且不再接受新的TCP連接。當(dāng)此連接關(guān)閉時(shí),子進(jìn)程會(huì)釋放,重新進(jìn)入 Accept,參與處理新的連接。這個(gè)模型的優(yōu)勢(shì)是完全可以復(fù)用進(jìn)程,不需要太多的上下文切換,比如php-fpm基于此模型的。

缺點(diǎn):

1.這種模型嚴(yán)重依賴進(jìn)程的數(shù)量解決并發(fā)問題,一個(gè)客戶端連接就需要占用一個(gè)進(jìn)程,工作進(jìn)程的數(shù)量有多少,并發(fā)處理能力就有多少。操作系統(tǒng)可以創(chuàng)建的進(jìn)程數(shù)量是有限的。

2、操作系統(tǒng)生成一個(gè)子進(jìn)程需要進(jìn)行內(nèi)存復(fù)制等操作,在資源和時(shí)間上會(huì)產(chǎn)生一定的開銷;當(dāng)有大量請(qǐng)求時(shí),會(huì)導(dǎo)致系統(tǒng)性能下降;

例如:即時(shí)聊天程序,一臺(tái)服務(wù)器可能要維持?jǐn)?shù)十萬(wàn)的連接,那么就要啟動(dòng)數(shù)十萬(wàn)的進(jìn)程來維持。這顯然不可能

基于上面的模式我們發(fā)現(xiàn)我們只能通過每次(accept)處理單個(gè)請(qǐng)求,沒辦法一次性處理多個(gè)請(qǐng)求?

簡(jiǎn)單實(shí)現(xiàn)如下

class Worker{

    protected $socket = null;

    public $onMessage = null;

    public $onConnect = null;

    public $workNum = 10;

    public function __construct($socket_address)

{

        //綁定地址監(jiān)聽端口

        $this->socket = stream_socket_server($socket_address);

    }

    public function start() {

        //獲取配置文件

        $this->fork(); //用來創(chuàng)建多個(gè)助教老師,創(chuàng)建多個(gè)子進(jìn)程負(fù)責(zé)接收請(qǐng)求的

    }

    public function fork(){

        for ($i=0;$i<$this->workNum;$i++){

            $pid = pcntl_fork();//下面的代碼父子進(jìn)程都會(huì)執(zhí)行

            if ($pid<0){

                exit('創(chuàng)建失敗');

            }elseif($pid>0){

//                $status=0;

//                $pid=pcntl_wait($status);這邊會(huì)阻塞等待子進(jìn)程結(jié)束后在創(chuàng)建進(jìn)程  所以放到for 后面  等待子進(jìn)程創(chuàng)建 執(zhí)行完成  回收子進(jìn)程

//                echo "子進(jìn)程回收了:$pid".PHP_EOL;

            }else{

                $this->accept();

                return; //這邊要return 否則子進(jìn)程還會(huì)創(chuàng)建子進(jìn)程 因?yàn)閒ork 在for 循環(huán)里面當(dāng)然這里在阻塞監(jiān)聽

            }

}

        //放在父進(jìn)程空間,結(jié)束的子進(jìn)程信息,阻塞狀態(tài)

        $status=0;

        $pid=pcntl_wait($status);

        echo "子進(jìn)程回收了:$pid".PHP_EOL;

    }

    public  function  accept(){

        //創(chuàng)建多個(gè)子進(jìn)程阻塞接收服務(wù)端socket

        while (true){

            $clientSocket=stream_socket_accept($this->socket); //阻塞監(jiān)聽

            var_dump(posix_getpid());

            //觸發(fā)事件的連接的回調(diào)

            if(!empty($clientSocket) && is_callable($this->onConnect)){

                call_user_func($this->onConnect,$clientSocket);

            }

            //從連接當(dāng)中讀取客戶端的內(nèi)容

            $buffer=fread($clientSocket,65535);

            //正常讀取到數(shù)據(jù),觸發(fā)消息接收事件,響應(yīng)內(nèi)容

            if(!empty($buffer) && is_callable($this->onMessage)){

                call_user_func($this->onMessage,$clientSocket,$buffer);

            }

            fclose($clientSocket); //必須關(guān)閉,子進(jìn)程不會(huì)釋放不會(huì)成功拿下進(jìn)入accpet

        }

}

}

$server = new Worker('tcp://0.0.0.0:9800');

//客戶端連接成功觸發(fā)

$server->onConnect = function ($fd){

    echo '連接事件觸發(fā)',(int)$fd,PHP_EOL;

};

//客戶端端發(fā)消息過來觸發(fā)

$server->onMessage = function ($conn, $message){

    //事件回調(diào)當(dāng)中寫業(yè)務(wù)邏輯

//var_dump($conn,$message);

    $content="我是peter";

    $http_resonse = "HTTP/1.1 200 OK\r\n";

    $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";

  //  $http_resonse .= "Connection: keep-alive\r\n"; //連接保持

    $http_resonse .= "Server: php socket server\r\n";

    $http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";

    $http_resonse .= $content;

    fwrite($conn, $http_resonse);

};

$server->start(); //啟動(dòng)

3)單進(jìn)程阻塞復(fù)用的網(wǎng)絡(luò)服務(wù)器

說明:

服務(wù)監(jiān)聽流程如上

1.保存所有的socket,通過select系統(tǒng)調(diào)用,監(jiān)聽socket描述符的可讀事件

2.Select會(huì)在內(nèi)核空間監(jiān)聽一旦發(fā)現(xiàn)socket可讀,會(huì)從內(nèi)核空間傳遞至用戶空間,在用戶空間通過邏輯判斷是服務(wù)端socket可讀,還是客戶端的socket可讀

3.如果是服務(wù)端的socket可讀,說明有新的客戶端建立,將socket保留到監(jiān)聽數(shù)組當(dāng)中 【第一次建立連接服務(wù)端可讀】

1.如果是客戶端的socket可讀,說明當(dāng)前已經(jīng)可以去讀取客戶端發(fā)送過來的內(nèi)容了,讀取內(nèi)容,然后響應(yīng)給客戶端。【客戶端給服務(wù)器發(fā)送數(shù)據(jù) 客戶端可讀】

缺點(diǎn):

1.select模式本身的缺點(diǎn)(1、循環(huán)遍歷處理事件、2、內(nèi)核空間傳遞數(shù)據(jù)的消耗)

2.單進(jìn)程對(duì)于大量任務(wù)處理乏力

class Worker{

    //監(jiān)聽socket

    protected $socket = NULL;

    //連接事件回調(diào)

    public $onConnect = NULL;

    //接收消息事件回調(diào)

    public $onMessage = NULL;

    public $workerNum=4; //子進(jìn)程個(gè)數(shù)

    public  $allSocket; //存放所有socket

    public function __construct($socket_address) {

        //監(jiān)聽地址+端口

        $this->socket=stream_socket_server($socket_address);

    //1、stream_set_blocking 當(dāng) socket處于阻塞模式時(shí),

    //比如:網(wǎng)絡(luò)io fread系統(tǒng)調(diào)用必須等待socket有數(shù)據(jù)返回,即進(jìn)程因系統(tǒng)調(diào)用阻塞;相反若處于非阻塞模式,內(nèi)核不管socket數(shù)據(jù)有沒有準(zhǔn)備好,都會(huì)立即返回給進(jìn)程。
    
    //2、stream_set_blocking 另外進(jìn)程阻塞和socket阻塞不是一個(gè)概念,進(jìn)程阻塞是因?yàn)橄到y(tǒng)調(diào)用所致,socket是否阻塞只是說明socket上事件是不是可以內(nèi)核即刻處理。

    //1、select是系統(tǒng)調(diào)用,必然會(huì)阻塞進(jìn)程的,和socket是否阻塞并沒有關(guān)系,我第2點(diǎn)備注了呢。

    //2、這里的IO就是針對(duì)socket的網(wǎng)絡(luò)IO,是否是阻塞的,正是你題示所問的問題。

    //3、socket之所以設(shè)置成非阻塞,是為了同一個(gè)進(jìn)程里可以更多的處理更多的tcp連接,這正是 select、poll 或者 epoll等多路復(fù)用模型能夠處理高并發(fā)的原因所在。

        stream_set_blocking($this->socket,0); //設(shè)置網(wǎng)絡(luò)io 比如 fread 非阻塞

    // 0是非阻塞,1是阻塞


//阻塞的意義是什么呢?


//某個(gè)函數(shù)讀取一個(gè)網(wǎng)絡(luò)流,當(dāng)沒有未讀取字節(jié)的時(shí)候,程序該怎么辦?


//是一直等待,直到下一個(gè)未讀取的字節(jié)的出現(xiàn),還是立即告訴調(diào)用者當(dāng)前沒有新內(nèi)容?


//前者是阻塞的,后者是非阻塞的。

//

//阻塞的好處是,排除其它非正常因素,阻塞的是按順序執(zhí)行的同步的讀取。

//

//借用小說里的說法就是“神刀出鞘,無(wú)血不歸”。在讀到新內(nèi)容之前,它不會(huì)往下走,什么別的事情都不做。

//

//而非阻塞,因?yàn)椴槐氐却齼?nèi)容,所以能異步的執(zhí)行,現(xiàn)在讀到讀不到都沒關(guān)系,執(zhí)行讀取操作后立刻就繼續(xù)往下做別的事情。

        $this->allSocket[(int)$this->socket]=$this->socket;

    }

    public function start() {

        //獲取配置文件

        $this->fork();

    }

    public function fork(){

        $this->accept();//子進(jìn)程負(fù)責(zé)接收客戶端請(qǐng)求

    }

    public  function  accept(){

        //創(chuàng)建多個(gè)子進(jìn)程阻塞接收服務(wù)端socket

        while (true){

            $write=$except=[];

            //需要監(jiān)聽socket

            $read=$this->allSocket;

            //建議socket狀態(tài)誰(shuí)改變

// var_dump($read);

            stream_select($read,$write,$except,60);//內(nèi)核遍歷循環(huán)哪些改變會(huì)阻塞 如果只有一個(gè)改變也會(huì)循環(huán)很多次的問題

//怎么區(qū)分服務(wù)端跟客戶端剛啟動(dòng)服務(wù)沒有客戶端連接進(jìn)來socket 沒有改變 當(dāng)有新的客戶端連接進(jìn)來當(dāng)前改變的是服務(wù)端所以循環(huán)read

            foreach ($read as $index=>$val){

                //循環(huán)每一個(gè)改變 返回響應(yīng)

//當(dāng)前發(fā)生改變的是服務(wù)端,有連接進(jìn)入

                if($val === $this->socket){

                    $clientSocket=stream_socket_accept($this->socket); //阻塞監(jiān)聽

//觸發(fā)事件的連接的回調(diào)

                    if(!empty($clientSocket) && is_callable($this->onConnect)){

                        call_user_func($this->onConnect,$clientSocket);

                    }

                    $this->allSocket[(int)$clientSocket]=$clientSocket;//先把資源放入數(shù)組 客戶端可寫時(shí)循環(huán)響應(yīng)避免阻塞

                }else{

                    //從連接當(dāng)中讀取客戶端的內(nèi)容

                    $buffer=fread($val,1024);

                    //如果數(shù)據(jù)為空,或者為false,不是資源類型

                    if(empty($buffer)){

                        if(feof($val) || !is_resource($val)){

                            //觸發(fā)關(guān)閉事件

                            fclose($val);

                            unset($this->allSocket[(int)$val]);

                            continue;

                        }

}

                    //正常讀取到數(shù)據(jù),觸發(fā)消息接收事件,響應(yīng)內(nèi)容

                    if(!empty($buffer) && is_callable($this->onMessage)){

                        call_user_func($this->onMessage,$val,$buffer);

                    }

}

}

}

}

}

$worker = new Worker('tcp://0.0.0.0:9800');

//連接事件

$worker->onConnect = function ($fd) {

    //echo '連接事件觸發(fā)',(int)$fd,PHP_EOL;

};

//消息接收

$worker->onMessage = function ($conn, $message) {

    //事件回調(diào)當(dāng)中寫業(yè)務(wù)邏輯

//var_dump($conn,$message);

    $content="我是peter";

    $http_resonse = "HTTP/1.1 200 OK\r\n";

    $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";

    $http_resonse .= "Connection: keep-alive\r\n"; //連接保持

    $http_resonse .= "Server: php socket server\r\n";

    $http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";

    $http_resonse .= $content;

    fwrite($conn, $http_resonse);

};

$worker->start(); //啟動(dòng)

4)多進(jìn)程master-worker模型

1.master進(jìn)程,負(fù)責(zé)處理配置文件讀取,啟動(dòng),終止和維護(hù)工作(worker)進(jìn)程數(shù),當(dāng)woker進(jìn)程退出后(異常情況下),會(huì)自動(dòng)重新啟動(dòng)新的woker

2.worker進(jìn)程的主要任務(wù)是完成具體的任務(wù)邏輯,啟動(dòng)端口監(jiān)聽,接收客戶端請(qǐng)求、使用epoll接收請(qǐng)求,執(zhí)行業(yè)務(wù)邏輯然后關(guān)閉連接。

是類似于nginx和workmen采用的Reactor 多進(jìn)程的模式,具體差異表現(xiàn)為主進(jìn)程中僅僅創(chuàng)建了監(jiān)聽,并沒有創(chuàng)建 mainReactor 來“accept”連接,而是由子進(jìn)程的 Reactor 來“accept”連接,通過負(fù)載均衡,一次只有一個(gè)子進(jìn)程進(jìn)行“accept”,子進(jìn)程“accept”新連接后就放到自己的 Reactor中進(jìn)行處理,不會(huì)再分配給其他子進(jìn)程 區(qū)別swoole

epoll:

epoll是在2.6內(nèi)核中提出的,是之前的select和poll的增強(qiáng)版本。相對(duì)于select和poll來說,epoll更加靈活,沒有描述符限制,無(wú)需輪詢。epoll使用一個(gè)文件描述符管理多個(gè)描述符,將用戶關(guān)系的文件描述符的事件存放到內(nèi)核的一個(gè)事件表中。

簡(jiǎn)單點(diǎn)來說就是當(dāng)連接有I/O流事件產(chǎn)生的時(shí)候,epoll就會(huì)去告訴進(jìn)程哪個(gè)連接有I/O流事件產(chǎn)生,然后進(jìn)程就去處理這個(gè)事件

也就是執(zhí)行回調(diào)。

class Worker{

    //監(jiān)聽socket

    protected $socket = NULL;

    //連接事件回調(diào)

    public $onConnect = NULL;

    //接收消息事件回調(diào)

    public $onMessage = NULL;

    public $workerNum=4; //子進(jìn)程個(gè)數(shù)

    public  $allSocket; //存放所有socket

    public function __construct($socket_address) {

        //監(jiān)聽地址+端口

        $this->socket=stream_socket_server($socket_address);

    }

    public function start() {

        //獲取配置文件

        $this->fork();

    }

    public function fork(){

        $this->accept();//子進(jìn)程負(fù)責(zé)接收客戶端請(qǐng)求

    }

    public  function  accept(){

        //第一個(gè)需要監(jiān)聽的事件(服務(wù)端socket的事件),一旦監(jiān)聽到可讀事件之后會(huì)觸發(fā)

        swoole_event_add($this->socket,function ($fd){

                $clientSocket=stream_socket_accept($fd);

                //觸發(fā)事件的連接的回調(diào)

                if(!empty($clientSocket) && is_callable($this->onConnect)){

                    call_user_func($this->onConnect,$clientSocket);

                }

            //監(jiān)聽客戶端可讀

            swoole_event_add($clientSocket,function ($fd){

                //從連接當(dāng)中讀取客戶端的內(nèi)容

                $buffer=fread($fd,1024);

                //如果數(shù)據(jù)為空,或者為false,不是資源類型

                if(empty($buffer)){

                    if(feof($fd) || !is_resource($fd)){

                        //觸發(fā)關(guān)閉事件

                        fclose($fd);

                    }

}

                //正常讀取到數(shù)據(jù),觸發(fā)消息接收事件,響應(yīng)內(nèi)容

                if(!empty($buffer) && is_callable($this->onMessage)){

                    call_user_func($this->onMessage,$fd,$buffer);

                }

            });

        });

        echo "非阻塞";

    }

}

$worker = new Worker('tcp://0.0.0.0:9805');

//連接事件

$worker->onConnect = function ($fd) {

    //echo '連接事件觸發(fā)',(int)$fd,PHP_EOL;

};

//消息接收

$worker->onMessage = function ($conn, $message) {

    //事件回調(diào)當(dāng)中寫業(yè)務(wù)邏輯

//var_dump($conn,$message);

    $content="我是peter";

    $http_resonse = "HTTP/1.1 200 OK\r\n";

    $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";

    $http_resonse .= "Connection: keep-alive\r\n"; //連接保持

    $http_resonse .= "Server: php socket server\r\n";

    $http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";

    $http_resonse .= $content;

    fwrite($conn, $http_resonse);

};

$worker->start(); //啟動(dòng)



高效的事件處理模式Reactor 模式 例如swoole

Reactor模型,Reactor顧名思義就是反應(yīng)堆的意思,它本身不處理任何數(shù)據(jù)收發(fā)。只是可以監(jiān)視一個(gè)socket句柄的事件變化。

1) 主進(jìn)程/線程往epoll內(nèi)核亊件中注冊(cè)socket上的讀就緒亊件。

2) 主進(jìn)程/線程調(diào)用epoll_wait等待socket上有數(shù)據(jù)可讀。

3) 當(dāng)socket上有數(shù)據(jù)可讀時(shí),epoll_wait通知主進(jìn)程/線程。主進(jìn)程/線程則將socket可讀事件放人請(qǐng)求隊(duì)列。

4) 睡眠在請(qǐng)求隊(duì)列上的某個(gè)工作線程被喚醒,它從socket讀取數(shù)據(jù),并處理客戶請(qǐng)求, 然后往epoll內(nèi)核事件表中注冊(cè)該socket上的寫就緒事件。

5) 主線程調(diào)用epoll_wait等待socket可寫。

6) 當(dāng)socket可寫時(shí),epoll_wait通知主進(jìn)程/線程將socket可寫亊件放人清求隊(duì)列。

7) 睡眠在請(qǐng)求隊(duì)列上的某個(gè)工作線程被喚醒,它往socket上寫人服務(wù)器處理客戶淸求

例如 swoole 中

每一個(gè)線程都有自己的用途,下面多每個(gè)線程有一個(gè)了解

1.1、MainReactor(主線程)

主線程會(huì)負(fù)責(zé)監(jiān)聽server socket,如果有新的連接accept,主線程會(huì)評(píng)估每個(gè)Reactor線程的連接數(shù)量。將此連接分配給連接數(shù)最少的reactor線程,做一個(gè)負(fù)載均衡。

1.2 、Reactor線程組

Reactor線程負(fù)責(zé)維護(hù)客戶端機(jī)器的TCP連接、處理網(wǎng)絡(luò)IO、收發(fā)數(shù)據(jù)完全是異步非阻塞的模式。

swoole的主線程在Accept新的連接后,會(huì)將這個(gè)連接分配給一個(gè)固定的Reactor線程,在socket可讀時(shí)讀取數(shù)據(jù),并進(jìn)行協(xié)議解析,將請(qǐng)求投遞到Worker進(jìn)程。在socket可寫時(shí)將數(shù)據(jù)發(fā)送給TCP客戶端。

nginx 類似 workermen

是類似于nginx采用的Reactor 多進(jìn)程的模式,具體差異表現(xiàn)為主進(jìn)程中僅僅創(chuàng)建了監(jiān)聽,并沒有創(chuàng)建 mainReactor 來“accept”連接,而是由子進(jìn)程的 Reactor 來“accept”連接,通過負(fù)載均衡,一次只有一個(gè)子進(jìn)程進(jìn)行“accept”,子進(jìn)程“accept”新連接后就放到自己的 Reactor中進(jìn)行處理,不會(huì)再分配給其他子進(jìn)程

class Worker{

    //監(jiān)聽socket

    protected $socket = NULL;

    //連接事件回調(diào)

    public $onConnect = NULL;

    public  $reusePort=1;

    //接收消息事件回調(diào)

    public $onMessage = NULL;

    public $workerNum=3; //子進(jìn)程個(gè)數(shù)

    public  $allSocket; //存放所有socket

    public  $addr;

    public function __construct($socket_address) {

        //監(jiān)聽地址+端口

        $this->addr=$socket_address;

    }

    public function start() {

        //獲取配置文件

        $this->fork();

    }

    public function fork(){

        for ($i=0;$i<$this->workerNum;$i++){

            $pid=pcntl_fork(); //創(chuàng)建成功會(huì)返回子進(jìn)程id

            if($pid<0){

                exit('創(chuàng)建失敗');

            }else if($pid>0){

                //父進(jìn)程空間,返回子進(jìn)程id

            }else{ //返回為0子進(jìn)程空間

                $this->accept();//子進(jìn)程負(fù)責(zé)接收客戶端請(qǐng)求

                exit;    //父進(jìn)程繼續(xù)執(zhí)行循環(huán)創(chuàng)建子進(jìn)程

            }

}

        //放在父進(jìn)程空間,結(jié)束的子進(jìn)程信息,阻塞狀態(tài)

        $status=0;

        for ($i=0;$i<$this->workerNum;$i++) {

            $pid = pcntl_wait($status);

        }

}

    public  function  accept(){

        $opts = array(

            'socket' => array(

                'backlog' =>10240, //成功建立socket連接的等待個(gè)數(shù)

            ),

        );

      $context = stream_context_create($opts);

      //開啟多端口監(jiān)聽,并且實(shí)現(xiàn)負(fù)載均衡

      stream_context_set_option($context,'socket','so_reuseport',1);

      stream_context_set_option($context,'socket','so_reuseaddr',1);

      $this->socket=stream_socket_server($this->addr,$errno,$errstr,STREAM_SERVER_BIND|STREAM_SERVER_LISTEN,$context);

        //第一個(gè)需要監(jiān)聽的事件(服務(wù)端socket的事件),一旦監(jiān)聽到可讀事件之后會(huì)觸發(fā)

        swoole_event_add($this->socket,function ($fd){

                $clientSocket=stream_socket_accept($fd);

                //觸發(fā)事件的連接的回調(diào)

                if(!empty($clientSocket) && is_callable($this->onConnect)){

                    call_user_func($this->onConnect,$clientSocket);

                }

            //監(jiān)聽客戶端可讀

            swoole_event_add($clientSocket,function ($fd){

                //從連接當(dāng)中讀取客戶端的內(nèi)容

                $buffer=fread($fd,1024);

                //如果數(shù)據(jù)為空,或者為false,不是資源類型

                if(empty($buffer)){

                    if(!is_resource($fd) || feof($fd) ){

                        //觸發(fā)關(guān)閉事件

                        fclose($fd);

                    }

}

                //正常讀取到數(shù)據(jù),觸發(fā)消息接收事件,響應(yīng)內(nèi)容

                if(!empty($buffer) && is_callable($this->onMessage)){

                    call_user_func($this->onMessage,$fd,$buffer);

                }

            });

        });

    }

}
$worker = new Worker('tcp://0.0.0.0:9810');

//開啟多進(jìn)程的端口監(jiān)聽

$worker->reusePort = true;

//連接事件

$worker->onConnect = function ($fd) {

    //echo '連接事件觸發(fā)',(int)$fd,PHP_EOL;

};

//消息接收

$worker->onMessage = function ($conn, $message) {

    //事件回調(diào)當(dāng)中寫業(yè)務(wù)邏輯

// $a=include 'index.php';

// var_dump($a);

//var_dump($conn,$message);

    $content="我是peter";

    $http_resonse = "HTTP/1.1 200 OK\r\n";

    $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";

    $http_resonse .= "Connection: keep-alive\r\n"; //連接保持

    $http_resonse .= "Server: php socket server\r\n";

    $http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";

    $http_resonse .= $content;

    fwrite($conn, $http_resonse);

};

$worker->start(); //啟動(dòng)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容