服務(wù)器的演進(jìn)
1)單進(jìn)程阻塞的網(wǎng)絡(luò)服務(wù)器
說明:
1.創(chuàng)建一個(gè)socket,綁定服務(wù)器端口(bind),監(jiān)聽端口(listen),在PHP中用stream_socket_server一個(gè)函數(shù)就能完成上面3個(gè)步驟
2.進(jìn)入while循環(huán),阻塞在accept操作上,等待客戶端連接進(jìn)入。此時(shí)程序會(huì)進(jìn)入睡眠狀態(tài),直到有新的客戶端發(fā)起connect到服務(wù)器,操作系統(tǒng)會(huì)喚醒此進(jìn)程。accept函數(shù)返回客戶端連接的socket
1.利用fread讀取客戶端socket當(dāng)中的數(shù)據(jù)收到數(shù)據(jù)后服務(wù)器程序進(jìn)行處理然后使用fwrite向客戶端發(fā)送響應(yīng)。長(zhǎng)連接的服務(wù)會(huì)持續(xù)與客戶端交互,而短連接服務(wù)一般收到響應(yīng)就會(huì)close。
缺點(diǎn):
一次只能處理一個(gè)連接,不支持多個(gè)連接同時(shí)處理
每個(gè)連接進(jìn)入到我們的服務(wù)端的時(shí)候,單獨(dú)創(chuàng)建一個(gè)進(jìn)程/線程提供服務(wù)
簡(jiǎn)單實(shí)現(xiàn)
單進(jìn)程阻塞的網(wǎng)絡(luò)服務(wù)器
1 創(chuàng)建一個(gè)socket,綁定服務(wù)器端口(bind),監(jiān)聽端口(listen),在PHP中用stream_socket_server一個(gè)函數(shù)就能完成上面3個(gè)步驟
2 進(jìn)入while循環(huán),阻塞在accept操作上,等待客戶端連接進(jìn)入。此時(shí)程序會(huì)進(jìn)入睡眠狀態(tài),直到有新的客戶端發(fā)起connect到服務(wù)器,操作系統(tǒng)會(huì)喚醒此進(jìn)程。
accept函數(shù)返回客戶端連接的socket
3 利用fread讀取客戶端socket當(dāng)中的數(shù)據(jù)收到數(shù)據(jù)后服務(wù)器程序進(jìn)行處理然后使用fwrite向客戶端發(fā)送響應(yīng)。
長(zhǎng)連接的服務(wù)會(huì)持續(xù)與客戶端交互,而短連接服務(wù)一般收到響應(yīng)就會(huì)close。
缺點(diǎn):
1 一次只能處理一個(gè)連接,不支持多個(gè)連接同時(shí)處理
2 每個(gè)連接進(jìn)入到我們的服務(wù)端的時(shí)候,單獨(dú)創(chuàng)建一個(gè)進(jìn)程/線程提供服務(wù)
class Worker{
protected $socket = null;
public $onMessage = null;
public $onConnect = null;
public function __construct($socket_address)
{
//綁定地址監(jiān)聽端口
$this->socket = stream_socket_server($socket_address);
}
public function start(){
while (true){
//阻塞監(jiān)聽客戶端socket狀態(tài)如連接成功發(fā)送的消息等,調(diào)用相應(yīng)回調(diào) 直到返回
$clientSocket = stream_socket_accept($this->socket);//返回客戶端資源
if(!empty($clientSocket) && is_callable($this->onConnect)){
call_user_func($this->onConnect,$clientSocket);
}
//從連接當(dāng)中讀取客戶端的內(nèi)容
$buffer=fread($clientSocket,65535);
//正常讀取到數(shù)據(jù),觸發(fā)消息接收事件,響應(yīng)內(nèi)容
if(!empty($buffer) && is_callable($this->onMessage)){
call_user_func($this->onMessage,$clientSocket,$buffer);
}
//如果不關(guān)閉連接不能支持大點(diǎn)的并發(fā)請(qǐng)求
//fclose($clientSocket);
}
}
}
$server = new Worker('tcp://0.0.0.0:9800');
//客戶端連接成功觸發(fā)
$server->onConnect = function ($fd){
echo '連接事件觸發(fā)',(int)$fd,PHP_EOL;
};
//客戶端端發(fā)消息過來觸發(fā)
$server->onMessage = function ($conn, $message){
//事件回調(diào)當(dāng)中寫業(yè)務(wù)邏輯
//var_dump($conn,$message);
$content="我是peter";
$http_resonse = "HTTP/1.1 200 OK\r\n";
$http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
// $http_resonse .= "Connection: keep-alive\r\n"; //連接保持
$http_resonse .= "Server: php socket server\r\n";
$http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
$http_resonse .= $content;
fwrite($conn, $http_resonse);
};
$server->start(); //啟動(dòng)
結(jié)論:
一次只能處理一個(gè)連接,不支持多個(gè)連接同時(shí)處理
每個(gè)連接進(jìn)入到我們的服務(wù)端的時(shí)候,單獨(dú)創(chuàng)建一個(gè)進(jìn)程/線程提供服務(wù)
1)預(yù)派生子進(jìn)程模式 類似php-fpm
1、程序啟動(dòng)后就會(huì)創(chuàng)建N個(gè)進(jìn)程。每個(gè)子進(jìn)程進(jìn)入 Accept,等待新的連接進(jìn)入。當(dāng)客戶端連接到服務(wù)器時(shí),其中一個(gè)子進(jìn)程會(huì)被喚醒,開始處理客戶端請(qǐng)求,并且不再接受新的TCP連接。當(dāng)此連接關(guān)閉時(shí),子進(jìn)程會(huì)釋放,重新進(jìn)入 Accept,參與處理新的連接。這個(gè)模型的優(yōu)勢(shì)是完全可以復(fù)用進(jìn)程,不需要太多的上下文切換,比如php-fpm基于此模型的。
缺點(diǎn):
1.這種模型嚴(yán)重依賴進(jìn)程的數(shù)量解決并發(fā)問題,一個(gè)客戶端連接就需要占用一個(gè)進(jìn)程,工作進(jìn)程的數(shù)量有多少,并發(fā)處理能力就有多少。操作系統(tǒng)可以創(chuàng)建的進(jìn)程數(shù)量是有限的。
2、操作系統(tǒng)生成一個(gè)子進(jìn)程需要進(jìn)行內(nèi)存復(fù)制等操作,在資源和時(shí)間上會(huì)產(chǎn)生一定的開銷;當(dāng)有大量請(qǐng)求時(shí),會(huì)導(dǎo)致系統(tǒng)性能下降;
例如:即時(shí)聊天程序,一臺(tái)服務(wù)器可能要維持?jǐn)?shù)十萬(wàn)的連接,那么就要啟動(dòng)數(shù)十萬(wàn)的進(jìn)程來維持。這顯然不可能
基于上面的模式我們發(fā)現(xiàn)我們只能通過每次(accept)處理單個(gè)請(qǐng)求,沒辦法一次性處理多個(gè)請(qǐng)求?
簡(jiǎn)單實(shí)現(xiàn)如下
class Worker{
protected $socket = null;
public $onMessage = null;
public $onConnect = null;
public $workNum = 10;
public function __construct($socket_address)
{
//綁定地址監(jiān)聽端口
$this->socket = stream_socket_server($socket_address);
}
public function start() {
//獲取配置文件
$this->fork(); //用來創(chuàng)建多個(gè)助教老師,創(chuàng)建多個(gè)子進(jìn)程負(fù)責(zé)接收請(qǐng)求的
}
public function fork(){
for ($i=0;$i<$this->workNum;$i++){
$pid = pcntl_fork();//下面的代碼父子進(jìn)程都會(huì)執(zhí)行
if ($pid<0){
exit('創(chuàng)建失敗');
}elseif($pid>0){
// $status=0;
// $pid=pcntl_wait($status);這邊會(huì)阻塞等待子進(jìn)程結(jié)束后在創(chuàng)建進(jìn)程 所以放到for 后面 等待子進(jìn)程創(chuàng)建 執(zhí)行完成 回收子進(jìn)程
// echo "子進(jìn)程回收了:$pid".PHP_EOL;
}else{
$this->accept();
return; //這邊要return 否則子進(jìn)程還會(huì)創(chuàng)建子進(jìn)程 因?yàn)閒ork 在for 循環(huán)里面當(dāng)然這里在阻塞監(jiān)聽
}
}
//放在父進(jìn)程空間,結(jié)束的子進(jìn)程信息,阻塞狀態(tài)
$status=0;
$pid=pcntl_wait($status);
echo "子進(jìn)程回收了:$pid".PHP_EOL;
}
public function accept(){
//創(chuàng)建多個(gè)子進(jìn)程阻塞接收服務(wù)端socket
while (true){
$clientSocket=stream_socket_accept($this->socket); //阻塞監(jiān)聽
var_dump(posix_getpid());
//觸發(fā)事件的連接的回調(diào)
if(!empty($clientSocket) && is_callable($this->onConnect)){
call_user_func($this->onConnect,$clientSocket);
}
//從連接當(dāng)中讀取客戶端的內(nèi)容
$buffer=fread($clientSocket,65535);
//正常讀取到數(shù)據(jù),觸發(fā)消息接收事件,響應(yīng)內(nèi)容
if(!empty($buffer) && is_callable($this->onMessage)){
call_user_func($this->onMessage,$clientSocket,$buffer);
}
fclose($clientSocket); //必須關(guān)閉,子進(jìn)程不會(huì)釋放不會(huì)成功拿下進(jìn)入accpet
}
}
}
$server = new Worker('tcp://0.0.0.0:9800');
//客戶端連接成功觸發(fā)
$server->onConnect = function ($fd){
echo '連接事件觸發(fā)',(int)$fd,PHP_EOL;
};
//客戶端端發(fā)消息過來觸發(fā)
$server->onMessage = function ($conn, $message){
//事件回調(diào)當(dāng)中寫業(yè)務(wù)邏輯
//var_dump($conn,$message);
$content="我是peter";
$http_resonse = "HTTP/1.1 200 OK\r\n";
$http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
// $http_resonse .= "Connection: keep-alive\r\n"; //連接保持
$http_resonse .= "Server: php socket server\r\n";
$http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
$http_resonse .= $content;
fwrite($conn, $http_resonse);
};
$server->start(); //啟動(dòng)
3)單進(jìn)程阻塞復(fù)用的網(wǎng)絡(luò)服務(wù)器
說明:
服務(wù)監(jiān)聽流程如上
1.保存所有的socket,通過select系統(tǒng)調(diào)用,監(jiān)聽socket描述符的可讀事件
2.Select會(huì)在內(nèi)核空間監(jiān)聽一旦發(fā)現(xiàn)socket可讀,會(huì)從內(nèi)核空間傳遞至用戶空間,在用戶空間通過邏輯判斷是服務(wù)端socket可讀,還是客戶端的socket可讀
3.如果是服務(wù)端的socket可讀,說明有新的客戶端建立,將socket保留到監(jiān)聽數(shù)組當(dāng)中 【第一次建立連接服務(wù)端可讀】
1.如果是客戶端的socket可讀,說明當(dāng)前已經(jīng)可以去讀取客戶端發(fā)送過來的內(nèi)容了,讀取內(nèi)容,然后響應(yīng)給客戶端。【客戶端給服務(wù)器發(fā)送數(shù)據(jù) 客戶端可讀】
缺點(diǎn):
1.select模式本身的缺點(diǎn)(1、循環(huán)遍歷處理事件、2、內(nèi)核空間傳遞數(shù)據(jù)的消耗)
2.單進(jìn)程對(duì)于大量任務(wù)處理乏力
class Worker{
//監(jiān)聽socket
protected $socket = NULL;
//連接事件回調(diào)
public $onConnect = NULL;
//接收消息事件回調(diào)
public $onMessage = NULL;
public $workerNum=4; //子進(jìn)程個(gè)數(shù)
public $allSocket; //存放所有socket
public function __construct($socket_address) {
//監(jiān)聽地址+端口
$this->socket=stream_socket_server($socket_address);
//1、stream_set_blocking 當(dāng) socket處于阻塞模式時(shí),
//比如:網(wǎng)絡(luò)io fread系統(tǒng)調(diào)用必須等待socket有數(shù)據(jù)返回,即進(jìn)程因系統(tǒng)調(diào)用阻塞;相反若處于非阻塞模式,內(nèi)核不管socket數(shù)據(jù)有沒有準(zhǔn)備好,都會(huì)立即返回給進(jìn)程。
//2、stream_set_blocking 另外進(jìn)程阻塞和socket阻塞不是一個(gè)概念,進(jìn)程阻塞是因?yàn)橄到y(tǒng)調(diào)用所致,socket是否阻塞只是說明socket上事件是不是可以內(nèi)核即刻處理。
//1、select是系統(tǒng)調(diào)用,必然會(huì)阻塞進(jìn)程的,和socket是否阻塞并沒有關(guān)系,我第2點(diǎn)備注了呢。
//2、這里的IO就是針對(duì)socket的網(wǎng)絡(luò)IO,是否是阻塞的,正是你題示所問的問題。
//3、socket之所以設(shè)置成非阻塞,是為了同一個(gè)進(jìn)程里可以更多的處理更多的tcp連接,這正是 select、poll 或者 epoll等多路復(fù)用模型能夠處理高并發(fā)的原因所在。
stream_set_blocking($this->socket,0); //設(shè)置網(wǎng)絡(luò)io 比如 fread 非阻塞
// 0是非阻塞,1是阻塞
//阻塞的意義是什么呢?
//某個(gè)函數(shù)讀取一個(gè)網(wǎng)絡(luò)流,當(dāng)沒有未讀取字節(jié)的時(shí)候,程序該怎么辦?
//是一直等待,直到下一個(gè)未讀取的字節(jié)的出現(xiàn),還是立即告訴調(diào)用者當(dāng)前沒有新內(nèi)容?
//前者是阻塞的,后者是非阻塞的。
//
//阻塞的好處是,排除其它非正常因素,阻塞的是按順序執(zhí)行的同步的讀取。
//
//借用小說里的說法就是“神刀出鞘,無(wú)血不歸”。在讀到新內(nèi)容之前,它不會(huì)往下走,什么別的事情都不做。
//
//而非阻塞,因?yàn)椴槐氐却齼?nèi)容,所以能異步的執(zhí)行,現(xiàn)在讀到讀不到都沒關(guān)系,執(zhí)行讀取操作后立刻就繼續(xù)往下做別的事情。
$this->allSocket[(int)$this->socket]=$this->socket;
}
public function start() {
//獲取配置文件
$this->fork();
}
public function fork(){
$this->accept();//子進(jìn)程負(fù)責(zé)接收客戶端請(qǐng)求
}
public function accept(){
//創(chuàng)建多個(gè)子進(jìn)程阻塞接收服務(wù)端socket
while (true){
$write=$except=[];
//需要監(jiān)聽socket
$read=$this->allSocket;
//建議socket狀態(tài)誰(shuí)改變
// var_dump($read);
stream_select($read,$write,$except,60);//內(nèi)核遍歷循環(huán)哪些改變會(huì)阻塞 如果只有一個(gè)改變也會(huì)循環(huán)很多次的問題
//怎么區(qū)分服務(wù)端跟客戶端剛啟動(dòng)服務(wù)沒有客戶端連接進(jìn)來socket 沒有改變 當(dāng)有新的客戶端連接進(jìn)來當(dāng)前改變的是服務(wù)端所以循環(huán)read
foreach ($read as $index=>$val){
//循環(huán)每一個(gè)改變 返回響應(yīng)
//當(dāng)前發(fā)生改變的是服務(wù)端,有連接進(jìn)入
if($val === $this->socket){
$clientSocket=stream_socket_accept($this->socket); //阻塞監(jiān)聽
//觸發(fā)事件的連接的回調(diào)
if(!empty($clientSocket) && is_callable($this->onConnect)){
call_user_func($this->onConnect,$clientSocket);
}
$this->allSocket[(int)$clientSocket]=$clientSocket;//先把資源放入數(shù)組 客戶端可寫時(shí)循環(huán)響應(yīng)避免阻塞
}else{
//從連接當(dāng)中讀取客戶端的內(nèi)容
$buffer=fread($val,1024);
//如果數(shù)據(jù)為空,或者為false,不是資源類型
if(empty($buffer)){
if(feof($val) || !is_resource($val)){
//觸發(fā)關(guān)閉事件
fclose($val);
unset($this->allSocket[(int)$val]);
continue;
}
}
//正常讀取到數(shù)據(jù),觸發(fā)消息接收事件,響應(yīng)內(nèi)容
if(!empty($buffer) && is_callable($this->onMessage)){
call_user_func($this->onMessage,$val,$buffer);
}
}
}
}
}
}
$worker = new Worker('tcp://0.0.0.0:9800');
//連接事件
$worker->onConnect = function ($fd) {
//echo '連接事件觸發(fā)',(int)$fd,PHP_EOL;
};
//消息接收
$worker->onMessage = function ($conn, $message) {
//事件回調(diào)當(dāng)中寫業(yè)務(wù)邏輯
//var_dump($conn,$message);
$content="我是peter";
$http_resonse = "HTTP/1.1 200 OK\r\n";
$http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
$http_resonse .= "Connection: keep-alive\r\n"; //連接保持
$http_resonse .= "Server: php socket server\r\n";
$http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
$http_resonse .= $content;
fwrite($conn, $http_resonse);
};
$worker->start(); //啟動(dòng)
4)多進(jìn)程master-worker模型
1.master進(jìn)程,負(fù)責(zé)處理配置文件讀取,啟動(dòng),終止和維護(hù)工作(worker)進(jìn)程數(shù),當(dāng)woker進(jìn)程退出后(異常情況下),會(huì)自動(dòng)重新啟動(dòng)新的woker
2.worker進(jìn)程的主要任務(wù)是完成具體的任務(wù)邏輯,啟動(dòng)端口監(jiān)聽,接收客戶端請(qǐng)求、使用epoll接收請(qǐng)求,執(zhí)行業(yè)務(wù)邏輯然后關(guān)閉連接。
是類似于nginx和workmen采用的Reactor 多進(jìn)程的模式,具體差異表現(xiàn)為主進(jìn)程中僅僅創(chuàng)建了監(jiān)聽,并沒有創(chuàng)建 mainReactor 來“accept”連接,而是由子進(jìn)程的 Reactor 來“accept”連接,通過負(fù)載均衡,一次只有一個(gè)子進(jìn)程進(jìn)行“accept”,子進(jìn)程“accept”新連接后就放到自己的 Reactor中進(jìn)行處理,不會(huì)再分配給其他子進(jìn)程 區(qū)別swoole
epoll:
epoll是在2.6內(nèi)核中提出的,是之前的select和poll的增強(qiáng)版本。相對(duì)于select和poll來說,epoll更加靈活,沒有描述符限制,無(wú)需輪詢。epoll使用一個(gè)文件描述符管理多個(gè)描述符,將用戶關(guān)系的文件描述符的事件存放到內(nèi)核的一個(gè)事件表中。
簡(jiǎn)單點(diǎn)來說就是當(dāng)連接有I/O流事件產(chǎn)生的時(shí)候,epoll就會(huì)去告訴進(jìn)程哪個(gè)連接有I/O流事件產(chǎn)生,然后進(jìn)程就去處理這個(gè)事件
也就是執(zhí)行回調(diào)。
class Worker{
//監(jiān)聽socket
protected $socket = NULL;
//連接事件回調(diào)
public $onConnect = NULL;
//接收消息事件回調(diào)
public $onMessage = NULL;
public $workerNum=4; //子進(jìn)程個(gè)數(shù)
public $allSocket; //存放所有socket
public function __construct($socket_address) {
//監(jiān)聽地址+端口
$this->socket=stream_socket_server($socket_address);
}
public function start() {
//獲取配置文件
$this->fork();
}
public function fork(){
$this->accept();//子進(jìn)程負(fù)責(zé)接收客戶端請(qǐng)求
}
public function accept(){
//第一個(gè)需要監(jiān)聽的事件(服務(wù)端socket的事件),一旦監(jiān)聽到可讀事件之后會(huì)觸發(fā)
swoole_event_add($this->socket,function ($fd){
$clientSocket=stream_socket_accept($fd);
//觸發(fā)事件的連接的回調(diào)
if(!empty($clientSocket) && is_callable($this->onConnect)){
call_user_func($this->onConnect,$clientSocket);
}
//監(jiān)聽客戶端可讀
swoole_event_add($clientSocket,function ($fd){
//從連接當(dāng)中讀取客戶端的內(nèi)容
$buffer=fread($fd,1024);
//如果數(shù)據(jù)為空,或者為false,不是資源類型
if(empty($buffer)){
if(feof($fd) || !is_resource($fd)){
//觸發(fā)關(guān)閉事件
fclose($fd);
}
}
//正常讀取到數(shù)據(jù),觸發(fā)消息接收事件,響應(yīng)內(nèi)容
if(!empty($buffer) && is_callable($this->onMessage)){
call_user_func($this->onMessage,$fd,$buffer);
}
});
});
echo "非阻塞";
}
}
$worker = new Worker('tcp://0.0.0.0:9805');
//連接事件
$worker->onConnect = function ($fd) {
//echo '連接事件觸發(fā)',(int)$fd,PHP_EOL;
};
//消息接收
$worker->onMessage = function ($conn, $message) {
//事件回調(diào)當(dāng)中寫業(yè)務(wù)邏輯
//var_dump($conn,$message);
$content="我是peter";
$http_resonse = "HTTP/1.1 200 OK\r\n";
$http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
$http_resonse .= "Connection: keep-alive\r\n"; //連接保持
$http_resonse .= "Server: php socket server\r\n";
$http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
$http_resonse .= $content;
fwrite($conn, $http_resonse);
};
$worker->start(); //啟動(dòng)
高效的事件處理模式Reactor 模式 例如swoole
Reactor模型,Reactor顧名思義就是反應(yīng)堆的意思,它本身不處理任何數(shù)據(jù)收發(fā)。只是可以監(jiān)視一個(gè)socket句柄的事件變化。
1) 主進(jìn)程/線程往epoll內(nèi)核亊件中注冊(cè)socket上的讀就緒亊件。
2) 主進(jìn)程/線程調(diào)用epoll_wait等待socket上有數(shù)據(jù)可讀。
3) 當(dāng)socket上有數(shù)據(jù)可讀時(shí),epoll_wait通知主進(jìn)程/線程。主進(jìn)程/線程則將socket可讀事件放人請(qǐng)求隊(duì)列。
4) 睡眠在請(qǐng)求隊(duì)列上的某個(gè)工作線程被喚醒,它從socket讀取數(shù)據(jù),并處理客戶請(qǐng)求, 然后往epoll內(nèi)核事件表中注冊(cè)該socket上的寫就緒事件。
5) 主線程調(diào)用epoll_wait等待socket可寫。
6) 當(dāng)socket可寫時(shí),epoll_wait通知主進(jìn)程/線程將socket可寫亊件放人清求隊(duì)列。
7) 睡眠在請(qǐng)求隊(duì)列上的某個(gè)工作線程被喚醒,它往socket上寫人服務(wù)器處理客戶淸求
例如 swoole 中
每一個(gè)線程都有自己的用途,下面多每個(gè)線程有一個(gè)了解
1.1、MainReactor(主線程)
主線程會(huì)負(fù)責(zé)監(jiān)聽server socket,如果有新的連接accept,主線程會(huì)評(píng)估每個(gè)Reactor線程的連接數(shù)量。將此連接分配給連接數(shù)最少的reactor線程,做一個(gè)負(fù)載均衡。
1.2 、Reactor線程組
Reactor線程負(fù)責(zé)維護(hù)客戶端機(jī)器的TCP連接、處理網(wǎng)絡(luò)IO、收發(fā)數(shù)據(jù)完全是異步非阻塞的模式。
swoole的主線程在Accept新的連接后,會(huì)將這個(gè)連接分配給一個(gè)固定的Reactor線程,在socket可讀時(shí)讀取數(shù)據(jù),并進(jìn)行協(xié)議解析,將請(qǐng)求投遞到Worker進(jìn)程。在socket可寫時(shí)將數(shù)據(jù)發(fā)送給TCP客戶端。
nginx 類似 workermen
是類似于nginx采用的Reactor 多進(jìn)程的模式,具體差異表現(xiàn)為主進(jìn)程中僅僅創(chuàng)建了監(jiān)聽,并沒有創(chuàng)建 mainReactor 來“accept”連接,而是由子進(jìn)程的 Reactor 來“accept”連接,通過負(fù)載均衡,一次只有一個(gè)子進(jìn)程進(jìn)行“accept”,子進(jìn)程“accept”新連接后就放到自己的 Reactor中進(jìn)行處理,不會(huì)再分配給其他子進(jìn)程
例
class Worker{
//監(jiān)聽socket
protected $socket = NULL;
//連接事件回調(diào)
public $onConnect = NULL;
public $reusePort=1;
//接收消息事件回調(diào)
public $onMessage = NULL;
public $workerNum=3; //子進(jìn)程個(gè)數(shù)
public $allSocket; //存放所有socket
public $addr;
public function __construct($socket_address) {
//監(jiān)聽地址+端口
$this->addr=$socket_address;
}
public function start() {
//獲取配置文件
$this->fork();
}
public function fork(){
for ($i=0;$i<$this->workerNum;$i++){
$pid=pcntl_fork(); //創(chuàng)建成功會(huì)返回子進(jìn)程id
if($pid<0){
exit('創(chuàng)建失敗');
}else if($pid>0){
//父進(jìn)程空間,返回子進(jìn)程id
}else{ //返回為0子進(jìn)程空間
$this->accept();//子進(jìn)程負(fù)責(zé)接收客戶端請(qǐng)求
exit; //父進(jìn)程繼續(xù)執(zhí)行循環(huán)創(chuàng)建子進(jìn)程
}
}
//放在父進(jìn)程空間,結(jié)束的子進(jìn)程信息,阻塞狀態(tài)
$status=0;
for ($i=0;$i<$this->workerNum;$i++) {
$pid = pcntl_wait($status);
}
}
public function accept(){
$opts = array(
'socket' => array(
'backlog' =>10240, //成功建立socket連接的等待個(gè)數(shù)
),
);
$context = stream_context_create($opts);
//開啟多端口監(jiān)聽,并且實(shí)現(xiàn)負(fù)載均衡
stream_context_set_option($context,'socket','so_reuseport',1);
stream_context_set_option($context,'socket','so_reuseaddr',1);
$this->socket=stream_socket_server($this->addr,$errno,$errstr,STREAM_SERVER_BIND|STREAM_SERVER_LISTEN,$context);
//第一個(gè)需要監(jiān)聽的事件(服務(wù)端socket的事件),一旦監(jiān)聽到可讀事件之后會(huì)觸發(fā)
swoole_event_add($this->socket,function ($fd){
$clientSocket=stream_socket_accept($fd);
//觸發(fā)事件的連接的回調(diào)
if(!empty($clientSocket) && is_callable($this->onConnect)){
call_user_func($this->onConnect,$clientSocket);
}
//監(jiān)聽客戶端可讀
swoole_event_add($clientSocket,function ($fd){
//從連接當(dāng)中讀取客戶端的內(nèi)容
$buffer=fread($fd,1024);
//如果數(shù)據(jù)為空,或者為false,不是資源類型
if(empty($buffer)){
if(!is_resource($fd) || feof($fd) ){
//觸發(fā)關(guān)閉事件
fclose($fd);
}
}
//正常讀取到數(shù)據(jù),觸發(fā)消息接收事件,響應(yīng)內(nèi)容
if(!empty($buffer) && is_callable($this->onMessage)){
call_user_func($this->onMessage,$fd,$buffer);
}
});
});
}
}
$worker = new Worker('tcp://0.0.0.0:9810');
//開啟多進(jìn)程的端口監(jiān)聽
$worker->reusePort = true;
//連接事件
$worker->onConnect = function ($fd) {
//echo '連接事件觸發(fā)',(int)$fd,PHP_EOL;
};
//消息接收
$worker->onMessage = function ($conn, $message) {
//事件回調(diào)當(dāng)中寫業(yè)務(wù)邏輯
// $a=include 'index.php';
// var_dump($a);
//var_dump($conn,$message);
$content="我是peter";
$http_resonse = "HTTP/1.1 200 OK\r\n";
$http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
$http_resonse .= "Connection: keep-alive\r\n"; //連接保持
$http_resonse .= "Server: php socket server\r\n";
$http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
$http_resonse .= $content;
fwrite($conn, $http_resonse);
};
$worker->start(); //啟動(dòng)