首页 > 其他分享 >自封线程池

自封线程池

时间:2023-10-10 11:44:44浏览次数:33  
标签:private public 任务 线程 new jobInfo jobName 自封

因为在实际的工作上,对于线程池这块也是基本都用的jdk的线程池,要不就是通过completefuture 要不直接就是Callable和Runnable ,因为没有做任务的封装,就导致对于任务的完成结果这类的数据只能通过日志进行查看,并且因为没有相关的规范接口,就导致实际上多线程应用起来很杂乱,所以想着能不能基于线程池来进行一次封装便于规范式开发

相关maven依赖

        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>   // 这个依赖就只是用来生成随机的字符串用来测试B任务的
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

先定义bean

1.定义任务的结果状态,也就是执行成功或者失败的枚举,

public enum TaskResultType {
   Success,/*方法执行完成,业务结果也正确*/
   Failure,/*方法执行完成,业务结果错误*/
   Exception/*方法执行抛出了异常*/
}

2.对于任务结果本身的返回上 任务参数的意义也就是后期的出错任务的排查工作,任务执行时间就是个数值的记录

@Data
@AllArgsConstructor
public class TaskResult<R> {
   //方法执行结果
   private final TaskResultType resultType;
   //方法执行后的结果数据
   private final R returnValue;
   //异常原因
   private final String failReason;
   //任务参数
   private Task task;
   //任务执行时间
   private LocalDateTime processTime;

}

规范的定义

1.任务执行器的定义 里面的taskExecute 就是具体想要执行的业务代码逻辑 ,就把它理解为run 方法里面的具体代码就行

public interface ITaskExecutor<R> {
    TaskResult<R> taskExecute(Task data);
}

2.对于任务本身的封装 因为在出错的时候需要数据来排查,所以我需要tracId 来查找任务上下文数据,还需要 t 任务内容 ,泛型是因为他可以兼容各种不同的参数类型,exeCutorName是作为任务执行器的选择,因为 在设想中 线程池能够完成不同类型的任务

使用的方式 就是通过new Task 把想要执行的方法的参数给包装成 Task的形式进行传递到线程池中

@Data
public class Task<T> {
    
    private int traceId;
    //任务内容
    private  T t;
    //工作名称 用于选择 任务执行器
    private String exeCutorName;
}

线程池本身

import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

/***
 *  类说明:框架的主体类,也是调用者主要使用的类
 这边的线程数和最大线程数可以参照网上的公式,但是太复杂了,公式我贴在最后
 */
public class PendingJobPool {
    //框架运行时的线程数,与机器的CPU数相同
    private static final int THREAD_COUNTS
            = Runtime.getRuntime().availableProcessors();
    private static BlockingQueue<Runnable> taskQueue
            = new ArrayBlockingQueue<Runnable>(5000);
    //这里建议用spring的  ThreadPoolTaskExecutor 他可以命名线程池的前缀,并且他可以使用spring的扩展装饰者模式更灵活
    private static ExecutorService taskExecutor
            = new ThreadPoolExecutor(THREAD_COUNTS, THREAD_COUNTS*2,
            60, TimeUnit.SECONDS, taskQueue,new ThreadPoolExecutor.CallerRunsPolicy());


    //用于注册工作任务类型 String 就是工作名
    private static ConcurrentHashMap<String, JobInfo<?>> jobInfoMap
            = new ConcurrentHashMap<String, JobInfo<?>>();


    public static Map<String, JobInfo<?>> getMap() {
        return jobInfoMap;
    }

    //以单例模式启动
    private PendingJobPool() {
    }

    private static class JobPoolHolder {
        public static PendingJobPool pool = new PendingJobPool();
    }

    public static PendingJobPool getInstance() {
        return JobPoolHolder.pool;
    }

    //对工作中的任务进行包装,提交给线程池使用,
    //并将处理任务的结果,写入缓存以供查询
    private static class PendingTask<T, R> implements Runnable {

        private JobInfo<R> jobInfo;
        //工作的数据
        private Task<T> task;

        public PendingTask(JobInfo<R> jobInfo, Task task) {
            this.jobInfo = jobInfo;
            this.task = task;
        }

        @Override
        public void run() {
            R r = null;
            ITaskExecutor<R> taskProcesser
                    = jobInfo.getTaskProcesser();
            TaskResult<R> result = null;
            try {
                result = taskProcesser.taskExecute(task);
                LocalDateTime now = LocalDateTime.now();

                if (result == null) {
                    result = new TaskResult<R>(TaskResultType.Exception, r
                            , "result is null",task,now);
                }

            } catch (Exception e) {
                e.printStackTrace();
                result = new TaskResult<R>(TaskResultType.Exception, r
                        , e.getMessage(),task,LocalDateTime.now());
            } finally {
                jobInfo.addTaskResult(result);
            }

        }
    }

