首页 > 其他分享 >手撕一个异步任务通用组件

手撕一个异步任务通用组件

时间:2022-12-27 10:47:32浏览次数:32  
标签:异步 通用 name process redis 进程 组件 data id

目的

取代linux crontab的计划任务,那玩儿意最小粒度一分钟,意味着服务器不管如何清闲都会有一分钟延迟

实现原理

while (TRUE) {}

没错,就是这么粗暴,一个永不停止的无限循环,具体不多说,代码里注释写的非常明白了,上代码

  1. 首先实现一个进程管理器,因为一个处理进程开启后,ssh客户端再一关,就失去对它的控制权了,这个管理器就是一组命令实现对进程的控制
<?php

namespace app\command;

use app\common\constant\CommonConstant;
use extendRedis\VekiseRedis;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

/**
 * Desc:异步任务进程管理
 * Author:glt 2022/12/20 17:36
 */
class SyncTaskProcessManager extends Command {

    protected function configure()
    {
        $this->setName('SyncTaskProcessManager')
            ->addArgument('id', Argument::OPTIONAL, "操作id")
            ->addOption('operate', NULL, Option::VALUE_REQUIRED, '操作类型')
            ->setDescription('异步任务进程管理');
    }

    protected function execute(Input $input, Output $output)
    {
        if (!$input->hasOption('operate')) {
            dd('请选择操作类型');
        }
        $operate = $input->getOption('operate');
        if (!in_array($operate, ['list', 'stop', 'start', 'del'])) {
            dd('操作类型不存在');
        }
        $redis = VekiseRedis::getInstance();
        $process_ids = $redis->keys('sync_task_process_*');
        //查看进程列表
        if ($operate === 'list') {
            $list = [];
            foreach ($process_ids as $process_id) {
                $item_process = $redis->hGetAll($process_id, FALSE);
                $item_process['id'] = $process_id;
                $item_process['status_cn'] = $item_process['status'] == CommonConstant::SYNC_TASK_PROCESS_STATUS_ON ? '运行中' : '暂停';
                $item_process['type_cn'] = $item_process['type'] == CommonConstant::SYNC_TASK_PROCESS_TYPE_SINGLE ? '不允许并行' : '允许并行';
                $list[] = $item_process;
            }
            dd($list);
        }
        //暂停进程
        if ($operate === 'stop') {
            $id = trim($input->getArgument('id'));
            if (empty($id)) {
                dd('请选择操作对象id');
            }
            if ($id === 'all') {//操作全部进程
                foreach ($process_ids as $process_id) {
                    $redis->hSet($process_id, 'status', CommonConstant::SYNC_TASK_PROCESS_STATUS_OFF, FALSE);
                }
            } else {//操作单个进程
                $redis->hSet($id, 'status', CommonConstant::SYNC_TASK_PROCESS_STATUS_OFF, FALSE);
            }
            dd('操作成功');
        }
        //开始进程
        if ($operate === 'start') {
            $id = trim($input->getArgument('id'));
            if (empty($id)) {
                dd('请选择操作对象id');
            }
            if ($id === 'all') {//操作全部进程
                foreach ($process_ids as $process_id) {
                    $redis->hSet($process_id, 'status', CommonConstant::SYNC_TASK_PROCESS_STATUS_ON, FALSE);
                }
            } else {//操作单个进程
                $redis->hSet($id, 'status', CommonConstant::SYNC_TASK_PROCESS_STATUS_ON, FALSE);
            }
            dd('操作成功');
        }
        //删除进程
        if ($operate === 'del') {
            $id = trim($input->getArgument('id'));
            if (empty($id)) {
                dd('请选择操作对象id');
            }
            if ($id === 'all') {//操作全部进程
                foreach ($process_ids as $process_id) {
                    $redis->rm($process_id, FALSE);
                }
            } else {//操作单个进程
                $redis->rm($id, FALSE);
            }
            dd('操作成功');
        }
        dd('操作类型非法');
    }
}

2.执行异步任务

<?php

namespace app\command;

use app\common\constant\CommonConstant;
use app\common\model\QueueData;
use extendRedis\VekiseRedis;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\Output;

