thinkphp-queue队列随笔

发布于:2025-06-09 ⋅ 阅读:(15) ⋅ 点赞:(0)

 安装

# 创建项目
composer create-project topthink/think 5.0.*

# 安装队列扩展
composer require topthink/think-queue

配置

// application/extra/queue.php

<?php
return [
    'connector'  => 'Redis',      // Redis 驱动
    'expire'     => 0,            // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    'default'    => 'default',    // 默认的队列名称
    'host'       => '127.0.0.1',  // redis 主机ip
    'port'       => 6379,         // redis 端口
    'password'   => '',           // redis 密码
    'select'     => 0,            // 使用哪一个 db,默认为 db0
    'timeout'    => 0,            // redis连接的超时时间
    'persistent' => false,
];

数据库

CREATE TABLE `qf_test` (
`id` int(10) NOT NULL AUTO_INCREMENT,
`task_type` varchar(50) DEFAULT '' COMMENT '任务类型',
`data` text COMMENT '数据',
`pdate` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

创建队列(入队)-生产者

<?php

namespace app\api\controller;

use think\Controller;

class Index extends Controller {

      // 生产者-入队
      public function qf() {
          // 1.当前任务将由哪个类来负责处理。
          $jobHandlerClassName = 'app\api\job\QfDev';
    
          // 2.队列名称,如果为新队列,会自动创建
          $jobQueueName = "qfDevQueue";
    
          // 3.当前任务所需的业务数据.
          $jobData = [ 'ts' => time(), 'bizId' => uniqid() , 'data' => $_GET ];
    
          // 4.将该任务推送到消息队列,等待对应的消费者去执行
          $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );        
          // 把任务分配到队列中,延迟10s后执行
          // $isPushed = Queue::later(10,$jobHandlerClassName,$jobData,$jobQueueName); 
    
          // database驱动时,返回值:1|false;
          // redis驱动时,返回值:随机字符串|false
          if( $isPushed !== false ){
              echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
          }else{
              echo 'something went wrong.';
          }
      }
      
}

消费队列(出队)-消费者

<?php

namespace app\api\job;

use think\queue\Job;

class QfDev {
    public function fire(Job $job,$data) {
        // 检查数据【可省】
        $flag = $this->checkJob($data);
        if(!$flag){
            $job->delete();
            return;
        }

        $isJobDone = $this->doJob($data);
        if ($isJobDone) {
            // ...
            // 执行完,删除任务
            $job->delete();
        }else{
            // 检查方法执行次数
            if ($job->attempts() > 3) {               
                $job->delete();
                // 重新发布,延期2秒再次执行
                //$job->release(2);
            }
        }
    }

    // 检查数据
    private function checkJob($data){
        // ... 数据检查
        return true;
    }

    // 业务处理
    private function doJob($data)
    {
        // ... 业务处理
        return true;
    }
}

访问

// 请求接口
http://localhost/api/index/qf

队列命令

# 单次执行
开始一个队列
php think queue:work
停止所有队列
php think queue:restart
重启所有消息队列
php think queue:restart
php think queue:work

# 多次执行
php think queue:work --daemon --queue helloJobQueue

宝塔任务进程管理器