多线程大数据批处理任务工具类设计:
-
多线程企业级使用,100%上线程池,问题来了,线程池你怎么配?怎么用?
-
如何保证不丢包?怎么确认全部包裹或数据全部下发完成了?方便你统计或者重试
-
如何做到通用?这次发优惠卷,下次可以复用发短信/二维码/验证码等。
一、线程池参数问题
为了简化配置和方便压测,动态调节线程池参数,这里使用Spring的org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
而不是JUC的java.util.concurrent.ThreadPoolExecutor
。
ThreadPoolTaskExecutor 介绍:
ThreadPoolTaskExecutor
是 Spring 提供的对ThreadPoolExecutor
的封装,提供了更多的功能和更好的集成,适合在 Spring 应用中使用。它提供的一个方便的线程池实现,用于异步执行任务或处理并发请求。
ThreadPoolExecutor
通过构造函数进行配置,而ThreadPoolTaskExecutor
通过 Spring 配置文件或注解进行配置,更加灵活方便,而且提供了更多的扩展功能,例如设置线程名称前缀、允许核心线程超时、设置拒绝策略等。 ThreadPoolTaskExecutor是 Spring 在使用 ThreadPoolTaskExecutor作为 Spring Bean 注册到容器中后,Spring 会负责在应用程序关闭时自动关闭所有注册的线程池, 所以不需要手动关闭。这样不仅可以确保线程池中的线程正确地停止,还可以防止资源泄露和潜在的并发问题。
总的来说,
ThreadPoolExecutor
提供了更高的灵活性和可配置性,适用于需要精细控制线程池行为的场景。而ThreadPoolTaskExecutor
则简化了配置和管理,适用于Spring应用,特别是需要异步任务和调度的场景。
特性 ThreadPoolExecutor
ThreadPoolTaskExecutor
配置复杂度 高,需要手动配置各个参数 低,通过Spring配置文件或注解配置 集成性 需要手动管理线程池的生命周期 与Spring无缝集成,自动管理线程池的生命周期 使用场景 适用于需要高度自定义和精细控制的场景 适用于Spring应用,特别是需要异步任务和调度的场景 拒绝策略 提供多种拒绝策略 默认使用 ThreadPoolExecutor
的拒绝策略线程工厂 需要手动指定 默认使用Spring的线程工厂
二、示例代码
2.1 线程池配置Config
@Data
@Configuration
@ConfigurationProperties(prefix = "thread.pool") //注意前缀
public class ThreadPoolProperties {
/**
* 核心线程池大小
*/
private int corePoolSize;
/**
* 最大可创建的线程数
*/
private int maxPoolSize;
/**
* 队列最大长度
*/
private int queueCapacity;
/**
* 线程池维护线程所允许的空闲时间
*/
private int keepAliveSeconds;
}
@Configuration
public class ThreadPoolConfig
{
/*
@Value("${thread.pool.corePoolSize}")
private String corePoolSize;
@Value("${thread.pool.maxPoolSize}")
private String maxPoolSize;
@Value("${thread.pool.queueCapacity}")
private String queueCapacity;
@Value("${thread.pool.keepAliveSeconds}")
private String keepAliveSeconds;
*/
//线程池配置
@Resource
private ThreadPoolProperties threadPoolProperties;
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor()
{
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
// 核心线程池大小
threadPool.setCorePoolSize(threadPoolProperties.getCorePoolSize());
// 最大可创建的线程数
threadPool.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
// 等待队列最大长度
threadPool.setQueueCapacity(threadPoolProperties.getQueueCapacity());
// 线程池维护线程所允许的空闲时间
threadPool.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
//异步方法内部线程名称
threadPool.setThreadNamePrefix("spring默认线程池-");
// 线程池对拒绝任务(无线程可用)的处理策略
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 任务都完成再关闭线程池
threadPool.setWaitForTasksToCompleteOnShutdown(true);
// 任务初始化
threadPool.initialize();
return threadPool;
}
}
2.2 配置文件application.properties
server.port=24618
spring.application.name=demo
# ========================Thread Pool Config==================
# 可以使用System.out.println(Runtime.getRuntime().availableProcessors());查看电脑核心数
thread.pool.corePoolSize=16
thread.pool.maxPoolSize=32
thread.pool.queueCapacity=50
thread.pool.keepAliveSeconds=2
2.3 主启动类
@SpringBootApplication
@MapperScan("com.zk.mapper") //import tk.mybatis.spring.annotation.MapperScan;
public class ThreadPoolTaskExecutorApplication
{
//先启动微服务测试一次看看电脑核心数,之后注释掉
@Resource
private ThreadPoolTaskExecutor threadPool;
@PostConstruct
public void getThreadPoolConfig()
{
System.out.println("*******测试threadPool getCorePoolSize: "+threadPool.getCorePoolSize());
System.out.println("*******测试threadPool getMaxPoolSize: "+threadPool.getMaxPoolSize());
System.out.println("*******测试threadPool getQueueCapacity: "+threadPool.getQueueCapacity());
System.out.println("*******测试threadPool getKeepAliveSeconds: "+threadPool.getKeepAliveSeconds());
}
public static void main(String[] args)
{
SpringApplication.run(ThreadPoolTaskExecutorApplication.class, args);
}
}
三、业务类
@RestController
@Slf4j
public class CouponController
{
@Resource
private CouponService couponService;
@GetMapping(value = "/coupon/send")
public void send()
{
couponService.batchTaskAction();
}
}
public interface CouponService
{
public void batchTaskAction();
}
@Service
public class CouponServiceImpl implements CouponService {
//下发优惠卷数量
public static final Integer COUPON_NUMBER = 50;
@Resource
private ThreadPoolTaskExecutor threadPool;
/**
* 下发50条优惠卷
*/
@Override
public void batchTaskAction()
{
//1 模拟要下发的50条优惠卷,上游系统给我们的下发优惠卷源头
List<String> coupons = new ArrayList<>(COUPON_NUMBER);
for (int i = 1; i <= COUPON_NUMBER; i++)
{
coupons.add("优惠卷--"+i);
}
//2 创建CountDownLatch,构造器参数为任务数量
CountDownLatch countDownLatch = new CountDownLatch(coupons.size());
long startTime = System.currentTimeMillis();
try
{
//3 将优惠卷集合逐条发送进线程池高并发处理
for (String coupon : coupons)
{
threadPool.execute(() -> {
try
{
//4 交个线程池处理的下发业务逻辑,可以提出成一个方法
System.out.println(String.format("【%s】发送成功", coupon));
}finally {
//5 发送一个少一个任务,计数减少一个
countDownLatch.countDown();
}
});
}
//6 阻塞当前发送完毕后,方法才能继续向下走
countDownLatch.await();
}catch (Exception e){
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println("----任务处理完毕costTime: "+(endTime - startTime) +" 毫秒");
}
}
为什么使用
CountDownLatch
?
- 确保任务完成:
CountDownLatch
可以确保所有提交的任务都已经完成。在所有任务完成之前,主线程会一直阻塞。- 避免丢包:通过
CountDownLatch
,可以确保每个优惠券下发任务都被执行,不会因为主线程提前结束而丢失任务。- 统计和重试:在所有任务完成后,可以统计任务的执行情况。如果需要,可以根据任务的执行结果进行重试或其他处理。
如果有任务失败,在原有代码中添加统计和重试逻辑,代码如下:
@Service public class CouponServiceImpl implements CouponService { // 下发优惠券数量 public static final Integer COUPON_NUMBER = 50; @Resource private ThreadPoolTaskExecutor threadPool; /** * 下发50条优惠券 */ @Override public void batchTaskAction() { // 1. 模拟要下发的50条优惠券,上游系统给我们的下发优惠券源头 List<String> coupons = new ArrayList<>(COUPON_NUMBER); for (int i = 1; i <= COUPON_NUMBER; i++) { coupons.add("优惠券--" + i); } // 2. 创建CountDownLatch,构造器参数为任务数量 CountDownLatch countDownLatch = new CountDownLatch(coupons.size()); AtomicInteger successCount = new AtomicInteger(0); AtomicInteger failureCount = new AtomicInteger(0); List<String> failedCoupons = new ArrayList<>(); long startTime = System.currentTimeMillis(); try { // 3. 将优惠券集合逐条发送进线程池高并发处理 for (String coupon : coupons) { threadPool.execute(() -> { try { // 4. 交给线程池处理的下发业务逻辑,可以提取成一个方法 boolean success = sendCoupon(coupon); if (success) { successCount.incrementAndGet(); } else { failureCount.incrementAndGet(); synchronized (failedCoupons) { failedCoupons.add(coupon); } } } finally { // 5. 发送一个少一个任务,计数减少一个 countDownLatch.countDown(); } }); } // 6. 阻塞当前线程,直到所有任务完成 countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.println("----任务处理完毕,耗时: " + (endTime - startTime) + " 毫秒"); System.out.println("成功发送的优惠券数量: " + successCount.get()); System.out.println("失败发送的优惠券数量: " + failureCount.get()); // 7. 重试失败的任务 if (!failedCoupons.isEmpty()) { System.out.println("开始重试失败的任务..."); retryFailedTasks(failedCoupons); } } private boolean sendCoupon(String coupon) { // 模拟发送优惠券的逻辑,随机成功或失败 boolean success = Math.random() > 0.2; // 80% 成功率 System.out.println(String.format("【%s】发送%s", coupon, success ? "成功" : "失败")); return success; } // 重试失败的任务 private void retryFailedTasks(List<String> failedCoupons) { CountDownLatch retryCountDownLatch = new CountDownLatch(failedCoupons.size()); AtomicInteger retrySuccessCount = new AtomicInteger(0); AtomicInteger retryFailureCount = new AtomicInteger(0); try { for (String coupon : failedCoupons) { threadPool.execute(() -> { try { boolean success = sendCoupon(coupon); if (success) { retrySuccessCount.incrementAndGet(); } else { retryFailureCount.incrementAndGet(); } } finally { retryCountDownLatch.countDown(); } }); } retryCountDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("重试完毕,成功发送的优惠券数量: " + retrySuccessCount.get()); System.out.println("重试完毕,失败发送的优惠券数量: " + retryFailureCount.get()); } }
关键点解释
- 统计成功和失败的任务:
- 使用
AtomicInteger
来统计成功和失败的任务数量。- 使用
List<String>
来记录失败的优惠券。- 任务执行逻辑:
- 在每个任务执行完毕后,根据执行结果更新统计数据。
- 如果任务失败,将失败的优惠券添加到
failedCoupons
列表中。- 重试逻辑:
- 在所有任务完成后,检查
failedCoupons
列表是否为空。- 如果有失败的任务,创建一个新的
CountDownLatch
并重新提交失败的任务进行重试。- 重试任务的执行逻辑与初始任务相同,统计重试的成功和失败数量。
四、高并发批处理下发通用工具类
4.1 工具类
为了代码更加模块化、可复用和易于维护,这里将任务批量发送的逻辑抽取成一个通用的工具类 TaskBatchSendUtils
,下次可以复用发短信/二维码/验证码等。
public class TaskBatchSendUtils
{
public static <T> void send(List<T> taskList, Executor threadPool, Consumer<? super T> consumer) throws InterruptedException
{
if (taskList == null || taskList.size() == 0)
{
return;
}
if(Objects.isNull(consumer))
{
return;
}
CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
for (T couponOrShortMsg : taskList)
{
threadPool.execute(() ->
{
try
{
consumer.accept(couponOrShortMsg);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
}
public static void disposeTask(String task)
{
System.out.println(String.format("【%s】disposeTask下发优惠卷或短信成功", task));
}
public static void disposeTaskV2(String task)
{
System.out.println(String.format("【%s】disposeTask下发邮件成功", task));
}
public static void disposeTaskV3(String task)
{
System.out.println(String.format("【%s】disposeTask下发二维码序号成功", task));
}
}
这里使用泛型和
Consumer
接口,使得send
方法可以处理任意类型的任务(有输入无输出),并且可以传入任意的任务处理逻辑。
4.2 业务类V2版本
public interface CouponServiceV2
{
public void batchTaskActionV2();
}
@Service
public class CouponServiceImplV2 implements CouponServiceV2 {
// 下发优惠卷数量
public static final Integer COUPON_NUMBER = 50;
@Resource
private ThreadPoolTaskExecutor threadPool;
@SneakyThrows
@Override
public void batchTaskActionV2() {
// 1 模拟要下发的50条优惠卷,上游系统给我们的下发优惠卷源头
List<String> coupons = getCoupons();
long startTime = System.currentTimeMillis();
// 2 调用工具类批处理任务,这些优惠卷coupons,放入线程池threadPool,做什么业务disposeTask下发
/*TaskBatchSendUtils.send(coupons, threadPool, new Consumer<String>() {
@Override
public void accept(String coupon) {
TaskBatchSendUtils.disposeTaskV2(coupon);
}
});*/
TaskBatchSendUtils.send(coupons,threadPool,TaskBatchSendUtils::disposeTaskV2);
long endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");
}
private static List<String> getCoupons() {
List<String> coupons = new ArrayList<>(COUPON_NUMBER);
for (int i = 1; i <= COUPON_NUMBER; i++) {
coupons.add("优惠卷--" + i);
}
return coupons;
}
}
@RestController
@Slf4j
public class CouponController{
@Resource
private CouponServiceV2 couponServiceV2;
@GetMapping(value = "/coupon/sendv2")
public void sendv2()
{
couponServiceV2.batchTaskActionV2();
}
}
标签:批处理,System,threadPool,并发,任务,线程,println,多线程,public
From: https://www.cnblogs.com/lyraUU/p/18303734