首页 > 其他分享 >7、定时进行数据批处理任务

7、定时进行数据批处理任务

时间:2022-08-15 17:24:00浏览次数:80  
标签:log 批处理 任务 线程 StopWatch 定时 public

一、StopWatch时间控制类:

StopWatch 是spring工具包org.springframework.util下的一个工具类,主要用于计算同步单线程执行时间。

1、StopWatch优缺点:

优点:

(1)、spring自带工具类,可直接使用;

(2)、代码实现简单,使用更简单;

(3)、统一归纳,展示每项任务耗时与占用总时间的百分比,展示结果直观;

(4)、性能消耗相对较小,并且最大程度的保证了start与stop之间的时间记录的准确性;

(5)、可在start时直接指定任务名字,从而更加直观的显示记录结果;

缺点:

(1)、一个StopWatch实例一次只能开启一个task,不能同时start多个task,并且在该task未stop之前不能start一个新的task,必须在该task stop之后才能开启新的task,若要一次开启多个,需要new不同的StopWatch实例;

(2)、代码侵入式使用,需要改动多处代码;

2、StopWatch模板:

    public Object stopWatchDemo(){
        StopWatch stopWatch = new StopWatch("StopWatch测量业务逻辑的运行时间");
        stopWatch.start();
        log.info("====================StopWatch计时任务开始========================");

        //业务逻辑1
        //业务逻辑2
        //......

        stopWatch.stop();
        log.info("====================StopWatch计时任务结束========================");
        log.info(stopWatch.prettyPrint());  //打印执行时间
        return null;
    }

 

二、数据的批处理:

Java中的批处理用于执行一组插入操作或批处理,因为一次又一次地执行单个插入会浪费时间并降低性能。因此,使用批处理可以一次执行多个插入操作,这样可以提高程序的性能。

1、批处理方式一:

    @Autowired
    private SqlSessionFactory sqlSessionFactory;

    /**
     * 批处理插入
     */
    private void upSave1(List<BatchSaveVO> itemList) {
        SqlSession session = null;
        try {
            // 新获取一个模式为BATCH,自动提交为false的session
            // 如果自动提交设置为true,将无法控制提交的条数,改为最后统一提交,可能导致内存溢出
            session = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
            BatchSaveMapper mapper = session.getMapper(BatchSaveMapper.class);
            int batchCount = 1000;// 每批commit的个数
            int batchLastIndex = batchCount;// 每批最后一个的下标
            for (int index = 0; index < itemList.size(); ) {
                if (batchLastIndex >= itemList.size()) {
                    batchLastIndex = itemList.size();
                    log.info("index:{}", batchLastIndex);
                    mapper.batchInsert(itemList.subList(index, batchLastIndex));
                    session.commit();
                    log.info("index:" + index + " batchLastIndex:" + batchLastIndex);
                    log.info("数据插入完毕");
                    break;// 数据插入完毕,退出循环
                } else {
                    mapper.batchInsert(itemList.subList(index, batchLastIndex));
                    session.commit();
                    log.info("index:" + index + " batchLastIndex:" + batchLastIndex);
                    index = batchLastIndex;// 设置下一批下标
                    batchLastIndex = index + (batchCount - 1);
                }
            }
            session.commit();
        } finally {
            session.close();
        }
    }

2、批处理方式二:

    @Autowired
    private SqlSessionTemplate sqlSessionTemplate;

    private void upSave2(List<BatchSaveVO> newReports) {
        SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
        // 如果自动提交设置为true,将无法控制提交的条数,改为最后统一提交,可能导致内存溢出
        SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, true);
        try {
            Optional.ofNullable(newReports).ifPresent(reports -> {
                BatchSaveMapper batchMapper = sqlSession.getMapper(BatchSaveMapper.class);
                batchMapper.batchInsert(newReports);
                sqlSession.commit();
            });
        }catch (Exception e){
            sqlSession.rollback();
            log.error("================保存失败================:{}",e.getMessage());
        }finally {
            sqlSession.close();
        }
    }

3、SqlSessionFactory详解:

(1)、介绍:

SqlSessionFactory是MyBatis的关键字,它是单个数据库映射关系经过编译后的内存镜像,SqlSessionFactory对象的实例可以通过SqlSessionFactoryBuilder对象来获得,而SqlSessionFactoryBuildr则可以从XML配置文件或一个预先定制的Configuration的实例构建出SqlSessionFactory的实例每一个MyBatis的应用程序都以一个SqlSessionFactory对象的实例为核心。SqlSessionFactory是线程安全的,他一旦被创建,应该在应用执行期间都存在,在应用运行期间不要重复创建多次,建议使用单例模式,SqlSessionFactory是创建SqlSession的工厂。

(2)、SqlSessionFactory.openSession()方法:

