首页 > 编程语言 >Java项目开发中异步调用场景控制并发数

Java项目开发中异步调用场景控制并发数

时间:2024-06-19 21:45:24浏览次数:30  
标签:异步 Java int batch 并发 concurrency total public

场景

项目基于SpringBoot搭建,默认使用Tomcat Web容器,对于每个HTTP请求,Tomcat Web容器会分配1个线程来处理请求。

在pom.xml里查看依赖关系: spring-boot-starter-web添加了tomcat-embed-core依赖

Tomcat线程池配置可在application.yml配置:

server:
  tomcat:
    max-threads: 500

此外,在日常开发中常见有两种用到多线程异步执行的场景:

  1. 服务内部启动1个或多个自定义的业务线程池,用于异步并发处理业务
  2. 调用的中间件或者第三方它本身是异步执行的

第1种场景,由于线程池是开发自己定义的,可配置线程池的核心/最大线程数、阻塞队列、拒绝策略等参数,
通过评估实际需求、资源占用等来进行配置参数,保证服务的稳定性、健壮性、可控性,
如:线程数过大导致服务器鸭梨过大影响性能、队列不是无限大避免OOM等。

而第2种场景,由于多线程、异步不是服务内的,开发经常容易忽略稳定性、可控性。

以使用ElasticSearch为例,RestHighLevelClient客户端提供了多个异步执行的方法,如:updateByQueryAsyncbulkAync等,
由于这些异步方法没有阻塞当前线程,能让接口快速返回,调用来确实很方便,但这隐藏了不可控的"风险"。

举例:
搜索服务里有商品索引,索引里包含商品、店铺、库存、价格等信息,
当某个商品的信息发生变动时,索引里商品相关字段需要更新,
一个商品可能在很多个店铺上架,那么需要批量更新索引里多个文档;

假设商品变动是商品服务通过MQ队列来下发商品变动消息,搜索服务收到消息后调用updateByQueryAsync来批量更新,
由于updateByQueryAsync是异步执行的,在消息监听处理里调用起来很快,通常消息消费者也是多线程的,
这样单个消息处理很快,如果短时间内消息很多,会导致大量updateByQueryAsync调用,这些异步调用由ElasticSearch
处理,而ElasticSearch内部也有线程池和队列,可能出现以下问题:

  1. ElasticSearch服务器有大量的批量更新处理,可能出现索引文档写入冲突,同时服务器CPU/内存/IO/GC等压力变大;
  2. 由于ElasticSearch服务器压力变大,可能影响其它非写入业务的正常运行,如商品搜索相关业务响应变慢或者超时;
  3. ElasticSearch内部的线程数、队列占满,会出现任务拒绝执行的异常,导致某些更新失败,影响搜索结果的正确性;

调整ElasticSearch写入线程池的配置是一个方法,这是站在被调用方的优化思路;

另一个思路是:当遇到这种调中间件或第三方接口本身是异步的场景,由调用方主动做一些控制,
如控制调用的频率、调用并发数等,为系统整体的可用性、稳定性考虑,而不是简单发起调用后就不管。

思路

那么如何控制呢?
考虑到调用方发起调用后,这个调用是由中间件、第三方处理,它们可能是多线程并发执行的,对于发起的1次调用,即有1个线程来处理,处理结束后该线程空闲出来;
因此调用方考虑对同时发起的并发调用数进行控制,比如限制并发调用数最大为50,保证不让被调用方同时超过50个调用异步执行;
发起每次异步调用后,需要拿到调用结果,这样才知道这次调用是否处理完成,从而控制是否发起新的调用。

对于RestHighLevelClient#updateByQueryAsync(UpdateByQueryRequest updateByQueryRequest, RequestOptions options, ActionListener<BulkByScrollResponse> listener)方法,
第3个参数是ActionListener接口,其中onResponse方法里可添加完成回调的逻辑。

方法1---用CountDownLatch来控制:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.springframework.util.Assert;

import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;

/**
 * 限制批处理的异步任务执行器
 * <p>
 * 1. 任务本身需是异步执行的
 * 2. 此执行器控制异步执行并发数, 避免大量并发导致系统负载过大
 *
 * @author cdfive
 */
@Slf4j
public class LimitBatchAsyncTaskExecutor {