/**
 * Desc:即时执行的异步任务
 * Author:glt 2022/12/20 17:36
 */
class SyncTask extends Command {

    protected function configure()
    {
        $this->setName('SyncTask')
            ->addArgument('type', Argument::REQUIRED, "任务类型")
            ->setDescription('即时执行的异步任务');
    }

    protected function execute(Input $input, Output $output)
    {
        //进程管理思路:每次启动异步消费者,生成一个进程id写入redis,
        //之后每次轮询都检查一遍进程状态,通过修改这个状态可以控制当前进程结束
        //生成进程id
        $redis = VekiseRedis::getInstance();
        $process_id = date('YmdHis') . uniqid();
        //在系统资源有富余的前提下,多进程同时执行明显消费速度更快,但是特定场景下同时执行会带来mysql死锁的问题,比如订单上传库存占用场景
        //所以进程应该有两种,一种并发进程可以开n多窗口一起跑,一种非并发进程只允许一个窗口运行
        $type = trim($input->getArgument('type'));
        if (!in_array($type, [CommonConstant::SYNC_TASK_PROCESS_TYPE_SINGLE, CommonConstant::SYNC_TASK_PROCESS_TYPE_PARALLEL])) {
            dd('类型参数错误');
        }
        if ($type == CommonConstant::SYNC_TASK_PROCESS_TYPE_SINGLE) {//单进程不允许并发
            $process_ids = $redis->keys('sync_task_process_*');
            foreach ($process_ids as $process_id) {
                $process_type = $redis->hGet($process_id, 'type', FALSE);
                if ($process_type == CommonConstant::SYNC_TASK_PROCESS_TYPE_SINGLE) {
                    dd('已经开启了一个单进程消费者');
                }
            }
            //两种进程分别消费自己独立的队列
            $queue_field = 'sync_task_queue_list_single';
        } else {
            $queue_field = 'sync_task_queue_list_parallel';
        }
        //进程id写入
        $redis->hMset('sync_task_process_' . $process_id, [
            'memory_usage'   => 0,//内存消耗
            'last_exec_time' => '',//最后一次执行时间
            'status'         => CommonConstant::SYNC_TASK_PROCESS_STATUS_ON,//进程开启状态
            'type'           => $type//进程类型
        ]);
        while (TRUE) {
            //检查进程状态
            $status = $redis->hGet('sync_task_process_' . $process_id, 'status');
            //进程状态为2,中断执行,这样就实现了手动控制异步任务执行
            if (!$status) {//需注意,常驻进程的任务,如果改了代码需要重启进程,这里的设计是能做到平滑重启的
                dd('进程被删除');
            }
            if ($status == CommonConstant::SYNC_TASK_PROCESS_STATUS_OFF) {
                dd('进程设置为暂停');
            }
            //开始执行
            //轮询redis消息列表,这个消息表为什么不使用mysql作为存储介质?
            //1:90%的情况是查不到数据的,空转2:频率太高
            //但是可以在消息消费完后写一份结果到mysql,追溯排查bug用,mysql的查询还是比redis强大舒服了太多
            $task_str = $redis->lpop($queue_field);//先进先出,rpush尾部插入,lpop头部取出
            if ($task_str !== FALSE) {//消息不为空
                try {//任何地方不能出现报错,会导致进程中断,所以提前捕获错误
                    $data = json_decode($task_str, TRUE);
                    $response = $this->execTask($data);
                    $res = $response['res'];
                    $msg = $response['msg'];
                } catch (\Exception $e) {
                    $res = FALSE;
                    $msg = $e->getMessage();
                }
                //写入一条数据到mysql备份或者人工排查问题,这张表只写不查,消息id用于唯一标识当前消息,用于判断重试次数
                $msg_id = QueueData::insertGetId([
                    'param'       => json_encode($data['param']),
                    'class_name'  => $data['class_name'],
                    'func_name'   => $data['func_name'],
                    'res'         => $res ? 1 : 2,
                    'msg'         => $msg,
                    'create_time' => date(CommonConstant::YMDHIS_FORMAT),
                    'origin_id'   => $data['origin_id'],
                    'scene'       => $data['scene']
                ]);
                if ($res === FALSE) {//三次重试机制
                    if (empty($data['origin_id'])) {//没有原始消息id,认为第一次出错,直接重新触发生产者
                        self::producer($data['param'], $data['class_name'], $data['func_name'], $data['scene'], $type, $msg_id);
                        $redis->set('sync_task_queue_error_num_' . $msg_id, 1);//记录错误一次
                    } else {//有原始消息id,认为不是第一次出错,去查找它错了几回
                        $error_num = $redis->get('sync_task_queue_error_num_' . $data['origin_id']);
                        if ($error_num == 3) {//已经重试了三次,不再重试,且把重试次数这个键删掉,没用了
                            $redis->rm('sync_task_queue_error_num_' . $data['origin_id']);
                        } else {//小于三次,继续重试
                            self::producer($data['param'], $data['class_name'], $data['func_name'], $data['scene'], $type, $data['origin_id']);
                            $redis->inc('sync_task_queue_error_num_' . $data['origin_id']);//记录错误一次+1
                        }
                    }
                } else {
                    if (!empty($data['origin_id'])) {//执行成功,且存在原始消息id,认为是重试后成功的,删掉重试次数
                        $redis->rm('sync_task_queue_error_num_' . $data['origin_id']);
                    }
                }
            }
            //执行结束,更新进程
            $redis->hMset('sync_task_process_' . $process_id, [
                'memory_usage'   => round(memory_get_usage() / 1024 / 1024, 2) . 'M',//内存消耗
                'last_exec_time' => date(CommonConstant::YMDHIS_FORMAT),//最后一次执行时间,这个时间长期不更新的话,认为是死进程清理掉,比如ctrl+c人为停止的就会这样
            ]);
        }
    }

