首页 > 其他分享 >高并发多线程大数据批处理任务工具类设计

高并发多线程大数据批处理任务工具类设计

时间:2024-07-15 18:29:22浏览次数:12  
标签:批处理 System threadPool 并发 任务 线程 println 多线程 public

多线程大数据批处理任务工具类设计:

  1. 多线程企业级使用,100%上线程池,问题来了,线程池你怎么配?怎么用?

  2. 如何保证不丢包?怎么确认全部包裹或数据全部下发完成了?方便你统计或者重试

  3. 如何做到通用?这次发优惠卷,下次可以复用发短信/二维码/验证码等。

一、线程池参数问题

​ 为了简化配置和方便压测,动态调节线程池参数,这里使用Spring的org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor而不是JUC的java.util.concurrent.ThreadPoolExecutor

ThreadPoolTaskExecutor 介绍:

ThreadPoolTaskExecutor 是 Spring 提供的对 ThreadPoolExecutor 的封装,提供了更多的功能和更好的集成,适合在 Spring 应用中使用。它提供的一个方便的线程池实现,用于异步执行任务或处理并发请求。

ThreadPoolExecutor 通过构造函数进行配置,而 ThreadPoolTaskExecutor 通过 Spring 配置文件或注解进行配置,更加灵活方便,而且提供了更多的扩展功能,例如设置线程名称前缀、允许核心线程超时、设置拒绝策略等。

​ ThreadPoolTaskExecutor是 Spring 在使用 ThreadPoolTaskExecutor作为 Spring Bean 注册到容器中后,Spring 会负责在应用程序关闭时自动关闭所有注册的线程池, 所以不需要手动关闭。这样不仅可以确保线程池中的线程正确地停止,还可以防止资源泄露和潜在的并发问题。

​ 总的来说,ThreadPoolExecutor 提供了更高的灵活性和可配置性,适用于需要精细控制线程池行为的场景。而 ThreadPoolTaskExecutor 则简化了配置和管理,适用于Spring应用,特别是需要异步任务和调度的场景。

特性 ThreadPoolExecutor ThreadPoolTaskExecutor
配置复杂度 高,需要手动配置各个参数 低,通过Spring配置文件或注解配置
集成性 需要手动管理线程池的生命周期 与Spring无缝集成,自动管理线程池的生命周期
使用场景 适用于需要高度自定义和精细控制的场景 适用于Spring应用,特别是需要异步任务和调度的场景
拒绝策略 提供多种拒绝策略 默认使用ThreadPoolExecutor的拒绝策略
线程工厂 需要手动指定 默认使用Spring的线程工厂

二、示例代码

2.1 线程池配置Config

@Data
@Configuration
@ConfigurationProperties(prefix = "thread.pool") //注意前缀
public class ThreadPoolProperties {

    /**
     * 核心线程池大小
     */
    private int corePoolSize;

    /**
     * 最大可创建的线程数
     */
    private int maxPoolSize;

    /**
     * 队列最大长度
     */
    private int queueCapacity;

    /**
     * 线程池维护线程所允许的空闲时间
     */
    private int keepAliveSeconds;
}
@Configuration
public class ThreadPoolConfig
{
    /*
    @Value("${thread.pool.corePoolSize}")
    private String corePoolSize;

    @Value("${thread.pool.maxPoolSize}")
    private String maxPoolSize;

    @Value("${thread.pool.queueCapacity}")
    private String queueCapacity;

    @Value("${thread.pool.keepAliveSeconds}")
    private String keepAliveSeconds;
    */

    //线程池配置
    @Resource
    private ThreadPoolProperties threadPoolProperties;

    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();

        // 核心线程池大小
        threadPool.setCorePoolSize(threadPoolProperties.getCorePoolSize());
        // 最大可创建的线程数
        threadPool.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
        // 等待队列最大长度
        threadPool.setQueueCapacity(threadPoolProperties.getQueueCapacity());
        // 线程池维护线程所允许的空闲时间
        threadPool.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
        //异步方法内部线程名称
        threadPool.setThreadNamePrefix("spring默认线程池-");
        // 线程池对拒绝任务(无线程可用)的处理策略
        threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 任务都完成再关闭线程池
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        // 任务初始化
        threadPool.initialize();

        return threadPool;
    }

}

2.2 配置文件application.properties

server.port=24618
spring.application.name=demo
# ========================Thread Pool Config==================
# 可以使用System.out.println(Runtime.getRuntime().availableProcessors());查看电脑核心数
thread.pool.corePoolSize=16
thread.pool.maxPoolSize=32
thread.pool.queueCapacity=50
thread.pool.keepAliveSeconds=2

2.3 主启动类