    //调用者提交工作中的任务
    public <T, R> void putTask(  Task<T> t) {
        JobInfo<R> jobInfo = getJob(t.getExeCutorName());
        PendingTask<T, R> task = new PendingTask(jobInfo, t);
        taskExecutor.execute(task);
    }


    //调用者注册工作,如工作名,任务的处理器等等
    public <R> void registerJob(String jobName,
                                ITaskExecutor<R> taskProcesser) {
        JobInfo<R> jobInfo =
                new JobInfo<R>(jobName, taskProcesser);
        if (jobInfoMap.putIfAbsent(jobName, jobInfo) != null) {
            throw new RuntimeException(jobName + "已经注册!");
        }

    }

    //搜索工作的信息
    private <R> JobInfo<R> getJob(String jobName) {
        JobInfo<R> jobInfo = (JobInfo<R>) jobInfoMap.get(jobName);
        if (null == jobInfo) {
            throw new RuntimeException(jobName + "是非法任务!");
        }
        return jobInfo;
    }

    //获得工作的整体处理进度
    public <R> String getTaskProgess(String jobName) {
        JobInfo<R> jobInfo = getJob(jobName);
        return jobInfo.getTotalProcess();
    }

    //获得指定的工作的每个任务的处理详情
    public <R> List<TaskResult<R>> getTaskDetail(String jobName) {
        JobInfo<R> jobInfo = getJob(jobName);
        return jobInfo.getTaskDetail();
    }

}

因为之前提到不同类型的任务需要有不同类型的处理器,所以就有了这个类JobInfo 里面对于任务类型的整体数据

就比如A类型的任务的配套执行器,A类型任务总共的执行次数,成功次数等,以及A类型任务的所有任务结果都保存在了taskDetailQueues 里面,但是因为这个任务结果时间久了就没任何意义了,所以这任务数据可以去定时删除clearResultQueue(在测试下 判断条件是 所有任务都执行过了,并且最后一次任务的执行时间在15s前)

/***
 *类说明:提交给框架执行的工作实体类,
* 工作:表示本批次需要处理的同性质任务(Task)的一个集合
 */
@Data
public class JobInfo<R> {
   //工作名,用以区分框架中唯一的工作
   private final String jobName;

   //处理工作中任务的处理器
   private final ITaskExecutor<R> taskProcesser;
   //任务的成功次数
   private AtomicInteger successCount;
   //工作中任务目前已经处理的次数
   private AtomicInteger taskProcessCount;
   //工作中任务数
   private  AtomicInteger taskCount;
   //存放每个任务的处理结果,供查询用
   private List<TaskResult<R>> taskDetailQueues;

   public JobInfo(String jobName,
      ITaskExecutor<R> taskProcesser
       ) {
         this.jobName = jobName;
         this.taskCount = new AtomicInteger(0);;
         successCount = new AtomicInteger(0);
         taskProcessCount = new AtomicInteger(0);
         this.taskProcesser = taskProcesser;
         taskDetailQueues = Collections.synchronizedList(new ArrayList<>());

   }


    //提供工作的整体进度信息
    public String getTotalProcess() {
        return "Success["+successCount.get()+"]/Current["+taskProcessCount.get()
                +"] Total["+ taskCount +"]";
    }
   
   //提供工作中每个任务的处理结果
   public  List<TaskResult<R>> getTaskDetail(){
      return  taskDetailQueues;
   }
   
   //每个任务处理完成后,记录任务的处理结果,因为从业务应用的角度来说,
//  对查询任务进度数据的一致性要不高
// 我们保证最终一致性即可,无需对整个方法加锁
   public void addTaskResult(TaskResult<R> taskResult){
      if(TaskResultType.Success.equals(taskResult.getResultType())){
         successCount.incrementAndGet();
      }
      taskProcessCount.incrementAndGet();
      taskCount.incrementAndGet();
      taskDetailQueues.add(taskResult);
   }
   //删除任务结果队列,清除任务相关数据 ,减少内存空间 ,适合定时任务
   public void clearResultQueue(){
      successCount = new AtomicInteger(0);
      taskProcessCount = new AtomicInteger(0);
      taskCount=new AtomicInteger(0);
      taskDetailQueues = Collections.synchronizedList(new ArrayList<>());
   }

}

代码示例:

定义2个任务的执行器

/**
 * 实际的任务执行器 Thread.sleep(taskTime)相当于就是任务的执行
 */
public class ATaskExecutor implements ITaskExecutor<Integer> {

