首页 > 系统相关 >【日常记录-Java】自定义进程池

【日常记录-Java】自定义进程池

时间:2024-07-12 13:02:00浏览次数:15  
标签:traceId Java processorWrapper 自定义 success private 日常 进程 port

Author:赵志乾
Date:2024-07-12
Declaration:All Right Reserved!!!

1. 简介

        服务器上有些进程需要池化管理,使用SpringBoot构建Web服务提供管理api,内部使用自定义的进程池维护已启动的进程;

        核心点:

  •   进程池管理的进程都会使用系统的一个端口对外提供服务;
  •   进程池管理的进程自身可以决定何时停止服务;
  •   进程池管理的进程有失效时间,如需长时间占用资源,需要进行续期操作;

2. 代码示例

// 自定义进程池
public abstract class ProcessorPool {
    //******************************核心参数*************************************
    // 基础端口: 进程池可使用的最小端口号
    private Integer basePort;
    // 最大并发数: 进程池最大启动进程数
    private Integer maxConcurrent;
    // 初始租期: 进程首次启动后最大存活时长,单位分钟
    private Double initLeaseTerm;
    // 任务标识与进程的映射
    private final Map<String, ProcessorWrapper> traceIdToProcessorWrapperMap = new HashMap<>();
    // 端口与进程的映射
    private final Map<Integer, ProcessorWrapper> portToProcessorWrapperMap = new HashMap<>();

    
    //******************************构造函数************************************
    public ProcessorPool(Integer basePort, Integer maxConcurrent, Double initLeaseTerm){
        this.basePort = basePort;
        this.maxConcurrent = maxConcurrent;
        this.initLeaseTerm = initLeaseTerm;
    }

    //*****************************核心管理函数*********************************
    // 获取进程池信息
    public ProcessorPoolInfo getProcessorPoolInfo() {
        synchronized(this){
            // step1: 优先清理已失效的进程
            clearProcessor();
            // step2: 构造进程池信息: 最大并发数、初始租期、当前并发数
            return ProcessorPoolInfo.builder()
                .maxConcurrent(maxConcurrent)
                .leaseTerm(initLeaseTerm)
                .concurrent(portToProcessorWrapperMap.size())
                .build();
        }
    }
    
    // 清理已失效的进程
    private void clearProcessor() {
        // step1: 按可用端口范围遍历
        for (int port = basePort; port < basePort + maxConcurrent; port++) {
            ProcessorWrapper processorWrapper = portToProcessorWrapperMap.get(port);
            if (processorWrapper != null) {
                // step1.1: 进程超期,则主动销毁
                if (processorWrapper.getExpireTime().isBeforeNow() && processorWrapper.getProcess().isAlive()) {
                    processorWrapper.getProcess().destroy();
                }
                // step1.2: 清除失效的进程
                if (!processorWrapper.getProcess().isAlive()) {
                    portToProcessorWrapperMap.remove(port);
                    traceIdToProcessorWrapperMap.remove(processorWrapper.getTraceId());
                }
            }
        }
    }

