接上一篇,为了可以同时处理多个连接,我们优化成多进程模式,并且还是预先生成进程,而不是动态创建进程.
<?php
/**
* 多进程阻塞式--一个master进程,两个worker进程.
* 其中一个进程挂掉自动启动新的
* 同时处理的连接数受限于启动的进程数
*/
class Xtgxiso_server
{
public $socket = false;
public $onConnect = null;
public $onMessage = null;
public $onClose = null;
public $process_num = 2;
private $pids = array();
function __construct($host="0.0.0.0",$port=1215){
//产生子进程分支
$pid = pcntl_fork();
if ($pid == -1) {
die("could not fork"); //pcntl_fork返回-1标明创建子进程失败
} else if ($pid) {
exit(); //父进程中pcntl_fork返回创建的子进程进程号
} else {
// 子进程pcntl_fork返回的时0
}
// 从当前终端分离
if (posix_setsid() == -1) {
die("could not detach from terminal");
}
umask(0);
$this->socket = stream_socket_server("tcp://".$host.":".$port,$errno, $errstr);
if (!$this->socket) die($errstr."--".$errno);
}
private function start_worker_process(){
$pid = pcntl_fork();
switch ($pid) {
case -1:
echo "fork error : {$i} \r\n";
exit;
case 0:
while ( 1 ) {
echo "waiting...\n";
$conn = stream_socket_accept($this->socket, -1);
if($this->onConnect) {
call_user_func($this->onConnect, $conn);
}
$receive = '';
$buffer = '';
while ( 1 ) {
$buffer = fread($conn, 3);
if($buffer === '' || $buffer === false)
{
if ( $this->onClose ){
call_user_func($this->onClose, $conn);
}
break;
}
$pos = strpos($buffer, "\n");
if($pos === false) {
$receive .= $buffer;
//echo "received:".$buffer.";not all package,continue recdiveing\n";
}else{
$receive .= trim(substr ($buffer,0,$pos+1));
$buffer = substr($buffer,$pos+1);
if($this->onMessage) {
call_user_func($this->onMessage, $conn, $receive);
}
switch ( $receive ){
case "quit":
echo "client close conn\n";
fclose($conn);
break 2;
default:
//echo "all package:\n";
//echo $receive."\n";
break;
}
$receive='';
}
}
}
default:
$this->pids[$pid] = $pid;
break;
}
}
public function run(){
for($i = 1; $i <= $this->process_num; $i++){
$this->start_worker_process();
}
while(1){
foreach ($this->pids as $i => $pid) {
if($pid) {
$res = pcntl_waitpid($pid, $status,WNOHANG);
if ( $res == -1 || $res > 0 ){
$this->start_worker_process();
unset($this->pids[$pid]);
}
}
}
sleep(1);
}
}
function __destruct() {
@fclose($this->socket);
}
}
$server = new Xtgxiso_server();
$server->onConnect = function($conn){
echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
fwrite($conn,"conn success\n");
};
$server->onMessage = function($conn,$msg){
echo "onMessage --" . $msg . "\n";
fwrite($conn,"received ".$msg."\n");
};
$server->onClose = function($conn){
echo "onClose --" . stream_socket_get_name($conn,true) . "\n";
fwrite($conn,"onClose "."\n");
};
$server->run();
这样,我们就实现了一个多进程的socket服务,同时处理的连接跟启动时配置的进程数有关,如果其中一个进程死掉,会自动启动新的!