    @Override
    public TaskResult<Integer> taskExecute(Task task) {
       System.out.println("这是工作A在执行相应的任务,任务数据=" + task.toString());
        Random r = new Random();
        int taskTime = r.nextInt(500);
        AppTest.sleep(taskTime);
        //任务成功
        LocalDateTime processTime = LocalDateTime.now();
        if (taskTime <= 200) {
            Integer dataResult = 1;
            return new TaskResult<Integer>(TaskResultType.Success, dataResult, "success", task, processTime);

        } else if (taskTime > 201 && taskTime <= 400) {//任务出现问题
            return new TaskResult<Integer>(TaskResultType.Failure, -1, "Failure", task, processTime);
        } else {//抛出异常
            try {
                throw new RuntimeException("发生异常!!");
            } catch (Exception e) {
                return new TaskResult<Integer>(TaskResultType.Exception,
                        -2, e.getMessage(), task, processTime);
            }
        }
    }

}
/**
 *实际的任务执行器 Thread.sleep(taskTime)相当于就是任务的执行
 */
public class BTaskExecutor implements ITaskExecutor<String> {

   @Override
   public TaskResult<String> taskExecute(Task task) {
      System.out.println("这是工作B,任务数据="+ task.toString());
      Random r = new Random();
      int taskTime = r.nextInt(500);
      try {
         Thread.sleep(taskTime);
      } catch (InterruptedException e) {

      }
      LocalDateTime processTime = LocalDateTime.now();
      //任务成功
      if (taskTime <= 300) {
         return new TaskResult<String>(TaskResultType.Success, "任务B执行成功", "success",task,processTime);
      } else if (taskTime > 301 && taskTime <= 400) {//任务出现问题
         return new TaskResult<String>(TaskResultType.Failure, "任务B执行失败", "Failure",task,processTime);
      } else {//抛出异常
         try {
            throw new RuntimeException("发生异常!!");
         } catch (Exception e) {
            return new TaskResult<String>(TaskResultType.Exception,
                  "任务B 因为抛出异常执行失败", e.getMessage(),task,processTime);
         }
      }
   }

}

main 方法执行

public class AppTest {


    public static void main(String[] args) throws Exception {
        ATaskExecutor aTaskExecutor = new ATaskExecutor();
        BTaskExecutor bTaskExecutor = new BTaskExecutor();
        PendingJobPool pool = PendingJobPool.getInstance();
        //线程池注册工作
        pool.registerJob("ATask", aTaskExecutor);
        pool.registerJob("BTask", bTaskExecutor);
        Random r = new Random();
        for (int i = 0; i < 5; i++) {

            Task<Integer> aTask = new Task<>();
            int aTaskTraceId = r.nextInt(1000);
            aTask.setT(r.nextInt(1000));
            System.out.println("aTask的traceID=" + aTaskTraceId);
            aTask.setTraceId(aTaskTraceId);
            aTask.setExeCutorName("ATask");
            pool.putTask(aTask);

            int bTaskTraceId = r.nextInt(1000);
            System.out.println("bTask的traceID=" + bTaskTraceId);
            Task<String> bTask = new Task<>();
            bTask.setTraceId(bTaskTraceId);
            bTask.setExeCutorName("BTask");
            bTask.setT(RandomStringUtils.random(3));
            pool.putTask(bTask);

        }
        Map<String, JobInfo<?>> map = PendingJobPool.getMap();


        //没有新任务时 删除任务结果队列,清除任务相关数据 ,减少内存空间 ,适合定时任务


        new Thread(() -> {
            while (true) {
                sleep(3000);
                map.values().stream().forEach((jobInfo) -> {
                    LocalDateTime now = LocalDateTime.now();
                    //删除数据要求是 任务全部执行成功并且最后一次任务的执行时间大于15s
                    if (jobInfo.getTaskProcessCount().get()!=0&&jobInfo.getTaskProcessCount().get() == jobInfo.getTaskProcessCount().get() && jobInfo.getTaskDetailQueues().get(jobInfo.getTaskProcessCount().get()-1).getProcessTime().isBefore(now.minusSeconds(15))) {
                        System.out.println(jobInfo.getJobName()+"任务均执行完成,此时最后执行完成的任务的完成时间是" + jobInfo.getTaskDetailQueues().get(jobInfo.getTaskProcessCount().get()-1).getProcessTime());
                        jobInfo.clearResultQueue();
                    }
                });
            }

        }).start();

        new Thread(()->{
            //查看任务进度
            while (true) {
                sleep(3000);
                map.keySet().stream().forEach((jobName) -> {
                    List<TaskResult<String>> taskDetail = pool.getTaskDetail(jobName);
                    if (!taskDetail.isEmpty()) {
                        System.out.println(jobName + "获得工作的整体处理进度=" + pool.getTaskProgess(jobName));
                        System.out.println(jobName + "任务详情=" + taskDetail);
                    }else {
                        System.out.println(jobName + "此时没有相对应的任务");
                    }
                });
            }
        }).start();



        sleep(10000);

        Task<String> bTask = new Task<>();
        int bTaskTraceId = r.nextInt(1000);
        bTask.setTraceId(bTaskTraceId);
        bTask.setExeCutorName("BTask");
        bTask.setT(RandomStringUtils.random(3));
        pool.putTask(bTask);


    }

