首页 > 编程语言 >ThinkPHPP5.0 Queue

ThinkPHPP5.0 Queue

时间:2023-11-23 11:12:51浏览次数:35  
标签:queue -- redis ThinkPHPP5.0 Queue job 日志 channel

composer 安装 think-queue

# tp5.0
composer require topthink/think-queue=1.1.6  --no-plugins

# tp5.1.x
composer require topthink/think-queue 2.0.4  --no-plugins

# tp6
composer require topthink/think-queue  --no-plugins

验证安装结果

php think queue:work -h

配置文件

Tp5.0
文件位置:根目录/config/queue.php

return [
//    'connector' => 'Sync'
    'connector'  => 'Redis',        // Redis 驱动
    'expire'     => 60,        // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    'default'    => 'default',        // 默认的队列名称
    'host'       => '127.0.0.1',    // redis 主机ip
    'port'       => 6379,        // redis 端口
    'password'   => '',        // redis 密码
    'select'     => 1,        // 使用哪一个 db,默认为 db0
    'timeout'    => 0,        // redis连接的超时时间
    'persistent' => false,        // 是否是长连接
];

Tp5.1.x
文件位置:根目录/config/queue.php

return [
//    'connector' => 'Sync'
    'connector'  => 'Redis',        // Redis 驱动
    'expire'     => 60,        // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    'default'    => 'default',        // 默认的队列名称
    'host'       => '127.0.0.1',    // redis 主机ip
    'port'       => 6379,        // redis 端口
    'password'   => '',        // redis 密码
    'select'     => 1,        // 使用哪一个 db,默认为 db0
    'timeout'    => 0,        // redis连接的超时时间
    'persistent' => false,        // 是否是长连接
];

Tp6
配置文件在统一目录下/config/queue.php

<?php

