composer 安装 think-queue
# tp5.0 composer require topthink/think-queue=1.1.6 --no-plugins # tp5.1.x composer require topthink/think-queue 2.0.4 --no-plugins # tp6 composer require topthink/think-queue --no-plugins
验证安装结果
php think queue:work -h
配置文件
Tp5.0 文件位置:根目录/config/queue.php return [ // 'connector' => 'Sync' 'connector' => 'Redis', // Redis 驱动 'expire' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null 'default' => 'default', // 默认的队列名称 'host' => '127.0.0.1', // redis 主机ip 'port' => 6379, // redis 端口 'password' => '', // redis 密码 'select' => 1, // 使用哪一个 db,默认为 db0 'timeout' => 0, // redis连接的超时时间 'persistent' => false, // 是否是长连接 ]; Tp5.1.x 文件位置:根目录/config/queue.php return [ // 'connector' => 'Sync' 'connector' => 'Redis', // Redis 驱动 'expire' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null 'default' => 'default', // 默认的队列名称 'host' => '127.0.0.1', // redis 主机ip 'port' => 6379, // redis 端口 'password' => '', // redis 密码 'select' => 1, // 使用哪一个 db,默认为 db0 'timeout' => 0, // redis连接的超时时间 'persistent' => false, // 是否是长连接 ]; Tp6 配置文件在统一目录下/config/queue.php <?php return [ 'default' => 'redis', // 使用redis 'connections' => [ 'sync' => [ 'type' => 'sync', ], 'database' => [ 'type' => 'database', 'queue' => 'default', 'table' => 'jobs', 'connection' => null, ], 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 1, 'timeout' => 0, 'persistent' => false, ], ], 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ], ];
在项目下新建一个Job目录,编JOB类
<?php namespace app\job; use think\facade\Log; use think\queue\Job; /** * 消费者类 * 用于处理队列中的任务 */ class CronJob { /** * fire是消息队列默认调用的方法 * @param Job $job 当前的任务对象 * @param array|mixed $data 发布任务时自定义的数据 */ public function fire(Job $job, $data) { Log::channel('job')->info('一条测试日志'); if (empty($data)) { Log::channel('job')->error(sprintf('[%s][%s] 队列无消息', __CLASS__, __FUNCTION__)); return; } //有效消息到达消费者时可能已经不再需要执行了 if (!$this->checkJob($data)) { $job->delete(); Log::channel('job')->record("Job does not need to be executed"); return; } //执行业务处理 if ($this->doJob($data)) { $job->delete();//任务执行成功后删除 Log::channel('job')->record("job has been down and deleted"); } else { //检查任务重试次数 if ($job->attempts() > 3) { Log::channel('job')->record("job has been retried more that 3 times"); // $job->release(10); // 10秒后在执行 $job->delete(); // 删除任务 } } } /** * 消息在到达消费者时可能已经不需要执行了 * @param array|mixed $data 发布任务时自定义的数据 * @return boolean 任务执行的结果 */ private function checkJob($data) { Log::channel('job')->record('验证任务是否需要执行'); return true; } /** * 根据消息中的数据进行实际的业务处理 */ private function doJob($data) { // 实际业务流程处理 print_r($data['msg'] ?? '实际业务流程处理'); Log::channel('job')->record('实际业务流程处理'); return true; } function task1(){ print_r("task 1"); } public function failed($data) { // ...任务达到最大重试次数后,失败了 Log::channel('job')->error('任务达到最大重试次数后,失败了'); } }
配置日志
<?php // +---------------------------------------------------------------------- // | 日志设置 // +---------------------------------------------------------------------- return [ // 默认日志记录通道 'default' => env('log.channel', 'file'), // 日志记录级别 'level' => [], // 日志类型记录的通道 ['error'=>'email',...] 'type_channel' => [], // 关闭全局日志写入 'close' => false, // 全局日志处理 支持闭包 'processor' => null, // 日志通道列表 'channels' => [ 'file' => [ // 日志记录方式 'type' => 'File', // 日志保存目录 'path' => '', // 单文件日志写入 'single' => false, // 独立日志级别 'apart_level' => [], // 最大日志文件数量 'max_files' => 0, // 使用JSON格式记录 'json' => false, // 日志处理 'processor' => null, // 关闭通道日志写入 'close' => false, // 日志输出格式化 'format' => '[%s][%s] %s', // 是否实时写入 'realtime_write' => false, ], // 其它日志通道配置 'job' => [ 'type' => 'File', // 'path' => app()->getRootPath() . 'runtime/pay', // 重点这个路径要写 'path' => app()->getRuntimePath() . 'pay', // 重点这个路径要写 'time_format' => 'Y-m-d H:i:s', 'format' => '[%s][%s]:%s' ], ], ];
编写控制器调用
<?php namespace app\api\controller\v1; use app\job\CronJob; use think\facade\Queue; class User { public function index() { return 'v1/user/index2'; } /** * 投递消息(生产者) * @return string */ public function push() { // queue的 push方法 第一个参数可以接收字符或者对象字符串 $job = 'app\Job\CronJob'; // 当前任务由哪个类负责处理 $queueName = 'cron_job_queue'; // 当前队列归属的队列名称 // // 当前任务所需的业务数据 $data['msg'] = 'Test queue msg,time:' . date('Y-m-d H:i:s', time()); $data['user_id'] = 1; // $res = Queue::push(CronJob::class, $data, $queueName); // 可以自动获取 $res = Queue::push($job, $data, $queueName); // 可以手动指定 - $data['msg'] = 'later Test queue msg,time:' . date('Y-m-d H:i:s', time()); $res = Queue::later(10, $job, $data, $queueName); // 10秒后执行 $data['msg'] = 'task1---,time:' . date('Y-m-d H:i:s', time()); $res = Queue::later(30, "app\Job\CronJob@task1", $data, $queueName); // 10秒后执行 if ($res == false) { return '消息投递失败'; } else { return '消息投递成功'; } } }
守护运行
php think queue:work --queue cron_job_queue php think queue:listen --daemon --queue helloJobQueue # linux上以守护进程方式运行 nohup php think queue:work --daemon --queue cron_job_queue &
标签:queue,--,redis,ThinkPHPP5.0,Queue,job,日志,channel From: https://www.cnblogs.com/catyxiao/p/17851106.html