5万字长文
趁还没火赶紧收藏,比知识星球的项目质量高还不付费,你去哪儿找?
基于spring环境开发,拒绝CRUD,让你体验后端的美
我们为什么需要任务定时调度系统?
从后端角度讲,任务定时调度系统广泛应用于数据备份、日志清理、系统维护等场景,能够显著提高系统的自动化程度和运行效率
后端场景
1. 数据备份与恢复
1.1 场景描述
- 数据备份:定期备份数据库、文件系统等关键数据,防止数据丢失。
- 数据恢复:在数据丢失或损坏时,能够快速恢复数据。
1.2 任务定时调度系统应用
- 任务定义:定义备份任务,包括备份时间、备份路径、备份策略等。
- 调度策略:使用Cron表达式或固定间隔,定期执行备份任务。
- 执行器:执行备份命令,如
mysqldump
、rsync
等。 - 监控与日志:记录备份任务的执行情况,监控备份成功率。
2. 日志清理与归档
2.1 场景描述
- 日志清理:定期清理过期的日志文件,释放存储空间。
- 日志归档:将日志文件归档到长期存储系统,便于后续分析。
2.2 任务定时调度系统应用
- 任务定义:定义日志清理和归档任务,包括清理时间、归档路径、保留策略等。
- 调度策略:使用Cron表达式或固定间隔,定期执行清理和归档任务。
- 执行器:执行清理和归档命令,如
rm
、mv
等。 - 监控与日志:记录清理和归档任务的执行情况,监控日志文件的存储使用情况。
3. 系统维护与更新
3.1 场景描述
- 系统更新:定期更新系统组件或软件包,确保系统安全性和稳定性。
- 资源监控:定期监控系统资源使用情况,如CPU、内存、磁盘等。
3.2 任务定时调度系统应用
- 任务定义:定义系统更新和资源监控任务,包括更新时间、更新内容、监控指标等。
- 调度策略:使用Cron表达式或固定间隔,定期执行系统更新和资源监控任务。
- 执行器:执行系统更新和资源监控命令,如
apt-get update
、top
等。 - 监控与日志:记录系统更新和资源监控任务的执行情况,监控系统资源的使用情况。
4. 批处理任务
4.1 场景描述
- 批处理任务:在特定时间点或时间间隔内执行批处理任务,如数据清洗、数据转换、数据导入导出等。
4.2 任务定时调度系统应用
- 任务定义:定义批处理任务,包括任务时间、任务类型、任务参数等。
- 调度策略:使用Cron表达式或固定间隔,定期执行批处理任务。
- 执行器:执行批处理命令,如调用数据处理API、执行Shell脚本等。
- 监控与日志:记录批处理任务的执行情况,监控任务的执行效率和成功率。
5. 自动化测试
5.1 场景描述
- 自动化测试:定期执行自动化测试任务,确保系统的功能和性能符合预期。
5.2 任务定时调度系统应用
- 任务定义:定义自动化测试任务,包括测试时间、测试类型、测试用例等。
- 调度策略:使用Cron表达式或固定间隔,定期执行自动化测试任务。
- 执行器:执行测试命令,如调用测试框架(如JUnit、Selenium)执行测试用例。
- 监控与日志:记录自动化测试任务的执行情况,监控测试结果和覆盖率。
在互联网业务中,任务定时调度系统同样扮演着至关重要的角色,它确保了各种自动化流程按时、按需执行,从而维持业务的正常运转和优化用户体验。
互联网业务
1. 业务自动化
-
数据同步与备份: 定时任务可以用于跨数据库或跨系统的数据同步,确保数据的一致性,以及进行定期的数据备份,防止数据丢失。
-
缓存更新: 对于一些不经常变动但访问频繁的数据,可以设置定时任务定期更新缓存,提高系统响应速度。
-
报表生成: 每天、每周或每月生成业务报表,用于分析和决策支持。
2. 用户体验优化
-
定时推送: 如新闻应用的每日新闻摘要推送,或电商平台的促销活动提醒。
-
会员服务: 自动续费、会员等级调整等都可以通过定时任务实现。
-
内容更新: 如社交媒体内容的定时发布,保持平台内容的活跃度。
3. 系统维护与优化
-
日志清理: 定期清理旧的日志文件,释放存储空间。
-
性能监控: 定期运行性能检测任务,提前发现并解决潜在问题。
-
索引重建: 在搜索引擎服务中,定时重建索引以提升搜索效率。
4. 业务流程
-
订单处理: 如订单自动取消、自动确认收货等。
-
支付结算: 定时对账、结算给商家或用户的账户。
-
库存管理: 定时检查库存,触发自动补货或预警。
5. 营销与运营
-
活动管理: 自动开启或结束促销活动、限时抢购等。
-
用户行为分析: 定期分析用户行为数据,为个性化推荐或营销策略提供数据支持。
6. 安全与合规
-
安全扫描: 定期进行系统的安全检查,防范潜在的安全威胁。
-
合规检查: 确保业务操作符合法律法规,如数据隐私保护的定期审计。
7. 用户行为分析
7.1 场景描述
- 用户行为分析:定期分析用户行为数据,生成用户画像、推荐模型等。
7.2 任务定时调度系统应用
- 任务定义:定义用户行为分析任务,包括分析时间、分析指标、输出结果等。
- 调度策略:使用Cron表达式或固定间隔,定期执行用户行为分析任务。
- 执行器:执行分析命令,如调用数据分析工具(如Spark、Hadoop)进行数据处理。
- 监控与日志:记录用户行为分析任务的执行情况,监控分析结果的准确性和时效性。
为什么要变成分布式的?
问就是高可用,可扩展,高容错。
前言
从后端的角度来看,定时任务是指在预定的时间点或时间间隔内自动执行的特定任务。这些任务通常由服务器或分布式系统中的某个组件触发,并由后端服务负责执行。以下是从后端角度详细介绍定时任务的各个方面:
1. 定时任务的定义与管理
1.1 任务定义
- 任务名称:任务的唯一标识符,用于区分不同的任务。
- 任务描述:任务的简要描述,帮助理解任务的目的和功能。
- 执行时间:任务的执行时间,可以是具体的日期时间、Cron表达式或固定的时间间隔。
- 执行命令:任务执行的具体命令或脚本,可以是Shell命令、Python脚本、Java程序等。
- 参数:任务执行时所需的参数,可以是命令行参数、环境变量等。
- 重试策略:任务执行失败时的重试策略,包括最大重试次数和重试间隔。
- 超时时间:任务执行的最大允许时间,超过该时间任务将被终止。
1.2 任务管理
- 任务注册:通过API或UI界面将任务注册到系统中。
- 任务修改:允许用户修改已注册任务的定义。
- 任务删除:允许用户删除不再需要的任务。
- 任务查询:允许用户查询任务的状态、执行历史等信息。
2. 调度器
2.1 调度策略
- Cron表达式:支持标准的Cron表达式,用于定义任务的执行时间。
- 固定间隔:支持以固定时间间隔执行任务。
- 依赖任务:任务可以依赖其他任务的完成情况,只有当依赖任务成功执行后,当前任务才会被调度。
2.2 调度算法
- 优先级调度:根据任务的优先级进行调度。
- 负载均衡:根据执行器的负载情况,将任务分配到负载较低的执行器上。
2.3 调度记录
- 调度时间:记录任务的调度时间。
- 调度节点:记录任务被调度的节点。
- 调度状态:记录任务的调度状态,如已调度、未调度、调度失败等。
3. 执行器
3.1 任务执行
- 任务获取:执行器从消息队列或调度器获取待执行的任务。
- 任务执行:执行器执行任务的命令或脚本。
- 执行结果:执行结果(成功或失败)通过消息队列或直接返回给调度器。
3.2 容错机制
- 任务重试:如果任务执行失败,根据重试策略进行重试。
- 任务超时:如果任务执行时间超过预设的超时时间,执行器会终止任务并标记为失败。
4. 存储层
4.1 数据存储
- 任务定义存储:存储任务的定义信息。
- 调度记录存储:记录任务的调度历史。
- 执行结果存储:存储任务的执行结果,包括成功、失败、重试次数等信息。
4.2 存储选择
- 关系型数据库:如MySQL、PostgreSQL,适合存储结构化数据。
- NoSQL数据库:如MongoDB、Redis,适合存储非结构化或半结构化数据。
5. 消息队列
5.1 消息队列选择
- Kafka:适合高吞吐量的场景,支持分布式消息传递。
- RabbitMQ:适合需要可靠消息传递的场景。
- Redis Pub/Sub:适合轻量级、低延迟的场景。
5.2 消息传递
- 任务调度消息:调度器将任务调度信息发送到消息队列。
- 任务执行消息:执行器从消息队列中获取任务并执行。
- 执行结果消息:执行结果(成功或失败)通过消息队列返回给调度器。
6. 监控与日志
6.1 监控
- 系统状态监控:监控调度器、执行器、存储层等组件的状态。
- 任务执行监控:监控任务的执行情况,包括执行时间、执行结果等。
6.2 日志
- 任务执行日志:记录任务的执行过程,包括输入参数、执行命令、输出结果等。
- 系统日志:记录系统的运行状态、错误信息等。
7. 安全性
7.1 身份验证
- API身份验证:对任务定义与管理模块的API进行身份验证,防止未授权访问。
- 用户认证:对用户进行身份认证,确保只有授权用户才能操作任务。
7.2 数据加密
- 数据传输加密:在数据传输过程中使用SSL/TLS加密,确保数据的安全性。
- 数据存储加密:对存储在数据库中的敏感数据进行加密。
7.3 访问控制
- 权限管理:对不同用户或角色设置不同的访问权限,确保任务的安全管理。
具体设计与实现
设计一个分布式任务定时调度系统涉及多个层面的考虑,包括任务调度、任务分发、任务执行、状态监控、高可用性、容错机制和扩展性。一个健壮的分布式定时调度系统必须确保在多个节点环境中任务能够按时、高效且可靠地执行。
核心设计原则
- 分布式调度:系统需要支持在多个节点上分布式运行,并保证任务不会因为节点故障而丢失或重复执行。
- 任务的高可用性:调度系统要能够处理节点故障,确保任务按时执行,并提供故障恢复能力。
- 任务的负载均衡:需要将任务均衡分配给不同的节点,避免某些节点过载或空闲。
- 可扩展性:系统应能够方便地横向扩展,以应对任务和节点的增长。
- 可监控和可追踪性:系统应该对任务的调度、执行、结果、失败等状态提供全面的监控与日志。
系统的核心组件
-
任务定义与调度配置
- 任务类型:定时任务、周期任务、触发任务等。任务可以以Cron表达式等格式定义调度时间。
- 任务元数据存储:任务的元数据(任务ID、时间配置、任务状态等)可以存储在数据库中,如MySQL、PostgreSQL,或者使用分布式数据库如Etcd、ZooKeeper、Redis。
-
调度器(Scheduler)
-
时间驱动的调度器:通过解析任务的时间表达式(如Cron)确定任务的执行时间。
-
分布式锁:由于调度器可能有多个实例同时运行,使用分布式锁(如基于Redis、ZooKeeper)保证同一任务不会被多个调度器重复调度。最常见的分布式锁实现包括:
- Redis分布式锁:使用Redis的
SETNX
命令确保同一时刻只有一个调度器可以调度某个任务。 - ZooKeeper/Etcd:通过分布式一致性来实现锁。
- Redis分布式锁:使用Redis的
-
-
任务分配与负载均衡
-
任务分发器(Dispatcher) :调度器在调度任务时,需要将任务分发到执行节点。可以基于轮询、随机分配、或任务权重等策略分配任务,常见的负载均衡策略包括:
- 哈希取模:通过对任务ID哈希后取模来分配任务。
- Consistent Hashing:一致性哈希可以保证任务分配的稳定性,当节点变动时,最少数量的任务会受到影响。
- 基于节点负载的分配:调度系统监控节点的负载情况,将任务优先分配到负载较低的节点上。
-
-
任务执行器(Executor)
- 任务执行:执行器从任务分发器处接收任务后,按照设定的时间进行执行。执行器需要具备一定的容错机制,确保任务不会因为瞬时的失败而永久失败。支持任务的重试机制。
- 任务状态同步:任务执行过程中,执行器会向调度器反馈任务的执行状态(例如执行中、执行成功、执行失败),确保调度器可以实时掌握任务状态。
- 任务日志记录:执行过程中,生成日志并记录执行过程中的异常情况。
-
任务状态管理与恢复机制
- 任务状态:任务从创建、调度、分发、执行中、执行成功到执行失败,需要有完整的状态流转。任务状态可存储在数据库中,调度器或执行器通过查询数据库的任务状态来判断任务的当前进展。
- 任务恢复机制:当某个节点宕机时,系统应自动将任务重新分配给其他节点执行(利用分布式锁来确保同一任务不会被多个节点同时执行)。
-
监控与告警
-
监控:需要对整个调度系统进行全面的监控,监控内容包括:
- 任务的执行时间、执行次数、成功/失败次数等。
- 每个节点的负载情况、任务执行时长等。
- 任务的状态流转(从调度到执行的各个步骤)。
-
告警:任务执行失败、超时、节点故障时触发告警,并支持多种告警方式(如邮件、短信、钉钉,企业微信,飞书等)。
-
-
高可用与扩展性设计
- 调度器的高可用:可以采用主从架构,主调度器负责任务调度,当主调度器发生故障时,自动选举从调度器接管工作(如使用ZooKeeper实现主从选举)。
- 任务的容错性:任务执行失败后可以重试,重试次数和重试间隔可配置。某些任务执行时间可能较长,任务系统需要支持长时间运行任务。
- 水平扩展:通过增加调度器或执行器节点来横向扩展系统。调度器和执行器之间应该是松耦合的,增加节点时不需要修改任务系统的整体架构。
- 数据库的扩展:由于任务元数据和状态信息需要频繁读写,可能需要考虑数据库的扩展性,如读写分离或使用分布式数据库(如Cassandra、HBase)存储任务信息。
系统架构图示(简化版)
+--------------------+
| API Gateway | <--- 用户通过API创建、管理任务
+--------------------+
|
+--------------------+
| Task Scheduler | <--- 任务调度器,负责任务的时间触发、分发
+--------------------+
|
v
+--------------------+ +--------------------+
| Task Dispatcher | ---> | Distributed Lock | <--- 保证任务调度的唯一性(Redis/ZooKeeper)
+--------------------+ +--------------------+
|
v
+---------------------+ +---------------------+ +---------------------+
| Task Executor Node | | Task Executor Node | | Task Executor Node | <--- 执行器,执行任务
+---------------------+ +---------------------+ +---------------------+
|
v
+--------------------+
| Status Tracker | <--- 监控任务执行状态,记录任务日志
+--------------------+
|
v
+--------------------+
| Monitoring/Alert | <--- 监控系统,任务失败或异常时告警
+--------------------+
设计细节与技术选型
- 分布式锁:选择 Redis 或 ZooKeeper 实现分布式锁,防止任务重复调度和分发。
- 消息队列:在任务调度器和任务执行器之间可以使用消息队列(如 Kafka、RabbitMQ、ActiveMQ)作为任务的分发通道,保证任务在不同节点之间的可靠分发。
- 任务持久化:任务元数据和状态信息可以使用关系型数据库(如 MySQL)或者 NoSQL 数据库(如 MongoDB)进行存储,具体选型依据系统需求和扩展性考虑。
- 任务负载均衡:可以使用一致性哈希或基于权重的负载均衡策略,或者借助已有的负载均衡工具(如 NGINX、Kubernetes 自带的调度器)。
具体实现方案和技术细节
1. 任务存储设计
-- 任务定义表
CREATE TABLE task_definition (
task_id BIGINT PRIMARY KEY,
task_name VARCHAR(100),
task_desc VARCHAR(500),
cron_expression VARCHAR(100),
task_handler VARCHAR(200), -- 任务处理类
task_param TEXT, -- 任务参数(JSON格式)
retry_times INT, -- 重试次数
retry_interval INT, -- 重试间隔(秒)
status TINYINT, -- 任务状态(0:停用 1:启用)
create_time DATETIME,
update_time DATETIME
);
-- 任务执行记录表
CREATE TABLE task_execution (
execution_id BIGINT PRIMARY KEY,
task_id BIGINT,
executor_ip VARCHAR(50), -- 执行器IP
start_time DATETIME, -- 开始时间
end_time DATETIME, -- 结束时间
status TINYINT, -- 执行状态(0:执行中 1:成功 2:失败)
error_msg TEXT, -- 错误信息
create_time DATETIME
);
2. 调度器实现
@Component
public class TaskScheduler {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private TaskExecutorRegistry executorRegistry;
// 使用Redis分布式锁实现任务调度
public void scheduleTask(TaskDefinition task) {
String lockKey = "task_lock:" + task.getTaskId();
try {
// 获取分布式锁
boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "1", 30, TimeUnit.SECONDS);
if (locked) {
// 计算下次执行时间
Date nextFireTime = CronUtils.getNextFireTime(task.getCronExpression());
if (shouldExecuteNow(nextFireTime)) {
// 选择执行器节点
String executorNode = selectExecutor();
// 发送任务到执行器
dispatchTask(task, executorNode);
}
}
} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
}
// 选择执行器节点(一致性哈希实现)
private String selectExecutor() {
List<String> nodes = executorRegistry.getAvailableNodes();
ConsistentHash<String> hash = new ConsistentHash<>(nodes);
return hash.getNode(UUID.randomUUID().toString());
}
}
3. 任务执行器实现
@Component
public class TaskExecutor {
@Autowired
private TaskExecutionRepository executionRepository;
@Autowired
private ApplicationContext applicationContext;
public void executeTask(TaskDefinition task) {
TaskExecution execution = new TaskExecution();
execution.setTaskId(task.getTaskId());
execution.setExecutorIp(IpUtils.getLocalIp());
execution.setStartTime(new Date());
try {
// 获取任务处理器
TaskHandler handler = (TaskHandler) applicationContext
.getBean(task.getTaskHandler());
// 执行任务
handler.execute(task.getTaskParam());
// 更新执行状态为成功
execution.setStatus(1);
execution.setEndTime(new Date());
} catch (Exception e) {
// 任务执行失败,进行重试
if (shouldRetry(task)) {
retryTask(task);
}
// 更新执行状态为失败
execution.setStatus(2);
execution.setErrorMsg(e.getMessage());
execution.setEndTime(new Date());
}
// 保存执行记录
executionRepository.save(execution);
}
// 任务重试逻辑
private void retryTask(TaskDefinition task) {
// 使用延迟队列实现重试
DelayQueue<RetryTask> delayQueue = new DelayQueue<>();
delayQueue.offer(new RetryTask(task,
task.getRetryInterval(), TimeUnit.SECONDS));
}
}
4. 监控告警实现
@Component
public class TaskMonitor {
@Autowired
private AlertService alertService;
// 监控任务执行时长
public void monitorExecutionDuration(TaskExecution execution) {
long duration = execution.getEndTime().getTime() -
execution.getStartTime().getTime();
// 如果执行时间超过阈值,发送告警
if (duration > getThreshold(execution.getTaskId())) {
AlertMessage message = new AlertMessage();
message.setType(AlertType.TASK_TIMEOUT);
message.setContent("Task execution timeout: " + execution.getTaskId());
alertService.sendAlert(message);
}
}
// 监控任务失败
@EventListener(TaskFailedEvent.class)
public void onTaskFailed(TaskFailedEvent event) {
AlertMessage message = new AlertMessage();
message.setType(AlertType.TASK_FAILED);
message.setContent("Task failed: " + event.getTaskId() +
", error: " + event.getErrorMessage());
alertService.sendAlert(message);
}
}
5. 高可用实现
@Component
public class SchedulerHA {
@Autowired
private CuratorFramework zkClient;
private static final String MASTER_PATH = "/scheduler/master";
// 使用ZooKeeper实现主从选举
public void electMaster() {
try {
// 创建临时节点
zkClient.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(MASTER_PATH, IpUtils.getLocalIp().getBytes());
// 成为主节点,开始调度任务
startScheduling();
} catch (Exception e) {
// 创建节点失败,说明已经有主节点
// 注册监听器,等待成为主节点
registerMasterListener();
}
}
// 监听主节点变化
private void registerMasterListener() {
PathChildrenCache cache = new PathChildrenCache(
zkClient, MASTER_PATH, true);
cache.getListenable().addListener((client, event) -> {
if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
// 主节点下线,重新选举
electMaster();
}
});
}
}
6. 任务分发实现
@Component
public class TaskDispatcher {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// Kafka消息发送回调处理
private class TaskDispatchCallback
implements ListenableFutureCallback<SendResult<String, String>> {
@Override
public void onSuccess(SendResult<String, String> result) {
// 任务发送成功,记录日志
log.info("Task dispatched successfully, offset: {}",
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
// 任务发送失败,进行重试
log.error("Failed to dispatch task", ex);
retryDispatch();
}
}
// 任务分发重试机制
private void retryDispatch(TaskDefinition task, String executorNode) {
RetryTemplate retryTemplate = new RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(3); // 最多重试3次
retryTemplate.setRetryPolicy(retryPolicy);
// 指数退避策略
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000L);
backOffPolicy.setMultiplier(2.0);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.execute(context -> {
dispatchTask(task, executorNode);
return null;
});
}
}
7. 执行器注册与发现
@Component
public class ExecutorRegistry {
@Autowired
private CuratorFramework zkClient;
private static final String EXECUTOR_PATH = "/task/executors";
// 执行器节点注册
public void registerExecutor() {
String path = EXECUTOR_PATH + "/" + IpUtils.getLocalIp();
try {
// 创建临时节点
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "1".getBytes());
// 定期更新节点状态(心跳)
startHeartbeat(path);
} catch (Exception e) {
log.error("Failed to register executor", e);
}
}
// 获取可用执行器列表
public List<String> getAvailableExecutors() {
try {
List<String> children = zkClient.getChildren()
.forPath(EXECUTOR_PATH);
return children.stream()
.map(child -> new String(zkClient.getData()
.forPath(EXECUTOR_PATH + "/" + child)))
.collect(Collectors.toList());
} catch (Exception e) {
log.error("Failed to get available executors", e);
return Collections.emptyList();
}
}
// 心跳更新
private void startHeartbeat(String path) {
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(() -> {
try {
// 更新节点数据
zkClient.setData()
.forPath(path, String.valueOf(System.currentTimeMillis())
.getBytes());
} catch (Exception e) {
log.error("Failed to update heartbeat", e);
}
}, 0, 30, TimeUnit.SECONDS);
}
}
8. 任务执行状态追踪
@Component
public class TaskTracker {
@Autowired
private TaskExecutionRepository executionRepository;
@Autowired
private TaskMonitor taskMonitor;
// 任务状态更新
public void updateTaskStatus(TaskExecution execution) {
// 保存执行记录
executionRepository.save(execution);
// 发送任务状态变更事件
ApplicationEventPublisher.publishEvent(
new TaskStatusChangeEvent(execution));
// 监控任务执行情况
taskMonitor.monitorExecution(execution);
}
// 任务执行超时检测
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void checkTimeout() {
List<TaskExecution> runningTasks =
executionRepository.findByStatus(TaskStatus.RUNNING);
for (TaskExecution task : runningTasks) {
if (isTimeout(task)) {
// 标记任务超时
task.setStatus(TaskStatus.TIMEOUT);
updateTaskStatus(task);
// 终止任务执行
terminateTask(task);
}
}
}
// 任务执行历史查询
public Page<TaskExecution> queryTaskHistory(
Long taskId, Date startTime, Date endTime, Pageable pageable) {
return executionRepository.findByTaskIdAndTimeRange(
taskId, startTime, endTime, pageable);
}
}
9. 任务执行统计
@Component
public class TaskStatistics {
@Autowired
private RedisTemplate redisTemplate;
// 统计任务执行次数
public void incrementExecutionCount(Long taskId) {
String key = "task:stats:exec_count:" + taskId;
redisTemplate.opsForValue().increment(key);
}
// 统计任务执行时长
public void recordExecutionDuration(Long taskId, long duration) {
String key = "task:stats:duration:" + taskId;
redisTemplate.opsForZSet().add(key,
String.valueOf(System.currentTimeMillis()), duration);
}
// 获取任务统计信息
public TaskStats getTaskStats(Long taskId) {
TaskStats stats = new TaskStats();
// 获取执行次数
String countKey = "task:stats:exec_count:" + taskId;
stats.setExecutionCount(
redisTemplate.opsForValue().get(countKey));
// 获取平均执行时长
String durationKey = "task:stats:duration:" + taskId;
Set<ZSetOperations.TypedTuple<String>> durations =
redisTemplate.opsForZSet().rangeWithScores(
durationKey, 0, -1);
stats.setAvgDuration(calculateAvgDuration(durations));
return stats;
}
}
10. 配置管理接口
@RestController
@RequestMapping("/api/task")
public class TaskController {
@Autowired
private TaskService taskService;
// 创建任务
@PostMapping
public ResponseEntity<TaskDefinition> createTask(
@RequestBody TaskDefinition task) {
return ResponseEntity.ok(taskService.createTask(task));
}
// 修改任务
@PutMapping("/{taskId}")
public ResponseEntity<TaskDefinition> updateTask(
@PathVariable Long taskId,
@RequestBody TaskDefinition task) {
task.setTaskId(taskId);
return ResponseEntity.ok(taskService.updateTask(task));
}
// 删除任务
@DeleteMapping("/{taskId}")
public ResponseEntity<Void> deleteTask(@PathVariable Long taskId) {
taskService.deleteTask(taskId);
return ResponseEntity.ok().build();
}
@PutMapping("/{taskId}/pause")
public ResponseEntity<Void> pauseTask(@PathVariable Long taskId) {
taskService.pauseTask(taskId);
return ResponseEntity.ok().build();
}
// 恢复任务
@PutMapping("/{taskId}/resume")
public ResponseEntity<Void> resumeTask(@PathVariable Long taskId) {
taskService.resumeTask(taskId);
return ResponseEntity.ok().build();
}
// 立即执行任务
@PostMapping("/{taskId}/execute")
public ResponseEntity<Void> executeNow(@PathVariable Long taskId) {
taskService.executeTaskNow(taskId);
return ResponseEntity.ok().build();
}
// 查询任务执行历史
@GetMapping("/{taskId}/history")
public ResponseEntity<Page<TaskExecution>> queryHistory(
@PathVariable Long taskId,
@RequestParam(required = false) Date startTime,
@RequestParam(required = false) Date endTime,
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "10") int size) {
Pageable pageable = PageRequest.of(page, size);
return ResponseEntity.ok(
taskService.queryTaskHistory(taskId, startTime, endTime, pageable));
}
// 获取任务统计信息
@GetMapping("/{taskId}/stats")
public ResponseEntity<TaskStats> getTaskStats(@PathVariable Long taskId) {
return ResponseEntity.ok(taskService.getTaskStats(taskId));
}
}
11. 任务执行器实现(补充重试和超时控制)
@Component
public class TaskExecutorEnhanced {
@Autowired
private TaskExecutionRepository executionRepository;
// 带超时控制的任务执行
public void executeWithTimeout(TaskDefinition task) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
executeTask(task);
});
try {
// 设置任务超时时间
future.get(task.getTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
// 任务超时处理
handleTimeout(task);
} catch (Exception e) {
// 任务执行异常处理
handleExecutionError(task, e);
}
}
// 任务超时处理
private void handleTimeout(TaskDefinition task) {
TaskExecution execution = new TaskExecution();
execution.setTaskId(task.getTaskId());
execution.setStatus(TaskStatus.TIMEOUT);
execution.setErrorMsg("Task execution timeout");
executionRepository.save(execution);
// 发送任务超时事件
ApplicationEventPublisher.publishEvent(
new TaskTimeoutEvent(task.getTaskId()));
}
// 任务重试处理
private void handleRetry(TaskDefinition task, Exception e) {
int retryCount = getRetryCount(task.getTaskId());
if (retryCount < task.getMaxRetries()) {
// 计算重试延迟时间(指数退避)
long delay = calculateRetryDelay(retryCount);
// 将任务加入重试队列
RetryQueue.addRetryTask(task, delay);
// 更新重试次数
updateRetryCount(task.getTaskId(), retryCount + 1);
} else {
// 达到最大重试次数,标记任务最终失败
markTaskAsFailed(task, e);
}
}
}
12. 分布式锁增强实现
@Component
public class DistributedLock {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private CuratorFramework zkClient;
// 多级分布式锁(Redis + ZooKeeper)
public boolean acquireLock(String lockKey, long timeout) {
// 首先尝试获取Redis锁
boolean redisLocked = acquireRedisLock(lockKey, timeout);
if (!redisLocked) {
return false;
}
try {
// 再尝试获取ZooKeeper锁
return acquireZkLock(lockKey, timeout);
} catch (Exception e) {
// 如果获取ZK锁失败,释放Redis锁
releaseRedisLock(lockKey);
return false;
}
}
// Redis分布式锁实现
private boolean acquireRedisLock(String lockKey, long timeout) {
String value = UUID.randomUUID().toString();
return redisTemplate.opsForValue()
.setIfAbsent(lockKey, value, timeout, TimeUnit.MILLISECONDS);
}
// ZooKeeper分布式锁实现
private boolean acquireZkLock(String lockKey, long timeout) {
InterProcessMutex mutex = new InterProcessMutex(
zkClient, "/locks/" + lockKey);
try {
return mutex.acquire(timeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
return false;
}
}
}
13. 任务优先级队列实现
@Component
public class PriorityTaskQueue {
private PriorityBlockingQueue<PriorityTask> queue;
public PriorityTaskQueue() {
this.queue = new PriorityBlockingQueue<>(
1000,
Comparator.comparingInt(PriorityTask::getPriority).reversed()
);
}
// 添加优先级任务
public void addTask(TaskDefinition task, int priority) {
PriorityTask priorityTask = new PriorityTask(task, priority);
queue.offer(priorityTask);
}
// 获取下一个要执行的任务
public TaskDefinition getNextTask() throws InterruptedException {
PriorityTask priorityTask = queue.take();
return priorityTask.getTask();
}
// 优先级任务包装类
@Data
@AllArgsConstructor
private static class PriorityTask {
private TaskDefinition task;
private int priority;
}
}
14. 任务执行性能优化
@Component
public class TaskExecutorOptimized {
// 任务执行线程池配置
private ThreadPoolExecutor executor;
@PostConstruct
public void init() {
// 核心线程数
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
// 最大线程数
int maxPoolSize = corePoolSize * 2;
// 使用有界队列
BlockingQueue<Runnable> workQueue =
new ArrayBlockingQueue<>(10000);
// 自定义拒绝策略
RejectedExecutionHandler handler =
new CallerRunsPolicy();
executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60L,
TimeUnit.SECONDS,
workQueue,
new NamedThreadFactory("task")};
// 自定义线程工厂
private static class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public NamedThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix +
threadNumber.getAndIncrement());
// 设置为守护线程
t.setDaemon(true);
// 设置线程优先级
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
// 批量任务执行优化
public void executeBatchTasks(List<TaskDefinition> tasks) {
// 任务分组
Map<String, List<TaskDefinition>> taskGroups =
tasks.stream().collect(Collectors.groupingBy(
TaskDefinition::getTaskHandler));
// 并行执行不同组的任务
CompletableFuture[] futures = taskGroups.entrySet().stream()
.map(entry -> CompletableFuture.runAsync(() -> {
executeTaskGroup(entry.getKey(), entry.getValue());
}, executor))
.toArray(CompletableFuture[]::new);
// 等待所有任务组执行完成
CompletableFuture.allOf(futures).join();
}
// 任务组执行
private void executeTaskGroup(String handler, List<TaskDefinition> tasks) {
// 获取任务处理器实例
TaskHandler taskHandler = getTaskHandler(handler);
// 预热处理器
taskHandler.warmup();
// 批量执行任务
for (TaskDefinition task : tasks) {
try {
taskHandler.execute(task);
} catch (Exception e) {
handleExecutionError(task, e);
}
}
}
}
15. 任务执行日志管理
@Component
public class TaskLogManager {
@Autowired
private ElasticsearchTemplate esTemplate;
// 异步写入任务日志
@Async
public void writeTaskLog(TaskLog taskLog) {
// 构建索引名称(按日期分表)
String indexName = "task_log_" +
DateFormatUtils.format(new Date(), "yyyy_MM_dd");
// 写入ES
esTemplate.save(indexName, taskLog);
}
// 查询任务日志
public Page<TaskLog> queryTaskLogs(TaskLogQuery query) {
// 构建ES查询条件
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 添加任务ID过滤
if (query.getTaskId() != null) {
boolQuery.must(QueryBuilders.termQuery("taskId", query.getTaskId()));
}
// 添加时间范围过滤
if (query.getStartTime() != null && query.getEndTime() != null) {
boolQuery.must(QueryBuilders.rangeQuery("createTime")
.from(query.getStartTime())
.to(query.getEndTime()));
}
// 添加执行状态过滤
if (query.getStatus() != null) {
boolQuery.must(QueryBuilders.termQuery("status", query.getStatus()));
}
// 执行查询
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(boolQuery)
.withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
.withPageable(PageRequest.of(query.getPage(), query.getSize()))
.build();
return esTemplate.queryForPage(searchQuery, TaskLog.class);
}
}
16. 任务依赖管理
@Component
public class TaskDependencyManager {
@Autowired
private TaskDefinitionRepository taskRepository;
// 任务依赖图(DAG)
private DirectedAcyclicGraph<Long, DefaultEdge> dependencyGraph;
@PostConstruct
public void init() {
// 初始化依赖图
dependencyGraph = new DirectedAcyclicGraph<>(DefaultEdge.class);
// 加载所有任务依赖关系
List<TaskDependency> dependencies = taskRepository.findAllDependencies();
for (TaskDependency dependency : dependencies) {
addDependency(dependency.getTaskId(),
dependency.getDependentTaskId());
}
}
// 添加任务依赖
public void addDependency(Long taskId, Long dependentTaskId) {
// 添加顶点
dependencyGraph.addVertex(taskId);
dependencyGraph.addVertex(dependentTaskId);
try {
// 添加边
dependencyGraph.addEdge(dependentTaskId, taskId);
// 检查是否有环
CycleDetector<Long, DefaultEdge> cycleDetector =
new CycleDetector<>(dependencyGraph);
if (cycleDetector.detectCycles()) {
// 如果存在环,回滚添加的边
dependencyGraph.removeEdge(dependentTaskId, taskId);
throw new IllegalArgumentException("Cyclic dependency detected");
}
} catch (IllegalArgumentException e) {
throw new RuntimeException("Failed to add dependency", e);
}
}
// 检查任务是否可以执行
public boolean canExecute(Long taskId) {
// 获取所有依赖任务
Set<Long> dependencies = getDependencies(taskId);
// 检查所有依赖任务是否都已完成
return dependencies.stream()
.allMatch(this::isTaskCompleted);
}
// 获取任务的所有依赖
private Set<Long> getDependencies(Long taskId) {
Set<Long> dependencies = new HashSet<>();
for (DefaultEdge edge : dependencyGraph.incomingEdgesOf(taskId)) {
dependencies.add(dependencyGraph.getEdgeSource(edge));
}
return dependencies;
}
}
17. 任务熔断器实现
@Component
public class TaskCircuitBreaker {
private Map<Long, CircuitBreakerStatus> breakerStatusMap =
new ConcurrentHashMap<>();
@Value("${task.circuit-breaker.failure-threshold}")
private int failureThreshold;
@Value("${task.circuit-breaker.reset-timeout}")
private long resetTimeout;
// 检查任务是否可以执行
public boolean allowExecution(Long taskId) {
CircuitBreakerStatus status = breakerStatusMap
.computeIfAbsent(taskId, k -> new CircuitBreakerStatus());
switch (status.getState()) {
case CLOSED:
return true;
case OPEN:
// 检查是否到达重置时间
if (System.currentTimeMillis() - status.getLastFailureTime()
>= resetTimeout) {
status.setState(CircuitBreakerState.HALF_
@Data
private static class CircuitBreakerStatus {
private CircuitBreakerState state = CircuitBreakerState.CLOSED;
private int failureCount = 0;
private long lastFailureTime;
private AtomicInteger halfOpenSuccessCount = new AtomicInteger(0);
}
// 记录任务执行失败
public void recordFailure(Long taskId) {
CircuitBreakerStatus status = breakerStatusMap.get(taskId);
if (status != null) {
status.setFailureCount(status.getFailureCount() + 1);
status.setLastFailureTime(System.currentTimeMillis());
// 检查是否达到失败阈值
if (status.getFailureCount() >= failureThreshold) {
status.setState(CircuitBreakerState.OPEN);
}
}
}
// 记录任务执行成功
public void recordSuccess(Long taskId) {
CircuitBreakerStatus status = breakerStatusMap.get(taskId);
if (status != null) {
if (status.getState() == CircuitBreakerState.HALF_OPEN) {
// 半开状态下的成功计数
if (status.getHalfOpenSuccessCount().incrementAndGet() >= 3) {
// 连续成功3次后关闭熔断器
status.setState(CircuitBreakerState.CLOSED);
status.setFailureCount(0);
status.getHalfOpenSuccessCount().set(0);
}
}
}
}
}
18. 任务限流实现
@Component
public class TaskRateLimiter {
@Autowired
private RedisTemplate redisTemplate;
// 基于Redis的滑动窗口限流实现
public boolean tryAcquire(String taskHandler, int limit, int windowSeconds) {
String key = "rate_limit:" + taskHandler;
long now = System.currentTimeMillis();
try {
// 使用Redis事务保证原子性
return redisTemplate.execute(new SessionCallback<Boolean>() {
@Override
public Boolean execute(RedisOperations operations) {
operations.multi();
// 移除窗口之外的数据
operations.opsForZSet().removeRangeByScore(
key, 0, now - windowSeconds * 1000);
// 获取当前窗口的请求数
Long count = operations.opsForZSet().count(
key, now - windowSeconds * 1000, now);
if (count != null && count < limit) {
// 添加当前请求
operations.opsForZSet().add(key,
UUID.randomUUID().toString(), now);
// 设置过期时间
operations.expire(key, windowSeconds * 2, TimeUnit.SECONDS);
List<Object> results = operations.exec();
return results != null && !results.isEmpty();
} else {
operations.discard();
return false;
}
}
});
} catch (Exception e) {
log.error("Rate limit error", e);
return false;
}
}
}
19. 任务预热机制
@Component
public class TaskWarmUpManager {
private Map<String, WarmUpStatus> warmUpStatusMap =
new ConcurrentHashMap<>();
// 任务预热
public void warmUp(TaskDefinition task) {
String handler = task.getTaskHandler();
WarmUpStatus status = warmUpStatusMap.computeIfAbsent(
handler, k -> new WarmUpStatus());
// 如果已经预热,直接返回
if (status.isWarmedUp()) {
return;
}
synchronized (status) {
if (!status.isWarmedUp()) {
try {
// 执行预热逻辑
doWarmUp(task);
status.setWarmedUp(true);
status.setLastWarmUpTime(System.currentTimeMillis());
} catch (Exception e) {
log.error("Task warm up failed", e);
}
}
}
}
// 执行预热逻辑
private void doWarmUp(TaskDefinition task) {
// 1. 预加载任务所需的配置
loadTaskConfig(task);
// 2. 预热数据库连接池
warmUpDataSource();
// 3. 预热缓存
warmUpCache(task);
// 4. 预热类加载
warmUpClassLoader(task.getTaskHandler());
}
@Data
private static class WarmUpStatus {
private boolean warmedUp = false;
private long lastWarmUpTime;
}
}
20. 任务监控指标收集
@Component
public class TaskMetricsCollector {
@Autowired
private MeterRegistry meterRegistry;
// 记录任务执行时间
public void recordExecutionTime(String taskHandler, long timeMs) {
Timer timer = Timer.builder("task.execution.time")
.tag("handler", taskHandler)
.register(meterRegistry);
timer.record(timeMs, TimeUnit.MILLISECONDS);
}
// 记录任务执行结果
public void recordExecutionResult(String taskHandler, boolean success) {
Counter counter = Counter.builder("task.execution.result")
.tag("handler", taskHandler)
.tag("result", success ? "success" : "failure")
.register(meterRegistry);
counter.increment();
}
// 记录任务队列大小
public void recordQueueSize(String taskHandler, int size) {
Gauge.builder("task.queue.size", () -> size)
.tag("handler", taskHandler)
.register(meterRegistry);
}
// 记录任务执行延迟
public void recordExecutionDelay(String taskHandler, long delayMs) {
DistributionSummary summary = DistributionSummary
.builder("task.execution.delay")
.tag("handler", taskHandler)
.register(meterRegistry);
summary.record(delayMs);
}
}
21. 任务告警配置
@Component
public class TaskAlertConfig {
@Data
public static class AlertRule {
private String metric; // 监控指标
private String operator; // 操作符
private double threshold; // 阈值
private int duration; // 持续时间(秒)
private String severity; // 告警级别
private List<String> channels; // 告警渠道
}
// 加载告警规则
@PostConstruct
public void loadAlertRules() {
// 从配置中心加载告警规则
List<AlertRule> rules = loadRulesFromConfig();
// 注册告警规则
for (AlertRule rule : rules) {
registerAlertRule(rule);
}
}
// 注册告警规则
private void registerAlertRule(AlertRule rule) {
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
checkAlertCondition(rule);
}
}, 0, rule.getDuration() * 1000);
}
// 检查告警条件
private void checkAlertCondition(AlertRule rule) {
double currentValue = getMetricValue(rule.getMetric());
boolean triggered = evaluateCondition(
currentValue, rule.getOperator(), rule.getThreshold());
if (triggered) {
AlertMessage message = buildAlertMessage(rule, currentValue);
alertNotifier.notify(message, rule.getChannels());
}
}
// 构建告警消息
private AlertMessage buildAlertMessage(AlertRule rule, double currentValue) {
return AlertMessage.builder()
.title("Task Alert: " + rule.getMetric())
.content(String.format("Metric %s current value: %f, threshold: %f",
rule.getMetric(), currentValue, rule.getThreshold()))
.severity(rule.getSeverity())
.timestamp(new Date())
.build();
}
}
@Component
public class AlertNotifier {
@Autowired
private EmailSender emailSender;
@Autowired
private DingTalkClient dingTalkClient;
@Autowired
private SlackClient slackClient;
// 发送告警通知
public void notify(AlertMessage message, List<String> channels) {
for (String channel : channels) {
switch (channel) {
case "email":
sendEmailAlert(message);
break;
case "dingtalk":
sendDingTalkAlert(message);
break;
case "slack":
sendSlackAlert(message);
break;
}
}
}
private void sendEmailAlert(AlertMessage message) {
EmailMessage email = EmailMessage.builder()
.subject(message.getTitle())
.content(message.getContent())
.build();
emailSender.send(email);
}
private void sendDingTalkAlert(AlertMessage message) {
DingTalkMessage dingTalk = DingTalkMessage.builder()
.title(message.getTitle())
.content(message.getContent())
.build();
dingTalkClient.send(dingTalk);
}
}
22. 任务运行时上下文
@Component
public class TaskContext {
private static final ThreadLocal<TaskContextInfo> contextHolder =
new ThreadLocal<>();
@Data
@Builder
public static class TaskContextInfo {
private Long taskId;
private String taskHandler;
private Map<String, Object> parameters;
private Date startTime;
private String executorIp;
private String traceId;
}
// 设置任务上下文
public static void setContext(TaskContextInfo context) {
contextHolder.set(context);
}
// 获取任务上下文
public static TaskContextInfo getContext() {
return contextHolder.get();
}
// 清除任务上下文
public static void clearContext() {
contextHolder.remove();
}
// 获取任务参数
public static Object getParameter(String key) {
TaskContextInfo context = getContext();
return context != null && context.getParameters() != null ?
context.getParameters().get(key) : null;
}
}
23. 任务执行器优化增强
@Component
public class EnhancedTaskExecutor {
@Autowired
private TaskContext taskContext;
@Autowired
private TaskMetricsCollector metricsCollector;
@Autowired
private TaskCircuitBreaker circuitBreaker;
@Autowired
private TaskRateLimiter rateLimiter;
// 增强的任务执行方法
public void execute(TaskDefinition task) {
String taskHandler = task.getTaskHandler();
// 1. 检查熔断器状态
if (!circuitBreaker.allowExecution(task.getTaskId())) {
throw new TaskExecutionException("Circuit breaker is open");
}
// 2. 检查限流
if (!rateLimiter.tryAcquire(taskHandler, 100, 60)) {
throw new TaskExecutionException("Rate limit exceeded");
}
// 3. 设置任务上下文
TaskContextInfo contextInfo = buildTaskContext(task);
TaskContext.setContext(contextInfo);
try {
// 4. 记录开始时间
long startTime = System.currentTimeMillis();
// 5. 执行任务
doExecute(task);
// 6. 记录执行指标
long executionTime = System.currentTimeMillis() - startTime;
metricsCollector.recordExecutionTime(taskHandler, executionTime);
metricsCollector.recordExecutionResult(taskHandler, true);
// 7. 记录成功状态
circuitBreaker.recordSuccess(task.getTaskId());
} catch (Exception e) {
// 8. 记录失败状态
circuitBreaker.recordFailure(task.getTaskId());
metricsCollector.recordExecutionResult(taskHandler, false);
throw e;
} finally {
// 9. 清理上下文
TaskContext.clearContext();
}
}
private TaskContextInfo buildTaskContext(TaskDefinition task) {
return TaskContextInfo.builder()
.taskId(task.getTaskId())
.taskHandler(task.getTaskHandler())
.parameters(task.getParameters())
.startTime(new Date())
.executorIp(IpUtils.getLocalIp())
.traceId(UUID.randomUUID().toString())
.build();
}
}
24. 任务执行状态跟踪
@Component
public class TaskStateTracker {
@Autowired
private TaskExecutionRepository executionRepository;
@Autowired
private TaskMetricsCollector metricsCollector;
// 更新任务状态
public void updateTaskState(TaskExecution execution, TaskState newState) {
// 1. 记录状态变更
execution.setState(newState);
execution.setUpdateTime(new Date());
// 2. 持久化状态
executionRepository.save(execution);
// 3. 发布状态变更事件
publishStateChangeEvent(execution);
// 4. 记录状态指标
recordStateMetrics(execution);
}
// 发布状态变更事件
private void publishStateChangeEvent(TaskExecution execution) {
TaskStateChangeEvent event = new TaskStateChangeEvent(
execution.getTaskId(),
execution.getState(),
execution.getUpdateTime()
);
ApplicationEventPublisher.publishEvent(event);
}
// 记录状态指标
private void recordStateMetrics(TaskExecution execution) {
String taskHandler = execution.getTaskHandler();
String state = execution.getState().name();
// 记录状态转换计数
metricsCollector.recordStateTransition(taskHandler, state);
// 记录状态持续时间
if (execution.getStartTime() != null) {
long duration = System.currentTimeMillis() -
execution.getStartTime().getTime();
metricsCollector.recordStateDuration(taskHandler, state, duration);
}
}
// 获取任务执行历史
public List<TaskStateHistory> getTaskStateHistory(Long taskId) {
return executionRepository.findStateHistory(taskId);
}
// 检查任务是否处于特定状态
public boolean isTaskInState(Long taskId, TaskState state) {
TaskExecution execution = executionRepository
.findLatestExecution(taskId);
return execution != null && execution.getState() == state;
}
}
25. 任务重试策略实现
@Component
public class TaskRetryStrategy {
@Data
public static class RetryPolicy {
private int maxAttempts;
private long initialInterval;
private double multiplier;
private long maxInterval;
}
// 执行重试逻辑
public void executeWithRetry(TaskDefinition task, RetryPolicy policy) {
int attempts = 0;
long interval = policy.getInitialInterval();
while (attempts < policy.getMaxAttempts()) {
try {
executeTask(task);
return; // 执行成功,直接返回
} catch (Exception e) {
attempts++;
if (attempts >= policy.getMaxAttempts()) {
throw new TaskExecutionException(
"Max retry attempts reached", e);
}
// 等待重试间隔
try {
Thread.sleep(interval);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new TaskExecutionException(
"Retry interrupted", ie);
}
// 计算下次重试间隔
interval = calculateNextInterval(interval, policy);
}
}
}
// 计算下次重试间隔
private long calculateNextInterval(long currentInterval, RetryPolicy policy) {
long nextInterval = (long) (currentInterval * policy.getMultiplier());
return Math.min(nextInterval, policy.getMaxInterval());
}
}
26. 任务优先级管理
@Component
public class TaskPriorityManager {
private final PriorityBlockingQueue<PrioritizedTask> taskQueue;
public TaskPriorityManager() {
this.taskQueue = new PriorityBlockingQueue<>(
1000,
Comparator.comparingInt(PrioritizedTask::getPriority)
.reversed()
.thenComparing(PrioritizedTask::getCreateTime)
);
}
@Data
@AllArgsConstructor
public static class PrioritizedTask {
private TaskDefinition task;
private int priority;
private Date createTime;
}
// 添加优先级任务
public void addTask(TaskDefinition task, int priority) {
PrioritizedTask prioritizedTask = new PrioritizedTask(
task, priority, new Date());
taskQueue.offer(prioritizedTask);
}
// 获取下一个要执行的任务
public TaskDefinition getNextTask() throws InterruptedException {
PrioritizedTask prioritizedTask = taskQueue.take();
return prioritizedTask.getTask();
}
// 动态调整任务优先级
public void adjustPriority(Long taskId, int newPriority) {
// 移除旧任务
List<PrioritizedTask> tasks = new ArrayList<>();
taskQueue.drainTo(tasks);
// 更新优先级并重新入队
tasks.stream()
.map(task -> {
if (task.getTask().getTaskId().equals(taskId)) {
return new PrioritizedTask(
task.getTask(), newPriority, task.getCreateTime());
}
return task;
})
.forEach(taskQueue::offer);
}
}
27. 任务资源管理
@Component
public class TaskResourceManager {
@Data
public static class ResourceQuota {
private int maxConcurrency;
private long maxMemory;
private int maxCpu;
}
private Map<String, ResourceQuota> quotaMap = new ConcurrentHashMap<>();
private Map<String, AtomicInteger> concurrencyCounter =
new ConcurrentHashMap<>();
// 检查资源是否可用
public boolean checkResourceAvailable(String taskHandler) {
ResourceQuota quota = quotaMap.get(taskHandler);
if (quota == null) {
return true; // 没有配额限制
}
// 检查并发数
AtomicInteger counter = concurrencyCounter
.computeIfAbsent(taskHandler, k -> new AtomicInteger(0));
if (counter.get() >= quota.getMaxConcurrency()) {
return false;
}
// 检查内存使用
if (Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory() > quota.getMaxMemory()) {
return false;
}
return true;
}
// 申请资源
public boolean acquireResource(String taskHandler) {
if (!checkResourceAvailable(taskHandler)) {
return false;
}
AtomicInteger counter = concurrencyCounter
.computeIfAbsent(taskHandler, k -> new AtomicInteger(0));
return counter.incrementAndGet() <=
quotaMap.get(taskHandler).getMaxConcurrency();
}
// 释放资源
public void releaseResource(String taskHandler) {
AtomicInteger counter = concurrencyCounter.get(taskHandler);
if (counter != null) {
counter.decrementAndGet();
}
}
}
28. 任务执行器健康检查
@Component
public class TaskExecutorHealthChecker {
@Autowired
private TaskExecutorRegistry executorRegistry;
@Scheduled(fixedRate = 30000) // 每30秒检查一次
public void checkExecutorHealth() {
List<String> executors = executorRegistry.getRegisteredExecutors();
for (String executor : executors) {
try {
// 发送心跳请求
boolean healthy = checkExecutorHealth(executor);
if (!healthy) {
// 处理不健康的执行器
handleUnhealthyExecutor(executor);
}
} catch (Exception e) {
log.error("Health check failed for executor: " + executor, e);
}
}
}
private boolean checkExecutorHealth(String executor) {
try {
// 2. 检查系统资源
boolean resourceOk = checkSystemResources(executor);
// 3. 检查网络连接
boolean networkOk = checkNetworkConnectivity(executor);
return heartbeatOk && resourceOk && networkOk;
} catch (Exception e) {
log.error("Health check failed", e);
return false;
}
}
private void handleUnhealthyExecutor(String executor) {
// 1. 标记执行器为不健康状态
executorRegistry.markUnhealthy(executor);
// 2. 重新分配该执行器上的任务
redistributeTasks(executor);
// 3. 发送告警通知
sendHealthAlert(executor);
}
private void redistributeTasks(String executor) {
List<TaskExecution> runningTasks =
executionRepository.findRunningTasksByExecutor(executor);
for (TaskExecution task : runningTasks) {
// 将任务重新分配给其他健康的执行器
String newExecutor = executorRegistry.selectHealthyExecutor();
if (newExecutor != null) {
task.setExecutorIp(newExecutor);
task.setState(TaskState.PENDING);
executionRepository.save(task);
}
}
}
}
29. 任务执行数据采集
@Component
public class TaskExecutionDataCollector {
@Autowired
private ElasticsearchTemplate esTemplate;
@Data
@Document(indexName = "task_execution_metrics")
public static class TaskExecutionMetrics {
private String taskId;
private String executorIp;
private Date timestamp;
private long executionTime;
private long cpuUsage;
private long memoryUsage;
private int threadCount;
private Map<String, Object> customMetrics;
}
// 收集任务执行指标
public void collectMetrics(TaskExecution execution) {
TaskExecutionMetrics metrics = new TaskExecutionMetrics();
metrics.setTaskId(execution.getTaskId().toString());
metrics.setExecutorIp(execution.getExecutorIp());
metrics.setTimestamp(new Date());
// 收集系统指标
OperatingSystemMXBean osMxBean =
ManagementFactory.getOperatingSystemMXBean();
ThreadMXBean threadMxBean =
ManagementFactory.getThreadMXBean();
metrics.setCpuUsage(osMxBean.getSystemLoadAverage());
metrics.setMemoryUsage(Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory());
metrics.setThreadCount(threadMxBean.getThreadCount());
// 保存指标数据
esTemplate.save(metrics);
}
// 查询任务执行指标
public List<TaskExecutionMetrics> queryMetrics(
String taskId, Date startTime, Date endTime) {
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("taskId", taskId))
.must(QueryBuilders.rangeQuery("timestamp")
.from(startTime)
.to(endTime));
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(queryBuilder)
.withSort(SortBuilders.fieldSort("timestamp").order(SortOrder.DESC))
.build();
return esTemplate.queryForList(searchQuery, TaskExecutionMetrics.class);
}
}
30. 任务执行性能分析
@Component
public class TaskPerformanceAnalyzer {
@Autowired
private TaskExecutionDataCollector dataCollector;
@Data
public static class PerformanceReport {
private double avgExecutionTime;
private double p95ExecutionTime;
private double p99ExecutionTime;
private double avgCpuUsage;
private double avgMemoryUsage;
private List<PerformanceAnomaly> anomalies;
}
// 生成任务性能报告
public PerformanceReport analyzePerformance(
String taskId, Date startTime, Date endTime) {
// 获取执行指标数据
List<TaskExecutionMetrics> metrics =
dataCollector.queryMetrics(taskId, startTime, endTime);
PerformanceReport report = new PerformanceReport();
// 计算执行时间统计
List<Long> executionTimes = metrics.stream()
.map(TaskExecutionMetrics::getExecutionTime)
.collect(Collectors.toList());
report.setAvgExecutionTime(calculateAverage(executionTimes));
report.setP95ExecutionTime(calculatePercentile(executionTimes, 95));
report.setP99ExecutionTime(calculatePercentile(executionTimes, 99));
// 计算资源使用统计
report.setAvgCpuUsage(metrics.stream()
.mapToDouble(TaskExecutionMetrics::getCpuUsage)
.average()
.orElse(0.0));
report.setAvgMemoryUsage(metrics.stream()
.mapToDouble(TaskExecutionMetrics::getMemoryUsage)
.average()
.orElse(0.0));
// 检测性能异常
report.setAnomalies(detectAnomalies(metrics));
return report;
}
// 检测性能异常
private List<PerformanceAnomaly> detectAnomalies(
List<TaskExecutionMetrics> metrics) {
List<PerformanceAnomaly> anomalies = new ArrayList<>();
// 使用Z-score方法检测异常值
double avgExecTime = metrics.stream()
.mapToLong(TaskExecutionMetrics::getExecutionTime)
.average()
.orElse(0.0);
double stdDev = calculateStandardDeviation(metrics, avgExecTime);
for (TaskExecutionMetrics metric : metrics) {
double zScore = (metric.getExecutionTime() - avgExecTime) / stdDev;
if (Math.abs(zScore) > 3.0) { // 超过3个标准差认为是异常
anomalies.add(new PerformanceAnomaly(
metric.getTimestamp(),
"Execution time anomaly",
metric.getExecutionTime(),
zScore
));
}
}
return anomalies;
}
// 计算标准差
private double calculateStandardDeviation(
List<TaskExecutionMetrics> metrics, double mean) {
return Math.sqrt(metrics.stream()
.mapToDouble(m -> Math.pow(m.getExecutionTime() - mean, 2))
.average()
.orElse(0.0));
}
}
31. 任务执行器自动扩缩容
@Component
public class TaskExecutorScaler {
@Autowired
private TaskExecutorRegistry executorRegistry;
@Autowired
private TaskMetricsCollector metricsCollector;
@Value("${task.executor.min-instances}")
private int minInstances;
@Value("${task.executor.max-instances}")
private int maxInstances;
@Value("${task.executor.cpu-threshold}")
private double cpuThreshold;
@Value("${task.executor.scale-up-cooldown}")
private long scaleUpCooldown;
private Date lastScaleTime;
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void checkScaling() {
// 获取当前执行器负载情况
Map<String, ExecutorMetrics> metricsMap =
collectExecutorMetrics();
// 计算平均负载
double avgLoad = calculateAverageLoad(metricsMap);
if (shouldScaleUp(avgLoad)) {
scaleUp();
} else if (shouldScaleDown(avgLoad)) {
scaleDown();
}
}
private boolean shouldScaleUp(double avgLoad) {
if (avgLoad > cpuThreshold) {
// 检查冷却时间
if (lastScaleTime == null ||
System.currentTimeMillis() - lastScaleTime.getTime() > scaleUpCooldown) {
return true;
}
}
return false;
}
private void scaleUp() {
int currentCount = executorRegistry.getExecutorCount();
if (currentCount < maxInstances) {
// 创建新的执行器实例
String newExecutor = createExecutorInstance();
executorRegistry.registerExecutor(newExecutor);
lastScaleTime = new Date();
log.info("Scaled up executor count to: {}", currentCount + 1);
}
}
private void scaleDown() {
int currentCount = executorRegistry.getExecutorCount();
if (currentCount > minInstances) {
// 选择负载最低的执行器进行下线
String executorToRemove = selectExecutorForRemoval();
gracefulShutdown(executorToRemove);
log.info("Scaled down executor count to: {}", currentCount - 1);
}
}
private void gracefulShutdown(String executor) {
// 1. 停止接收新任务
executorRegistry.markShuttingDown(executor);
// 2. 等待当前任务完成
waitForTasksCompletion(executor);
// 3. 从注册表中移除
executorRegistry.deregisterExecutor(executor);
// 4. 释放资源
destroyExecutorInstance(executor);
}
}
32. 任务执行器负载均衡
@Component
public class TaskLoadBalancer {
@Autowired
private TaskExecutorRegistry executorRegistry;
@Autowired
private TaskMetricsCollector metricsCollector;
// 负载均衡策略枚举
public enum BalanceStrategy {
ROUND_ROBIN,
LEAST_CONNECTIONS,
WEIGHTED_RESPONSE_TIME,
CONSISTENT_HASH
}
// 选择执行器
public String selectExecutor(TaskDefinition task, BalanceStrategy strategy) {
List<String> availableExecutors =
executorRegistry.getHealthyExecutors();
if (availableExecutors.isEmpty()) {
throw new NoAvailableExecutorException("No healthy executor available");
}
switch (strategy) {
case ROUND_ROBIN:
return selectByRoundRobin(availableExecutors);
case LEAST_CONNECTIONS:
return selectByLeastConnections(availableExecutors);
case WEIGHTED_RESPONSE_TIME:
return selectByWeightedResponseTime(availableExecutors);
case CONSISTENT_HASH:
return selectByConsistentHash(availableExecutors, task);
default:
return selectByRoundRobin(availableExecutors);
}
}
private String selectByLeastConnections(List<String> executors) {
return executors.stream()
.min(Comparator.comparingInt(
executor -> metricsCollector.getCurrentConnections(executor)))
.orElse(executors.get(0));
}
private String selectByWeightedResponseTime(List<String> executors) {
Map<String, Double> responseTimeMap = new HashMap<>();
// 计算每个执行器的平均响应时间
for (String executor : executors) {
double avgResponseTime =
metricsCollector.getAverageResponseTime(executor);
responseTimeMap.put(executor, avgResponseTime);
}
// 计算权重
double totalWeight = responseTimeMap.values().stream()
.mapToDouble(time -> 1.0 / time)
.sum();
double random = Math.random() * totalWeight;
double weightSum = 0;
for (Map.Entry<String, Double> entry : responseTimeMap.entrySet()) {
weightSum += 1.0 / entry.getValue();
if (weightSum >= random) {
return entry.getKey();
}
}
return executors.get(0);
}
private String selectByConsistentHash(
List<String> executors, TaskDefinition task) {
ConsistentHash<String> consistentHash =
new ConsistentHash<>(executors);
return consistentHash.getNode(task.getTaskId().toString());
}
}
33. 任务执行超时处理
@Component
public class TaskTimeoutHandler {
@Autowired
private TaskExecutionRepository executionRepository;
@Autowired
private TaskAlertConfig alertConfig;
@Scheduled(fixedRate = 10000) // 每10秒检查一次
public void checkTimeoutTasks() {
List<TaskExecution> runningTasks =
executionRepository.findRunningTasks();
for (TaskExecution task : runningTasks) {
if (isTimeout(task)) {
handleTimeout(task);
}
}
}
private boolean isTimeout(TaskExecution task) {
if (task.getStartTime() == null) {
return false;
}
long executionTime = System.currentTimeMillis() -
task.getStartTime().getTime();
return executionTime > task.getTimeout();
}
private void handleTimeout(TaskExecution task) {
// 1. 更新任务状态
task.setState(TaskState.TIMEOUT);
executionRepository.save(task);
// 2. 终止任务执行
terminateExecution(task);
// 3. 发送超时告警
sendTimeoutAlert(task);
// 4. 执行超时补偿策略
executeTimeoutCompensation(task);
}
private void terminateExecution(TaskExecution task) {
// 获取任务进程信息
Optional<ProcessHandle> processHandle = ProcessHandle.of(task.getPid());
processHandle.ifPresent(handle -> {
// 先发送中断信号
handle.destroy();
// 等待一段时间后强制终止
try {
if (handle.继续补充任务超时处理和其他核心功能实现。
### 33. 任务执行超时处理(续)
```java
@Component
public class TaskTimeoutHandler {
private void terminateExecution(TaskExecution task) {
// 获取任务进程信息
Optional<ProcessHandle> processHandle = ProcessHandle.of(task.getPid());
processHandle.ifPresent(handle -> {
// 先发送中断信号
handle.destroy();
// 等待一段时间后强制终止
try {
if (handle.isAlive()) {
Thread.sleep(5000); // 等待5秒
if (handle.isAlive()) {
handle.destroyForcibly();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
private void executeTimeoutCompensation(TaskExecution task) {
// 根据任务配置执行补偿策略
TaskTimeoutStrategy strategy = task.getTimeoutStrategy();
switch (strategy) {
case RETRY:
scheduleRetry(task);
break;
case CALLBACK:
executeCallbackTask(task);
break;
case ALERT_ONLY:
// 仅发送告警,无需其他操作
break;
}
}
private void scheduleRetry(TaskExecution task) {
if (task.getRetryCount() < task.getMaxRetries()) {
// 创建重试任务
TaskExecution retryTask = new TaskExecution();
BeanUtils.copyProperties(task, retryTask);
retryTask.setRetryCount(task.getRetryCount() + 1);
retryTask.setState(TaskState.PENDING);
retryTask.setStartTime(null);
retryTask.setEndTime(null);
// 计算重试延迟
long delay = calculateRetryDelay(task.getRetryCount());
scheduleTaskWithDelay(retryTask, delay);
}
}
}
34. 任务执行器资源隔离
@Component
public class TaskResourceIsolation {
@Data
public static class ResourceLimit {
private int cpuLimit; // CPU限制(核心数)
private long memoryLimit; // 内存限制(字节)
private int threadLimit; // 线程数限制
}
private Map<String, ResourceLimit> resourceLimits = new ConcurrentHashMap<>();
// 使用cgroup进行资源隔离
public void applyResourceLimits(String taskHandler, ResourceLimit limit) {
try {
// 创建cgroup
String cgroupPath = "/sys/fs/cgroup/task_" + taskHandler;
Files.createDirectories(Paths.get(cgroupPath));
// 设置CPU限制
Files.write(Paths.get(cgroupPath + "/cpu.cfs_quota_us"),
String.valueOf(limit.getCpuLimit() * 100000).getBytes());
// 设置内存限制
Files.write(Paths.get(cgroupPath + "/memory.limit_in_bytes"),
String.valueOf(limit.getMemoryLimit()).getBytes());
// 将当前进程添加到cgroup
Files.write(Paths.get(cgroupPath + "/tasks"),
String.valueOf(ProcessHandle.current().pid()).getBytes());
} catch (IOException e) {
log.error("Failed to apply resource limits", e);
throw new ResourceIsolationException("Failed to apply resource limits", e);
}
}
// 监控资源使用情况
public ResourceUsage getResourceUsage(String taskHandler) {
ResourceUsage usage = new ResourceUsage();
String cgroupPath = "/sys/fs/cgroup/task_" + taskHandler;
try {
// 读取CPU使用率
String cpuUsage = new String(Files.readAllBytes(
Paths.get(cgroupPath + "/cpuacct.usage")));
usage.setCpuUsage(Long.parseLong(cpuUsage));
// 读取内存使用量
String memoryUsage = new String(Files.readAllBytes(
Paths.get(cgroupPath + "/memory.usage_in_bytes")));
usage.setMemoryUsage(Long.parseLong(memoryUsage));
return usage;
} catch (IOException e) {
log.error("Failed to get resource usage", e);
throw new ResourceMonitorException("Failed to get resource usage", e);
}
}
}
35. 任务执行器网络隔离
@Component
public class TaskNetworkIsolation {
@Data
public static class NetworkPolicy {
private List<String> allowedHosts;
private List<Integer> allowedPorts;
private long bandwidthLimit; // bits/s
}
// 配置网络隔离策略
public void applyNetworkPolicy(String taskHandler, NetworkPolicy policy) {
try {
// 创建网络命名空间
String nsName = "task_" + taskHandler;
Runtime.getRuntime().exec("ip netns add " + nsName);
// 配置防火墙规则
configureFirewall(nsName, policy);
// 配置带宽限制
configureBandwidthLimit(nsName, policy);
} catch (IOException e) {
log.error("Failed to apply network policy", e);
throw new NetworkIsolationException("Failed to apply network policy", e);
}
}
private void configureFirewall(String namespace, NetworkPolicy policy) {
try {
// 默认禁止所有流量
executeCommand("ip netns exec " + namespace +
" iptables -P INPUT DROP");
executeCommand("ip netns exec " + namespace +
" iptables -P OUTPUT DROP");
// 允许指定主机访问
for (String host : policy.getAllowedHosts()) {
executeCommand("ip netns exec " + namespace +
" iptables -A OUTPUT -d " + host + " -j ACCEPT");
executeCommand("ip netns exec " + namespace +
" iptables -A INPUT -s " + host + " -j ACCEPT");
}
// 允许指定端口访问
for (Integer port : policy.getAllowedPorts()) {
executeCommand("ip netns exec " + namespace +
" iptables -A OUTPUT -p tcp --dport " + port + " -j ACCEPT");
executeCommand("ip netns exec " + namespace +
" iptables -A INPUT -p tcp --sport " + port + " -j ACCEPT");
}
} catch (IOException e) {
throw new NetworkIsolationException("Failed to configure firewall", e);
}
}
private void configureBandwidthLimit(String namespace, NetworkPolicy policy) {
try {
// 创建流量控制规则
executeCommand("ip netns exec " + namespace +
" tc qdisc add dev eth0 root tbf rate " +
policy.getBandwidthLimit() + "bit burst 32kbit latency 400ms");
} catch (IOException e) {
throw new NetworkIsolationException(
"Failed to configure bandwidth limit", e);
}
}
}
36.任务执行器故障转移
在分布式任务调度系统中,任务执行器(Task Executor)是负责实际执行任务的节点。如果一个执行器发生故障,我们需要一种机制来检测这个故障,并且将任务转移到另一个健康的执行器上执行。这就是故障转移。
实现思路:
-
心跳检测: 每个执行器定期发送心跳信号到中央管理系统(如ZooKeeper、Etcd等),如果心跳在预定时间内未收到,则认为该节点故障。
-
任务重新分配: 一旦检测到故障,调度系统需要将原本分配给故障节点的任务重新分配到其他健康节点。
-
状态管理: 保持任务的状态更新,以便在故障转移后能正确地继续或重试任务。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.scheduling.annotation.Scheduled;
@Component
public class TaskFailoverManager {
@Autowired
private TaskExecutorRegistry executorRegistry;
@Autowired
private TaskScheduler taskScheduler;
@Scheduled(fixedDelay = 30000) // 每30秒检查一次
public void checkAndFailoverTasks() {
// 遍历所有注册的执行器
for (String executorId : executorRegistry.getAllExecutorIds()) {
if (!executorRegistry.isExecutorAlive(executorId)) {
// 如果执行器被认为是故障的
failoverTasks(executorId);
}
}
}
private void failoverTasks(String failedExecutorId) {
// 获取故障执行器上的所有任务
List<Task> tasksToFailover = executorRegistry.getTasksForExecutor(failedExecutorId);
for (Task task : tasksToFailover) {
// 重新分配任务到一个可用的执行器
String newExecutorId = executorRegistry.getAvailableExecutor();
if (newExecutorId != null) {
taskScheduler.schedule(task.getRunnable(), task.getTrigger());
// 更新任务状态,表示任务已被转移
executorRegistry.updateTaskExecutor(task.getId(), newExecutorId);
} else {
// 如果没有可用的执行器,记录日志或触发告警
System.out.println("No available executor for task failover: " + task.getId());
}
}
}
}
标签:task,String,private,容错,任务,new,任务调度,public,分布式
From: https://blog.csdn.net/jsjbrdzhh/article/details/143581591