因为在实际的工作上,对于线程池这块也是基本都用的jdk的线程池,要不就是通过completefuture 要不直接就是Callable和Runnable ,因为没有做任务的封装,就导致对于任务的完成结果这类的数据只能通过日志进行查看,并且因为没有相关的规范接口,就导致实际上多线程应用起来很杂乱,所以想着能不能基于线程池来进行一次封装便于规范式开发
相关maven依赖
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId> // 这个依赖就只是用来生成随机的字符串用来测试B任务的
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
先定义bean
1.定义任务的结果状态,也就是执行成功或者失败的枚举,
public enum TaskResultType {
Success,/*方法执行完成,业务结果也正确*/
Failure,/*方法执行完成,业务结果错误*/
Exception/*方法执行抛出了异常*/
}
2.对于任务结果本身的返回上 任务参数的意义也就是后期的出错任务的排查工作,任务执行时间就是个数值的记录
@Data
@AllArgsConstructor
public class TaskResult<R> {
//方法执行结果
private final TaskResultType resultType;
//方法执行后的结果数据
private final R returnValue;
//异常原因
private final String failReason;
//任务参数
private Task task;
//任务执行时间
private LocalDateTime processTime;
}
规范的定义
1.任务执行器的定义 里面的taskExecute 就是具体想要执行的业务代码逻辑 ,就把它理解为run 方法里面的具体代码就行
public interface ITaskExecutor<R> {
TaskResult<R> taskExecute(Task data);
}
2.对于任务本身的封装 因为在出错的时候需要数据来排查,所以我需要tracId 来查找任务上下文数据,还需要 t 任务内容 ,泛型是因为他可以兼容各种不同的参数类型,exeCutorName是作为任务执行器的选择,因为 在设想中 线程池能够完成不同类型的任务
使用的方式 就是通过new Task 把想要执行的方法的参数给包装成 Task的形式进行传递到线程池中
@Data
public class Task<T> {
private int traceId;
//任务内容
private T t;
//工作名称 用于选择 任务执行器
private String exeCutorName;
}
线程池本身
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/***
* 类说明:框架的主体类,也是调用者主要使用的类
这边的线程数和最大线程数可以参照网上的公式,但是太复杂了,公式我贴在最后
*/
public class PendingJobPool {
//框架运行时的线程数,与机器的CPU数相同
private static final int THREAD_COUNTS
= Runtime.getRuntime().availableProcessors();
private static BlockingQueue<Runnable> taskQueue
= new ArrayBlockingQueue<Runnable>(5000);
//这里建议用spring的 ThreadPoolTaskExecutor 他可以命名线程池的前缀,并且他可以使用spring的扩展装饰者模式更灵活
private static ExecutorService taskExecutor
= new ThreadPoolExecutor(THREAD_COUNTS, THREAD_COUNTS*2,
60, TimeUnit.SECONDS, taskQueue,new ThreadPoolExecutor.CallerRunsPolicy());
//用于注册工作任务类型 String 就是工作名
private static ConcurrentHashMap<String, JobInfo<?>> jobInfoMap
= new ConcurrentHashMap<String, JobInfo<?>>();
public static Map<String, JobInfo<?>> getMap() {
return jobInfoMap;
}
//以单例模式启动
private PendingJobPool() {
}
private static class JobPoolHolder {
public static PendingJobPool pool = new PendingJobPool();
}
public static PendingJobPool getInstance() {
return JobPoolHolder.pool;
}
//对工作中的任务进行包装,提交给线程池使用,
//并将处理任务的结果,写入缓存以供查询
private static class PendingTask<T, R> implements Runnable {
private JobInfo<R> jobInfo;
//工作的数据
private Task<T> task;
public PendingTask(JobInfo<R> jobInfo, Task task) {
this.jobInfo = jobInfo;
this.task = task;
}
@Override
public void run() {
R r = null;
ITaskExecutor<R> taskProcesser
= jobInfo.getTaskProcesser();
TaskResult<R> result = null;
try {
result = taskProcesser.taskExecute(task);
LocalDateTime now = LocalDateTime.now();
if (result == null) {
result = new TaskResult<R>(TaskResultType.Exception, r
, "result is null",task,now);
}
} catch (Exception e) {
e.printStackTrace();
result = new TaskResult<R>(TaskResultType.Exception, r
, e.getMessage(),task,LocalDateTime.now());
} finally {
jobInfo.addTaskResult(result);
}
}
}
//调用者提交工作中的任务
public <T, R> void putTask( Task<T> t) {
JobInfo<R> jobInfo = getJob(t.getExeCutorName());
PendingTask<T, R> task = new PendingTask(jobInfo, t);
taskExecutor.execute(task);
}
//调用者注册工作,如工作名,任务的处理器等等
public <R> void registerJob(String jobName,
ITaskExecutor<R> taskProcesser) {
JobInfo<R> jobInfo =
new JobInfo<R>(jobName, taskProcesser);
if (jobInfoMap.putIfAbsent(jobName, jobInfo) != null) {
throw new RuntimeException(jobName + "已经注册!");
}
}
//搜索工作的信息
private <R> JobInfo<R> getJob(String jobName) {
JobInfo<R> jobInfo = (JobInfo<R>) jobInfoMap.get(jobName);
if (null == jobInfo) {
throw new RuntimeException(jobName + "是非法任务!");
}
return jobInfo;
}
//获得工作的整体处理进度
public <R> String getTaskProgess(String jobName) {
JobInfo<R> jobInfo = getJob(jobName);
return jobInfo.getTotalProcess();
}
//获得指定的工作的每个任务的处理详情
public <R> List<TaskResult<R>> getTaskDetail(String jobName) {
JobInfo<R> jobInfo = getJob(jobName);
return jobInfo.getTaskDetail();
}
}
因为之前提到不同类型的任务需要有不同类型的处理器,所以就有了这个类JobInfo 里面对于任务类型的整体数据
就比如A类型的任务的配套执行器,A类型任务总共的执行次数,成功次数等,以及A类型任务的所有任务结果都保存在了taskDetailQueues 里面,但是因为这个任务结果时间久了就没任何意义了,所以这任务数据可以去定时删除clearResultQueue(在测试下 判断条件是 所有任务都执行过了,并且最后一次任务的执行时间在15s前)
/***
*类说明:提交给框架执行的工作实体类,
* 工作:表示本批次需要处理的同性质任务(Task)的一个集合
*/
@Data
public class JobInfo<R> {
//工作名,用以区分框架中唯一的工作
private final String jobName;
//处理工作中任务的处理器
private final ITaskExecutor<R> taskProcesser;
//任务的成功次数
private AtomicInteger successCount;
//工作中任务目前已经处理的次数
private AtomicInteger taskProcessCount;
//工作中任务数
private AtomicInteger taskCount;
//存放每个任务的处理结果,供查询用
private List<TaskResult<R>> taskDetailQueues;
public JobInfo(String jobName,
ITaskExecutor<R> taskProcesser
) {
this.jobName = jobName;
this.taskCount = new AtomicInteger(0);;
successCount = new AtomicInteger(0);
taskProcessCount = new AtomicInteger(0);
this.taskProcesser = taskProcesser;
taskDetailQueues = Collections.synchronizedList(new ArrayList<>());
}
//提供工作的整体进度信息
public String getTotalProcess() {
return "Success["+successCount.get()+"]/Current["+taskProcessCount.get()
+"] Total["+ taskCount +"]";
}
//提供工作中每个任务的处理结果
public List<TaskResult<R>> getTaskDetail(){
return taskDetailQueues;
}
//每个任务处理完成后,记录任务的处理结果,因为从业务应用的角度来说,
// 对查询任务进度数据的一致性要不高
// 我们保证最终一致性即可,无需对整个方法加锁
public void addTaskResult(TaskResult<R> taskResult){
if(TaskResultType.Success.equals(taskResult.getResultType())){
successCount.incrementAndGet();
}
taskProcessCount.incrementAndGet();
taskCount.incrementAndGet();
taskDetailQueues.add(taskResult);
}
//删除任务结果队列,清除任务相关数据 ,减少内存空间 ,适合定时任务
public void clearResultQueue(){
successCount = new AtomicInteger(0);
taskProcessCount = new AtomicInteger(0);
taskCount=new AtomicInteger(0);
taskDetailQueues = Collections.synchronizedList(new ArrayList<>());
}
}
代码示例:
定义2个任务的执行器
/**
* 实际的任务执行器 Thread.sleep(taskTime)相当于就是任务的执行
*/
public class ATaskExecutor implements ITaskExecutor<Integer> {
@Override
public TaskResult<Integer> taskExecute(Task task) {
System.out.println("这是工作A在执行相应的任务,任务数据=" + task.toString());
Random r = new Random();
int taskTime = r.nextInt(500);
AppTest.sleep(taskTime);
//任务成功
LocalDateTime processTime = LocalDateTime.now();
if (taskTime <= 200) {
Integer dataResult = 1;
return new TaskResult<Integer>(TaskResultType.Success, dataResult, "success", task, processTime);
} else if (taskTime > 201 && taskTime <= 400) {//任务出现问题
return new TaskResult<Integer>(TaskResultType.Failure, -1, "Failure", task, processTime);
} else {//抛出异常
try {
throw new RuntimeException("发生异常!!");
} catch (Exception e) {
return new TaskResult<Integer>(TaskResultType.Exception,
-2, e.getMessage(), task, processTime);
}
}
}
}
/**
*实际的任务执行器 Thread.sleep(taskTime)相当于就是任务的执行
*/
public class BTaskExecutor implements ITaskExecutor<String> {
@Override
public TaskResult<String> taskExecute(Task task) {
System.out.println("这是工作B,任务数据="+ task.toString());
Random r = new Random();
int taskTime = r.nextInt(500);
try {
Thread.sleep(taskTime);
} catch (InterruptedException e) {
}
LocalDateTime processTime = LocalDateTime.now();
//任务成功
if (taskTime <= 300) {
return new TaskResult<String>(TaskResultType.Success, "任务B执行成功", "success",task,processTime);
} else if (taskTime > 301 && taskTime <= 400) {//任务出现问题
return new TaskResult<String>(TaskResultType.Failure, "任务B执行失败", "Failure",task,processTime);
} else {//抛出异常
try {
throw new RuntimeException("发生异常!!");
} catch (Exception e) {
return new TaskResult<String>(TaskResultType.Exception,
"任务B 因为抛出异常执行失败", e.getMessage(),task,processTime);
}
}
}
}
main 方法执行
public class AppTest {
public static void main(String[] args) throws Exception {
ATaskExecutor aTaskExecutor = new ATaskExecutor();
BTaskExecutor bTaskExecutor = new BTaskExecutor();
PendingJobPool pool = PendingJobPool.getInstance();
//线程池注册工作
pool.registerJob("ATask", aTaskExecutor);
pool.registerJob("BTask", bTaskExecutor);
Random r = new Random();
for (int i = 0; i < 5; i++) {
Task<Integer> aTask = new Task<>();
int aTaskTraceId = r.nextInt(1000);
aTask.setT(r.nextInt(1000));
System.out.println("aTask的traceID=" + aTaskTraceId);
aTask.setTraceId(aTaskTraceId);
aTask.setExeCutorName("ATask");
pool.putTask(aTask);
int bTaskTraceId = r.nextInt(1000);
System.out.println("bTask的traceID=" + bTaskTraceId);
Task<String> bTask = new Task<>();
bTask.setTraceId(bTaskTraceId);
bTask.setExeCutorName("BTask");
bTask.setT(RandomStringUtils.random(3));
pool.putTask(bTask);
}
Map<String, JobInfo<?>> map = PendingJobPool.getMap();
//没有新任务时 删除任务结果队列,清除任务相关数据 ,减少内存空间 ,适合定时任务
new Thread(() -> {
while (true) {
sleep(3000);
map.values().stream().forEach((jobInfo) -> {
LocalDateTime now = LocalDateTime.now();
//删除数据要求是 任务全部执行成功并且最后一次任务的执行时间大于15s
if (jobInfo.getTaskProcessCount().get()!=0&&jobInfo.getTaskProcessCount().get() == jobInfo.getTaskProcessCount().get() && jobInfo.getTaskDetailQueues().get(jobInfo.getTaskProcessCount().get()-1).getProcessTime().isBefore(now.minusSeconds(15))) {
System.out.println(jobInfo.getJobName()+"任务均执行完成,此时最后执行完成的任务的完成时间是" + jobInfo.getTaskDetailQueues().get(jobInfo.getTaskProcessCount().get()-1).getProcessTime());
jobInfo.clearResultQueue();
}
});
}
}).start();
new Thread(()->{
//查看任务进度
while (true) {
sleep(3000);
map.keySet().stream().forEach((jobName) -> {
List<TaskResult<String>> taskDetail = pool.getTaskDetail(jobName);
if (!taskDetail.isEmpty()) {
System.out.println(jobName + "获得工作的整体处理进度=" + pool.getTaskProgess(jobName));
System.out.println(jobName + "任务详情=" + taskDetail);
}else {
System.out.println(jobName + "此时没有相对应的任务");
}
});
}
}).start();
sleep(10000);
Task<String> bTask = new Task<>();
int bTaskTraceId = r.nextInt(1000);
bTask.setTraceId(bTaskTraceId);
bTask.setExeCutorName("BTask");
bTask.setT(RandomStringUtils.random(3));
pool.putTask(bTask);
}
public static void sleep(Integer time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
线程池参数配置
等待队列数=queuePoolSize=(corePoolSize / taskCose) * responseTime
代入数据maxPoolSize=(80/5~10)*1=16~8,也就是说,queuePoolSize的取值范围是8~16,意思是等待队列里面的线程,可以等待8~16秒,超过了这个时间就需要开新的线程来执行。
最大线程池数=maxPoolSize=(max(taskSecond) - queuePoolSize) / (1 / taskCost)=(100-(8~6)) / (1/5~10) = (92~94) / ( 1/ 5~10) =(92~94)* (5~10)=(92*5)~(94*10)=460~940
最大线程数=( 最大任务数-等待队列容量)/ 每个线程每秒处理能力
因为是最近有空了才想到是去封装一下的,还没有在工作中使用过
标签:private,public,任务,线程,new,jobInfo,jobName,自封 From: https://www.cnblogs.com/zz0203/p/17754279.html