首页 > 其他分享 >Laravel Schedule 中的 dailyAt 是如何工作的

Laravel Schedule 中的 dailyAt 是如何工作的

时间:2024-05-07 17:46:49浏览次数:10  
标签:Laravel return currentTime invert Schedule DateTime dailyAt parts date

Laravel Schedule 中的 dailyAt 是如何工作的

业务逻辑中通过 dailyAt​ 指定了一个每天都需要执行的定时任务:

$schedule->call(function () {
    // 业务逻辑
 })->dailyAt('14:29');

Illuminate\Console\Scheduling\ManagesFrequencies​ 中的 dailyAt​ 方法,最终是生成 cron 表达式:29 14 * * *

public function dailyAt($time)
{
    $segments = explode(':', $time);

    return $this->spliceIntoPosition(2, (int) $segments[0])
                ->spliceIntoPosition(1, count($segments) === 2 ? (int) $segments[1] : '0');
}

protected function spliceIntoPosition($position, $value)
{
    $segments = explode(' ', $this->expression);

    $segments[$position - 1] = $value;

    return $this->cron(implode(' ', $segments));
}

public function cron($expression)
{
    // 默认 expression  = '* * * * *';
    $this->expression = $expression;

    return $this;
}

Illuminate\Console\Scheduling\ScheduleRunCommand​ 中的 handle​ 方法:

public function handle(Schedule $schedule, Dispatcher $dispatcher, ExceptionHandler $handler)
{
    $this->schedule = $schedule;
    $this->dispatcher = $dispatcher;
    $this->handler = $handler;

    foreach ($this->schedule->dueEvents($this->laravel) as $event) {
        if (! $event->filtersPass($this->laravel)) {
            $this->dispatcher->dispatch(new ScheduledTaskSkipped($event));

            continue;
        }

        if ($event->onOneServer) {
            $this->runSingleServerEvent($event);
        } else {
            $this->runEvent($event);
        }

        $this->eventsRan = true;
    }

    if (! $this->eventsRan) {
        $this->info('No scheduled commands are ready to run.');
    }
}

通过 $this->schedule->dueEvents($this->laravel)​ 获取到期的定时任务,Illuminate\Console\Scheduling\Schedule​ 中的 dueEvents​:

public function dueEvents($app)
{
    return collect($this->events)->filter->isDue($app);
}

Illuminate\Console\Scheduling\Event​ 中的 isDue​ 和 expressionPasses​:

public function isDue($app)
{
    if (! $this->runsInMaintenanceMode() && $app->isDownForMaintenance()) {
        return false;
    }

    return $this->expressionPasses() &&
           $this->runsInEnvironment($app->environment());
}

protected function expressionPasses()
{
    $date = Date::now();

    if ($this->timezone) {
        $date = $date->setTimezone($this->timezone);
    }

    return (new CronExpression($this->expression))->isDue($date->toDateTimeString());
}

Cron\CronExpression​ 中的 isDue​ 方法的最后 $this->getNextRunDate($currentTime, 0, true)->getTimestamp() === $currentTime->getTimestamp();​ 决定是否需要执行:

public function isDue($currentTime = 'now', $timeZone = null): bool
{
    $timeZone = $this->determineTimeZone($currentTime, $timeZone);

    if ('now' === $currentTime) {
        $currentTime = new DateTime();
    } elseif ($currentTime instanceof DateTime) {
        $currentTime = clone $currentTime;
    } elseif ($currentTime instanceof DateTimeImmutable) {
        $currentTime = DateTime::createFromFormat('U', $currentTime->format('U'));
    } elseif (\is_string($currentTime)) {
        $currentTime = new DateTime($currentTime);
    }

    Assert::isInstanceOf($currentTime, DateTime::class);
    $currentTime->setTimezone(new DateTimeZone($timeZone));

    // drop the seconds to 0
    $currentTime->setTime((int) $currentTime->format('H'), (int) $currentTime->format('i'), 0);

    try {
        return $this->getNextRunDate($currentTime, 0, true)->getTimestamp() === $currentTime->getTimestamp();
    } catch (Exception $e) {
        return false;
    }
}

currentTime​ 就是当前的时间,getNextRunDate​ 是根据当前时间和 expression​ (本例中是:29 14 * * *​)计算出下一次任务需要执行的时间点:

public function getNextRunDate($currentTime = 'now', int $nth = 0, bool $allowCurrentDate = false, $timeZone = null): DateTime
{
    return $this->getRunDate($currentTime, $nth, false, $allowCurrentDate, $timeZone);
}


protected function getRunDate($currentTime = null, int $nth = 0, bool $invert = false, bool $allowCurrentDate = false, $timeZone = null): DateTime
{
    $timeZone = $this->determineTimeZone($currentTime, $timeZone);

    if ($currentTime instanceof DateTime) {
        $currentDate = clone $currentTime;
    } elseif ($currentTime instanceof DateTimeImmutable) {
        $currentDate = DateTime::createFromFormat('U', $currentTime->format('U'));
    } elseif (\is_string($currentTime)) {
        $currentDate = new DateTime($currentTime);
    } else {
        $currentDate = new DateTime('now');
    }

    Assert::isInstanceOf($currentDate, DateTime::class);
    $currentDate->setTimezone(new DateTimeZone($timeZone));
    $currentDate->setTime((int) $currentDate->format('H'), (int) $currentDate->format('i'), 0);

    $nextRun = clone $currentDate;
    \Log::info('init nextRun = ' . $nextRun->format('Y-m-d H:i:s'));

    // We don't have to satisfy * or null fields
    $parts = [];
    $fields = [];
    // [5,3,2,4,1,0]
    foreach (self::$order as $position) {
        $part = $this->getExpression($position);
        if (null === $part || '*' === $part) {
            continue;
        }
        $parts[$position] = $part;
        $fields[$position] = $this->fieldFactory->getField($position);
    }
    // parts = [1 => 14, 0 => 29]
    // fields = [1 => HoursField, 0 => MinutesField]; // 对应的取值范围是 [0 ~ 59, 0 ~ 23]

    if (isset($parts[2]) && isset($parts[4])) {
        $domExpression = sprintf('%s %s %s %s *', $this->getExpression(0), $this->getExpression(1), $this->getExpression(2), $this->getExpression(3));
        $dowExpression = sprintf('%s %s * %s %s', $this->getExpression(0), $this->getExpression(1), $this->getExpression(3), $this->getExpression(4));

        $domExpression = new self($domExpression);
        $dowExpression = new self($dowExpression);

        $domRunDates = $domExpression->getMultipleRunDates($nth + 1, $currentTime, $invert, $allowCurrentDate, $timeZone);
        $dowRunDates = $dowExpression->getMultipleRunDates($nth + 1, $currentTime, $invert, $allowCurrentDate, $timeZone);

        $combined = array_merge($domRunDates, $dowRunDates);
        usort($combined, function ($a, $b) {
            return $a->format('Y-m-d H:i:s') <=> $b->format('Y-m-d H:i:s');
        });

        return $combined[$nth];
    }

    // Set a hard limit to bail on an impossible date
    for ($i = 0; $i < $this->maxIterationCount; ++$i) {
        foreach ($parts as $position => $part) {
            $satisfied = false;
            // Get the field object used to validate this part
            $field = $fields[$position];
            // Check if this is singular or a list
            if (false === strpos($part, ',')) {
                $satisfied = $field->isSatisfiedBy($nextRun, $part);
            } else {
                foreach (array_map('trim', explode(',', $part)) as $listPart) {
                    if ($field->isSatisfiedBy($nextRun, $listPart)) {
                        $satisfied = true;

                        break;
                    }
                }
            }

            // If the field is not satisfied, then start over
            if (!$satisfied) {
                $field->increment($nextRun, $invert, $part);

                continue 2;
            }
        }

        // Skip this match if needed
        if ((!$allowCurrentDate && $nextRun == $currentDate) || --$nth > -1) {
            $this->fieldFactory->getField(0)->increment($nextRun, $invert, $parts[0] ?? null);

            continue;
        }

        return $nextRun;
    }

    // @codeCoverageIgnoreStart
    throw new RuntimeException('Impossible CRON expression');
    // @codeCoverageIgnoreEnd
}

月、周、日、天、小时这几个字段都实现了 FieldInterface​ 接口

interface FieldInterface
{
    /**
     * Check if the respective value of a DateTime field satisfies a CRON exp. (检查 DateTime 字段的相应值是否满足 CRON exp。)
     *
     * @param DateTimeInterface $date  DateTime object to check
     * @param string            $value CRON expression to test against
     *
     * @return bool Returns TRUE if satisfied, FALSE otherwise
     */
    public function isSatisfiedBy(DateTimeInterface $date, $value): bool;