    public static void sleep(Integer time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

线程池参数配置

等待队列数=queuePoolSize=(corePoolSize / taskCose) * responseTime
          代入数据maxPoolSize=(80/5~10)*1=16~8,也就是说,queuePoolSize的取值范围是8~16,意思是等待队列里面的线程,可以等待8~16秒,超过了这个时间就需要开新的线程来执行。

最大线程池数=maxPoolSize=(max(taskSecond) - queuePoolSize) / (1 / taskCost)=(100-(8~6)) / (1/5~10) = (92~94) / ( 1/ 5~10) =(92~94)* (5~10)=(92*5)~(94*10)=460~940
           最大线程数=( 最大任务数-等待队列容量)/ 每个线程每秒处理能力

因为是最近有空了才想到是去封装一下的,还没有在工作中使用过

标签:private,public,任务,线程,new,jobInfo,jobName,自封
From: https://www.cnblogs.com/zz0203/p/17754279.html

相关文章

  • python多线程
    importdatetimeimportthreadingfromtimeimportsleep#创建一个信号量,限制最多同时运行2个线程semaphore=threading.Semaphore(2)#创建一个线程锁threadLock=threading.Lock()defworker(i):withsemaphore:current_datetime=datetime.datet......
  • Java 21新特性-虚拟线程 审核中
    本文翻译自国外论坛medium,原文地址:https://medium.com/@benweidig/looking-at-java-21-virtual-threads-0ddda4ac1be1Java21版本更新中最重要的功能之一就是虚拟线程(JEP444)。这些轻量级线程减少了编写、维护和观察高吞吐量并发应用程序所需的工作量。正如我的许多其他文......
  • 创建线程的三种方式
    「有点收获」三种基本方法创建线程(qq.com)创建线程的三种方法线程英译是Thread,这也是Java中线程对应的类名,在java.lang包下。注意下它实现了Runnable接口,下文会详细解释。线程与任务合并——直接继承Thread类线程创建出来自然是需要执行一些特定的任务的,一个线程......
  • Java21上手体验-分代ZGC和虚拟线程 | 京东云技术团队
    一、导语几天前Oracle刚刚发布了Java21,由于这是最新的LTS版本,引起了大家的关注。我也第一时间在个人项目中进行了升级体验。一探究竟,和大家分享。二、Java21更新内容介绍官方release公告:https://jdk.java.net/21/release-notes开源中国介绍:https://my.oschina.net/waylau/blog/10......
  • 1、为何说只有 1 种实现线程的方法?
    1、为何说只有1种实现线程的方法?为什么说本质只有一种实现线程的方式?实现Runnable接口究竟比继承Thread类实现线程好在哪里?目录1、为何说只有1种实现线程的方法?实现多线程的多种方式1、通过实现Runnable接口的方式实现多线程2、继承Thread类3、通过线程池创建线......
  • 万字长文详解Java线程池面试题
    王有志,一个分享硬核Java技术的互金摸鱼侠加入Java人的提桶跑路群:共同富裕的Java人今天是《面霸的自我修养》第6篇文章,我们一起来看看面试中会问到哪些关于线程池的问题吧。数据来源:大部分来自于各机构(Java之父,Java继父,某灵,某泡,某客)以及各博主整理文档;小部分来自于......
  • 多线程,线程同步(synchronized),并发问题
    多个线程同时操作一个对象,就会出现并发问题,所以需要线程同步,线程同步是一种等待机制。 线程同步的形成条件:队列+锁(锁就是例如上厕所,一个进去锁住避免其他进入。到下一个进去再锁住)线程同步来解决线程的不安全性弊端!: ......
  • 多线程,守护线程daemon
    简介: 下面例子:首先两个线程类实现Runnable接口 然后在主线程模拟一下上帝守护你 其中,setDaemon方法可以切换线程模式......
  • 多线程,线程优先级Priority
    线程优先级(Priority)用数字表示,范围从1~10,优先级越高,给的资源就多一点,被执行的可能就高一些  优先级默认为5 注意!!!要先设置优先级再启动线程!!! ......
  • 多线程,线程状态-停止
    五大线程状态!1、创建状态2、就绪状态3、运行状态4、阻塞状态5、死亡状态如下图: 这里主要先学习线程的停止  ......