    // 每批次处理数
    private int batch;

    // 跟踪id
    private String traceId;

    public LimitBatchAsyncTaskExecutor(int batch) {
        this(batch, CommonUtil.getTraceId());
    }

    public LimitBatchAsyncTaskExecutor(int batch, String traceId) {
        Assert.isTrue(batch > 0, "batch must be greater than 0");
        this.batch = batch;
        this.traceId = traceId;
    }

    public <T> void executeTasks(Collection<T> tasks, AsyncTaskExecutor<T> asyncTaskExecutor) {
        Assert.isTrue(tasks != null && tasks.size() > 0, "tasks can't be empty");
        this.executeTasks(tasks.iterator(), tasks.size(), asyncTaskExecutor);
    }

    public <T> void executeTasks(Iterator<T> tasks, int total, AsyncTaskExecutor<T> asyncTaskExecutor) {
        Assert.notNull(tasks, "tasks can't be null");
        Assert.isTrue(total > 0, "total must be greater than 0");
        Assert.notNull(asyncTaskExecutor, "asyncTaskExecutor can't be null");

        log.info(traceId + ",LimitBatchAsyncTaskExecutor executeTasks start,total={},batch={}", total, batch);
        long totalStart = System.currentTimeMillis();
        long batchStart = System.currentTimeMillis();

        CountDownLatch latch = null;
        Runnable callback = null;
        int index = 0;
        int batchIndex = 0;
        int batchTotal = (total / batch) + (total % batch == 0 ? 0 : 1);
        while (tasks.hasNext()) {
            T task = tasks.next();
            index++;
            if (latch == null) {
                batchStart = System.currentTimeMillis();
                batchIndex++;
                latch = new CountDownLatch((batchIndex < batchTotal) ? batch : (total - index + 1));
                CountDownLatch finalLatch = latch;
                callback = new Runnable() {
                    @Override
                    public void run() {
                        finalLatch.countDown();
                    }
                };
            }

            Context context = new Context(index, total, batchIndex, batchTotal);
            log.info(traceId + ",LimitBatchAsyncTaskExecutor executeTask start,batch={},context={}", batch, context);
            long start = System.currentTimeMillis();
            Runnable finalCallback = callback;
            asyncTaskExecutor.executeTask(task, () -> {
                log.info(traceId + ",LimitBatchAsyncTaskExecutor executeTask done,cost={}ms,batch={},context={}", (System.currentTimeMillis() - start), batch, context);
                finalCallback.run();
            }, context);

            if (index % batch == 0 || index == total) {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    log.error(traceId + ",LimitBatchAsyncTaskExecutor await error", e);
                }

                latch = null;
                callback = null;
                log.info(traceId + ",LimitBatchAsyncTaskExecutor batch done,cost={}ms,index=({}/{}),batchIndex=({}/{})"
                        , (System.currentTimeMillis() - batchStart), index, total, batchIndex, batchTotal);
            }
        }

        log.info(traceId + ",LimitBatchAsyncTaskExecutor executeTasks success,total cost={}ms,batch={}", (System.currentTimeMillis() - totalStart), batch);
    }

    public int getBatch() {
        return batch;
    }

    public String getTraceId() {
        return traceId;
    }

    /**
     * 异步任务执行器
     */
    public static interface AsyncTaskExecutor<T> {

        /**
         * 执行任务,任务本身需是异步执行的
         */
        void executeTask(T task, Runnable callback, Context context);
    }

    /**
     * 上下文
     */
    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    public static class Context implements Serializable {

        private static final long serialVersionUID = 2916376548269524746L;

        // 下标
        private int index;

        // 总数
        private int total;

        // 批量下标
        private int batchIndex;

        // 批量总数
        private int batchTotal;

        @Override
        public String toString() {
            return ToStringBuilder.reflectionToString(this, ToStringStyle.JSON_STYLE);
        }
    }

    @Override
    public String toString() {
        return "LimitBatchAsyncTaskExecutor{" +
                "batch=" + batch +
                ", traceId='" + traceId + '\'' +
                '}';
    }
}

LimitBatchAsyncTaskExecutor类的batch字段,表示每批次处理的数量,通过CountDownLatch和回调接口,
控制异步任务每批次最多同时执行的任务数量,当每批次任务执行完成后,开始执行下个批次的任务;注意最后1批次执行任务数
可能小于batch

