DAG(Directed Acyclic Graph,有向无环图)是指一个有向图,其中不包含任何环。在任务调度中,DAG被用来表示任务之间的依赖关系。一个任务的执行必须等待其依赖的任务完成之后才能开始。因此,DAG任务调度是一个非常重要的问题。
下面是一个Java实现的轻量级DAG任务调度的示例:
import java.util.ArrayList;
import java.util.List;
public class DAGTaskScheduler {
// 存储任务
private List<Task> tasks = new ArrayList<>();
// 添加任务
public void addTask(Task task) {
tasks.add(task);
}
// 调度任务
public void schedule() {
// 创建一个列表来存储未完成的任务
List<Task> unfinishedTasks = new ArrayList<>();
unfinishedTasks.addAll(tasks);
// 遍历任务列表
while (!unfinishedTasks.isEmpty()) {
boolean taskExecuted = false;
for (Task task : unfinishedTasks) {
if (task.dependenciesAreMet()) {
task.execute();
unfinishedTasks.remove(task);
taskExecuted = true;
break;
}
}
if (!taskExecuted) {
throw new RuntimeException("任务执行顺序错误");
}
}
}
// 任务类
public static class Task {
// 任务名称
private String name;
// 依赖的任务列表
private List<Task> dependencies = new ArrayList<>();
// 执行任务的方法(可以根据实际情况修改)
public void execute() {
System.out.println(name + " 任务执行");
}
// 添加依赖的任务
public void addDependency(Task dependency) {
dependencies.add(dependency);
}
// 判断依赖的任务是否都已经完成
public boolean dependenciesAreMet() {
for (Task dependency : dependencies) {
if (!dependency.isExecuted()) {
return false;
}
}
return true;
}
// 判断任务是否已经执行完成
public boolean isExecuted() {
return executed;
}
// 设置任务执行状态
public void setExecuted(boolean executed) {
this.executed = executed;
}
// 构造方法
public Task(String name) {
this.name = name;
}
// 是否已经执行
private boolean executed = false;
}
}
该实现中,任务调度器DAGTaskScheduler
包含了一个任务列表。通过addTask
方法可以添加任务。调用schedule
方法可以按照任务之间的依赖关系来调度任务的执行。任务类Task
包含了任务的名称、依赖的任务列表和执行任务的方法。在addDependency
方法中可以添加依赖的任务,在dependenciesAreMet
方法中可以判断依赖的任务是否都已经完成,在execute
方法中可以执行具体的任务。在该实现中,每个任务都是只能执行一次的。
需要注意的是,该实现只是一个轻量级的DAG任务调度实现,可能并不够完善。实际应用中,可能需要考虑更复杂的情况,比如任务之间的并发执行、任务出错处理等。同时,对于更大的任务量,可能需要更高效的算法来实现调度。
下面是一个基本的DAG任务调度代码,基于Java的CompletableFuture API:
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class DAGTaskScheduler {
private final Map<String, CompletableFuture<?>> tasks = new HashMap<>();
public DAGTaskScheduler(List<Task> tasks) {
for (Task task : tasks) {
CompletableFuture<?> future = CompletableFuture.supplyAsync(task::execute);
this.tasks.put(task.getName(), future);
}
for (Task task : tasks) {
CompletableFuture<?> future = this.tasks.get(task.getName());
for (String dependency : task.getDependencies()) {
CompletableFuture<?> dependencyFuture = this.tasks.get(dependency);
future.thenRun(() -> {
try {
dependencyFuture.get();
} catch (InterruptedException | ExecutionException e) {
System.err.println("Task '" + task.getName() + "' failed due to dependency error: " + e);
}
});
}
}
}
public void start() throws TimeoutException {
for (CompletableFuture<?> future : tasks.values()) {
try {
future.get(1, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
System.err.println("Task '" + future + "' failed: " + e);
}
}
}
public static class Task {
private final String name;
private final List<String> dependencies;
private final Runnable action;
public Task(String name, List<String> dependencies, Runnable action) {
this.name = name;
this.dependencies = dependencies;
this.action = action;
}
public String getName() {
return name;
}
public List<String> getDependencies() {
return dependencies;
}
public Runnable getAction() {
return action;
}
public Object execute() {
action.run();
return null;
}
}
}
使用示例:
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
public class App {
public static void main(String[] args) throws TimeoutException {
List<DAGTaskScheduler.Task> tasks = Arrays.asList(
new DAGTaskScheduler.Task("task1", Collections.emptyList(), () -> System.out.println("Task 1 executed")),
new DAGTaskScheduler.Task("task2", Collections.singletonList("task1"), () -> System.out.println("Task 2 executed")),
new DAGTaskScheduler.Task("task3", Collections.singletonList("task2"), () -> System.out.println("Task 3 executed")),
new DAGTaskScheduler.Task("task4", Arrays.asList("task1", "task3"), () -> System.out.println("Task 4 executed"))
);
DAGTaskScheduler scheduler = new DAGTaskScheduler(tasks);
scheduler.start(); // Blocks until all tasks complete or timeout
}
}
上述代码中,我们通过“CompletableFuture.supplyAsync(task::execute)”创建了一个CompletableFuture对象,用于异步执行一个任务。我们还在代码中,使用“thenRun()”方法创建了一个新的CompletionStage对象,它表示当前任务完成之后要执行的操作。如果这个CompletionStage对象依赖于另一个CompletionStage对象(代表另一个任务),则我们对其设置了一个Callback函数,如果依赖的CompletableFuture对象执行失败,则将打印错误消息。最后,我们在start()方法中,等待所有任务执行完成,如果任何一个任务出现错误,则打印错误消息。
在实际情况中,我们可能会有更多的复杂任务依赖关系,并且在任务执行失败时,可能需要采取其他操作,例如自动重试,发送错误报告等。因此,上述代码仅提供了一个简单的示例用于参考。
以下是一个基于Java的CompletableFuture API的复杂任务依赖关系的DAG任务调度demo。这个demo使用了一个工作线程池来执行异步任务,并在任务失败时执行重试操作和发送错误报告的逻辑。
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.stream.*;
public class TaskScheduler {
private final ExecutorService executor;
private final Map<String, CompletableFuture<Void>> tasks = new HashMap<>();
public TaskScheduler(int numThreads) {
this.executor = Executors.newFixedThreadPool(numThreads);
}
// Schedule a task with no dependencies
public void scheduleTask(String taskId, Supplier<Void> taskFn, int numRetries) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(taskFn, executor)
.thenRun(() -> System.out.printf("Task %s completed successfully%n", taskId));
addTask(taskId, future, numRetries);
}
// Schedule a task with dependencies
public void scheduleTask(String taskId, List<String> dependencies, Supplier<Void> taskFn, int numRetries) {
CompletableFuture<Void> future = CompletableFuture.allOf(dependencies.stream()
.map(this::getTask)
.toArray(CompletableFuture[]::new))
.thenComposeAsync(unused -> CompletableFuture.supplyAsync(taskFn, executor))
.thenRun(() -> System.out.printf("Task %s completed successfully%n", taskId));
addTask(taskId, future, numRetries);
}
// Retrieve a task by ID, or create a completed one if not found
private CompletableFuture<Void> getTask(String taskId) {
return tasks.getOrDefault(taskId, CompletableFuture.completedFuture(null));
}
// Add a task to the task map
private void addTask(String taskId, CompletableFuture<Void> future, int numRetries) {
tasks.put(taskId, future.handleAsync((_, ex) -> {
if (ex != null && numRetries > 0) {
System.err.printf("Task %s failed with error: %s%n", taskId, ex);
scheduleTask(taskId, future.join(), numRetries - 1); // Retry task
} else if (ex != null) {
System.err.printf("Task %s failed with error: %s%n", taskId, ex);
// Send error report to admin or alerting system
}
return null;
}));
}
// Wait for all tasks to complete and shut down the executor
public void awaitCompletion() {
CompletableFuture.allOf(tasks.values().toArray(new CompletableFuture[0]))
.join();
executor.shutdown();
}
// Main method for testing
public static void main(String[] args) {
TaskScheduler scheduler = new TaskScheduler(4);
scheduler.scheduleTask("A", () -> {
Thread.sleep(1000);
return null;
}, 2);
scheduler.scheduleTask("B", Arrays.asList("A"), () -> {
Thread.sleep(2000);
return null;
}, 1);
scheduler.scheduleTask("C", Arrays.asList("A"), () -> {
Thread.sleep(1500);
throw new RuntimeException("Task C failed!");
}, 3);
scheduler.scheduleTask("D", Arrays.asList("B", "C"), () -> {
Thread.sleep(3000);
return null;
}, 1);
scheduler.awaitCompletion();
}
}
在这个demo中,我们首先定义了一个TaskScheduler类来封装DAG任务调度的逻辑。这个类有一个ExecutorService对象,用于执行异步任务,以及一个Map对象,用于存储任务ID和对应的CompletableFuture对象。
接下来,我们定义了scheduleTask方法来添加新的任务。如果任务没有依赖关系,则我们只需要创建一个CompletableFuture对象并执行它。如果任务有依赖关系,则我们需要等待依赖的任务完成,然后再执行当前的任务。
在添加任务时,我们使用了handleAsync方法来处理任务执行过程中的异常。如果任务执行失败并且还有重试次数,则我们重新安排任务的执行。如果任务执行失败并且没有重试次数了,则我们发送错误报告。
最后,我们定义了一个main方法,来测试我们的任务调度代码。在这个例子中,我们定义了4个任务,以及它们之间的依赖关系。任务A没有依赖关系,任务B和任务C都依赖于任务A,而任务D又依赖于任务B和任务C。我们的任务调度代码会自动处理任务的依赖关系,并在任务执行失败时采取相应措施。
这个demo只是一个简单的例子,但它可以为你提供参考,帮助你更好地理解Java的CompletableFuture API,并且实现一个复杂任务依赖关系的DAG任务调度。
标签:Task,tasks,task,任务,DAG,CompletableFuture,任务调度,轻量,public From: https://www.cnblogs.com/Roni-i/p/17202280.html