    /**
     * Desc:执行任务
     * Author:glt 2022/12/22 10:46
     *
     * @param $data
     *
     * @return array
     */
    private function execTask($data)
    {
        $class_name = $data['class_name'];
        $func_name = $data['func_name'];
        $target_class = (new $class_name);
        $exec_res = $target_class->$func_name($data['param']);
        if ($exec_res === FALSE) {
            //这里要适应下现有的代码写法,错误消息定义为当前逻辑类的一个属性error
            //一层调用没问题,很清楚去拿谁的error属性,但是好几层调用那就是地狱体验了,直接抛异常不香吗???
            return ['res' => FALSE, 'msg' => $target_class->error ?? ''];
        }

        return ['res' => TRUE, 'msg' => is_array($exec_res) ? json_encode($exec_res) : $exec_res];
    }

    /**
     * Desc:异步任务生产者
     * Author:glt 2022/12/22 10:22
     *
     * @param       $param
     * @param       $class_name
     * @param       $func_name
     * @param       $scene
     * @param int   $type
     * @param int   $origin_id
     *
     * @return bool|int
     */
    public static function producer($param, $class_name, $func_name, $scene, $type = CommonConstant::SYNC_TASK_PROCESS_TYPE_PARALLEL, $origin_id = 0)
    {
        //分别写入不同的list
        $queue_field = $type == CommonConstant::SYNC_TASK_PROCESS_TYPE_SINGLE ? 'sync_task_queue_list_single' : 'sync_task_queue_list_parallel';
        $redis = VekiseRedis::getInstance();

        return $redis->rpush($queue_field, json_encode([
            'param'      => $param,
            'class_name' => $class_name,
            'func_name'  => $func_name,
            'scene'      => $scene,//场景,方便数据库查看而已,没别的用
            'origin_id'  => $origin_id//原始消息id,用于标识重试的消息
        ]));
    }

    /**
     * Desc:都知道异步任务调试麻烦,所以先写个入口,用于同步执行异步任务
     * Author:glt 2022/12/22 14:54
     *
     * @param $msg_id
     */
    public function handleExec($msg_id)
    {
        $msg = QueueData::where('id', $msg_id)->find()->toArray();
        $this->execTask([
            'class_name' => $msg['class_name'],
            'func_name'  => $msg['func_name'],
            'param'      => json_decode($msg['param'], TRUE),
        ]);
    }
}

标签:异步,通用,name,process,redis,进程,组件,data,id
From: https://www.cnblogs.com/gltt/p/17007538.html

相关文章