@SpringBootApplication
@MapperScan("com.zk.mapper") //import tk.mybatis.spring.annotation.MapperScan;
public class ThreadPoolTaskExecutorApplication
{
    //先启动微服务测试一次看看电脑核心数,之后注释掉
    @Resource
    private ThreadPoolTaskExecutor threadPool;

    @PostConstruct
    public void getThreadPoolConfig()
    {
        System.out.println("*******测试threadPool getCorePoolSize: "+threadPool.getCorePoolSize());
        System.out.println("*******测试threadPool getMaxPoolSize: "+threadPool.getMaxPoolSize());
        System.out.println("*******测试threadPool getQueueCapacity: "+threadPool.getQueueCapacity());
        System.out.println("*******测试threadPool getKeepAliveSeconds: "+threadPool.getKeepAliveSeconds());
    }

    public static void main(String[] args)
    {
        SpringApplication.run(ThreadPoolTaskExecutorApplication.class, args);
    }

}

三、业务类

@RestController
@Slf4j
public class CouponController
{
    @Resource
    private CouponService couponService;

    @GetMapping(value = "/coupon/send")
    public void send()
    {
        couponService.batchTaskAction();
    }
}
public interface CouponService
{
    public void batchTaskAction();
}
@Service
public class CouponServiceImpl implements CouponService {
    //下发优惠卷数量
    public  static final Integer COUPON_NUMBER = 50;

    @Resource
    private ThreadPoolTaskExecutor threadPool;

    /**
     * 下发50条优惠卷
     */
    @Override
    public void batchTaskAction()
    {
        //1 模拟要下发的50条优惠卷,上游系统给我们的下发优惠卷源头
        List<String> coupons = new ArrayList<>(COUPON_NUMBER);
        for (int i = 1; i <= COUPON_NUMBER; i++)
        {
            coupons.add("优惠卷--"+i);
        }

        //2 创建CountDownLatch,构造器参数为任务数量
        CountDownLatch countDownLatch = new CountDownLatch(coupons.size());

        long startTime = System.currentTimeMillis();
        try
        {
            //3 将优惠卷集合逐条发送进线程池高并发处理
            for (String coupon : coupons)
            {
                threadPool.execute(() -> {
                    try
                    {
                        //4 交个线程池处理的下发业务逻辑,可以提出成一个方法
                        System.out.println(String.format("【%s】发送成功", coupon));
                    }finally {
                        //5 发送一个少一个任务,计数减少一个
                        countDownLatch.countDown();
                    }
                });
            }
            //6 阻塞当前发送完毕后,方法才能继续向下走
            countDownLatch.await();
        }catch (Exception e){
            e.printStackTrace();
        }

        long endTime = System.currentTimeMillis();
        System.out.println("----任务处理完毕costTime: "+(endTime - startTime) +" 毫秒");
    }
}

为什么使用 CountDownLatch?

  • 确保任务完成CountDownLatch 可以确保所有提交的任务都已经完成。在所有任务完成之前,主线程会一直阻塞。
  • 避免丢包:通过 CountDownLatch,可以确保每个优惠券下发任务都被执行,不会因为主线程提前结束而丢失任务。
  • 统计和重试:在所有任务完成后,可以统计任务的执行情况。如果需要,可以根据任务的执行结果进行重试或其他处理。

如果有任务失败,在原有代码中添加统计和重试逻辑,代码如下:

@Service
public class CouponServiceImpl implements CouponService {
    // 下发优惠券数量
    public static final Integer COUPON_NUMBER = 50;

    @Resource
    private ThreadPoolTaskExecutor threadPool;

    /**
     * 下发50条优惠券
     */
    @Override
    public void batchTaskAction() {
        // 1. 模拟要下发的50条优惠券,上游系统给我们的下发优惠券源头
        List<String> coupons = new ArrayList<>(COUPON_NUMBER);
        for (int i = 1; i <= COUPON_NUMBER; i++) {
            coupons.add("优惠券--" + i);
        }

        // 2. 创建CountDownLatch,构造器参数为任务数量
        CountDownLatch countDownLatch = new CountDownLatch(coupons.size());
        
        AtomicInteger successCount = new AtomicInteger(0);
        AtomicInteger failureCount = new AtomicInteger(0);
        List<String> failedCoupons = new ArrayList<>();

        long startTime = System.currentTimeMillis();
        try {
            // 3. 将优惠券集合逐条发送进线程池高并发处理
            for (String coupon : coupons) {
                threadPool.execute(() -> {
                    try {
                        // 4. 交给线程池处理的下发业务逻辑,可以提取成一个方法
                        boolean success = sendCoupon(coupon);
                        if (success) {
                            successCount.incrementAndGet();
                        } else {
                            failureCount.incrementAndGet();
                            synchronized (failedCoupons) {
                                failedCoupons.add(coupon);
                            }
                        }
                    } finally {
                        // 5. 发送一个少一个任务,计数减少一个
                        countDownLatch.countDown();
                    }
                });
            }
            // 6. 阻塞当前线程,直到所有任务完成
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }

        long endTime = System.currentTimeMillis();
        System.out.println("----任务处理完毕,耗时: " + (endTime - startTime) + " 毫秒");
        System.out.println("成功发送的优惠券数量: " + successCount.get());
        System.out.println("失败发送的优惠券数量: " + failureCount.get());

        // 7. 重试失败的任务
        if (!failedCoupons.isEmpty()) {
            System.out.println("开始重试失败的任务...");
            retryFailedTasks(failedCoupons);
        }
    }

