php利用socket开发一个memcached的客户端api

由于个人电脑的原因,memcache.dll无法使用。这样我就无法操作memcached服务了。

思来想去,就自己写一个API吧。顺便也了解一下memcached协议。

贴代码

<?php

/**
 * 
 * memcached连接客户端
 * 
 */
 
class my_memcache{
	
	
	protected $socket = false;
	protected $server_list = array();
	protected $host = false;
	protected $port = false;
	
	protected $threshold = 0;
	protected $min_saving = 0;
	
	
	function __construct() {
		$this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
   	}

	
	function __destruct() {
    	socket_close($this->socket);
	}

	
   /**
    * 完成 
	*/
	
	public function add($key,$var,$flag=0,$expire=0){
		$this->selectconnect($key);
		if ( is_array($var) ){
			$var = serialize($var);
		}
		$len = strlen($var);
		if ( $len > $this->threshold && $this->threshold > 0 ){
			$flag=1;
			$var = $this->gz_ungz_compress($var,1);
			$len = strlen($var);
		}
		$in = "add $key $flag $expire $len\r\n";
		$in .= $var."\r\n";
		socket_write($this->socket, $in, strlen($in));
		$out = socket_read($this->socket,1024);
		$out = trim(str_replace("\r\n","",$out));
		if ( strtoupper($out) == "STORED" ){
			return true;
		}else{
			return false;	
		}
	}
	
   /**
    * 完成 
	*/
	
	public function addServer($host,$port,$persistent=true,$weight=1,$timeout=1,$retry_interval=15,$status=1,$failure_callback = NULL ,$timeoutms = 0){
		if ( empty($host) || empty($port) ){
			return false;
		}
		$this->server_list[] = array('host' => $host , 'port' => $port , 'persistent' => true, 'weight' => $weight , 'timeout' => $timeout , 'retry_interval' => $retry_interval , 'status' => $status , 'failure_callback' => $failure_callback );
	}
	
   /**
    * 完成 
	*/
	public function close(){
		return socket_close($this->socket);
	}
	
   /**
    * 完成 
	*/
	public function connect($host,$port=11211,$timeout=1){
		$this->host = $host;
		$this->port = $port;
		return socket_connect($this->socket,$host,$port);
	}
	
	
   /**
    * 完成 
	*/
	protected function selectconnect($key){
		if ( empty($key) || empty($this->server_list) ){
			return false;
		}
		$len = count($this->server_list)-1;
		$first = ord(substr($key,0,1));
		$key = $first%$len;
		$server = $this->server_list[$key];
		if ( $server['persistent'] ){
			socket_get_option ($this->socket,1,SO_KEEPALIVE);
		}
		return socket_connect($this->socket,$server['host'],$server['port']);
	}
	
	
	
   /**
    * 完成 
	*/
	public function decrement($key,$value=1){
		if ( empty($key) ){
			return false;
		} 
		$val = intval($value);
		$this->selectconnect($key);
		$in = "decr $key $val\r\n";
		socket_write($this->socket, $in, strlen($in));
		$out = socket_read($this->socket,1024);
		$out = trim(str_replace("\r\n","",$out));
		return $out;
	}
	
   /**
    * 完成 
	*/
	public function delete($key,$timeout =0){
		$this->selectconnect($key);
		$in = "delete $key\r\n";
		socket_write($this->socket, $in, strlen($in));
		$out = socket_read($this->socket,1024);
		$out = trim(str_replace("\r\n","",$out));
		if ( $out == 'DELETED' ){
			return true;
		}else if ( $out == 'NOT_FOUND' ){
			return false;
		}else{
			return false;
		}
	}

	
   /**
    * 完成 
	*/
	public function get($key,$flag=0){
		if ( empty($key) ){
			return false;
		}
		$this->selectconnect($key);
		$in = "get $key $flag\r\n";
		socket_write($this->socket, $in, strlen($in));
		$out = socket_read($this->socket,90240);
		$out_array = explode("\r\n",$out);
		$l1 = $out_array[0];
		$l1_array = explode(" ",$l1);
		$content = str_replace(array("\r","\n"),'',$out_array[1]);
		if ( $l1_array[2] == 1 ){
			$content = $this->gz_ungz_compress($content,2);
		}
		if ( empty($content) ){
			return false;	
		}else{
			return $content;
		}
		
	}
	
