Author:赵志乾
Date:2024-07-12
Declaration:All Right Reserved!!!
1. 简介
服务器上有些进程需要池化管理,使用SpringBoot构建Web服务提供管理api,内部使用自定义的进程池维护已启动的进程;
核心点:
- 进程池管理的进程都会使用系统的一个端口对外提供服务;
- 进程池管理的进程自身可以决定何时停止服务;
- 进程池管理的进程有失效时间,如需长时间占用资源,需要进行续期操作;
2. 代码示例
// 自定义进程池
public abstract class ProcessorPool {
//******************************核心参数*************************************
// 基础端口: 进程池可使用的最小端口号
private Integer basePort;
// 最大并发数: 进程池最大启动进程数
private Integer maxConcurrent;
// 初始租期: 进程首次启动后最大存活时长,单位分钟
private Double initLeaseTerm;
// 任务标识与进程的映射
private final Map<String, ProcessorWrapper> traceIdToProcessorWrapperMap = new HashMap<>();
// 端口与进程的映射
private final Map<Integer, ProcessorWrapper> portToProcessorWrapperMap = new HashMap<>();
//******************************构造函数************************************
public ProcessorPool(Integer basePort, Integer maxConcurrent, Double initLeaseTerm){
this.basePort = basePort;
this.maxConcurrent = maxConcurrent;
this.initLeaseTerm = initLeaseTerm;
}
//*****************************核心管理函数*********************************
// 获取进程池信息
public ProcessorPoolInfo getProcessorPoolInfo() {
synchronized(this){
// step1: 优先清理已失效的进程
clearProcessor();
// step2: 构造进程池信息: 最大并发数、初始租期、当前并发数
return ProcessorPoolInfo.builder()
.maxConcurrent(maxConcurrent)
.leaseTerm(initLeaseTerm)
.concurrent(portToProcessorWrapperMap.size())
.build();
}
}
// 清理已失效的进程
private void clearProcessor() {
// step1: 按可用端口范围遍历
for (int port = basePort; port < basePort + maxConcurrent; port++) {
ProcessorWrapper processorWrapper = portToProcessorWrapperMap.get(port);
if (processorWrapper != null) {
// step1.1: 进程超期,则主动销毁
if (processorWrapper.getExpireTime().isBeforeNow() && processorWrapper.getProcess().isAlive()) {
processorWrapper.getProcess().destroy();
}
// step1.2: 清除失效的进程
if (!processorWrapper.getProcess().isAlive()) {
portToProcessorWrapperMap.remove(port);
traceIdToProcessorWrapperMap.remove(processorWrapper.getTraceId());
}
}
}
}
// 启动进程: traceId-任务标识, commandTag-进程命令标识 返回结果中的整数为进程所使用的端口号相对基础端口的偏移量
public ResultData<Integer> startProcessor(String traceId, String commandTag) {
synchronized(this){
// step1: 优先清理已失效的进程
clearProcessor();
// step2: 接口幂等
if (traceIdToProcessorWrapperMap.containsKey(traceId)) {
return ResultData.<Integer>builder()
.success(true)
.desc("already start!")
.data(traceIdToProcessorWrapperMap.get(traceId).getPort() - basePort)
.build();
}
// step3: 并发数限制
if (portToProcessorWrapperMap.size() >= maxConcurrent) {
return ResultData.<Integer>builder()
.success(false)
.desc("concurrent limited!")
.data(null)
.build();
}
// step4: 寻找可用端口
int port = basePort;
for (; port < basePort + maxConcurrent; port++) {
ProcessorWrapper processorWrapper = portToProcessorWrapperMap.get(port);
if (processorWrapper == null) {
break;
}
}
// step5: 调用子类启动进程
Process processor = startProcessor(traceId,commandTag,port);
// step6: 处理启动结果
if(processor == null){
return ResultData.<Integer>builder()
.success(false)
.desc("start failed!")
.data(null)
.build();
}
ProcessorWrapper processorWrapper = ProcessorWrapper.builder()
.startTime(DateTime.now())
.traceId(traceId)
.commandTag(commandTag)
.port(port)
.processor(processor)
.expireTime(DateTime.now().plusMillis((int) (initLeaseTerm * 60 * 1000)))
.build();
traceIdToProcessorWrapperMap.put(traceId, processorWrapper);
portToProcessorWrapperMap.put(port, processorWrapper);
return ResultData.<Integer>builder()
.success(true)
.desc("success!")
.data(port - basePort)
.build();
}
}
// 停止进程
public ResultData<String> stopProcessor(String traceId) {
synchronized(this){
// step1: 幂等
if (!traceIdToProcessorWrapperMap.containsKey(traceId)) {
return ResultData.<String>builder()
.success(true)
.desc("already stopped!")
.data(null)
.build();
}
// step2: 停止进程
ProcessorWrapper processorWrapper = traceIdToProcessorWrapperMap.get(traceId);
if (processorWrapper.getProcess().isAlive()) {
processorWrapper.getProcess().destroy();
}
portToProcessorWrapperMap.remove(processorWrapper.getPort());
traceIdToProcessorWrapperMap.remove(traceId);
// step3: 记录结果
Double time = (DateTime.now().getMillis() - processorWrapper.getStartTime().getMillis()) / 1000.0;
log.info("持续时间: traceId={} commandTag={} time={}秒", traceId, processorWrapper.getCommandTag(), time);
return ResultData.<String>builder()
.success(true)
.desc("success!")
.data(null)
.build();
}
}
// 续期进程
public ResultData<String> renew(String traceId, Double minutes) {
synchronized(this){
// step1: 数据合法性校验
if (!traceIdToProcessorWrapperMap.containsKey(traceId)) {
return ResultData.<String>builder()
.success(false)
.desc("traceId is not exist!")
.data(null)
.build();
}
// step2: 续期
ProcessorWrapper processorWrapper = traceIdToProcessorWrapperMap.get(traceId);
processorWrapper.setExpireTime(DateTime.now().plusMillis((int) (minutes * 60 * 1000)));
return ResultData.<String>builder()
.success(true)
.desc("success!")
.data(null)
.build();
}
}
}
//************************************************************************************
//**********************************额外类*********************************************
// 进程包装类
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ProcessorWrapper {
/****************************************************************************************
* traceId : 运行任务的唯一标识
* commandTag : 所使用的仿真器
* port : 占用的端口号
* processor : 所使用的进程
* startTime : 进程开始时间
* expireTime : 进程失效时间
****************************************************************************************/
private String traceId;
private String simulator;
private Integer port;
private Process process;
private DateTime startTime;
private DateTime expireTime;
}
// 结果类
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ResultData<T> {
private Boolean success;
private String desc;
private T data;
}
// 进程池信息类
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ProcessorPoolInfo {
/****************************************************************************************
* maxConcurrent : 最大并发数
* leaseTerm : 初始租期
* concurrent : 当前并发数
****************************************************************************************/
private Integer maxConcurrent;
private Double leaseTerm;
private Integer concurrent;
}
标签:traceId,Java,processorWrapper,自定义,success,private,日常,进程,port
From: https://blog.csdn.net/zhaoyaxuan001/article/details/140369195