    /**
     * When a CRON expression is not satisfied, this method is used to increment
     * or decrement a DateTime object by the unit of the cron field. (当不满足 CRON 表达式时,此方法用于按 cron 字段的单位递增或递减 DateTime 对象。)
     *
     * @param DateTimeInterface $date DateTime object to change
     * @param bool $invert (optional) Set to TRUE to decrement
     * @param string|null $parts (optional) Set parts to use
     *
     * @return FieldInterface
     */
    public function increment(DateTimeInterface &$date, $invert = false, $parts = null): FieldInterface;

    /**
     * Validates a CRON expression for a given field.
     *
     * @param string $value CRON expression value to validate
     *
     * @return bool Returns TRUE if valid, FALSE otherwise
     */
    public function validate(string $value): bool;
}

// 小时字段(HoursField)中实现的 increment 方法
public function increment(DateTimeInterface &$date, $invert = false, $parts = null): FieldInterface
{
    // Change timezone to UTC temporarily. This will
    // allow us to go back or forwards and hour even
    // if DST will be changed between the hours.
    if (null === $parts || '*' === $parts) {
        $timezone = $date->getTimezone();
        $date = $date->setTimezone(new DateTimeZone('UTC'));
        $date = $date->modify(($invert ? '-' : '+') . '1 hour');
        $date = $date->setTimezone($timezone);

        $date = $date->setTime((int)$date->format('H'), $invert ? 59 : 0);
        return $this;
    }

    $parts = false !== strpos($parts, ',') ? explode(',', $parts) : [$parts];
    $hours = [];
    foreach ($parts as $part) {
        $hours = array_merge($hours, $this->getRangeForExpression($part, 23));
    }

    $current_hour = $date->format('H');
    $position = $invert ? \count($hours) - 1 : 0;
    $countHours = \count($hours);
    if ($countHours > 1) {
        for ($i = 0; $i < $countHours - 1; ++$i) {
            if ((!$invert && $current_hour >= $hours[$i] && $current_hour < $hours[$i + 1]) ||
                ($invert && $current_hour > $hours[$i] && $current_hour <= $hours[$i + 1])) {
                $position = $invert ? $i : $i + 1;

                break;
            }
        }
    }

    $hour = (int) $hours[$position];
    if ((!$invert && (int) $date->format('H') >= $hour) || ($invert && (int) $date->format('H') <= $hour)) {
        $date = $date->modify(($invert ? '-' : '+') . '1 day');
        $date = $date->setTime($invert ? 23 : 0, $invert ? 59 : 0);
    } else {
        $date = $date->setTime($hour, $invert ? 59 : 0);
    }

    return $this;
}

getRunDate​ 方法中先用 $satisfied = $field->isSatisfiedBy($nextRun, $part);​ 检查 parts​ 中,对应位置上的值是否符合下一次任务执行的时间点,不满足时,使用 $field->increment($nextRun, $invert, $part);​ 对时间点进行调整

self::$order​ 中大的周期在上面,组装的 parts​ 中也是大的周期在最前面,所以后面处理的时候,会先处理大周期:

private static $order = [
    self::YEAR,
    self::MONTH,
    self::DAY,
    self::WEEKDAY,
    self::HOUR,
    self::MINUTE,
];

当前时间 2024-05-07 16:01:00​,定时任务的表达式是 29 14 * * *​,先处理大周期,所以第一个处理的周期是小时 14

当前时间中的小时是 16​,已经超过了 14​,HoursField​的 increment​ 方法中会加一天,同时将小时和分钟设置为 0,调整后的时间是:2024-05-08 00:00:00

getRunDate​ 方法中调用了 increment​ 方法之后,会 continue 2​,即:重新再验证一遍 parts​ 是否符合;

第二次验证时,时间中的小时是 00​,小于 14​,即时间还未到,此时会将小时 14​ 设置到时间里,调整后的时间是:2024-05-08 14:00:00

小时验证通过后,第二个处理的周期是分钟29​,此时时间中的分钟是 00​,小于 29​,再将分钟 29​ 设置到时间里,调整后的时间是:2024-05-08 14:29:00

getRunDate​ 最终返回的下一次任务执行时间是 2024-05-08 14:29:00​,如果当前时间也走到了 2024-05-08 14:29:00​ 的时候,任务就会被触发。

