例如:邮件队列、任务队列、消息队列、Feed队列 ,手机短信,纪录日志,转换视频格式,数据挖掘采集 ,为了增强用户体验可以用队列或异步处理
PHP短耗时异步处理
- <?php
- echo '执行完毕提示';
- fastcgi_finish_request(); /* 响应完成, 关闭连接 */
- echo '不会输出';
- sleep(3); //sleep模拟一些耗时的操作
- /* 记录日志 */
- file_put_contents('log.txt', '生存还是毁灭,这是个问题.');
- ?>
这个部署有一个好处,就是可以不至于让一个页面的响应时间太久,直接返回给用户一个执行完毕的提示,但PHP后台实际还在继续执行,这样做可能会有用户体验上的舒服感。
PHP 长耗时用队列
http://blog.s135.com/httpsqs/
HTTPSQS(HTTP Simple Queue Service)是一款基于 HTTP GET/POST 协议的轻量级开源简单消息队列服务,使用 Tokyo Cabinet 的 B+Tree Key/Value 来做数据的持久化存储。
非常简单,基于 HTTP GET/POST 协议。PHP、、Perl、Shell、、Ruby等支持HTTP协议的编程语言均可调用
https://.oschina.net/664712890/PHP-Redis-Queue/tree/master
内存队列
- <?php
- /**
- * 使用共享内存的PHP循环内存队列实现
- * 支持多进程, 支持各种数据类型的存储
- * 注: 完成入队或出队操作,尽快使用unset(), 以释放临界区
- *
- * @author wangbinandi@gmail.com
- * @created 2009-12-23
- */
- class SHMQueue
- {
- private $maxQSize = 0; // 队列最大长度
- private $front = 0; // 队头指针
- private $rear = 0; // 队尾指针
- private $blockSize = 256; // 块的大小(byte)
- private $memSize = 25600; // 最大共享内存(byte)
- private $shmId = 0;
- private $filePtr = './shmq.ptr';
- private $semId = 0;
- public function __construct()
- {
- $shmkey = ftok(__FILE__, 't');
- $this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize );
- $this->maxQSize = $this->memSize / $this->blockSize;
- // 申請一个信号量
- $this->semId = sem_get($shmkey, 1);
- sem_acquire($this->semId); // 申请进入临界区
- $this->init();
- }
- private function init()
- {
- if ( file_exists($this->filePtr) ){
- $contents = file_get_contents($this->filePtr);
- $data = explode( '|', $contents );
- if ( isset($data[0]) && isset($data[1])){
- $this->front = (int)$data[0];
- $this->rear = (int)$data[1];
- }
- }
- }
- public function getLength()
- {
- return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize;
- }
- public function enQueue( $value )
- {
- if ( $this->ptrInc($this->rear) == $this->front ){ // 队满
- return false;
- }
- $data = $this->encode($value);
- shmop_write($this->shmId, $data, $this->rear );
- $this->rear = $this->ptrInc($this->rear);
- return true;
- }
- public function deQueue()
- {
- if ( $this->front == $this->rear ){ // 队空
- return false;
- }
- $value = shmop_read($this->shmId, $this->front, $this->blockSize-1);
- $this->front = $this->ptrInc($this->front);
- return $this->decode($value);
- }
- private function ptrInc( $ptr )
- {
- return ($ptr + $this->blockSize) % ($this->memSize);
- }
- private function encode( $value )
- {
- $data = serialize($value) . "__eof";
- if ( strlen($data) > $this->blockSize -1 ){
- throw new Exception(strlen($data)." is overload block size!");
- }
- return $data;
- }
- private function decode( $value )
- {
- $data = explode("__eof", $value);
- return unserialize($data[0]);
- }
- public function __destruct()
- {
- $data = $this->front . '|' . $this->rear;
- file_put_contents($this->filePtr, $data);
- sem_release($this->semId); // 出临界区, 释放信号量
- }
- }
- //使用的样例代码如下:
- // 进队操作
- $shmq = new SHMQueue();
- $data = 'test data';
- $shmq->enQueue($data);
- $data = 'test';
- $shmq->enQueue($data);
- unset($shmq);
- // 出队操作
- $shmq = new SHMQueue();
- $data = $shmq->deQueue();
- unset($shmq);
memcache队列 可以使用比较专业的工具,例如:Apache ActiveMQ、 memcacheq,下面是两个基本简单的实现方式:
- <?php
- /**
- * PHP memcache 队列类
- * @author LKK/lianq.net
- * @version 0.3
- * @修改说明:
- * 1.放弃了之前的AB面轮值思路,使用类似数组的构造,重写了此类.
- * 2.队列默认先进先出,但增加了反向读取功能.
- * @example:
- $obj = new memcacheQueue('duilie');
- for ($i = 1; $i <= 10; $i++) {
- $obj->add($i . 'asdf');
- }
- $count = $obj->getQueueLength();
- $list = $obj->read(10); //读数据10条,没有出栈
- $poplist = $obj->get(8); //出栈
- */
- class memcacheQueue
- {
- public static $client; //memcache客户端连接
- public $access; //队列是否可更新
- private $expire; //过期时间,秒,1~2592000,即30天内
- private $sleepTime; //等待解锁时间,微秒
- private $queueName; //队列名称,唯一值
- private $retryNum; //重试次数,= 10 * 理论并发数
- public $currentHead; //当前队首值
- public $currentTail; //当前队尾值
- const MAXNUM = 20000; //最大队列数,建议上限10K
- const HEAD_KEY = '_lkkQueueHead_'; //队列首key
- const TAIL_KEY = '_lkkQueueTail_'; //队列尾key
- const VALU_KEY = '_lkkQueueValu_'; //队列值key
- const LOCK_KEY = '_lkkQueueLock_'; //队列锁key
- /**
- * 构造函数
- * @param string $queueName 队列名称
- * @param int $expire 过期时间
- * @param array $config memcache配置
- *
- * @return <type>
- */
- public function __construct($queueName = '', $expire = 0, $config = '')
- {
- if (empty($config)) {
- self::$client = memcache_pconnect('127.0.0.1', 11211);
- } elseif (is_array($config)) { //array('host'=>'127.0.0.1','port'=>'11211')
- self::$client = memcache_pconnect($config['host'], $config['port']);
- } elseif (is_string($config)) { //"127.0.0.1:11211"
- $tmp = explode(':', $config);
- $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
- $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
- self::$client = memcache_pconnect($conf['host'], $conf['port']);
- }
- if (!self::$client) return false;
- ignore_user_abort(true); //当客户断开连接,允许继续执行
- set_time_limit(0); //取消脚本执行延时上限
- $this->access = false;
- $this->sleepTime = 1000;
- $expire = empty($expire) ? 3600 : intval($expire) + 1;
- $this->expire = $expire;
- $this->queueName = $queueName;
- $this->retryNum = 1000;
- $this->head_key = $this->queueName . self::HEAD_KEY;
- $this->tail_key = $this->queueName . self::TAIL_KEY;
- $this->lock_key = $this->queueName . self::LOCK_KEY;
- $this->_initSetHeadNTail();
- }
- /**
- * 初始化设置队列首尾值
- */
- private function _initSetHeadNTail()
- {
- //当前队列首的数值
- $this->currentHead = memcache_get(self::$client, $this->head_key);
- if ($this->currentHead === false) $this->currentHead = 0;
- //当前队列尾的数值
- $this->currentTail = memcache_get(self::$client, $this->tail_key);
- if ($this->currentTail === false) $this->currentTail = 0;
- }
- /**
- * 当取出元素时,改变队列首的数值
- * @param int $step 步长值
- */
- private function _changeHead($step = 1)
- {
- $this->currentHead += $step;
- memcache_set(self::$client, $this->head_key, $this->currentHead, false, $this->expire);
- }
- /**
- * 当添加元素时,改变队列尾的数值
- * @param int $step 步长值
- * @param bool $reverse 是否反向
- * @return null
- */
- private function _changeTail($step = 1, $reverse = false)
- {
- if (!$reverse) {
- $this->currentTail += $step;
- } else {
- $this->currentTail -= $step;
- }
- memcache_set(self::$client, $this->tail_key, $this->currentTail, false, $this->expire);
- }
- /**
- * 队列是否为空
- * @return bool
- */
- private function _isEmpty()
- {
- return (bool)($this->currentHead === $this->currentTail);
- }
- /**
- * 队列是否已满
- * @return bool
- */
- private function _isFull()
- {
- $len = $this->currentTail - $this->currentHead;
- return (bool)($len === self::MAXNUM);
- }
- /**
- * 队列加锁
- */
- private function _getLock()
- {
- if ($this->access === false) {
- while (!memcache_add(self::$client, $this->lock_key, 1, false, $this->expire)) {
- usleep($this->sleepTime);
- @$i++;
- if ($i > $this->retryNum) { //尝试等待N次
- return false;
- break;
- }
- }
- $this->_initSetHeadNTail();
- return $this->access = true;
- }
- return $this->access;
- }
- /**
- * 队列解锁
- */
- private function _unLock()
- {
- memcache_delete(self::$client, $this->lock_key, 0);
- $this->access = false;
- }
- /**
- * 获取当前队列的长度
- * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度<=该长度
- * @return int
- */
- public function getQueueLength()
- {
- $this->_initSetHeadNTail();
- return intval($this->currentTail - $this->currentHead);
- }
- /**
- * 添加队列数据
- * @param void $data 要添加的数据
- * @return bool
- */
- public function add($data)
- {
- if (!$this->_getLock()) return false;
- if ($this->_isFull()) {
- $this->_unLock();
- return false;
- }
- $value_key = $this->queueName . self::VALU_KEY . strval($this->currentTail + 1);
- $result = memcache_set(self::$client, $value_key, $data, MEMCACHE_COMPRESSED, $this->expire);
- if ($result) {
- $this->_changeTail();
- }
- $this->_unLock();
- return $result;
- }
- /**
- * 读取队列数据
- * @param int $length 要读取的长度(反向读取使用负数)
- * @return array
- */
- public function read($length = 0)
- {
- if (!is_numeric($length)) return false;
- $this->_initSetHeadNTail();
- if ($this->_isEmpty()) {
- return false;
- }
- if (empty($length)) $length = self::MAXNUM; //默认所有
- $keyArr = array();
- if ($length > 0) { //正向读取(从队列首向队列尾)
- $tmpMin = $this->currentHead;
- $tmpMax = $tmpMin + $length;
- for ($i = $tmpMin; $i <= $tmpMax; $i++) {
- $keyArr[] = $this->queueName . self::VALU_KEY . $i;
- }
- } else { //反向读取(从队列尾向队列首)
- $tmpMax = $this->currentTail;
- $tmpMin = $tmpMax + $length;
- for ($i = $tmpMax; $i > $tmpMin; $i--) {
- $keyArr[] = $this->queueName . self::VALU_KEY . $i;
- }
- }
- $result = @memcache_get(self::$client, $keyArr);
- return $result;
- }
- /**
- * 取出队列数据
- * @param int $length 要取出的长度(反向读取使用负数)
- * @return array
- */
- public function get($length = 0)
- {
- if (!is_numeric($length)) return false;
- if (!$this->_getLock()) return false;
- if ($this->_isEmpty()) {
- $this->_unLock();
- return false;
- }
- if (empty($length)) $length = self::MAXNUM; //默认所有
- $length = intval($length);
- $keyArr = array();
- if ($length > 0) { //正向读取(从队列首向队列尾)
- $tmpMin = $this->currentHead;
- $tmpMax = $tmpMin + $length;
- for ($i = $tmpMin; $i <= $tmpMax; $i++) {
- $keyArr[] = $this->queueName . self::VALU_KEY . $i;
- }
- $this->_changeHead($length);
- } else { //反向读取(从队列尾向队列首)
- $tmpMax = $this->currentTail;
- $tmpMin = $tmpMax + $length;
- for ($i = $tmpMax; $i > $tmpMin; $i--) {
- $keyArr[] = $this->queueName . self::VALU_KEY . $i;
- }
- $this->_changeTail(abs($length), true);
- }
- $result = @memcache_get(self::$client, $keyArr);
- foreach ($keyArr as $v) { //取出之后删除
- @memcache_delete(self::$client, $v, 0);
- }
- $this->_unLock();
- return $result;
- }
- /**
- * 清空队列
- */
- public function clear()
- {
- if (!$this->_getLock()) return false;
- if ($this->_isEmpty()) {
- $this->_unLock();
- return false;
- }
- $tmpMin = $this->currentHead--;
- $tmpMax = $this->currentTail++;
- for ($i = $tmpMin; $i <= $tmpMax; $i++) {
- $tmpKey = $this->queueName . self::VALU_KEY . $i;
- @memcache_delete(self::$client, $tmpKey, 0);
- }
- $this->currentTail = $this->currentHead = 0;
- memcache_set(self::$client, $this->head_key, $this->currentHead, false, $this->expire);
- memcache_set(self::$client, $this->tail_key, $this->currentTail, false, $this->expire);
- $this->_unLock();
- }
- /*
- * 清除所有memcache缓存数据
- */
- public function memFlush()
- {
- memcache_flush(self::$client);
- }
- }