    // 启动进程: traceId-任务标识, commandTag-进程命令标识   返回结果中的整数为进程所使用的端口号相对基础端口的偏移量
    public ResultData<Integer> startProcessor(String traceId, String commandTag) {
        synchronized(this){
            // step1: 优先清理已失效的进程
            clearProcessor();
            // step2: 接口幂等
            if (traceIdToProcessorWrapperMap.containsKey(traceId)) {
                return ResultData.<Integer>builder()
                    .success(true)
                    .desc("already start!")
                    .data(traceIdToProcessorWrapperMap.get(traceId).getPort() - basePort)
                    .build();
            }
            // step3: 并发数限制
            if (portToProcessorWrapperMap.size() >= maxConcurrent) {
                return ResultData.<Integer>builder()
                    .success(false)
                    .desc("concurrent limited!")
                    .data(null)
                    .build();
            }
            // step4: 寻找可用端口
            int port = basePort;
            for (; port < basePort + maxConcurrent; port++) {
                ProcessorWrapper processorWrapper = portToProcessorWrapperMap.get(port);
                if (processorWrapper == null) {
                    break;
                }
            }
            // step5: 调用子类启动进程
            Process processor = startProcessor(traceId,commandTag,port);
            // step6: 处理启动结果
            if(processor == null){
                return ResultData.<Integer>builder()
                    .success(false)
                    .desc("start failed!")
                    .data(null)
                    .build();
            }
            ProcessorWrapper processorWrapper = ProcessorWrapper.builder()
                .startTime(DateTime.now())
                .traceId(traceId)
                .commandTag(commandTag)
                .port(port)
                .processor(processor)
                .expireTime(DateTime.now().plusMillis((int) (initLeaseTerm * 60 * 1000)))
                .build();
            traceIdToProcessorWrapperMap.put(traceId, processorWrapper);
            portToProcessorWrapperMap.put(port, processorWrapper);

            return ResultData.<Integer>builder()
                    .success(true)
                    .desc("success!")
                    .data(port - basePort)
                    .build();
        }
    }

    // 停止进程
    public ResultData<String> stopProcessor(String traceId) {
        synchronized(this){
            // step1: 幂等
            if (!traceIdToProcessorWrapperMap.containsKey(traceId)) {
                return ResultData.<String>builder()
                    .success(true)
                    .desc("already stopped!")
                    .data(null)
                    .build();
            }
            // step2: 停止进程
            ProcessorWrapper processorWrapper = traceIdToProcessorWrapperMap.get(traceId);
            if (processorWrapper.getProcess().isAlive()) {
                processorWrapper.getProcess().destroy();
            }
            portToProcessorWrapperMap.remove(processorWrapper.getPort());
            traceIdToProcessorWrapperMap.remove(traceId);
            // step3: 记录结果
            Double time = (DateTime.now().getMillis() -     processorWrapper.getStartTime().getMillis()) / 1000.0;
            log.info("持续时间: traceId={} commandTag={} time={}秒", traceId, processorWrapper.getCommandTag(), time);
            return ResultData.<String>builder()
                .success(true)
                .desc("success!")
                .data(null)
                .build();
        }
    }

    // 续期进程
    public ResultData<String> renew(String traceId, Double minutes) {
        synchronized(this){
            // step1: 数据合法性校验
            if (!traceIdToProcessorWrapperMap.containsKey(traceId)) {
                return ResultData.<String>builder()
                    .success(false)
                    .desc("traceId is not exist!")
                    .data(null)
                    .build();
            }
            // step2: 续期
            ProcessorWrapper processorWrapper = traceIdToProcessorWrapperMap.get(traceId);
            processorWrapper.setExpireTime(DateTime.now().plusMillis((int) (minutes * 60 * 1000)));
            return ResultData.<String>builder()
                .success(true)
                .desc("success!")
                .data(null)
                .build();
        }
    }
}

//************************************************************************************
//**********************************额外类*********************************************
// 进程包装类
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ProcessorWrapper {
 /****************************************************************************************
     *   traceId      : 运行任务的唯一标识
     *   commandTag   : 所使用的仿真器
     *   port         : 占用的端口号
     *   processor    : 所使用的进程
     *   startTime    : 进程开始时间
     *   expireTime   : 进程失效时间 
  ****************************************************************************************/
    private String traceId;
    private String simulator;
    private Integer port;
    private Process process;
    private DateTime startTime;
    private DateTime expireTime;
}

// 结果类
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ResultData<T> {
    private Boolean success;
    private String desc;
    private T data;
}

// 进程池信息类
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ProcessorPoolInfo {
    /****************************************************************************************
     *  maxConcurrent   :  最大并发数
     *  leaseTerm       :  初始租期
     *  concurrent      :  当前并发数
     ****************************************************************************************/
    private Integer maxConcurrent;
    private Double leaseTerm;
    private Integer concurrent;
}

标签:traceId,Java,processorWrapper,自定义,success,private,日常,进程,port
From: https://blog.csdn.net/zhaoyaxuan001/article/details/140369195

