目的
取代linux crontab的计划任务,那玩儿意最小粒度一分钟,意味着服务器不管如何清闲都会有一分钟延迟
实现原理
while (TRUE) {}
没错,就是这么粗暴,一个永不停止的无限循环,具体不多说,代码里注释写的非常明白了,上代码
- 首先实现一个进程管理器,因为一个处理进程开启后,ssh客户端再一关,就失去对它的控制权了,这个管理器就是一组命令实现对进程的控制
<?php
namespace app\command;
use app\common\constant\CommonConstant;
use extendRedis\VekiseRedis;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
/**
* Desc:异步任务进程管理
* Author:glt 2022/12/20 17:36
*/
class SyncTaskProcessManager extends Command {
protected function configure()
{
$this->setName('SyncTaskProcessManager')
->addArgument('id', Argument::OPTIONAL, "操作id")
->addOption('operate', NULL, Option::VALUE_REQUIRED, '操作类型')
->setDescription('异步任务进程管理');
}
protected function execute(Input $input, Output $output)
{
if (!$input->hasOption('operate')) {
dd('请选择操作类型');
}
$operate = $input->getOption('operate');
if (!in_array($operate, ['list', 'stop', 'start', 'del'])) {
dd('操作类型不存在');
}
$redis = VekiseRedis::getInstance();
$process_ids = $redis->keys('sync_task_process_*');
//查看进程列表
if ($operate === 'list') {
$list = [];
foreach ($process_ids as $process_id) {
$item_process = $redis->hGetAll($process_id, FALSE);
$item_process['id'] = $process_id;
$item_process['status_cn'] = $item_process['status'] == CommonConstant::SYNC_TASK_PROCESS_STATUS_ON ? '运行中' : '暂停';
$item_process['type_cn'] = $item_process['type'] == CommonConstant::SYNC_TASK_PROCESS_TYPE_SINGLE ? '不允许并行' : '允许并行';
$list[] = $item_process;
}
dd($list);
}
//暂停进程
if ($operate === 'stop') {
$id = trim($input->getArgument('id'));
if (empty($id)) {
dd('请选择操作对象id');
}
if ($id === 'all') {//操作全部进程
foreach ($process_ids as $process_id) {
$redis->hSet($process_id, 'status', CommonConstant::SYNC_TASK_PROCESS_STATUS_OFF, FALSE);
}
} else {//操作单个进程
$redis->hSet($id, 'status', CommonConstant::SYNC_TASK_PROCESS_STATUS_OFF, FALSE);
}
dd('操作成功');
}
//开始进程
if ($operate === 'start') {
$id = trim($input->getArgument('id'));
if (empty($id)) {
dd('请选择操作对象id');
}
if ($id === 'all') {//操作全部进程
foreach ($process_ids as $process_id) {
$redis->hSet($process_id, 'status', CommonConstant::SYNC_TASK_PROCESS_STATUS_ON, FALSE);
}
} else {//操作单个进程
$redis->hSet($id, 'status', CommonConstant::SYNC_TASK_PROCESS_STATUS_ON, FALSE);
}
dd('操作成功');
}
//删除进程
if ($operate === 'del') {
$id = trim($input->getArgument('id'));
if (empty($id)) {
dd('请选择操作对象id');
}
if ($id === 'all') {//操作全部进程
foreach ($process_ids as $process_id) {
$redis->rm($process_id, FALSE);
}
} else {//操作单个进程
$redis->rm($id, FALSE);
}
dd('操作成功');
}
dd('操作类型非法');
}
}
2.执行异步任务
<?php
namespace app\command;
use app\common\constant\CommonConstant;
use app\common\model\QueueData;
use extendRedis\VekiseRedis;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\Output;
/**
* Desc:即时执行的异步任务
* Author:glt 2022/12/20 17:36
*/
class SyncTask extends Command {
protected function configure()
{
$this->setName('SyncTask')
->addArgument('type', Argument::REQUIRED, "任务类型")
->setDescription('即时执行的异步任务');
}
protected function execute(Input $input, Output $output)
{
//进程管理思路:每次启动异步消费者,生成一个进程id写入redis,
//之后每次轮询都检查一遍进程状态,通过修改这个状态可以控制当前进程结束
//生成进程id
$redis = VekiseRedis::getInstance();
$process_id = date('YmdHis') . uniqid();
//在系统资源有富余的前提下,多进程同时执行明显消费速度更快,但是特定场景下同时执行会带来mysql死锁的问题,比如订单上传库存占用场景
//所以进程应该有两种,一种并发进程可以开n多窗口一起跑,一种非并发进程只允许一个窗口运行
$type = trim($input->getArgument('type'));
if (!in_array($type, [CommonConstant::SYNC_TASK_PROCESS_TYPE_SINGLE, CommonConstant::SYNC_TASK_PROCESS_TYPE_PARALLEL])) {
dd('类型参数错误');
}
if ($type == CommonConstant::SYNC_TASK_PROCESS_TYPE_SINGLE) {//单进程不允许并发
$process_ids = $redis->keys('sync_task_process_*');
foreach ($process_ids as $process_id) {
$process_type = $redis->hGet($process_id, 'type', FALSE);
if ($process_type == CommonConstant::SYNC_TASK_PROCESS_TYPE_SINGLE) {
dd('已经开启了一个单进程消费者');
}
}
//两种进程分别消费自己独立的队列
$queue_field = 'sync_task_queue_list_single';
} else {
$queue_field = 'sync_task_queue_list_parallel';
}
//进程id写入
$redis->hMset('sync_task_process_' . $process_id, [
'memory_usage' => 0,//内存消耗
'last_exec_time' => '',//最后一次执行时间
'status' => CommonConstant::SYNC_TASK_PROCESS_STATUS_ON,//进程开启状态
'type' => $type//进程类型
]);
while (TRUE) {
//检查进程状态
$status = $redis->hGet('sync_task_process_' . $process_id, 'status');
//进程状态为2,中断执行,这样就实现了手动控制异步任务执行
if (!$status) {//需注意,常驻进程的任务,如果改了代码需要重启进程,这里的设计是能做到平滑重启的
dd('进程被删除');
}
if ($status == CommonConstant::SYNC_TASK_PROCESS_STATUS_OFF) {
dd('进程设置为暂停');
}
//开始执行
//轮询redis消息列表,这个消息表为什么不使用mysql作为存储介质?
//1:90%的情况是查不到数据的,空转2:频率太高
//但是可以在消息消费完后写一份结果到mysql,追溯排查bug用,mysql的查询还是比redis强大舒服了太多
$task_str = $redis->lpop($queue_field);//先进先出,rpush尾部插入,lpop头部取出
if ($task_str !== FALSE) {//消息不为空
try {//任何地方不能出现报错,会导致进程中断,所以提前捕获错误
$data = json_decode($task_str, TRUE);
$response = $this->execTask($data);
$res = $response['res'];
$msg = $response['msg'];
} catch (\Exception $e) {
$res = FALSE;
$msg = $e->getMessage();
}
//写入一条数据到mysql备份或者人工排查问题,这张表只写不查,消息id用于唯一标识当前消息,用于判断重试次数
$msg_id = QueueData::insertGetId([
'param' => json_encode($data['param']),
'class_name' => $data['class_name'],
'func_name' => $data['func_name'],
'res' => $res ? 1 : 2,
'msg' => $msg,
'create_time' => date(CommonConstant::YMDHIS_FORMAT),
'origin_id' => $data['origin_id'],
'scene' => $data['scene']
]);
if ($res === FALSE) {//三次重试机制
if (empty($data['origin_id'])) {//没有原始消息id,认为第一次出错,直接重新触发生产者
self::producer($data['param'], $data['class_name'], $data['func_name'], $data['scene'], $type, $msg_id);
$redis->set('sync_task_queue_error_num_' . $msg_id, 1);//记录错误一次
} else {//有原始消息id,认为不是第一次出错,去查找它错了几回
$error_num = $redis->get('sync_task_queue_error_num_' . $data['origin_id']);
if ($error_num == 3) {//已经重试了三次,不再重试,且把重试次数这个键删掉,没用了
$redis->rm('sync_task_queue_error_num_' . $data['origin_id']);
} else {//小于三次,继续重试
self::producer($data['param'], $data['class_name'], $data['func_name'], $data['scene'], $type, $data['origin_id']);
$redis->inc('sync_task_queue_error_num_' . $data['origin_id']);//记录错误一次+1
}
}
} else {
if (!empty($data['origin_id'])) {//执行成功,且存在原始消息id,认为是重试后成功的,删掉重试次数
$redis->rm('sync_task_queue_error_num_' . $data['origin_id']);
}
}
}
//执行结束,更新进程
$redis->hMset('sync_task_process_' . $process_id, [
'memory_usage' => round(memory_get_usage() / 1024 / 1024, 2) . 'M',//内存消耗
'last_exec_time' => date(CommonConstant::YMDHIS_FORMAT),//最后一次执行时间,这个时间长期不更新的话,认为是死进程清理掉,比如ctrl+c人为停止的就会这样
]);
}
}
/**
* Desc:执行任务
* Author:glt 2022/12/22 10:46
*
* @param $data
*
* @return array
*/
private function execTask($data)
{
$class_name = $data['class_name'];
$func_name = $data['func_name'];
$target_class = (new $class_name);
$exec_res = $target_class->$func_name($data['param']);
if ($exec_res === FALSE) {
//这里要适应下现有的代码写法,错误消息定义为当前逻辑类的一个属性error
//一层调用没问题,很清楚去拿谁的error属性,但是好几层调用那就是地狱体验了,直接抛异常不香吗???
return ['res' => FALSE, 'msg' => $target_class->error ?? ''];
}
return ['res' => TRUE, 'msg' => is_array($exec_res) ? json_encode($exec_res) : $exec_res];
}
/**
* Desc:异步任务生产者
* Author:glt 2022/12/22 10:22
*
* @param $param
* @param $class_name
* @param $func_name
* @param $scene
* @param int $type
* @param int $origin_id
*
* @return bool|int
*/
public static function producer($param, $class_name, $func_name, $scene, $type = CommonConstant::SYNC_TASK_PROCESS_TYPE_PARALLEL, $origin_id = 0)
{
//分别写入不同的list
$queue_field = $type == CommonConstant::SYNC_TASK_PROCESS_TYPE_SINGLE ? 'sync_task_queue_list_single' : 'sync_task_queue_list_parallel';
$redis = VekiseRedis::getInstance();
return $redis->rpush($queue_field, json_encode([
'param' => $param,
'class_name' => $class_name,
'func_name' => $func_name,
'scene' => $scene,//场景,方便数据库查看而已,没别的用
'origin_id' => $origin_id//原始消息id,用于标识重试的消息
]));
}
/**
* Desc:都知道异步任务调试麻烦,所以先写个入口,用于同步执行异步任务
* Author:glt 2022/12/22 14:54
*
* @param $msg_id
*/
public function handleExec($msg_id)
{
$msg = QueueData::where('id', $msg_id)->find()->toArray();
$this->execTask([
'class_name' => $msg['class_name'],
'func_name' => $msg['func_name'],
'param' => json_decode($msg['param'], TRUE),
]);
}
}
标签:异步,通用,name,process,redis,进程,组件,data,id
From: https://www.cnblogs.com/gltt/p/17007538.html