方法2---用Semaphore来控制:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.springframework.util.Assert;

import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Semaphore;

/**
 * 限制并发度的异步任务执行器
 * <p>
 * 1. 任务本身需是异步执行的
 * 2. 此执行器控制异步执行并发数, 避免大量并发导致系统负载过大
 *
 * @author cdfive
 */
@Slf4j
public class LimitConcurrencyAsyncTaskExecutor {

    // 并发数
    private int concurrency;

    // 跟踪id
    private String traceId;

    public LimitConcurrencyAsyncTaskExecutor(int concurrency) {
        this(concurrency, CommonUtil.getTraceId());
    }

    public LimitConcurrencyAsyncTaskExecutor(int concurrency, String traceId) {
        Assert.isTrue(concurrency > 0, "concurrency must be greater than 0");
        this.concurrency = concurrency;
        this.traceId = traceId;
    }

    public <T> void executeTasks(Collection<T> tasks, AsyncTaskExecutor<T> asyncTaskExecutor) {
        Assert.isTrue(tasks != null && tasks.size() > 0, "tasks can't be empty");
        this.executeTasks(tasks.iterator(), tasks.size(), asyncTaskExecutor);
    }

    public <T> void executeTasks(Iterator<T> tasks, int total, AsyncTaskExecutor<T> asyncTaskExecutor) {
        Assert.notNull(tasks, "tasks can't be null");
        Assert.isTrue(total > 0, "total must be greater than 0");
        Assert.notNull(asyncTaskExecutor, "asyncTaskExecutor can't be null");

        log.info(traceId + ",LimitConcurrencyAsyncTaskExecutor executeTasks start,total={},concurrency={}", total, concurrency);
        long totalStart = System.currentTimeMillis();
        long batchStart = System.currentTimeMillis();

        Semaphore semaphore = new Semaphore(concurrency);
        Runnable callback = new Runnable() {
            @Override
            public void run() {
                semaphore.release();
            }
        };
        int index = 0;
        while (tasks.hasNext()) {
            T task = tasks.next();
            index++;

            semaphore.acquireUninterruptibly();
            Context context = new Context(index, total);
            log.info(traceId + ",LimitConcurrencyAsyncTaskExecutor executeTask start,concurrency={},context={}", concurrency, context);
            long start = System.currentTimeMillis();

            asyncTaskExecutor.executeTask(task, () -> {
                log.info(traceId + ",LimitConcurrencyAsyncTaskExecutor executeTask done,cost={}ms,concurrency={},context={}", (System.currentTimeMillis() - start), concurrency, context);
                callback.run();
            }, context);
        }

        log.info(traceId + ",LimitConcurrencyAsyncTaskExecutor executeTasks success,total cost={}ms,concurrency={}", (System.currentTimeMillis() - totalStart), concurrency);
    }

    public int getConcurrency() {
        return concurrency;
    }

    public String getTraceId() {
        return traceId;
    }

    /**
     * 异步任务执行器
     */
    public static interface AsyncTaskExecutor<T> {

        /**
         * 执行任务,任务本身需是异步执行的
         */
        void executeTask(T task, Runnable callback, Context context);
    }

    /**
     * 上下文
     */
    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    public static class Context implements Serializable {

        private static final long serialVersionUID = 2916376548269524746L;

        // 下标
        private int index;

        // 总数
        private int total;

        @Override
        public String toString() {
            return ToStringBuilder.reflectionToString(this, ToStringStyle.JSON_STYLE);
        }
    }

    @Override
    public String toString() {
        return "LimitConcurrencyAsyncTaskExecutor{" +
                "concurrency=" + concurrency +
                ", traceId='" + traceId + '\'' +
                '}';
    }
}

LimitConcurrencyAsyncTaskExecutor类的concurrency字段,表示异步同时执行任务的并发数,
通过Semaphore和回调接口,限制了异步任务同时调用执行的并发数量,concurrency个任务同时异步执行,
当其中某个任务执行完成后资源空闲出来,立即调用执行下个任务。

2个方法比较和测试