    private boolean sendCoupon(String coupon) {
        // 模拟发送优惠券的逻辑,随机成功或失败
        boolean success = Math.random() > 0.2; // 80% 成功率
        System.out.println(String.format("【%s】发送%s", coupon, success ? "成功" : "失败"));
        return success;
    }

    // 重试失败的任务
    private void retryFailedTasks(List<String> failedCoupons) {
        CountDownLatch retryCountDownLatch = new CountDownLatch(failedCoupons.size());
        AtomicInteger retrySuccessCount = new AtomicInteger(0);
        AtomicInteger retryFailureCount = new AtomicInteger(0);

        try {
            for (String coupon : failedCoupons) {
                threadPool.execute(() -> {
                    try {
                        boolean success = sendCoupon(coupon);
                        if (success) {
                            retrySuccessCount.incrementAndGet();
                        } else {
                            retryFailureCount.incrementAndGet();
                        }
                    } finally {
                        retryCountDownLatch.countDown();
                    }
                });
            }
            retryCountDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("重试完毕,成功发送的优惠券数量: " + retrySuccessCount.get());
        System.out.println("重试完毕,失败发送的优惠券数量: " + retryFailureCount.get());
    }
}

关键点解释

  1. 统计成功和失败的任务
    • 使用 AtomicInteger 来统计成功和失败的任务数量。
    • 使用 List<String> 来记录失败的优惠券。
  2. 任务执行逻辑
    • 在每个任务执行完毕后,根据执行结果更新统计数据。
    • 如果任务失败,将失败的优惠券添加到 failedCoupons 列表中。
  3. 重试逻辑
    • 在所有任务完成后,检查 failedCoupons 列表是否为空。
    • 如果有失败的任务,创建一个新的 CountDownLatch 并重新提交失败的任务进行重试。
    • 重试任务的执行逻辑与初始任务相同,统计重试的成功和失败数量。

四、高并发批处理下发通用工具类

4.1 工具类

​ 为了代码更加模块化、可复用和易于维护,这里将任务批量发送的逻辑抽取成一个通用的工具类 TaskBatchSendUtils,下次可以复用发短信/二维码/验证码等。

public class TaskBatchSendUtils
{
    public static <T> void send(List<T> taskList, Executor threadPool, Consumer<? super T> consumer) throws InterruptedException
    {
        if (taskList == null || taskList.size() == 0)
        {
            return;
        }

        if(Objects.isNull(consumer))
        {
            return;
        }

        CountDownLatch countDownLatch = new CountDownLatch(taskList.size());

        for (T couponOrShortMsg : taskList)
        {
            threadPool.execute(() ->
            {
                try
                {
                    consumer.accept(couponOrShortMsg);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
    }

    public static void disposeTask(String task)
    {
        System.out.println(String.format("【%s】disposeTask下发优惠卷或短信成功", task));
    }

    public static void disposeTaskV2(String task)
    {
        System.out.println(String.format("【%s】disposeTask下发邮件成功", task));
    }

    public static void disposeTaskV3(String task)
    {
        System.out.println(String.format("【%s】disposeTask下发二维码序号成功", task));
    }
}

这里使用泛型和 Consumer 接口,使得 send 方法可以处理任意类型的任务(有输入无输出),并且可以传入任意的任务处理逻辑。

4.2 业务类V2版本

public interface CouponServiceV2
{
    public void batchTaskActionV2();
}
@Service
public class CouponServiceImplV2 implements CouponServiceV2 {
    // 下发优惠卷数量
    public static final Integer COUPON_NUMBER = 50;

    @Resource
    private ThreadPoolTaskExecutor threadPool;

    @SneakyThrows
    @Override
    public void batchTaskActionV2() {
        // 1 模拟要下发的50条优惠卷,上游系统给我们的下发优惠卷源头
        List<String> coupons = getCoupons();

        long startTime = System.currentTimeMillis();

        // 2 调用工具类批处理任务,这些优惠卷coupons,放入线程池threadPool,做什么业务disposeTask下发
        /*TaskBatchSendUtils.send(coupons, threadPool, new Consumer<String>() {
            @Override
            public void accept(String coupon) {
                TaskBatchSendUtils.disposeTaskV2(coupon);
            }
        });*/
        TaskBatchSendUtils.send(coupons,threadPool,TaskBatchSendUtils::disposeTaskV2);


        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");

    }

    private static List<String> getCoupons() {
        List<String> coupons = new ArrayList<>(COUPON_NUMBER);
        for (int i = 1; i <= COUPON_NUMBER; i++) {
            coupons.add("优惠卷--" + i);
        }
        return coupons;
    }
}
@RestController
@Slf4j
public class CouponController{
    @Resource
    private CouponServiceV2 couponServiceV2;
    
    @GetMapping(value = "/coupon/sendv2")
    public void sendv2()
    {
        couponServiceV2.batchTaskActionV2();
    }
}

标签:批处理,System,threadPool,并发,任务,线程,println,多线程,public
From: https://www.cnblogs.com/lyraUU/p/18303734

相关文章

  • MySQL PXC集群多个节点同时大量并发update同一行
    如本文标题,MySQLPXC集群多个节点同时大量并发update同一行数据,会怎样?为此,本人做了一个测试,来验证到底会怎样!一、生成测试数据mysql>CREATETABLEtest(->`a`int(11)NOTNULLDEFAULT0,->`b`int(11)DEFAULTNULL,->`c`int(11)DEFAULTNULL,......
  • MySQL的并发问题的解决方案
    怎么解决脏读、不可重复读、幻读这些问题呢?其实有两种可选的解决方案方案一、读操作利用MVCC(多版本并发控制),写操作进行加锁。所谓的MVCC,就是生成一个ReadView,通过ReadView找到符合条件的记录版本(历史版本由undolog日志构成)。查询语句只能读到在生成ReadView之前已提交事所做的......
  • Go语言--广播式并发聊天服务器
    实现功能每个客户端上线,服务端可以向其他客户端广播上线信息;发送的消息可以广播给其他在线的客户支持改名支持客户端主动退出支持通过who查找当前在线的用户超时退出流程变量用户结构体保存用户的管道,用户名以及网络地址信息typeClientstruct{ Cchanstring......
  • java操作Oracle 方式二 ( 多线程 )
    多线程方式 也是 连接-》操作-》断开连接  这样的操作过程,只是采用了多线程这种方式的特点是每次都是新的连接,多线程,解决了网络环境不好时连接oracle比较费时,影响主程序其它功能的响应。OracleUtil.java基础类代码详见:https://www.cnblogs.com/hailexuexi/p/1830273......
  • Linux多线程
    多线程1.线程概念1.1地址空间和页表的映射关系地址空间是进程能看到的拥有的资源的大小页表决定了进程实际拥有资源的大小和位置所以对进程地址空间和页表进行适当的资源划分,就可以对一个进程的所有资源进行分类进程地址空间如何与页表和物理内存产生映射的过程首先进......
  • Java中的并发集合详解
    Java中的并发集合详解大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!介绍在并发编程中,安全地操作共享数据是一项关键任务。Java提供了一系列的并发集合类,用于在多线程环境下安全地操作数据。本文将详细讨论Java中几种常用的并发集合,包括并发映射、并发......
  • 使用分布式锁解决淘客返利系统中的并发问题
    使用分布式锁解决淘客返利系统中的并发问题大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!在大型淘客返利系统中,高并发是一个常见的挑战。为了保证数据的一致性和系统的稳定性,我们需要有效地管理并发访问,特别是在涉及关键资源或业务操作时。本文将......
  • 使用Java实现高并发编程
    使用Java实现高并发编程大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!今天我们来讨论Java中的高并发编程。Java提供了丰富的并发编程工具和框架,包括线程、线程池、并发集合和锁机制等。本文将通过代码示例详细介绍如何使用这些工具实现高并发编程。1.......
  • java基础篇(java多线程)
    在Java中,多进程通常指的是通过创建新的操作系统进程来执行任务。Java提供了ProcessBuilder和Runtime.exec()方法来实现这一点。以下是一个简单的示例代码,展示了如何使用ProcessBuilder创建一个新的进程。示例代码importjava.io.BufferedReader;importjava.io.IOExceptio......
  • [Java并发]
    sleep()、wait()、join()、yield()yield在Java中,yield()是一个Thread类的方法,它用于暂停当前线程并允许其他线程运行。它是一种线程调度的建议,告诉调度器可以切换到其他就绪状态的线程。调用yield()方法后,线程会从运行状态转换为就绪状态,让其他线程有机会执行。下面是一个简单......