相关文章

  • 演示:【Avalonia-Controls】Avalonia皮肤,主题,自定义控件,数据库,系统模块资源库
    一、目的:分享一个Avalonia皮肤,主题,自定义控件,数据库,系统模块资源库开源地址:GitHub-HeBianGu/Avalonia-Controls:Avalonia控件库Nuget包地址:NuGetGallery|PackagesmatchingHeBianGu.AvaloniaUI.演示视频地址:【Avalonia-Controls】Avalonia工具组件皮肤库v1.0.0_......
  • 最新AI一站式系统源码-ChatGPT商业版系统源码,支持自定义AI智能体应用、AI绘画、AI视频
     一、前言人工智能语言模型和AI绘画在多个领域都有广泛的应用.....SparkAi创作系统是一款基于ChatGPT和Midjourney开发的智能问答和绘画系统,提供一站式AIB/C端解决方案,涵盖AI大模型提问、AI绘画、AI视频、文档分析、图像识别和理解、TTS&语音识别、AI换脸等多项功能。......
  • Java怎么统计每个项目下的每个类别的数据
    为了演示如何在Java中统计每个项目下的每个类别的数据,我们可以考虑一个简单的场景:假设我们有一个电商系统,需要统计每个商品分类在每个店铺下的销售数量。这里我们将使用Java的集合框架,如HashMap和ArrayList,来存储和统计数据。1.使用Java的集合框架HashMap和ArrayList来存储和统计......
  • 关于Java内存区域的理解和记录
    近期做项目遇到了FullGC的问题,干脆总结一下Java内存区域分布和垃圾回收是咋回事。Java内存区域按照线程隔离状态直接分成三大块空间:线程私有:程序计数器是一块较小的内存空间,可以看做当前线程所执行的字节码的行号指示器。在虚拟机概念模型里,字节码解释器工作时就是通过改变这......
  • Java中的递归算法详解
    Java中的递归算法详解大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!1.什么是递归算法?递归算法是指在函数的定义中使用函数自身调用的方法。在算法中,递归通常用于解决可以被拆分为相似子问题的问题,每个子问题都是原始问题的一部分。2.递归算法的基本......
  • Java中的反序列化详解
    Java中的反序列化详解大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!1.什么是反序列化?反序列化是将对象的字节序列转换回对象的过程。在Java中,对象序列化是将对象转换为字节序列以便存储或传输,而反序列化则是将这些字节序列重新转换为对象。2.Java中......
  • Java Redis多限流
    JavaRedis多限流在Java中实现Redis多限流通常涉及使用Redis的某些特性,如INCR、EXPIRE、Lua脚本或者更高级的Redis数据结构如RedisBitmaps、RedisStreams结合RedisPub/Sub,或者使用Redis的第三方库如RedisRateLimiter(基于Lua脚本或Redis自身功能实现)。然而,为了直接和易于实现......
  • Java中的排序算法详解
    Java中的排序算法详解大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!1.排序算法概述排序算法是计算机科学中的基础问题,它将一组元素按照特定的顺序重新排列。在实际开发中,选择合适的排序算法可以显著提高程序的性能。2.冒泡排序(BubbleSort)冒泡排序......
  • 【热门】Java银行交易处理系统讲解【含源代码】
    本作品由老程个人著作,经供参考以下是一个关于Java银行交易处理系统的简要讲解:系统概述:银行交易处理系统是一个复杂但关键的应用程序,用于处理各种金融交易,如存款、取款、转账、账户查询等。主要功能模块:1.用户账户管理:开户:创建新的用户账户,包括收集个人信息、设置......
  • Java毕业设计基于Vue+SpringBoot的电影院订票选座管理系统(代码+数据库+文档LW+运行成
    很多朋友发现后期找不到文章,收藏关注不迷路文章目录项目介绍技术介绍项目界面关键代码目录项目介绍在飞速发展的今天,网络已成为人们重要的交流平台。电影院每天都有大量的需要通过网络发布,为此,本人开发了一个基于B/S;浏览器/服务器;模式的电影院管理系统。该系......