return [
    'default'     => 'redis', // 使用redis
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => '127.0.0.1',
            'port'       => 6379,
            'password'   => '',
            'select'     => 1,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

在项目下新建一个Job目录,编JOB类

<?php


namespace app\job;

use think\facade\Log;
use think\queue\Job;

/**
 * 消费者类
 * 用于处理队列中的任务
 */
class CronJob
{

    /**
     * fire是消息队列默认调用的方法
     * @param Job $job 当前的任务对象
     * @param array|mixed $data 发布任务时自定义的数据
     */
    public function fire(Job $job, $data)
    {
        Log::channel('job')->info('一条测试日志');
        if (empty($data)) {
            Log::channel('job')->error(sprintf('[%s][%s] 队列无消息', __CLASS__, __FUNCTION__));
            return;
        }

        //有效消息到达消费者时可能已经不再需要执行了
        if (!$this->checkJob($data)) {
            $job->delete();
            Log::channel('job')->record("Job does not need to be executed");
            return;
        }
        //执行业务处理
        if ($this->doJob($data)) {
            $job->delete();//任务执行成功后删除
            Log::channel('job')->record("job has been down and deleted");
        } else {
            //检查任务重试次数
            if ($job->attempts() > 3) {
                Log::channel('job')->record("job has been retried more that 3 times");
//                $job->release(10); // 10秒后在执行
                $job->delete(); // 删除任务
            }
        }
    }

    /**
     * 消息在到达消费者时可能已经不需要执行了
     * @param array|mixed $data 发布任务时自定义的数据
     * @return boolean 任务执行的结果
     */
    private function checkJob($data)
    {
        Log::channel('job')->record('验证任务是否需要执行');
        return true;
    }

    /**
     * 根据消息中的数据进行实际的业务处理
     */
    private function doJob($data)
    {
        // 实际业务流程处理
        print_r($data['msg'] ?? '实际业务流程处理');
        Log::channel('job')->record('实际业务流程处理');
        return true;
    }

    function task1(){
        print_r("task 1");
    }


    public function failed($data)
    {
        // ...任务达到最大重试次数后,失败了
        Log::channel('job')->error('任务达到最大重试次数后,失败了');
    }

}

配置日志

<?php

// +----------------------------------------------------------------------
// | 日志设置
// +----------------------------------------------------------------------
return [
    // 默认日志记录通道
    'default'      => env('log.channel', 'file'),
    // 日志记录级别
    'level'        => [],
    // 日志类型记录的通道 ['error'=>'email',...]
    'type_channel'    =>    [],
    // 关闭全局日志写入
    'close'        => false,
    // 全局日志处理 支持闭包
    'processor'    => null,

    // 日志通道列表
    'channels'     => [
        'file' => [
            // 日志记录方式
            'type'           => 'File',
            // 日志保存目录
            'path'           => '',
            // 单文件日志写入
            'single'         => false,
            // 独立日志级别
            'apart_level'    => [],
            // 最大日志文件数量
            'max_files'      => 0,
            // 使用JSON格式记录
            'json'           => false,
            // 日志处理
            'processor'      => null,
            // 关闭通道日志写入
            'close'          => false,
            // 日志输出格式化
            'format'         => '[%s][%s] %s',
            // 是否实时写入
            'realtime_write' => false,
        ],
        // 其它日志通道配置
        'job'    =>    [
            'type' => 'File',
//            'path' => app()->getRootPath() . 'runtime/pay', // 重点这个路径要写
            'path' => app()->getRuntimePath() . 'pay', // 重点这个路径要写
            'time_format' => 'Y-m-d H:i:s',
            'format' => '[%s][%s]:%s'
        ],
    ],

];

编写控制器调用

<?php

namespace app\api\controller\v1;


use app\job\CronJob;
use think\facade\Queue;

class User
{
    public function index()
    {
        return 'v1/user/index2';
    }

    /**
     * 投递消息(生产者)
     * @return string
     */
    public function push()
    {
        //  queue的 push方法 第一个参数可以接收字符或者对象字符串
        $job = 'app\Job\CronJob'; // 当前任务由哪个类负责处理
        $queueName = 'cron_job_queue';  // 当前队列归属的队列名称
        //  // 当前任务所需的业务数据
        $data['msg'] = 'Test queue msg,time:' . date('Y-m-d H:i:s', time());
        $data['user_id'] = 1;
//        $res = Queue::push(CronJob::class, $data, $queueName);  // 可以自动获取
        $res = Queue::push($job, $data, $queueName);   // 可以手动指定 -
        $data['msg'] = 'later Test queue msg,time:' . date('Y-m-d H:i:s', time());
        $res = Queue::later(10, $job, $data, $queueName);   // 10秒后执行
        $data['msg'] = 'task1---,time:' . date('Y-m-d H:i:s', time());
        $res = Queue::later(30, "app\Job\CronJob@task1", $data, $queueName);   // 10秒后执行
        if ($res == false) {
            return '消息投递失败';
        } else {
            return '消息投递成功';
        }
    }
}

守护运行

php think queue:work --queue cron_job_queue

php think queue:listen --daemon --queue helloJobQueue

# linux上以守护进程方式运行
nohup php think queue:work  --daemon --queue cron_job_queue &

 

标签:queue,--,redis,ThinkPHPP5.0,Queue,job,日志,channel
From: https://www.cnblogs.com/catyxiao/p/17851106.html

相关文章

  • Odoo16_queue_job第三方异步队列
    1.安装第三方模块queue_jobqueue/queue_jobat16.0·OCA/queue·GitHub2.odoo配置文件,启动多workersworkers=3proxy_mode=Trueserver_wide_modules=web,queue_job[queue_job]channels=root:23.使用方法fromodooimportmodels,fields,apiclass......
  • 队列和循环队列(ArrayQueueAndCircleQueue)
    队列数组队列1.初始化队列privateintmaxsize;//最大长度privateintfront;//指向队首的前一个位置privateintrear;//指向队尾privateint[]arr;publicArrayQueue(intmaxsize){this.maxsize=maxsize;arr=newint[maxsize];......
  • python—多线程之线程之间共享数据(Queue)
    Queue理解从一个线程向另一个线程发送数据最安全的方式可能就是使用queue库中的队列了。创建一个被多个线程共享的Queue对象,这些线程通过使用put()和get()操作来向队列中添加或者删除元素。Queue对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据。生产者和......
  • CF1765H Hospital Queue
    题意给定一张有向无环图,一个合法方案定以为每个点拓扑序满足对应限制,求每个点所有合法方案中的最小拓扑序。\(1\leqn,m\le2000\),数据保证存在合法方案。solution:对拓扑序的字典序的限制可以用优先队列维护,这道题也可以直接开桶。倒着考虑每个时刻能让那些点成为答案,当......
  • 20231110_stack_queue
    课程笔记https://www.cnblogs.com/hellohebin/p/15677386.html上课代码//1-10/*//test1#include<bits/stdc++.h>usingnamespacestd;constintN=1e6+10;intsta[N],head=0;intn,x;intmain(){cin>>n;for(inti=1;i<=n;i++){ci......
  • BlockingQueue队列详解
    /**本例介绍一个特殊的队列:BlockingQueue,如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒.同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被......
  • .NET(C#) LinkedList、Queue<T>和Stack<T>的使用
    本文主要介绍.NET(C#)中,LinkedList链表、Queue<T>队列和Stack<T>堆栈的使用,以及相关的示例代码。1、LinkedList(链表)链表中元素存储内存中是不连续分配,每个元素都有记录前后节点,节点值可以重复,不能通过下标访问,泛型的使用保证类型安全,可以避免装箱拆箱,找元素就只能遍历,查找不方......
  • 队列(Queue):先进先出(FIFO)的数据结构
    队列是一种基本的数据结构,用于在计算机科学和编程中管理数据的存储和访问。队列遵循先进先出(FirstIn,FirstOut,FIFO)原则,即最早入队的元素首先出队。这种数据结构模拟了物理世界中的队列,如排队等待服务的人。在本篇博客中,我们将详细介绍队列的概念、用途、实现以及如何在编程中......
  • Java拾贝第十六天——集合之Queue、Stack
    Queue(队列)Queue是一种先进先出(FIFO:FirstInFirstOut)的有序集合:Queue是Collection的子接口,其定义如下publicinterfaceQueue<E>extendsCollection<E>LinkedList实现了Queue的子接口,根据多态性可以使用Queue创建LinkedList实例。Queue接口常用方法如下:方法类型......
  • Princeton Algorithms, Part I week2 stack&queue
    stack:先进后出queue:先进先出首先是stack有两种实现方式,第一种是用链表,第二种是用数组。Stack:linked-listrepresentation   stack:arrayimplementation  上面这个实现是固定长度的arrayimplementation非常不方便,所以引入可变长度的实现resizing-array......