标签:Laravel,return,currentTime,invert,Schedule,DateTime,dailyAt,parts,date
From: https://www.cnblogs.com/zhpj/p/18178031/how-does-dailyat-work-in-laravel-schedule-work-zg1

相关文章

  • [转帖]Release Schedule of Current Database Releases (Doc ID 742060.1)
    https://support.oracle.com/knowledge/Oracle%20Database%20Products/742060_1.html APPLIESTO:OracleDatabase-StandardEdition-Version11.2.0.4andlaterOracleDatabaseCloudService-VersionN/AandlaterGen2ExadataCloudatCustomer-VersionAl......
  • PHP框架Laravel+Vue3+前后端分离开发模式+实战项目
    1、本实战项目采用前后端分离的开发模式,前端框架vue3,后端框架laravel10。所谓的前后端分离的开发,就是有别于利用cookie,session的基于会话机制的开发模式;前后端分离的开发模式是基于jwt的开发模式,也就是说后端的接口数据不仅可以支持web页面,也可以支持微信小程序,公众号,app等移动端......
  • Apache DolphinScheduler支持Flink吗?
    随着大数据技术的快速发展,很多企业开始将Flink引入到生产环境中,以满足日益复杂的数据处理需求。而作为一款企业级的数据调度平台,ApacheDolphinScheduler也跟上了时代步伐,推出了对Flink任务类型的支持。Flink是一个开源的分布式流处理框架,具有高吞吐量、低延迟和准确性等特点,广泛......
  • laravel 监听数据库查询
    laravel监听数据库查询DB::listen在Laravel框架中,DB::listen是一个监听器,用于监听数据库查询。这段代码注册了一个闭包(匿名函数)作为监听器,当执行数据库查询时,闭包会被调用,并且传递一个包含查询信息的对象作为参数。在这个例子中,传递给闭包的参数是$query,它包含了执行的SQL语句......
  • 使用dolphinscheduler调度flink实时任务
    1.在“项目管理”>>"工作流定义"里边创建工作流2.选择flink_stream3.选择安装flink服务的节点worker分组程序类型选择sql4.在“脚本”编写框输入flink-sql主代码5.选择flink版本,指定任务名称,配置资源参数最后点击确认保存6.在任务定义>>实时任务列表找到刚......
  • 利用 Amazon EMR Serverless、Amazon Athena、Apache Dolphinscheduler 以及本地 TiDB
    引言在数据驱动的世界中,企业正在寻求可靠且高性能的解决方案来管理其不断增长的数据需求。本系列博客从一个重视数据安全和合规性的B2C金融科技客户的角度来讨论云上云下混合部署的情况下如何利用亚马逊云科技云原生服务、开源社区产品以及第三方工具构建无服务器数据仓库的解......
  • 用DolphinScheduler轻松实现Flume数据采集任务自动化!
    转载自天地风雷水火山泽目的因为我们的数仓数据源是Kafka,离线数仓需要用Flume采集Kafka中的数据到HDFS中。在实际项目中,我们不可能一直在Xshell中启动Flume任务,一是因为项目的Flume任务很多,二是一旦Xshell页面关闭Flume任务就会停止,这样非常不方便,因此必须在后台启动Flume任务......
  • 30 天精通 RxJS (28):Scheduler 基本观念
    不晓得读者们还记不记得,我们在前面的文章中有提到Scheduler是为了解决RxJS衍生的最后一个问题,而我们现在就在揭晓这个谜底。其实RxJS用久了之后就会发现Observable有一个优势是可以同时处理同步和非同步行为,但这个优势也带来了一个问题,就是我们常常会搞不清处现在的......
  • 快速理解Laravel容器(IOC、DI、Provider、Contract)
    源码理解思维的提升分享一些个人见解。Laravel里面的某些概念,就像魔术一样,看起来很厉害,当知道魔术怎么变的,就会认为也不过如此。所以不必感觉Laravel里有些概念难以理解。应当抛除被框架约束思维的枷锁,用PHP设计的角度去思考,关注大概,而不是在在框架层面逐行磨叽。毕竟源码那么......
  • WhaleScheduler为银行业全信创环境打造统一调度管理平台解决方案
    项目背景数字金融是数字经济的重要支撑和驱动力。近年来,我国针对数字金融的发展政策频频出台,《金融科技发展规划(2022-2025年)》、《“十四五”数字经济发展规划》、《关于银行业保险业数字化转型的指导意见》、《金融标准化“十四五”发展规划》等相继发布,顶层设计逐步完善。2......