//新获取一个模式为BATCH,自动提交为false的session
// 如果自动提交设置为true,将无法控制提交的条数,改为最后统一提交,可能导致内存溢出
SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);

(3)、ExecutorType三种SQL执行器模式:

1)、ExecutorType.SIMPLE:相当于JDBC的stmt.execute(sql) 执行完毕即关闭即stmt.close()

2)、ExecutorType.REUSE:相当于JDBC的stmt.execute(sql) 执行完不关闭,而是将stmt存入Map<String, Statement>中缓存,其中key为执行的sql模板;

3)、ExecutorType.BATCH:相当于JDBC语句的stmt.addBatch(sql),即仅将执行SQL加入到批量计划但是不真正执行,所以此时不会执行返回受影响的行数,而只有执行stmt.execteBatch()后才会真正执行sql。

方式

优势

劣势

ExecutorType.SIMPLE

默认执行器, 节约服务器资源

每次都要开关Statement

ExecutorType.REUSE

提升后端接口处理效率

每次一个新sql都缓存,增大JVM内存压力

ExecutorType.BATCH

专门用于更新插入操作,效率最快

对select 不适用,另外特殊要求,比如限制一次execteBatch的数量时需要写过滤器定制

 

 

三、定时任务:

1、@Schedule定时任务方式:

在日常开发中比较简单的实现方式就是使用Spring的@Scheduled()注解。但是在修改服务器时间时会导致定时任务不执行情况的发生,解决的办法是当修改服务器时间后,将服务进行重启就可以避免此现象的发生。

(1)、@Schedule详解:

@Schedule详解参考

(2)、@Schedule的使用:

要使用@Scheduled注解,首先需要在相关类上添加@EnableScheduling,启用Spring的计划任务执行功能,这样可以在容器中的任何Spring管理的bean上检测@Scheduled注解,执行计划任务。

    @Scheduled(cron = "0 0 1 * * ?")
    public Object batchSaveWay() {
        StopWatch stopWatch = new StopWatch("@Scheduled定时任务");
        stopWatch.start();
        log.info("====================@Scheduled定时任务开始========================");

        //业务逻辑

        stopWatch.stop();
        log.info("====================@Scheduled定时任务结束========================");
        log.info(stopWatch.prettyPrint());
        return null;
    }

 

2、SchedulingConfigurer接口线程定时任务方式:

@Schedule 注解有一个缺点,其定时的时间不能动态的改变,而基于SchedulingConfigurer 接口的方式可以做到(例如将cron表达式存入数据库,通过修改数据中的cron表达式,动态改变定时任务的执行周期,而不需要重启项目。)

(1)、操作流程:

1)、声明ThreadPoolTaskScheduler配置类:

@Configuration
public class TaskPoolConfig {
    /**
     * ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JDK中的JUC。
     * ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。
     *
     * 拒绝策略:
     * (1)、CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用 线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。
     *     但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
     * (2)、AbortPolicy: 丢弃任务,并抛出拒绝执行
     * (3)、RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
     * (4)、DiscardPolicy: 直接丢弃,其他啥都没有
     * (5)、DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
     */

    @Bean
    public ThreadPoolTaskScheduler taskSchedulerExecutor() {
        // 创建一个线程池对象
        final ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        // 定义一个线程池大小
        scheduler.setPoolSize(100);
        // 线程池名的前缀
        scheduler.setThreadNamePrefix("taskExecutor-");
        // 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
        scheduler.setWaitForTasksToCompleteOnShutdown(true);
        // 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
        scheduler.setAwaitTerminationSeconds(60);
        // 线程池对拒绝任务的处理策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
        scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return scheduler;
    }
}

2)、实现SchedulingConfigurer接口:

@EnableScheduling
@Component
@Configuration
public abstract class ThreadSchedulingConfigurerWay implements SchedulingConfigurer {
    @Autowired
    private TaskPoolConfig taskPoolConfig;

    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
        scheduledTaskRegistrar.setScheduler(taskPoolConfig.taskSchedulerExecutor());
        scheduledTaskRegistrar.addTriggerTask(new Runnable() {
            @Override
            public void run() {
                // 定时任务要执行的内容
                executionWay();
            }
        }, new Trigger() {
            @Override
            public Date nextExecutionTime(TriggerContext triggerContext) {
                // 定时任务触发,可修改定时任务的执行周期
                String cron = getCron(); //可以将表达式配置在数据库中,动态改变定时的时间
                CronTrigger trigger = new CronTrigger(cron);
                Date nextExecDate = trigger.nextExecutionTime(triggerContext);
                return nextExecDate;
            }
        });
    }

    public abstract String getCron();
    public abstract void executionWay();
    
}

3)、实现定时线程任务:

任务一:

