1、安装think-queue插件

先用宝塔安装redis服务、php的redis扩展,再安装think-queue插件

composer require topthink/think-queue

2、配置redis连接参数 application/extra/queue.php

<?php
/**消息队列配置*/
use think\Env;
return [
    //Redis驱动
    'connector' => 'redis',
    "expire"    =>null,    //任务过期时间默认为60秒,禁用可设为null
    "default"   =>"lucas",    //默认队列名称
    "host"      =>Env::get("redis.host", "127.0.0.1"),    //Redis主机IP地址
    "port"      =>Env::get("redis.port", 6379),    //Redis端口
    "password"  =>Env::get("redis.password", ""),    //Redis密码
    "select"    =>5,    //Redis数据库索引,默认db0
    "timeout"   =>0,    //Redis连接超时时间
    "persistent"=>false,    //是否长连接
];

3、创建addons\faqueue\library\jobs\TestJob.php文件

<?php

namespace addons\faqueue\library\jobs;

use think\Log;
use think\queue\job;
use addons\faqueue\model\FaqueueLog;

class TestJob
{
    public function fire(Job $job, $data){

        // php think queue:work --queue lucas --tries 2 --delay 3 --sleep 4
        // 或者指定php版本执行 /www/server/php/73/bin/php think queue:work --queue lucas --tries 2 --delay 3 --sleep 4
        // throw new \Exception("测试抛出异常");
        try {
            $result = (new FaqueueLog())->order('id desc')->find();
            // $k = $reulst['ceshia'];
        } catch (\Exception $e) {
            // 回滚事务
            // Db::rollback();
            // throw new \Exception($e->getMessage());
            $result = false;
        }

        if($result){
            $data['result_id'] = $result['id'];
            Log::write("测试队列TestJob执行成功:",'info');
            $job->delete();
            (new FaqueueLog())->log($job->getQueue(),$job->getName(),$data);
        }else{
            $job->delete();
            // $job->release();
            (new FaqueueLog())->log($job->getQueue(),$job->getName(),['msg'=>'执行失败']);
            Log::write("测试队列TestJob执行失败:",'error');
        }

    }

    public function failed($data){
        Log::write("测试队列TestJob执行失败:".print_r(['data' => $data,],true),'error');
    }
}

4、开启监听服务,在网站目录命令行执行

php think queue:listen --queue lucas

命令参数详情如下:

Work 模式

php think queue:work \
-- daemon \            // 是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
--queue helloLucas \   // 要处理的队列的名称
--delay 0 \            // 如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认0
--force \               // 系统处于维护状态时,是否仍然处理任务,并未找到相关说明
--memory 128 \           // 该进程允许使用的内存上线,以 M 为单位
--sleep 3 \               // 如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
--tries 2               // 如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0


Listen 模式

php think queue:listen \
--queue helloLucas \  // 监听的队列的名称
--delay 0 \           // 如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认0
--memory 128 \          // 该进程允许使用的内存上线,以 M 为单位
--sleep 3 \              // 如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
--tries 0 \              // 如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
--timeout 60          // 创建的work子进程的允许执行的最长时间,以秒为单位

5、创建addons\faqueue\controller\Index.php测试访问

<?php

namespace addons\faqueue\controller;

use think\addons\Controller;
use addons\faqueue\library\QueueApi;

class Index extends Controller
{

    public function index()
    {
        $data = ['create_time' => date('Y-m-d H:i:s')];
        $res = QueueApi::sendTest($data);
        halt($res);
    }

}

注意查看否有监听到redis信息,或者查看runtime\log下的cli日志。

6、使用supervisord进程守护管理器

可以通过命令sudo ps -aux|grep supervisor * 查看supervisor相关运行的端口号

常见问题:

1、think-queue + supervisor 报错 General error: 2006 MySQL server has gone away

解决方案:原因是长连接很久没有新的请求发起,达到了 server 端的 timeout,被 server 强行关闭。之后再通过 connection 发起请求的时候,就会报错 server has gone away。所以,可以在database.php中增加如下配置。

// 是否需要断线重连
'break_reconnect' => true,
// 断线标识字符串
'break_match_str'  => ['2006'],

查看服务器中think-queue服务列表

# 查看
ps -aux | grep queue
# 不需要可以杀死进程PID,15为正常终止服务,9为强制杀死进程
kill -15 进程号
最后修改:2024 年 11 月 19 日
如果觉得我的文章对你有用,请随意赞赏