目录
应用
应用1:Leetcode 1834. 单线程 CPU
题目
给你一个二维数组 tasks ,用于表示 n 项从 0 到 n - 1 编号的任务。其中 \(tasks[i] = [enqueueTime_i, processingTime_i]\) 意味着第 i 项任务将会于 enqueueTimei 时进入任务队列,需要 \(processingTime_i\) 的时长完成执行。
现有一个单线程 CPU ,同一时间只能执行 最多一项 任务,该 CPU 将会按照下述方式运行:
如果 CPU 空闲,且任务队列中没有需要执行的任务,则 CPU 保持空闲状态。
如果 CPU 空闲,但任务队列中有需要执行的任务,则 CPU 将会选择 执行时间最短 的任务开始执行。如果多个任务具有同样的最短执行时间,则选择下标最小的任务开始执行。
一旦某项任务开始执行,CPU 在 执行完整个任务 前都不会停止。
CPU 可以在完成一项任务后,立即开始执行一项新任务。
返回 CPU 处理任务的顺序。
示例 1:
输入:\(tasks = [[1,2],[2,4],[3,2],[4,1]]\)
输出:\([0,2,3,1]\)
解释:事件按下述流程运行:
- time = 1 ,任务 0 进入任务队列,可执行任务项 = [0]
- 同样在 time = 1 ,空闲状态的 CPU 开始执行任务 0 ,可执行任务项 = []
- time = 2 ,任务 1 进入任务队列,可执行任务项 = [1]
- time = 3 ,任务 2 进入任务队列,可执行任务项 = [1, 2]
- 同样在 time = 3 ,CPU 完成任务 0 并开始执行队列中用时最短的任务 2 ,可执行任务项 = {1]
- time = 4 ,任务 3 进入任务队列,可执行任务项 = [1, 3]
- time = 5 ,CPU 完成任务 2 并开始执行队列中用时最短的任务 3 ,可执行任务项 = {1]
- time = 6 ,CPU 完成任务 3 并开始执行任务 1 ,可执行任务项 = []
- time = 10 ,CPU 完成任务 1 并进入空闲状态
分析
这里,我们维护一个 \(Task\) 对象,它具有如下属性:
-
进入任务队列的时刻 \(enqueueTime\);
-
执行时长 \(processingTime\);
-
任务编号 \(index\)。
由于 python
是小根堆,我们需要实现该对象的 def __lt__(self, other)
方法。
未完待续。。。
代码实现
class Task(object):
def __init__(self, enqueue_time, processing_time, index):
self.enqueue_time = enqueue_time
self.processing_time = processing_time
self.index = index
def __lt__(self, other):
""" 使用<排序时,会被调用 """
# 如果任务具有相同的执行时间,则比较下标
if self.processing_time == other.processing_time:
return self.index < other.index
# 否则返回执行时间最短的任务
else:
return self.processing_time < other.processing_time
class Solution:
def getOrder(self, tasks: List[List[int]]) -> List[int]:
n = len(tasks)
# 将所有的任务按照入队时刻排序
_tasks = list()
for index, task in enumerate(tasks):
_tasks.append(Task(task[0], task[1], index))
_tasks.sort(key=lambda x: x.enqueue_time)
result = list()
queue = list() # 使用优先级队列来保存所有的等待执行的任务
current_time = 0 # 用于记录CPU执行的时刻
pointer = 0 # 标记已经遍历过的任务数量
for i in range(n):
# 如果堆为空,则当前时刻就是下一个待处理的任务入队时刻
if not queue:
current_time = max(current_time, _tasks[pointer].enqueue_time)
# 将入队时间小于当前时刻的所有任务先放入堆中,并记录遍历过的任务数量
while pointer < n and _tasks[pointer].enqueue_time <= current_time:
# 任务放入堆中,Python是小根堆,排序会使用"<"符号,需要重载"__lt__"方法
heapq.heappush(queue, _tasks[pointer])
pointer += 1
# 从堆中取出一个任务来执行
task = heapq.heappop(queue)
# 将执行过的任务的序号记录下来
result.append(task.index)
# 当前时刻向后偏移任务执行的时长
current_time += task.processing_time
return result
应用2:Leetcode 621. 任务调度器
题目
给你一个用字符数组
tasks
表示的 CPU 需要执行的任务列表。其中每个字母表示一种不同种类的任务。任务可以以任意顺序执行,并且每个任务都可以在 1 个单位时间内执行完。在任何一个单位时间,CPU 可以完成一个任务,或者处于待命状态。
然而,两个 相同种类 的任务之间必须有长度为整数n
的冷却时间,因此至少有连续n
个单位时间内 CPU 在执行不同的任务,或者在待命状态。
你需要计算完成所有任务所需要的 最短时间 。
示例 1:
输入:
tasks = ["A","A","A","A","A","A","B","C","D","E","F","G"]
,n = 2
输出:16
解释:一种可能的解决方案是:A -> B -> C -> A -> D -> E -> A -> F -> G -> A -> (待命) -> (待命) -> A -> (待命) -> (待命) -> A
分析
比较朴素的想法是,我们按照时间顺序,依次给每一个时间单位分配任务。
那么如果当前有多种任务不在冷却中,那么我们应该如何挑选执行的任务呢?直觉上,我们应当选择剩余执行次数最多的那个任务,将每种任务的剩余执行次数尽可能平均,使得 CPU 处于待命状态的时间尽可能少。
我们使用二元组 \((nextValid_i, rest_i)\) 表示 第 \(i\) 类任务,其中,\(nextValid_i\) 表示:第 \(i\) 类任务下一个可以执行的时刻,\(rest_i\) 表示:第 \(i\) 类任务剩余执行次数。
同时,我们维护一个列表 \(taskCategory\) 记录所有的任务种类,\(taskCategory = [(nextValid_i, rest_i)]\)。
那么,初始状态时,所有的任务都可以在第一个时刻执行,因此,将所有的 \(nextValid_i\) 都初始化为 \(1\),\(rest_i\) 初始化为所有任务的出现次数。这里,我们并不关心哪一个任务,只需要关注每种任务的总数即可。
我们使用 \(time\) 记录当前的时刻,每次执行一个任务时,我们需要选择 不在冷却中,并且 剩余执行次数最多的任务 执行。
算法步骤:
-
统计每类任务出现的次数;
-
初始化每类任务的下一个可以执行的时刻及其剩余可执行次数;
初始状态,每类任务的下一个可以执行的时刻都是 \(1\)。
-
将时刻 \(time\) 初始化为 \(0\);
-
遍历所有的任务,对于每一个待执行的任务:
-
首先,将时刻 \(time\) 加 \(1\);
-
然后,跳过 CPU 空转的时间段,计算CPU空转的时间方法:
-
遍历所有的任务种类,找到剩余可执行次数大于 \(1\) 的所有类型的任务中,下一个可执行时刻最小的时间 \(nextValid_{min}\) ;
-
如果 \(nextValid_{min}\) 大于 \(time\),则说明存在冷却时间,可以直接将时间 time 移动到 \(nextValid_{min}\),从而跳过 CPU 冷却的时段。
-
-
找到下一个最优的任务,查找方法如下:
-
遍历所有的 \(taskCategory\)
-
最优的任务需要满足的条件:
-
该类任务的剩余可执行次数大于 0 ,并且下一个可执行时刻小于当前时刻 \(time\),否则,这个任务就是处于冷却中的;
-
该类任务的剩余可执行次数是最多的。
-
-
更新该类任务的下一个可执行的时刻,即 \(nextValid_i = time + n + 1\);
-
同时,该类任务任务的剩余可执行次数减 \(1\);
-
-
代码实现
from typing import List
from collections import Counter
class Solution:
def leastInterval(self, tasks: List[str], n: int) -> int:
frequency = Counter(tasks)
task_category = list()
for _rest in frequency.values():
task_category.append([1, _rest])
time = 0
for i in range(len(tasks)):
time += 1
# 跳过CPU空转的时间
_next_valid = list()
for _task in task_category:
if _task[1] > 0:
_next_valid.append(_task[0])
time = max(time, min(_next_valid))
# 找到下一个最优任务
best = -1
for j, _task in enumerate(task_category):
if _task[1] and _task[0] <= time:
if best == -1 or _task[1] > task_category[best][1]:
best = j
# 下一次执行时刻:当前时刻+冷却时间+1
task_category[best][0] = time + n + 1
task_category[best][1] -= 1
return time
参考:
标签:执行,tasks,任务,力扣,算法,task,time,任务调度,CPU From: https://www.cnblogs.com/larry1024/p/17482786.html