//定时线程任务一
@Configuration
public class ThreadSchedulingConfigurerWayOneImpl extends ThreadSchedulingConfigurerWay {
    @Override
    public String getCron() {
        String cron = "0 0/1 * * * ?";
        return cron;
    }

    @Override
    public void executionWay() {
        System.out.println("定时线程任务demo1:"
                + LocalDateTime.now()+",线程名称:"+Thread.currentThread().getName()
                + " 线程id:"+Thread.currentThread().getId());
    }
}

任务二:

//定时线程任务二
@Configuration
public class ThreadSchedulingConfigurerWayTwoImpl extends ThreadSchedulingConfigurerWay {
    @Override
    public String getCron() {
        String cron = "0 0/1 * * * ?";
        return cron;
    }

    @Override
    public void executionWay() {
        System.out.println("定时线程任务demo2:"
                + LocalDateTime.now()+",线程名称:"+Thread.currentThread().getName()
                + " 线程id:"+Thread.currentThread().getId());
    }
}

(2)、ThreadPoolTaskExecutor和ThreadPoolTaskScheduler的区别:

相关参考

 

3、Quartz框架定时任务方式:

Quartz框架定时任务默认都是并发执行的,不会等待上一次任务执行完毕,只要间隔时间到就会执行,如果定时任执行太长,会长时间占用资源,导致其它任务堵塞。

(1)、添加Quartz依赖:

<!-- quartz定时框架-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

(2)、声明Job任务(继承QuartzJobBean抽象类):

@Slf4j
public class QuartzBatchSaveJob extends QuartzJobBean {
    @Autowired
    @Qualifier("QuartzWay")
    private BatchSaveService quartzWay;

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        //定时任务的业务逻辑
        quartzWay.batchSaveWay();
    }
}

(3)、编写业务类:

    public Object batchSaveWay() {
        StopWatch stopWatch = new StopWatch("Quartz定时框架启动任务");
        stopWatch.start();
        log.info("====================Quartz定时框架启动任务开始========================");

        //业务逻辑

        stopWatch.stop();
        log.info("====================Quartz定时框架启动任务结束========================");
        log.info(stopWatch.prettyPrint());
        return null;
    }

(4)、将Job任务注册到Spring容器中:

@Configuration
public class QuartzConfig {
    //Quartz定时框架启动任务
    @Bean
    public JobDetail businessJobDetail(){
        return JobBuilder.newJob(QuartzBatchSaveJob.class)
                .withDescription("Quartz定时框架启动任务")
                .withIdentity(QuartzBatchSaveJob.class.getSimpleName())
                .storeDurably().build();
    }
    @Bean
    public Trigger businessJobTrigger() {
        String cron = "0 0 1 * * ?";
        CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cron);
        return TriggerBuilder.newTrigger().forJob(businessJobDetail())
                .withIdentity("QuartzBatchSaveJobTrigger")
                .withSchedule(cronScheduleBuilder).build();
    }
}

 

 

 

 

 

 

 

 

搜索

复制

<iframe height="240" width="320"></iframe>

标签:log,批处理,任务,线程,StopWatch,定时,public
From: https://www.cnblogs.com/Iven-L/p/16587692.html

相关文章

  • 批处理命令—set
    set命令:设置变量(1)set显示环境变量set会显示当前所有的系统环境变量setc会显示所有以c或C开头的变量。(即:不区分大小写)setcom显示所有以com或......
  • cmd批处理中使用mvn命令 解决自动退出窗口问题
    为了方便打包程序,在批处理中可以使用mvn命令(mvncleanpackage-DskipTests)但使用后发现,mvn命令后面的批处理命令不会执行,黑窗口会直接退出。原因:mvn本身是一个批处理命......
  • Alchemy Nft黑客松任务(第一周)
    Alchemy是什么项目?2019年12月,Alchemy完成1500万美元A轮融资,资方为PanteraCapital,斯坦福大学,Coinbase,三星等。2021年4月,Alchemy以5.05亿美元估值完成8000万美元B轮融资,Co......
  • 转:批处理bat命令 获取当前盘符和当前目录和上级目录的代码
     批处理命令获取当前盘符和当前目录当前盘符:%~d0当前路径:%cd%当前执行命令行:%0当前bat文件路径:%~dp0当前bat文件短路径:%~sdp0测试12345echo当前盘符:%......
  • 如何可视化编写和编排你的 K8s 任务
    作者:学仁简介K8sJob是Kubernetes中的一种资源,用来处理短周期的Pod,相当于一次性任务,跑完就会把Pod销毁,不会一直占用资源,可以节省成本,提高资源利用率阿里任务调度......
  • nginx的日志切割-每天定时脚本执行
    日志切割时企业中常规动作,我们不会每天去手工执行一遍命令,通常是让脚本自己执行,于是我们可以将这个动作写进任务计划,每天凌晨自动执行。#!/usr/bin/bashs_log="/usr/l......