LimitBatchAsyncTaskExecutorLimitConcurrencyAsyncTaskExecutor都限制了执行异步任务的并发数,
不同的是LimitBatchAsyncTaskExecutor是每批次执行n个任务,需要等这个批次所有任务执行完成后,开始下个批次的执行,
LimitConcurrencyAsyncTaskExecutor是限制最多n个任务同时执行,当其中1个任务执行完成立即执行下个任务,相比较
而言它没有时间和资源上的浪费,性能更好。

编写测试类验证:

/**
 * @author cdfive
 */
public class LimitAsyncTaskExecutorTest {

    @Test
    public void testLimitBatchAsyncTaskExecutor() {
        long start = System.currentTimeMillis();
        int batch = 5;
        int total = 1000;
        CountDownLatch latch = new CountDownLatch(total);

        LimitBatchAsyncTaskExecutor limitBatchAsyncTaskExecutor = new LimitBatchAsyncTaskExecutor(batch);

        List<String> codes = IntStream.range(1, 1 + total).mapToObj(i -> String.valueOf(i)).collect(Collectors.toList());

        LimitBatchAsyncTaskExecutor.AsyncTaskExecutor<String> asyncTaskExecutor = new LimitBatchAsyncTaskExecutor.AsyncTaskExecutor<String>() {
            @Override
            public void executeTask(String code, Runnable callback, LimitBatchAsyncTaskExecutor.Context context) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(180, 200));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                        System.err.println(limitBatchAsyncTaskExecutor.getTraceId() + "," + Thread.currentThread().getName() + "=>code=" + code + ",context=" + context);

                        callback.run();

                        latch.countDown();
                    }
                }).start();
            }
        };

        limitBatchAsyncTaskExecutor.executeTasks(codes, asyncTaskExecutor);

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // more than 40s
        System.out.println("total done,cost=" + (System.currentTimeMillis() - start) + "ms");
    }
    
    @Test
    public void testLimitConcurrencyAsyncTaskExecutor() {
        long start = System.currentTimeMillis();
        int concurrency = 5;
        int total = 1000;
        CountDownLatch latch = new CountDownLatch(total);

        LimitConcurrencyAsyncTaskExecutor limitConcurrencyAsyncTaskExecutor = new LimitConcurrencyAsyncTaskExecutor(concurrency);

        List<String> codes = IntStream.range(1, 1 + total).mapToObj(i -> String.valueOf(i)).collect(Collectors.toList());

        LimitConcurrencyAsyncTaskExecutor.AsyncTaskExecutor<String> asyncTaskExecutor = new LimitConcurrencyAsyncTaskExecutor.AsyncTaskExecutor<String>() {
            @Override
            public void executeTask(String code, Runnable callback, LimitConcurrencyAsyncTaskExecutor.Context context) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(180, 200));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                        System.err.println(limitConcurrencyAsyncTaskExecutor.getTraceId() + "," + Thread.currentThread().getName() + "=>code=" + code + ",context=" + context);

                        callback.run();

                        latch.countDown();
                    }
                }).start();
            }
        };

        limitConcurrencyAsyncTaskExecutor.executeTasks(codes, asyncTaskExecutor);

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // less than 40s
        System.out.println("total done,cost=" + (System.currentTimeMillis() - start) + "ms");
    }
}

1000个任务,每个任务运行时间为180-200毫秒之间,限制参数均设置为5;
估算:1000 * 0.2s / 5 = 40s,5个异步并发,大概40s执行完成;
测试结果:使用LimitBatchAsyncTaskExecutor运行时间略大于40秒,使用LimitConcurrencyAsyncTaskExecutor略小于40秒。
由此可见,方法2的LimitConcurrencyAsyncTaskExecutor性能更好,因为它没有资源闲置;
也可根据需求选择使用。

总结

  • 多线程和异步是开发中常见和重要的场景,在合适的时候使用能提高程序处理效率

  • 对于自定义的线程池根据需求、硬件、性能等评估设置相关参数,可考虑参数被动态配置或者线程池动态创建

  • 对于调中间件或第三方接口本身是异步执行的情况,要考虑调用方的资源和承受能力,可从调用方角度进行限制和控制

  • 被调用方是异步,从调用方控制并发数可使用java.util.concurrent包下的并发工具类,配合异步回调来控制

  • 在程序设计和开发过程中要多考虑风险可控,调用方/被调用方、中间件、第三方等,关注系统的稳定性和和健壮性

