think-queue for ThinkPHP6 使用方法教程
1、安装方法
composer require topthink/think-queue
2、配置
配置文件位于 config/queue.php
公共配置
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
use think\facade\Env;return ['default' => 'redis','prefix' => 'xx_','connections' => ['sync' => ['driver' => 'sync',],'database' => ['driver' => 'database','queue' => 'default','table' => 'jobs',],'redis' => ['driver' => 'redis','queue' => 'xx','host' => Env::get('redis.redis_hostname', '127.0.0.1'),'port' => Env::get('redis.port', 6379),'password' => Env::get('redis.redis_password', '123456'),'select' => Env::get('redis.select', 1),'timeout' => 0,'persistent' => false,],],'failed' => ['type' => 'database','table' => 'failed_jobs',],
];
3、创建任务类
推荐使用 app\job
作为任务类的命名空间
也可以放在任意可以自动加载到的地方
任务类不需继承任何类,如果这个类只有一个任务,那么就只需要提供一个fire
方法就可以了,如果有多个小任务,就写多个方法,下面发布任务的时候会有区别
每个方法会传入两个参数 think\queue\Job $job
(当前的任务对象) 和 $data
(发布任务时自定义的数据)
还有个可选的任务失败执行的方法 failed
传入的参数为$data
(发布任务时自定义的数据)
下面写两个例子
//示例1
namespace app\job;use think\queue\Job;class Job1{public function fire(Job $job, $data){//....这里执行具体的任务 if ($job->attempts() > 3) {//通过这个方法可以检查这个任务已经重试了几次了}//如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法$job->delete();// 也可以重新发布这个任务$job->release($delay); //$delay为延迟时间}public function failed($data){// ...任务达到最大重试次数后,失败了}}
//示例2
namespace app\lib\job;use think\queue\Job;class Job2{public function task1(Job $job, $data){}public function task2(Job $job, $data){}public function failed($data){}}
4、发布任务
$job
是任务名
命名空间是app\job
的,比如上面的例子一,写Job1
类名即可
其他的需要些完整的类名,比如上面的例子二,需要写完整的类名app\lib\job\Job2
如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名app\lib\job\Job2@task1
、app\lib\job\Job2@task2
$data
是你要传到任务里的参数
$queue
队列名,指定这个任务是在哪个队列上执行,同下面监控队列的时候指定的队列名,可不填
think\facade\Queue::push($job, $data = '', $queue = null)
和 think\facade\Queue::later($delay, $job, $data = '', $queue = null)
两个方法,前者是立即执行,后者是在$delay
秒后执行
5、监听任务并执行
&> php think queue:listen //主要用于开发和调试。一个监视器,它会循环地、频繁地启动和终止 work 进程。&> php think queue:work //高性能的生产环境首选。一个长驻留的守护进程,启动后一直运行,高效地处理任务。
两种,具体的可选参数可以输入命令加 --help
查看
可配合supervisor使用,保证进程常驻
6、queue:listen 与 queue:work 区别
特性 | php think queue:work (工作者/守护模式) | php think queue:listen (监听器) |
---|---|---|
工作原理 | 启动一个长驻留的 PHP 进程,该进程在内存中保持框架和应用状态,循环地从队列中取出并执行任务。 | 启动一个监视进程,这个进程本身不处理任务,而是周期性地调用 queue:work 命令来执行任务,每次调用后都会结束该子进程。 |
性能 | 极高。框架和代码只在启动时加载一次(守护模式),后续任务处理都在同一个进程中完成,资源开销极小。 | 很低。每次处理任务(或一批任务)都需要重新初始化整个 PHP 和 ThinkPHP 框架,带来大量的 I/O 和 CPU 开销。 |
资源占用 | 低且稳定。内存占用基本恒定。 | 高且波动。因为不断有进程创建和销毁,CPU 和内存占用会周期性飙升。 |
代码更新 | 不自动响应。由于进程常驻内存,如果你修改了业务代码,必须手动重启 work 进程才能生效。 | 自动响应。因为每次处理任务都会启动新的进程,所以代码修改会自动生效,无需重启 listen 命令。 |
稳定性 | 高。但需要配合 Supervisor 等进程管理工具,以确保进程意外退出后能自动重启。 | 较低。进程频繁启停本身可能引入不稳定性,且性能瓶颈明显。 |
适用场景 | 生产环境(Production) | 开发/调试环境(Development) |
7、深入原理分析
1. php think queue:work --daemon
(守护模式 Work)
这是生产环境的正确用法。
流程:
你执行命令
php think queue:work --daemon --queue my_queue
。系统启动一个 PHP 进程,加载整个 ThinkPHP 框架和你的应用代码。
该进程进入一个无限循环:
检查队列:询问 Redis(或数据库):“
my_queue
里有新任务吗?”执行任务:如果有任务,从队列中取出,并在当前进程中直接执行任务逻辑。
循环继续:处理完后,立即回到循环开头,检查下一个任务。
这个进程会一直运行,直到你手动终止它,或者它遇到致命错误而退出。
优点:极快的任务处理速度,因为避免了重复的启动开销。
缺点:代码更新后必须重启进程。必须使用
--daemon
参数才能进入这种高性能模式。
注:必须使用 queue:work --daemon
,并配合进程管理器(如 Supervisor)。
绝对不要在生产环境使用 listen
,它的性能开销是你无法承受的
使用守护模式:
# 基本的守护模式命令
php think queue:work --daemon --queue my_queue# 推荐使用更多参数的完整命令,例如:
php think queue:work --daemon --queue my_queue --tries 3 --sleep 3
使用 Supervisor 管理进程:
单个 work 进程可能崩溃,也无法利用多核CPU。使用 Supervisor 可以解决这两个问题。
自动重启:如果 work 进程意外退出,Supervisor 会立即重启它。
多进程:可以轻松启动多个 work 进程并行处理任务,极大提升吞吐量。
一个典型的 Supervisor 配置 (/etc/supervisor/conf.d/think-queue.conf
):
[program:think-queue]
process_name=%(program_name)s_%(process_num)02d
command=php /path/to/your/project/think queue:work --daemon --queue my_queue
directory=/path/to/your/project
autostart=true
autorestart=true
user=www-data
numprocs=4 ; 启动4个进程并行处理
redirect_stderr=true
stdout_logfile=/path/to/your/log/supervisor_think-queue.log
2. php think queue:listen
流程:
你执行命令
php think queue:listen --queue my_queue
。系统启动一个监听进程。
这个监听进程自己也进入一个循环,但它的工作是:
创建子进程:在每一个循环中,它都会像在命令行中一样,重新执行一次
php think queue:work
(注意:不是守护模式)命令。处理任务:这个新创建的
work
子进程会加载框架,检查并处理队列中的一个或多个任务(取决于--tries
等参数)。销毁子进程:任务处理完成后,这个
work
子进程就退出了。循环继续:监听进程等待片刻(可配置),然后再次创建新的
work
子进程去处理下一个任务。
优点:开发时非常方便,修改代码后立即生效,无需重启。
缺点:性能极差,因为每个任务(或每批任务)都需要经历一次完整的“启动PHP->加载框架->处理任务->关闭PHP”的过程。