首页 > 编程语言 >thinkphp 6 消息队列 (转载)

thinkphp 6 消息队列 (转载)

时间:2024-01-15 09:55:56浏览次数:30  
标签:队列 Job queue print job thinkphp 转载 data MultiTask

1.安装think-queue

composer require topthink/think-queue

 

2.配置消息队列,将config/queue.php将’default’ => ‘sync’改为’default’ => ‘redis’,使用Redis驱动

 

如选择database,需创建表

CREATE TABLE `prefix_jobs` ( `id` int(11) NOT NULL AUTO_INCREMENT, `queue` varchar(255) NOT NULL, `payload` longtext NOT NULL, `attempts` tinyint(3) unsigned NOT NULL, `reserve_time` int(10) unsigned DEFAULT NULL, `available_time` int(10) unsigned NOT NULL, `create_time` int(10) unsigned NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

3.创建生产者

 

复制代码
class Index extends BaseController
{
    
    /**
     * 单任务
     */
    public function singleTask()
    {
        //当前任务将由哪个类来负责处理
        $jobHandlerClassName = 'app\job\Job1';
        //业务数据 对象需要手动转序列化
        $jobData = ['ts' => time()];
        //队列名称
        $jobQueueName = "createOrderJob";
        //入队列,later延时发送,单位秒。push立即发送
        $isPushed = Queue::later(2, $jobHandlerClassName, $jobData,$jobQueueName);
        //$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
        // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
        if( $isPushed !== false ){
            echo '执行成功';
        }else{
            echo '执行失败';
        }
        //php think queue:listen --queue createOrderJob  执行队列
        //nohup php think queue:listen --queue createOrderJob &  不以守护进程执行
    }
 
    /**
     * 多任务
     */
    public function multiTask(){
        $taskType = $_GET['taskType'];
        switch ($taskType) {
            case 'taskA':
                $jobHandlerClassName  = 'app\job\MultiTask@taskA';
                $jobDataArr = ['a'   => '1'];
                $jobQueueName = "multiTaskJobQueue";
                break;
            case 'taskB':
                $jobHandlerClassName  = 'app\job\MultiTask@taskB';
                $jobDataArr = ['b'   => '2'];
                $jobQueueName = "multiTaskJobQueue";
                break;
            default:
                break;
        }
 
        $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
        if ($isPushed !== false) {
            echo("the $taskType of MultiTask Job has been Pushed to ".$jobQueueName ."<br>");
        }else{
            echo "push a new $taskType of MultiTask Job Failed!";
        }
    }
 
}
复制代码

4.创建消费者

单任务

复制代码
class Job1
{
    public function fire(Job $job, $data)
    {
        //业务处理代码,具体不贴出来了
        $isJobDone = $this->jobDone($data);
        //执行成功删除
        if($isJobDone){
            $job->delete();
            print("任务已经被执行成功并且删除");
        }else{
            $job->release(3); //$delay为延迟时间 表示该任务延迟3秒后再执行
            print("任务3s后再次被执行");
        }
        //通过这个方法可以检查任务重试了几次
        if ($job->attempts() > 3) {
            print("Job has been retried more than 3 times!");
            $job->delete();
        }
    }
 
    public function failed($data)
    {
        // ...任务达到最大重试次数后,失败了
    }
 
    private function jobDone($data){
        Log::write('这是数据 ' . json_encode($data));
        return true;
    }
复制代码

多任务

复制代码
class MultiTask{
    public function taskA(Job $job,$data){
 
        $isJobDone = $this->_doTaskA($data);
 
        if ($isJobDone) {
            $job->delete();
            print("Info: TaskA of Job MultiTask has been done and deleted"."\n");
        }else{
            $job->release(3);
            print("任务3s后再次被执行");
        }
        if ($job->attempts() > 3) {
            print("Job has been retried more than 3 times!");
            $job->delete();
        }
    }
 
    public function taskB(Job $job,$data){
 
        $isJobDone = $this->_doTaskB($data);
 
        if ($isJobDone) {
            $job->delete();
            print("Info: TaskB of Job MultiTask has been done and deleted"."\n");
        }else{
            $job->release(3);
            print("任务3s后再次被执行");
        }
        if ($job->attempts() > 3) {
            print("Job has been retried more than 3 times!");
            $job->delete();
        }
    }
 
    private function _doTaskA($data) {
        print("Info: doing TaskA of Job MultiTask "."\n");
        return true;
    }
 
    private function _doTaskB($data) {
        print("Info: doing TaskB of Job MultiTask "."\n");
        return true;
    }
}
复制代码

5.执行

php think queue:listen --queue createOrderJob   //(队列名)

 --daemon  //可后台运行,具体看手册

 

//$job->delete();   删除任务
//$job->attempts();  查看任务执行次数
// 注意:执行完任务后必须删除任务

 

 

 

 

在database 模式下,2.7.1 和 2.7.2 中的重发逻辑是先删除原来的任务,然后插入一个新的任务。2.7.3 中的重发时机是直接更新原任务。

而在redis 模式下,3种重发都是先删除再插入。

不管是哪种重发方式,重发之后,任务的已尝试次数会在原来的基础上 +1 。

此外,消费者类中需要注意,如果 fire() 方法中可能抛出异常,那么如果不需要自动重发的话,

请在抛出异常之前将任务删除 job−>delete(),以免产生bug。如果需要自动重发的话,请直接抛出异常,

不要在fire()方法中又手动使用job->release() , 这样会导致该任务被重发两次,产生两个一样的新任务。

 

 可配合supervisor使用,保证进程常驻

 

转: https://www.freesion.com/article/81081354933/

https://www.sxxblog.com/index/detail/archive/18.html

https://blog.csdn.net/chengzheng5879/article/details/100913455

 https://www.thinkphp.cn/code/4796.html

https://www.cnblogs.com/fps2tao/p/15145470.html

 

标签:队列,Job,queue,print,job,thinkphp,转载,data,MultiTask
From: https://www.cnblogs.com/hh1137054072/p/17964742

相关文章

  • 前K个高频元素 347 暴力法和优先级队列
    关于暴力法简单说一点,我是用链表进行比较排序的,但是有一个细节要注意。点击查看代码structnode{intval;intcount;node*next;node(intval,intcount):val(val),count(count),next(nullptr){}};关于节点的构造函数,一定要构造......
  • 布隆过滤器详解——转载自IT老暖男
    前言我们之前讲了Redis的缓存雪崩、穿透、击穿。在文章里我们说了解决缓存穿透的办法之一,就是布隆过滤器,但是上次并没有讲如何使用布隆过滤器。作为暖男的老哥,给你们补上,请叫我IT老暖男。什么是布隆过滤器布隆过滤器(BloomFilter),是1970年,由一个叫布隆的小伙子提出的,距今已......
  • 滑动窗口的最大值 239 单调队列初识
    最开始做的时候,暴力解法结果不管怎么剪枝,还是超时了。后来看到了卡哥的方法,学到了单调队列,其实就是自定义队列。用deque来实现。有三个关键点:pop,push,front.pop,如果遍历的元素等于队头元素,则头删。push,把比遍历元素小的都进行尾部删。front,就是普通的查找队头。循环遍历的时......
  • CF-514-D-单调队列
    514-D题目大意给定\(n\)个人,每个人有\(m\)个属性,第\(i\)个人的第\(j\)个属性值为\(a[i][j]\)。最多可以执行\(k\)次操作,每次操作选定一个属性,把所有人的该属性减\(1\),求一段最长的区间,满足执行所有操作之后该区间中所有人的所有属性全部为\(0\)。Solution转换一下思考方向,求......
  • 栈和队列
    栈和队列目录栈和队列栈栈的基本定义顺序栈链栈栈栈的基本定义栈是一种特殊的线性表,特点是先进后出栈的操作只能在表的一端进行,出栈和入栈都只能在栈顶(表尾)进行栈的两种实现:顺序栈和链栈顺序栈采用数组,用一个指针指向栈顶双向顺序栈(空间利用率更高)两个栈栈尾是数组的头......
  • thinkphp 便捷关联简写
    /**   *关联会员表   */   publicfunctionmember(){return$this->hasOne(Member::class,'member_id','member_id');}关联写法/***详情*@paramint$order_id*@returnarray*/publicfunc......
  • js 队列方法
    就像栈是以LIFO形式限制访问的数据结构一样,队列以先进先出(FIFO,First-In-First-Out)形式限制访问。队列在列表末尾添加数据,但从列表开头获取数据。因为有了在数据末尾添加数据的push()方法,所以要模拟队列就差一个从数组开头取得数据的方法了。这个数组方法叫shift(),它会删除......
  • 第二章 Spring Boot 整合 Kafka消息队列 生产者
    ​ 系列文章目录第一章Kafka配置部署及SASL_PLAINTEXT安全认证第二章  SpringBoot整合Kafka消息队列 生产者第三章  SpringBoot整合Kafka消息队列 消息者(待续) 前言        Kafka是一个消息队列产品,基于Topicpartitions的设计,能达到非常高的消息......
  • 【转载】淘宝爬虫sign、token详解
    【转载】淘宝爬虫sign、token详解淘宝对于h5的访问采用了和客户端不同的方式,由于在h5的js代码中保存appsercret具有较高的风险,mtop采用了随机分配令牌的方式,为每个访问端分配一个token,保存在用户的cookie中,通过cookie带回服务端分配的token,客户端利用分配的token对请求的URL参......
  • thinkphp6的join连表查询
    说明join要关联的(完整)表名以及别名,支持三种写法:写法1:['完整表名或者子查询'=>'别名']写法2:'完整表名别名'写法3:'不带数据表前缀的表名'condition关联条件,可以为字符串或数组,为数组时每一个元素都是一个关联条件。type关联类型,可以为:INNER、LEFT、RIGHT、FULL,......