标签:异步,Java,int,batch,并发,concurrency,total,public
From: https://www.cnblogs.com/cdfive2018/p/18256107

相关文章

  • SEETF-2023 express-javascript-security ejs相关漏洞
    今天做个ejs相关题目。进入页面只发现一个输入框,题目标签是ejs相关,去github看看源码,发现ejs版本为3.1.9,可以确定地是rce漏洞。接下来说说这个rce漏洞。3.1.9版本的rce漏洞主要是因为使用了这个模板来构建网页逻辑导致的。点击查看代码//index.jsconstexpress=require('e......
  • 袈裟哥java程序语言终极版
    第一章:Java环境搭建    Java是一种计算机编程语言;除了java编程语言,还有很多的编程语言:c、c++、c#、python等不同编程语言类比于不同国家语言;每个编程语言的语法不同;应用场景不同Java是一个用于后端开发的编程语言 一、Java历史    1.1995年,sun公司推出的一款......
  • 【JavaScript脚本宇宙】终极对决:JavaScript表单库比较指南
    简化你的表单开发:六种流行JavaScript库的深入比较前言在现代网页开发中,表单处理是一个常见的任务。为了简化这个过程并提供更好的用户体验,许多开发人员使用JavaScript库来管理表单数据、验证和提交。本文将介绍六种流行的JavaScript表单库,它们具有不同的功能和适用场景。......
  • 对于java中向上转型后调用.getClass()方法还是子类的原因
    在java中有一个概念叫引用。官方给的定义是这样的:在Java中,"引用"(Reference)是指一个变量,它存储了对象的内存地址,而不是对象本身。换句话说,引用是指向对象的指针或者句柄。在Java中,所有的对象都通过引用来访问和操作,而不是直接访问对象本身。 通俗的将就是一个指路人,当有人询......
  • java基础·小白入门(二)
    目录Java数组、字符串、正则表达式数组基本知识二维数组字符串初始化基本用法正则表达式相关知识点Java语言的内存分配Java的增强for循环类和对象基本概念定义与创建应用Java数组、字符串、正则表达式数组基本知识Java中,数组元素可以为简单数据类型,也可以为......
  • Java设置JSON字符串参数编码
    1.如何在Java中创建JSON字符串在Java中创建JSON字符串,我们可以使用多个库,其中最流行的是Jackson、Gson和org.json。以下是使用这些库创建JSON字符串的示例:1.1使用Jackson库(1)首先,确保我们的项目中包含了Jackson的依赖(如果我们使用Maven,可以参考前面的示例)。(2)创建一个Java对象(例......
  • python爬虫之aiohttp多任务异步爬虫
    python爬虫之aiohttp多任务异步爬虫爬取的flash服务如下:fromflaskimportFlaskimporttimeapp=Flask(__name__)@app.route('/bobo')defindex_bobo():time.sleep(2)return'Hellobobo'@app.route('/jay')defindex_jay():time.......
  • java datetime数据类型去掉时分秒
    在Java中,如果我们想要表示一个日期而不包括时间(时分秒),我们通常会使用java.time包中的LocalDate类。LocalDate是一个不可变的日期对象,它只包含年、月、日三个字段。1.datetime数据类型去掉时分秒案例一以下是如何使用LocalDate类以及如何从一个包含时间的日期时间对象(比如LocalD......
  • 1950 Springboot汽修技能点评系统idea开发mysql数据库APP应用java编程计算机网页源码m
    一、源码特点 springboot汽修技能点评系统是一套完善的信息系统,结合springboot框架和bootstrap完成本系统,对理解JSPjava编程开发语言有帮助系统采用springboot框架(MVC模式开发),系统具有完整的源代码和数据库,系统主要采用B/S模式开发。前段主要技术bootstrap.cssjquery......
  • 第五站:Java金——Spring框架的璀璨殿堂(一)
    第五站:Java金——Spring框架的璀璨殿堂踏入Java金的领域,我们来到了Spring框架的璀璨殿堂,这里是现代Java企业级应用开发的瑰宝。Spring通过其核心特性——依赖注入(IoC)和面向切面编程(AOP),以及SpringBoot的便捷启动与配置,为开发者提供了一条通往高效、简洁开发之路的金光大道......