	/**
    * 完成 
	*/
	public function getServerStatus($host,$port=11211){
		if ( empty($host) ){
			return false;
		}
		$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
		if ( socket_connect($socket,$host,$port) ){
			socket_close($socket);
			return 1;
		}else{
			return 0;
		}
	}
	
	
	/**
    * 完成 
	*/
	public function increment($key,$value=1){
		if ( empty($key) ){
			return false;
		} 
		$val = intval($value);
		$this->selectconnect($key);
		$in = "incr $key $val\r\n";
		socket_write($this->socket, $in, strlen($in));
		$out = socket_read($this->socket,1024);
		$out = trim(str_replace("\r\n","",$out));
		return $out;
	}
	
	/**
    * 完成 
	*/
	public function pconnect($host,$port=11211,$timeout=1){
		
		if ( !socket_set_option($this->socket, SOL_SOCKET, SO_KEEPALIVE, 1) ) {
			return false;
		}
		
		return socket_connect($this->socket,$host,$port);
		
	}
	
	/**
    * 完成 
	*/
	public function replace($key,$var,$flag=0,$expire=0){
		$len = strlen($var);
		if ( $len > $this->threshold && $this->threshold > 0 ){
			$flag=1;
			$var = $this->gz_ungz_compress($var,1);
			$len = strlen($var);
		}
		$in = "replace $key $flag $expire $len\r\n";
		$in .= $var."\r\n";
		socket_write($this->socket, $in, strlen($in));
		$out = socket_read($this->socket,1024);
		$out = trim(str_replace("\r\n","",$out));
		if ( strtoupper($out) == "STORED" ){
			return true;
		}else{
			return false;	
		}
	}
	
	
   /**
    * 完成 
	*/
	public function set($key,$var,$flag=0,$expire=0){
		$this->selectconnect($key);
		if ( is_array($var) ){
			$var = serialize($var);
		}
		$len = strlen($var);
		if ( $len > $this->threshold && $this->threshold > 0 ){
			$flag=1;
			$var = $this->gz_ungz_compress($var,1);
			$len = strlen($var);
		}
		$in = "set $key $flag $expire $len\r\n";
		$in .= $var."\r\n";
		socket_write($this->socket, $in, strlen($in));
		$out = socket_read($this->socket,1024);
		$out = trim(str_replace("\r\n","",$out));
		if ( strtoupper($out) == "STORED" ){
			return true;
		}else{
			return false;	
		}
	}
	
	
	/**
    * 完成 
	*/
	public function getVersion(){
		$result = array();
		if ( $this->server_list ){
			foreach ( $this->server_list as $s ) {
				$key = $s['host'].":".$s['port'];
				$in = "version\r\n";
				$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
				socket_connect($socket,$s['host'],$s['port']);
				socket_write($socket, $in, strlen($in));
				$out = socket_read($socket,1024);
				$out = str_replace(array("\r","\n"),"",$out);
				$out = str_replace("VERSION ","",$out);
				$result[$key]= $out;
			}
			return $result;
		}else{
			$in = "version\r\n";
			socket_write($this->socket, $in, strlen($in));
			$out = socket_read($this->socket,1024);
			$out = str_replace(array("\r","\n"),"",$out);
			$out = str_replace("VERSION ","",$out);
			$key = $this->host.":".$this->port;
			$result[$key]= $out;
			return $result;
		}
		
	}
	
	
	/**
    * 完成 
	*/
	public function flush(){
		$result = array();
		if ( $this->server_list ){
			foreach ( $this->server_list as $s ) {
				$key = $s['host'].":".$s['port'];
				$in = "flush_all\r\n";
				$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
				socket_connect($socket,$s['host'],$s['port']);
				socket_write($socket, $in, strlen($in));
				$out = socket_read($socket,1024);
				$out = str_replace(array("\r","\n"),"",$out);
				if ( $out == "OK" ){
					$result[$key]= true;
				}else{
					$result[$key]= false;
				}
			}
			return $result;
		}else{
			$in = "flush_all\r\n";
			socket_write($this->socket, $in, strlen($in));
			$out = socket_read($this->socket,1024);
			$out = str_replace(array("\r","\n"),"",$out);
			$key = $this->host.":".$this->port;
			if ( $out == "OK" ){
				$result[$key]= true;
			}else{
				$result[$key]= false;
			}
			
			return $result;
		}
	}
	
