利用openresty来优化php-fpm的长轮询

    对于长轮询场景是比较常见的,客户端同步的不断的在同步请求,而服务端根据请求来决定是否响应或者hold请求以继续检查数据.

    由于php-fpm是单进程单线程的,一个进程同时只能处理一个请求,所以对于这种长轮询的场景,需要启动特别多的php-fpm进程来应对高的并发,这是特别浪费资源的,因为大部分请求都是hold在服务器上。

    现在我们用openresty来轮询php-fpm,php-fpm会很快的处理完成,不会hold住,减少了php-fpm的压力,配置文件如下:

location ~ /longpolling(/.*) {
                content_by_lua_block {
                        local request_uri_without_args = ngx.re.sub(ngx.var.request_uri, "\\?.*", "")
                        request_uri_without_args = ngx.re.gsub(request_uri_without_args,"/longpolling","")
                        local url = {request_uri_without_args}
                        local args = ngx.req.get_uri_args()
                        local query = ngx.encode_args(args)
                        if query ~= "" then
                                url[#url + 1] = "?"
                                url[#url + 1] = query
                        end
                        url = table.concat(url)
                        for i = 1, 3 do
                                local sleep = 1
                                local res = ngx.location.capture(url)
                                for k, v in pairs(res.header) do
                                        if type(v) == "table" then
                                                if k == 'sleep' and v == 0 then
                                                        ngx.header[k] = table.concat(v, ",")
                                                        sleep = table.concat(v, ",")
                                                end
                                        else
                                                if k == "sleep" and v == "0" then
                                                        ngx.header[k] = v
                                                        sleep = v
                                               end
                                        end
                                end
                                if sleep == 1 then
                                        ngx.sleep(1)
                                else
                                        ngx.say(res.body)
                                        break
                                end

                        end
                }
        }

    

    将url中以”longpolling”开始的请求转换为长轮询.比如:/longpolling/aa 实际访问是/aa. 长轮询总共时间是3秒,每次停止一秒,根据php-fpm响应头sleep来决定是否输出还是继续轮询.PHP代码如下.

  

<?php
header('Content-type: application/json');
$time = $_GET["time"];
$data = get_new_data($time);
if ( $data ){
        header('sleep:0');
        echo json_encode(array("data"=>$data));
}else{
        echo json_encode(array("data"=>""));
}

 这样我们就做了一个相对通用的仅支持get请求的优化接口。  
继续阅读

发表在 好文推荐, 网站开发, 网站架构 | 标签为 | 14 条评论

socket服务的模型以及实现(7)–总结

    首先,我们来熟悉一下基本知识,再回顾前几篇文章!

   1:阻塞/非阻塞–这两个概念是针对 IO 过程中进程的状态来说的

        阻塞 IO 是指调用结果返回之前,当前线程会被挂起
        非阻塞指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回

   2:同步/异步–这两个概念是针对调用返回结果来说的

        同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回
        当一个异步过程调用发出后,调用者不能立刻得到结果,实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者

   3:多路复用(IO/Multiplexing)

        为了提高数据信息在网络通信线路中传输的效率,在一条物理通信线路上建立多条逻辑通信信道,同时传输若干路信号的技术就叫做多路复用技术。
        对于 Socket 来说,应该说能同时处理多个连接的模型都应该被称为多路复用,目前比较常用的有 select/poll/epoll/kqueue 这些 IO 模型.

    4:惊群效应

        惊群问题是由于系统中有多个进程在等待同一个资源,当资源可用的时候,系统会唤醒所有或部分处于休眠状态的进程去争抢资源,但是最终只会有一个进程能够成功的响应请求并获得资源,但在这个过程中由于系统要对全部的进程唤醒,导致了需要对这些进程进行不必要的切换,从而会产生系统资源的浪费.

        更多资料可参考如下链接

        http://wenda.workerman.net/?/question/179

        https://www.nginx.com/blog/socket-sharding-nginx-release-1-9-1/

    我们接着回顾下前几篇文章的思绪

        socket服务的模型以及实现(1)–单进程阻塞模型

           单进程阻塞模型,一次只能处理一个连接,效率极低!

        socket服务的模型以及实现(2)–多进程阻塞模型

            多进程阻塞模型,一个进程处理一个连接!

        socket服务的模型以及实现(3)–单进程IO复用select

           单进程IO复用(select),同时处理的连接数不受限于进程数!

        socket服务的模型以及实现(4)–单进程IO复用libevent

           单进程IO复用(libevent),同时处理的连接数比select要好很多!

        socket服务的模型以及实现(5)–多进程IO复用libevent

           多进程IO复用(libevent),可以更好的利用多核!

        socket服务的模型以及实现(6)–多进程IO复用libevent–端口复用技术

          增加了端口复用技术,性能更好!

     通过几上几篇文章,对于我们用PHP写个socket服务有了基本的服务模型认识!

   

发表在 好文推荐, 网站开发, 网站架构 | 标签为 | 25 条评论

socket服务的模型以及实现(6)–多进程IO复用libevent–端口复用技术

端口复用技术,这样就可以很好的解决惊群问题和stream_socket_server性能瓶颈的问题.

<?php
/**
 * 多进程IO复用libevent
 * 同时处理多个连接
 * 端口复用---建议php7
 */
class Xtgxiso_server
{
    public $socket = false;
    public $master = array();
    public $onConnect = null;
    public $onMessage = null;
    public $onClose = null;
    public $process_num = 2;
    private $pids = array();
    public $receive = array();
    private  $host='127.0.0.1';
    private $port = 1215;

    function __construct($host="0.0.0.0",$port=1215){
        //产生子进程分支
        $pid = pcntl_fork();
        if ($pid == -1) {
            die("could not fork"); //pcntl_fork返回-1标明创建子进程失败
        } else if ($pid) {
            exit(); //父进程中pcntl_fork返回创建的子进程进程号
        } else {
            // 子进程pcntl_fork返回的时0
        }
        // 从当前终端分离
        if (posix_setsid() == -1) {
            die("could not detach from terminal");
        }
        umask(0);
        $this->host = $host;
        $this->port = $port;
    }

    private function start_worker_process(){
        $pid = pcntl_fork();
        switch ($pid) {
            case -1:
                echo "fork error : {$i} \r\n";
                exit;
            case 0:
                $context_option['socket']['so_reuseport'] = 1;
                $context = stream_context_create($context_option);
                $this->socket = stream_socket_server("tcp://".$this->host.":".$this->port, $errno, $errstr,STREAM_SERVER_BIND | STREAM_SERVER_LISTEN,$context);
                if (!$this->socket) die($errstr."--".$errno);
                stream_set_blocking($this->socket,0);
                $id = (int)$this->socket;
                $this->master[$id] = $this->socket;
                $base = event_base_new();
                $event = event_new();
                event_set($event, $this->socket, EV_READ | EV_PERSIST, array(__CLASS__, 'ev_accept'), $base);
                event_base_set($event, $base);
                event_add($event);
                echo   posix_getpid()." start run...\n";
                event_base_loop($base);
            default:
                $this->pids[$pid] = $pid;
                break;
        }
    }

    public function run(){

        for($i = 1; $i <= $this->process_num; $i++){
            $this->start_worker_process();
        }

        while(1){
            foreach ($this->pids as $i => $pid) {
                if($pid) {
                    $res = pcntl_waitpid($pid, $status,WNOHANG);

                    if ( $res == -1 || $res > 0 ){
                        $this->start_worker_process();
                        unset($this->pids[$pid]);
                    }
                }
            }
            sleep(1);
        }
    }

    public function ev_accept($socket, $flag, $base){
        $connection = @stream_socket_accept($socket);
        echo posix_getpid()." -- accepted " . stream_socket_get_name($connection,true) . "\n";
        if ( !$connection ){
            return;
        }
        stream_set_blocking($connection, 0);
        $id = (int)$connection;
        if($this->onConnect) {
            call_user_func($this->onConnect, $connection);
        }
        $buffer = event_buffer_new($connection, array(__CLASS__, 'ev_read'), array(__CLASS__, 'ev_write'), array(__CLASS__, 'ev_error'), $id);
        event_buffer_base_set($buffer, $base);
        event_buffer_timeout_set($buffer, 30, 30);
        event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
        event_buffer_priority_set($buffer, 10);
        event_buffer_enable($buffer, EV_READ | EV_PERSIST);
        $this->master[$id] = $connection;
        $this->buffer[$id] = $buffer;
        $this->receive[$id] = '';
    }

    function ev_read($buffer, $id)
    {
        while( 1 ) {
            $read = event_buffer_read($buffer, 3);
            if($read === '' || $read === false)
            {
                break;
            }
            $pos = strpos($read, "\n");
            if($pos === false)
            {
                $this->receive[$id] .= $read;
                //echo "received:".$read.";not all package,continue recdiveing\n";
            }else{
                $this->receive[$id] .= trim(substr ($read,0,$pos+1));
                $read = substr($read,$pos+1);
                if($this->onMessage)
                {
                    call_user_func($this->onMessage,$this->master[$id],$this->receive[$id]);
                }
                switch ( $this->receive[$id] ){
                    case "quit":
                        echo "client close conn\n";
                        if($this->onClose) {
                            call_user_func($this->onClose, $this->master[$id]);
                        }
                        fclose($this->master[$id]);
                        break;
                    default:
                        //echo "all package:\n";
                        //echo $this->receive[$id]."\n";
                        break;
                }
                $this->receive[$id]='';
            }
        }
    }

    function ev_write($buffer, $id)
    {
        echo "$id -- " ."\n";
    }

    function ev_error($buffer, $error, $id)
    {
        echo "ev_error - ".$error."\n";
    }

}
$server =  new Xtgxiso_server();

$server->onConnect = function($conn){
    echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
    fwrite($conn,"conn success\n");
};

$server->onMessage = function($conn,$msg){
    echo "onMessage --" . $msg . "\n";
    fwrite($conn,"received ".$msg."\n");
};

$server->onClose = function($conn){
    echo "onClose --" . stream_socket_get_name($conn,true) . "\n";
};

$server->run();

   经过多次服务模型的演变,基本我们实现了一个高性能的服务模型!

  

发表在 好文推荐, 网站开发, 网站架构 | 标签为 | 31 条评论

socket服务的模型以及实现(5)–多进程IO复用libevent

在利用了IO复用的情况下,再结合多进程,实现性能更好的服务.

<?php
/**
 * 多进程IO复用libevent
 * 同时处理多个连接
 * 惊群效应没处理
 */
class Xtgxiso_server
{
    public $socket = false;
    public $master = array();
    public $onConnect = null;
    public $onMessage = null;
    public $onClose = null;
    public $process_num = 2;
    private $pids = array();
    public $receive = array();

    function __construct($host="0.0.0.0",$port=1215){
        //产生子进程分支
        $pid = pcntl_fork();
        if ($pid == -1) {
            die("could not fork"); //pcntl_fork返回-1标明创建子进程失败
        } else if ($pid) {
            exit(); //父进程中pcntl_fork返回创建的子进程进程号
        } else {
            // 子进程pcntl_fork返回的时0
        }
        // 从当前终端分离
        if (posix_setsid() == -1) {
            die("could not detach from terminal");
        }
        umask(0);
        $this->socket = stream_socket_server("tcp://".$host.":".$port, $errno, $errstr);
        if (!$this->socket) die($errstr."--".$errno);
        stream_set_blocking($this->socket,0);
        $id = (int)$this->socket;
        $this->master[$id] = $this->socket;
    }

    private function start_worker_process(){
        $pid = pcntl_fork();
        switch ($pid) {
            case -1:
                echo "fork error : {$i} \r\n";
                exit;
            case 0:
                $base = event_base_new();
                $event = event_new();
                event_set($event, $this->socket, EV_READ | EV_PERSIST, array(__CLASS__, 'ev_accept'), $base);
                event_base_set($event, $base);
                event_add($event);
                echo   posix_getpid()." start run...\n";
                event_base_loop($base);
            default:
                $this->pids[$pid] = $pid;
                break;
        }
    }

    public function run(){

        for($i = 1; $i <= $this->process_num; $i++){
            $this->start_worker_process();
        }

        while(1){
            foreach ($this->pids as $i => $pid) {
                if($pid) {
                    $res = pcntl_waitpid($pid, $status,WNOHANG);

                    if ( $res == -1 || $res > 0 ){
                        $this->start_worker_process();
                        unset($this->pids[$pid]);
                    }
                }
            }
            sleep(1);
        }
    }

    public function ev_accept($socket, $flag, $base){
        $connection = @stream_socket_accept($socket);
        echo posix_getpid()." -- accepted " . stream_socket_get_name($connection,true) . "\n";
        if ( !$connection ){//惊群效应
            return;
        }
        stream_set_blocking($connection, 0);
        $id = (int)$connection;
        if($this->onConnect) {
            call_user_func($this->onConnect, $connection);
        }
        $buffer = event_buffer_new($connection, array(__CLASS__, 'ev_read'), array(__CLASS__, 'ev_write'), array(__CLASS__, 'ev_error'), $id);
        event_buffer_base_set($buffer, $base);
        event_buffer_timeout_set($buffer, 30, 30);
        event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
        event_buffer_priority_set($buffer, 10);
        event_buffer_enable($buffer, EV_READ | EV_PERSIST);
        $this->master[$id] = $connection;
        $this->buffer[$id] = $buffer;
        $this->receive[$id] = '';
    }

    function ev_read($buffer, $id)
    {
        while( 1 ) {
            $read = event_buffer_read($buffer, 3);
            if($read === '' || $read === false)
            {
                break;
            }
            $pos = strpos($read, "\n");
            if($pos === false)
            {
                $this->receive[$id] .= $read;
                //echo "received:".$read.";not all package,continue recdiveing\n";
            }else{
                $this->receive[$id] .= trim(substr ($read,0,$pos+1));
                $read = substr($read,$pos+1);
                if($this->onMessage)
                {
                    call_user_func($this->onMessage,$this->master[$id],$this->receive[$id]);
                }
                switch ( $this->receive[$id] ){
                    case "quit":
                        echo "client close conn\n";
                        if($this->onClose) {
                            call_user_func($this->onClose, $this->master[$id]);
                        }
                        fclose($this->master[$id]);
                        break;
                    default:
                        //echo "all package:\n";
                        //echo $this->receive[$id]."\n";
                        break;
                }
                $this->receive[$id]='';
            }
        }
    }

    function ev_write($buffer, $id)
    {
        echo "$id -- " ."\n";
    }

    function ev_error($buffer, $error, $id)
    {
        echo "ev_error - ".$error."\n";
    }

}
$server =  new Xtgxiso_server();

$server->onConnect = function($conn){
    echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
    fwrite($conn,"conn success\n");
};

$server->onMessage = function($conn,$msg){
    echo "onMessage --" . $msg . "\n";
    fwrite($conn,"received ".$msg."\n");
};

$server->onClose = function($conn){
    echo "onClose --" . stream_socket_get_name($conn,true) . "\n";
};

$server->run();

在多进程的情况下,多个进程都在accep会有惊群效应的情况出现,性能瓶颈在stream_socket_server上面,这个会在下一篇介绍!

发表在 好文推荐, 网站开发, 网站架构 | 标签为 | 24 条评论

socket服务的模型以及实现(4)–单进程IO复用libevent

利用系统select提供的IO复用技术,会有性能问题,这次我们采用比较好的libevent库.

<?php
/**
 * 单进程IO复用libevent
 * 同时处理多个连接
 */
class Xtgxiso_server
{
    public $socket = false;
    public $master = array();
    public $onConnect = null;
    public $onMessage = null;
    public $onClose = null;
    public $receive = array();

    function __construct($host="0.0.0.0",$port=1215)
    {
        if (!extension_loaded('libevent')) {
            die("Please install libevent extension \n");
        }
        $this->socket = stream_socket_server("tcp://".$host.":".$port,$errno, $errstr);
        if (!$this->socket) die($errstr."--".$errno);
        stream_set_blocking($this->socket,0);
        $id = (int)$this->socket;
        $this->master[$id] = $this->socket;
    }

    public function run()
    {
        $base = event_base_new();
        $event = event_new();
        event_set($event, $this->socket, EV_READ | EV_PERSIST, array(__CLASS__, 'ev_accept'), $base);
        event_base_set($event, $base);
        event_add($event);
        echo  "start run...\n";
        event_base_loop($base);
    }

    public function ev_accept($socket, $flag, $base){
        $connection = stream_socket_accept($socket);
        stream_set_blocking($connection, 0);
        $id = (int)$connection;
        if($this->onConnect) {
            call_user_func($this->onConnect, $connection);
        }
        $buffer = event_buffer_new($connection, array(__CLASS__, 'ev_read'), array(__CLASS__, 'ev_write'), array(__CLASS__, 'ev_error'), $id);
        event_buffer_base_set($buffer, $base);
        event_buffer_timeout_set($buffer, 30, 30);
        event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
        event_buffer_priority_set($buffer, 10);
        event_buffer_enable($buffer, EV_READ | EV_PERSIST);
        $this->master[$id] = $connection;
        $this->buffer[$id] = $buffer;
        $this->receive[$id] = '';
    }

    function ev_read($buffer, $id)
    {
        while( 1 ) {
            $read = event_buffer_read($buffer, 3);
            if($read === '' || $read === false)
            {
                break;
            }
            $pos = strpos($read, "\n");
            if($pos === false)
            {
                $this->receive[$id] .= $read;
                //echo "received:".$read.";not all package,continue recdiveing\n";
            }else{
                $this->receive[$id] .= trim(substr ($read,0,$pos+1));
                $read = substr($read,$pos+1);
                if($this->onMessage)
                {
                    call_user_func($this->onMessage,$this->master[$id],$this->receive[$id]);
                }
                switch ( $this->receive[$id] ){
                    case "quit":
                        //echo "client close conn\n";
                        if($this->onClose) {
                            call_user_func($this->onClose, $this->master[$id]);
                        }
                        fclose($this->master[$id]);
                        break;
                    default:
                        //echo "all package:\n";
                        //echo $this->receive[$id]."\n";
                        break;
                }
                $this->receive[$id]='';
            }
        }
    }

    function ev_write($buffer, $id)
    {
        echo "$id -- " ."\n";
    }

    function ev_error($buffer, $error, $id)
    {
        echo "ev_error - ".$error."\n";
    }

}


$server =  new Xtgxiso_server();

$server->onConnect = function($conn){
    echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
    fwrite($conn,"conn success\n");
};

$server->onMessage = function($conn,$msg){
    echo "onMessage --" . $msg . "\n";
    fwrite($conn,"received ".$msg."\n");
};

$server->onClose = function($conn){
    echo "onClose --" . stream_socket_get_name($conn,true) . "\n";
};

$server->run();

这样的方式,相对来说是一个比较好的socket下的IO复用实现.

发表在 好文推荐, 福利分享, 网站开发, 网站架构 | 标签为 | 22 条评论

socket服务的模型以及实现(3)–单进程IO复用select

利用多进程来提高并发连接数并不理想,进程消耗太大了,这次我们利用IO复用技术来实现.

<?php
/**
 * 单进程IO复用select
 * 同时处理多个连接
 */
class Xtgxiso_server
{
    public $socket = false;
    public $master = array();
    public $onConnect = null;
    public $onMessage = null;
    public $onClose = null;

    function __construct($host="0.0.0.0",$port=1215)
    {
        $this->socket = stream_socket_server("tcp://".$host.":".$port,$errno, $errstr);
        if (!$this->socket) die($errstr."--".$errno);
        stream_set_blocking($this->socket,0);
        $id = (int)$this->socket;
        $this->master[$id] = $this->socket;
    }

    public function run(){
        $read = $this->master;
        $receive = array();
        echo  "start run...\n";
        while ( 1 ) {
            $read = $this->master;
            //echo  "waiting...\n";
            $mod_fd = @stream_select($read, $_w = NULL, $_e = NULL, 60);
            if ($mod_fd === FALSE) {
                break;
            }
            foreach ( $read as $k => $v ) {
                if ( $v === $this->socket ) {
                    //echo "new conn\n";
                    $conn = stream_socket_accept($this->socket);
                    if($this->onConnect) {
                        call_user_func($this->onConnect, $conn);
                    }
                    $id = (int)$conn;
                    $this->master[$id] = $conn;
                } else {
                    //echo "read data\n";
                    if ( !isset($receive[$k]) ){
                        $receive[$k]="";
                    }
                    $buffer = fread($v, 10);
                    //echo $buffer."\n";
                    if ( strlen($buffer) === 0 ) {
                        if ( $this->onClose ){
                            call_user_func($this->onClose,$v);
                        }
                        fclose($v);
                        $id = (int)$v;
                        unset($this->master[$id]);
                    } else if ( $buffer === FALSE ) {
                        if ( $this->onClose ){
                            call_user_func($this->onClose, $this->master[$key_to_del]);
                        }
                        fclose($v);
                        $id = (int)$v;
                        unset($this->master[$id]);
                    } else {
                        $pos = strpos($buffer, "\n");
                        if ( $pos === false) {
                            $receive[$k] .= $buffer;
                            //echo "received:".$buffer.";not all package,continue recdiveing\n";
                        }else{
                            $receive[$k] .= trim(substr ($buffer,0,$pos+1));
                            $buffer = substr($buffer,$pos+1);
                            if($this->onMessage) {
                                call_user_func($this->onMessage,$v,$receive[$k]);
                            }
                            switch ( $receive[$k] ){
                                case "quit":
                                    echo "client close conn\n";
                                    fclose($v);
                                    $id = (int)$v;
                                    unset($this->master[$id]);
                                    break;
                                default:
                                    //echo "all package:\n";
                                    //echo $receive[$k]."\n";
                                    break;
                            }
                            $receive[$k]='';
                        }
                    }
                }
            }
            usleep(10000);
        }
    }
}
$server =  new Xtgxiso_server();

$server->onConnect = function($conn){
    echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
    fwrite($conn,"conn success\n");
};

$server->onMessage = function($conn,$msg){
    echo "onMessage --" . $msg . "\n";
    fwrite($conn,"received ".$msg."\n");
};

$server->onClose = function($conn){
    echo "onClose --" . stream_socket_get_name($conn,true) . "\n";
    fwrite($conn,"onClose "."\n");
};

$server->run();

这样我们同时处理的连接数就不受限于进程数了,相对于靠多进程来提高连接数的方式明显好了很多.

发表在 好文推荐, 网站开发, 网站架构 | 标签为 | 29 条评论

socket服务的模型以及实现(2)–多进程阻塞模型

接上一篇,为了可以同时处理多个连接,我们优化成多进程模式,并且还是预先生成进程,而不是动态创建进程.

<?php
/**
 * 多进程阻塞式--一个master进程,两个worker进程.
 * 其中一个进程挂掉自动启动新的
 * 同时处理的连接数受限于启动的进程数
 */
class Xtgxiso_server
{
    public $socket = false;
    public $onConnect = null;
    public $onMessage = null;
    public $onClose = null;
    public $process_num = 2;
    private $pids = array();

    function __construct($host="0.0.0.0",$port=1215){
        //产生子进程分支
        $pid = pcntl_fork();
        if ($pid == -1) {
            die("could not fork"); //pcntl_fork返回-1标明创建子进程失败
        } else if ($pid) {
            exit(); //父进程中pcntl_fork返回创建的子进程进程号
        } else {
            // 子进程pcntl_fork返回的时0
        }
        // 从当前终端分离
        if (posix_setsid() == -1) {
            die("could not detach from terminal");
        }
        umask(0);
        $this->socket = stream_socket_server("tcp://".$host.":".$port,$errno, $errstr);
        if (!$this->socket) die($errstr."--".$errno);
    }

    private function start_worker_process(){
        $pid = pcntl_fork();
        switch ($pid) {
            case -1:
                echo "fork error : {$i} \r\n";
                exit;
            case 0:
                while ( 1 ) {
                    echo  "waiting...\n";
                    $conn = stream_socket_accept($this->socket, -1);
                    if($this->onConnect) {
                        call_user_func($this->onConnect, $conn);
                    }
                    $receive = '';
                    $buffer = '';
                    while ( 1 ) {
                        $buffer = fread($conn, 3);
                        if($buffer === '' || $buffer === false)
                        {
                            if ( $this->onClose ){
                                call_user_func($this->onClose, $conn);
                            }
                            break;
                        }
                        $pos = strpos($buffer, "\n");
                        if($pos === false) {
                            $receive .= $buffer;
                            //echo "received:".$buffer.";not all package,continue recdiveing\n";
                        }else{
                            $receive .= trim(substr ($buffer,0,$pos+1));
                            $buffer = substr($buffer,$pos+1);
                            if($this->onMessage) {
                                call_user_func($this->onMessage, $conn, $receive);
                            }
                            switch ( $receive ){
                                case "quit":
                                    echo "client close conn\n";
                                    fclose($conn);
                                    break 2;
                                default:
                                    //echo "all package:\n";
                                    //echo $receive."\n";
                                    break;
                            }
                            $receive='';
                        }
                    }
                }
            default:
                $this->pids[$pid] = $pid;
                break;
        }
    }

    public function run(){

        for($i = 1; $i <= $this->process_num; $i++){
            $this->start_worker_process();
        }

        while(1){
            foreach ($this->pids as $i => $pid) {
                if($pid) {
                    $res = pcntl_waitpid($pid, $status,WNOHANG);

                    if ( $res == -1 || $res > 0 ){
                        $this->start_worker_process();
                        unset($this->pids[$pid]);
                    }
                }
            }
            sleep(1);
        }
    }

    function __destruct() {
        @fclose($this->socket);
    }

}
$server =  new Xtgxiso_server();

$server->onConnect = function($conn){
    echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
    fwrite($conn,"conn success\n");
};

$server->onMessage = function($conn,$msg){
    echo "onMessage --" . $msg . "\n";
    fwrite($conn,"received ".$msg."\n");
};

$server->onClose = function($conn){
    echo "onClose --" . stream_socket_get_name($conn,true) . "\n";
    fwrite($conn,"onClose "."\n");
};

$server->run();

这样,我们就实现了一个多进程的socket服务,同时处理的连接跟启动时配置的进程数有关,如果其中一个进程死掉,会自动启动新的!

发表在 好文推荐, 网站开发, 网站架构 | 标签为 | 29 条评论

socket服务的模型以及实现(1)–单进程阻塞模型

这里我们只以tcp服务举例,协议以换行符举例.我们先看最简单的单进程阻塞模型

首先一个socket服务的代码如下:

<?php
/**
 * 单进程阻塞式--同时只能处理一个连接
 */
class Xtgxiso_server
{
    public $socket = false;
    public $onConnect = null;
    public $onMessage = null;
    public $onClose = null;

    function __construct($host="0.0.0.0",$port=1215)
    {
        $this->socket = stream_socket_server("tcp://".$host.":".$port,$errno, $errstr);
        if (!$this->socket) die($errstr."--".$errno);
    }

    public function run(){
        while ( 1 ) {
            echo  "waiting...\n";
            $conn = stream_socket_accept($this->socket, -1);
            if ( !$conn ){
                continue;
            }
            if($this->onConnect)
            {
                call_user_func($this->onConnect, $conn);
            }
            $receive = '';
            $buffer = '';
            while ( 1 ) {
                $buffer = fread($conn, 3);
                if($buffer === '' || $buffer === false)
                {
                    if ( $this->onClose ){
                        call_user_func($this->onClose, $conn);
                    }
                    break;
                }
                $pos = strpos($buffer, "\n");
                if($pos === false)
                {
                    $receive .= $buffer;
                    //echo "received:".$buffer.";not all package,continue recdiveing\n";
                }else{
                    $receive .= trim(substr ($buffer,0,$pos+1));
                    $buffer = substr($buffer,$pos+1);
                    if($this->onMessage)
                    {
                        call_user_func($this->onMessage, $conn, $receive);
                    }
                    switch ( $receive ){
                        case "quit":
                            echo "client close conn\n";
                            fclose($conn);
                            break 2;
                        default:
                            //echo "all package:\n";
                            //echo $receive."\n";
                            break;
                    }
                    $receive='';
                }
            }
        }
        fclose($socket);
    }
}
$server =  new Xtgxiso_server();

$server->onConnect = function($conn){
    echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
    fwrite($conn,"conn success\n");
};

$server->onMessage = function($conn,$msg){
    echo "onMessage --" . $msg . "\n";
    fwrite($conn,"received ".$msg."\n");
};

$server->onClose = function($conn){
    echo "onClose --" . stream_socket_get_name($conn,true) . "\n";
    fwrite($conn,"onClose "."\n");
};

$server->run();

大家可以用telnet命令来测试,这也是用换行符协议的原因,方便测试。可以很明显的看到,这个服务同时只能处理一个连接,肯定不会用人用这种方式来实现socket服务的

发表在 好文推荐, 网站开发, 网站架构 | 标签为 | 24 条评论

搭建高可用搜索服务(2)–安装haproxy代理coreseek服务

1:在安装有coreseek服务的主机上添加监控sphinx状态的服务

安装xineed服务

yum install xinetd

添加端口

echo "sphinxcheck     9322/tcp                # sphinx status check" >> /etc/services 
使用xinetd守护进程运行sphinx状态检测
vim /etc/xinetd.d/sphinxchk
service sphinxcheck
{
        disable         = no
        flags           = REUSE
        socket_type     = stream
        port            = 9322
        wait            = no
        user            = root
        server          = /usr/local/coreseek4.1/shell/sphinxchk_status.sh log_on_failure  += USERID
        only_from       = 192.168.1.126
}

新建状态监控shell

vim /usr/local/coreseek4.1/shell/sphinxchk_status.sh
#!/bin/bash
ERROR_MSG=`ps -aef|grep coreseek4.1|grep -v grep |wc -l`

if [ "$ERROR_MSG" != "0" ]
then
        # sphinx is fine, return http 200
        /bin/echo -e "HTTP/1.1 200 OK\r\n"
        /bin/echo -e "Content-Type: Content-Type: text/plain\r\n"
        /bin/echo -e "\r\n"
        /bin/echo -e "sphinx is running.\r\n"
        /bin/echo -e "\r\n"
else
        # sphinx is fine, return http 503
        /bin/echo -e "HTTP/1.1 503 Service Unavailable\r\n"
        /bin/echo -e "Content-Type: Content-Type: text/plain\r\n"
        /bin/echo -e "\r\n"
        /bin/echo -e "sphinx is *down*.\r\n"
        /bin/echo -e "\r\n"
fi

重启下xinetd服务

service xinetd restart

2:安装并配置haproxy

安装

yum -y install haproxy

配置文件/etc/haproxy/haproxy.cfg

#全局配置参数,进程级的,用来控制Haproxy启动前的一些进程及系统设置
global
    log         127.0.0.1 local2 #local2在/etc/rsyslog.conf中有一行local2.*    /var/log/haproxy.log
    chroot      /var/lib/haproxy
    pidfile     /var/run/haproxy.pid
    user        haproxy
    group       haproxy
    daemon
    maxconn     4096
    stats socket /var/lib/haproxy/stats

#配置一些默认的参数,可以被frontend,backend,listen段继承使用
defaults
    mode                    http
    log                     global
    option                  dontlognull
    option                  log-health-checks //记录健康检测日志
    option                  redispatch //在连接失败或断开情况下,允许当前会话被重新分发
    retries                 3
    timeout http-request    10s
    timeout queue           1m
    timeout connect         10s
    timeout client          1m
    timeout server          1m
    timeout http-keep-alive 10s
    timeout check           10s
    maxconn                 100000

#统计信息
listen  haproxy-status 0.0.0.0:8888
    mode          http
    option        httplog
    stats uri     /status  //统计页面url
    stats realm   Global\ statistics
    stats auth    test:123456 //登录用户和密码


#监听
listen  proxy-sphinx 192.168.1.123:9312
        mode tcp
        balance leastconn
        option tcpka
        option httpchk OPTIONS * HTTP/1.1\r\nHost:\ www
        server sphinx1  192.168.1.124:9312 weight 1 check port 9322 inter 1s rise 2 fall 2
        server sphinx2  192.168.1.125:9312 weight 1 check port 9322 inter 1s rise 2 fall 2

启动haproxy服务

service haproxy restart

访问http://192.168.1.123:8888/status 如看到如下类似界面,则表示成功了!

发表在 好文推荐, 网站开发, 网站架构 | 标签为 , , , | 20 条评论

搭建高可用搜索服务(1)–安装coreseek服务

1:首先安装coreseek

wget http://www.coreseek.cn/uploads/csft/4.0/coreseek-4.1-beta.tar.gz
tar -zxvf coreseek-4.1-beta.tar.gz
yum -y install gcc gcc-c++ libtool autoconf automake imake mysql-devel libxml2-devel expat-devel
cd coreseek-4.1-beta
cd mmseg-3.2.14
./bootstrap
./configure --prefix=/usr/local/mmseg3.2.14
make
make install
cd ..

cd csft-4.1
sh buildconf.sh
#如果sh buildconf.sh最后没有生成configure脚本,且提示automake: warnings are treated as errors,可以将configure.ac中的这行
AM_INIT_AUTOMAKE([-Wall -Werror foreign])
修改为
AM_INIT_AUTOMAKE([-Wall foreign])

./configure --prefix=/usr/local/coreseek4.1 --without-unixodbc --with-mmseg --with-mmseg-includes=/usr/local/mmseg3.2.14/include/mmseg/ --with-mmseg-libs=/usr/local/mmseg3.2.14/lib/ --with-mysql
make
make install

常见错误文章见:https://www.mawenbao.com/note/sphinx-coreseek-summary.html

2:索引配置文件大致如下:

source test
{
    type                    = mysql
    sql_host                = 127.0.0.1
    sql_user                = test
    sql_pass                = 123456
    sql_db                  = test
    sql_port                = 3306
    sql_query_pre           = SET NAMES utf8
    sql_query_pre           = REPLACE INTO sphinx_counter SELECT 1, MAX(id) FROM news
    sql_query_range         = SELECT MIN(id),MAX(id) FROM news
    sql_range_step          = 10000
    sql_ranged_throttle     = 1000
    sql_query               = SELECT id,author_id,title,content FROM news where  id>=$start AND id<=$end
    sql_attr_uint           = author_id
    sql_query_info_pre      = SET NAMES utf8
    sql_query_info          = SELECT * FROM news  WHERE id=$id
}

source extend : test
{
    sql_query_pre = SET NAMES utf8
    sql_query_range =
    sql_range_step = 0
    sql_query = SELECT id,author_id,title,content FROM news WHERE id > ( SELECT max_doc_id FROM sphinx_counter WHERE counter_id=2 )
}



index test
{
    source              = test
    path                = /usr/local/coreseek4.1/data/test
    docinfo             = extern
    mlock               = 0
    morphology          = none
    min_word_len        = 1
    html_strip          = 0

    charset_dictpath = /usr/local/mmseg3.2.14/etc/
    charset_type        = zh_cn.utf-8
}

index extend : test
{
    source              = extend
    path                = /usr/local/coreseek4.1/data/extend
    docinfo             = extern
    mlock               = 0
    morphology          = none
    min_word_len        = 1
    html_strip          = 0

    charset_dictpath = /usr/local/mmseg3.2.14/etc/
    charset_type        = zh_cn.utf-8
}

indexer
{
    mem_limit            = 2000M
}

searchd
{
    listen                = 127.0.0.1:9312
    listen                = 127.0.0.1:9313:mysql41
    read_timeout          = 10
    max_children          = 0
    max_matches           = 5000
    seamless_rotate       = 0
    preopen_indexes       = 0
    unlink_old            = 1
    pid_file              = /usr/local/coreseek4.1/log/searchd_mysql.pid
    log                   = /usr/local/coreseek4.1/log/searchd_mysql.log
    query_log             = /usr/local/coreseek4.1/log/query_mysql.log   
}

3:建立索引和启动服务

/usr/local/coreseek4.1/bin/indexer --all -c /usr/local/coreseek4.1/test.conf
/usr/local/coreseek4.1/bin/searchd -c /usr/local/coreseek4.1/test.conf

4:设置任务来定时重建主索引和增量索引

#test.sh主索引shell(* */3 * * * /usr/local/coreseek4.1/test.sh)

#!/bin/bash
date=`date "+%Y-%m-%d %H:%M:%S"`
echo -e "$date--start\n"
/usr/local/coreseek4.1/bin/indexer test --rotate -c /usr/local/coreseek4.1/test.conf
date=`date "+%Y-%m-%d %H:%M:%S"`
echo -e "$date--end\n"

#extend.sh增量索引shell(*/10 * * * * /usr/local/coreseek4.1/extend.sh)

#!/bin/bash
date=`date "+%Y-%m-%d %H:%M:%S"`
echo -e "$date--start\n"
/usr/local/coreseek4.1/bin/indexer extend  --rotate -c /usr/local/coreseek4.1/test.conf
date=`date "+%Y-%m-%d %H:%M:%S"`
echo -e "$date--end\n"

这样我们就完成了一个基本的搜索服务,同时为了避免程序意外退出,你也可以写个检查任务来自动启动coreseek服务.

发表在 好文推荐, 网站架构 | 标签为 , | 13 条评论