	/**
    * 完成 
	*/	
	public function getStats($type=0,$slabid=0,$limit=0){
		$this->getExtendedStats($type,$slabid,$limit);
	}
	
	
	/**
    *  stats reset 							清空统计数据=>返回reset
    *  stats slabs							显示各个slab的信息,包括chunk的大小、数目、使用情况等
	*  stats cachedump slab_id limit_num	显示某个slab中的前limit_num个key列表
	*  stats items
	*  stats detail [on|off|dump]			设置或者显示详细操作记录
	*  stats malloc 						显示内存分配数据    
	*/	
	public function getExtendedStats($type=0,$slabid=0,$limit=0){
		$type_list = array('reset','malloc','maps','cachedump','slabs','items','sizes');
		if ( $type ){
			if ( !in_array($type,$type_list) ){
				$type = '';
			}
		}
		$slabid = intval($slabid);
		$limit = intval($limit);
		$result = array();
		if ( $this->server_list ){
			foreach ( $this->server_list as $s ) {
				$key = $s['host'].":".$s['port'];
				$in = "stats\r\n";
				$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
				socket_connect($socket,$s['host'],$s['port']);
				socket_write($socket, $in, strlen($in));
				$out = socket_read($socket,10240);
				$out = explode("\r\n",$out);
				foreach ($out as $s) {
					if ( $s ){
						$temp = explode(" ",$s);
						if ( count($temp) > 2 ){
							$result[$key][$temp[1]] = $temp[2];
						}
					}
				} 
			}
			return $result;
		}else{
			$in = "stats\r\n";
			socket_write($this->socket, $in, strlen($in));
			$out = socket_read($this->socket,10240);
			if ( empty($out) ){
				return false;	
			}else{
				$key = $this->host.":".$this->port;
				$out = explode("\r\n",$out);
				foreach ($out as $s) {
					if ( $s ){
						$temp = explode(" ",$s);
						if ( count($temp) > 2 ){
							$result[$key][$temp[1]] = $temp[2];
						}
					}
				} 
				return $result;
			}
		}
	}
	
	//开启大值自动压缩(完成)
	public function setCompressThreshold($threshold,$min_saving){
		$threshold = intval($threshold);
		$min_saving = floatval($min_saving);
		if ( $threshold <= 0 || $min_saving <= 0 || $min_saving >= 1 ){
			return false;
		}
		$this->threshold = $threshold;
		$this->min_saving = $min_saving*10;
		return true;
	}
	
	//压解--压缩(完成) 默认压缩,否则解压
	protected function gz_ungz_compress($string,$type=1){
		if ( $type == 1 ){
			return gzcompress($string,$this->min_saving);
		}else{
			return gzuncompress($string);
		}
	}
	
	
	//运行时修改服务器参数和状态(完成)
	public function setServerParams($host,$port,$timeout=1,$retry_interval=15,$status=1,$failure_callback = NULL){
		if ( empty($host) || empty($port) ){
			return false;
		}
		foreach ($this->server_list as $key => $row ) {
			if ( $row['host'] == $host && $row['port'] == $port ){
				$row['timeout'] = $timeout;
				$row['retry_interval'] = $retry_interval;
				$row['status'] = $status;
				$row['failure_callback'] = $failure_callback;
				$this->server_list[$key] = $row;
				return true;
			}
		}
		return false; 
		
	}
	
}

跟使用php的memcache扩展是一样的。这次更深入了解他的内部机制了.

extra clothes for the 3 year old and all the baby gear
porno Where Does Your Power Come From

sometimes pumps if I need to be super dressy
snooki weight lossMETTLER TOLEDO Offers Free System Handbook To Integrate Weigh Modules
how to lose weight fast
此条目发表在 好文推荐, 网站开发, 网站架构 分类目录,贴了 , 标签。将固定链接加入收藏夹。

评论功能已关闭。