首页 > 其他分享 >远程调用优化之多线程

远程调用优化之多线程

时间:2023-09-15 12:23:26浏览次数:31  
标签:调用 currentThread Thread 任务 CompletableFuture 线程 多线程 远程

1. 通过feign进行远程调用是一种同步调用,只有当一个远程调用执行完毕以后,才会进行下一个远程调用,效率较低。
2. 可以考虑业务的执行逻辑,如果各个远程调用之间互不影响的话,可以考虑使用多线程来进行优化,提高效率。

1. 配置线程池

1.1 在公共的微服务中编写ThreadPoolConfiguration配置类,并自定义线程工厂

默认线程工厂的弊端:不利于对线程池输出的日志进行分析,无法确定日志是哪个微服务产生的。
使用自定义线程工厂,可以控制每一个线程的名称。

@Configuration
@EnableConfigurationProperties(value = ThreadPoolProperties.class)
public class ThreadPoolConfiguration {

    @Autowired
    private ThreadPoolProperties threadPoolProperties;

    @Value("${spring.application.name}")
    private String applicationName;

    /**
     * 配置一个线程池
     * int corePoolSize:核心线程数
     * int maximumPoolSize:最大线程数
     * long keepAliveTime:临时线程最大空闲时间
     * TimeUnit unit:时间单位
     * BlockingQueue<Runnable> workQueue:任务队列
     * ThreadFactory threadFactory:线程工厂
     * RejectedExecutionHandler handler:任务的拒绝策略
     * @return
     */
    @Bean
    public ThreadPoolExecutor threadPoolExecutor(){
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                threadPoolProperties.getCorePoolSize(),
                threadPoolProperties.getMaximumPoolSize(),
                threadPoolProperties.getKeepAliveTime(),
                TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(threadPoolProperties.getWorkQueueSize()),
                new ThreadFactory() {
                    int num = 1;
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("thread-【" + applicationName + "】-" + num++);
                        return thread;
                    }
                },
                new ThreadPoolExecutor.AbortPolicy());
        return threadPoolExecutor;
    }
}

1.2 在公共微服务中编写ThreadPoolProperties实体类进行线程池参数配置

@Data
@ConfigurationProperties(prefix = "app.threadpool")
public class ThreadPoolProperties {
    private int corePoolSize;
    private int maximumPoolSize;
    private long keepAliveTime;
    private int workQueueSize;
}

1.3 使用@Import注解对线程池配置类进行封装,要使用线程池的微服务只需要在启动类上添加@EnableThreadPool注解即可

@Target(value = ElementType.TYPE)
@Retention(value = RetentionPolicy.RUNTIME)
@Import(value = ThreadPoolConfiguration.class)
public @interface EnableThreadPool {
}

1.4 在要使用线程池的微服务中的application.yml中配置线程池的参数信息

# 线程池参数配置
app:
  threadpool:
    corePoolSize: 5
    maximumPoolSize: 10
    keepAliveTime: 2
    workQueueSize: 60

2. 使用线程池对远程调用进行改造,每一次远程调用就向线程池中提交一个任务,配合CountDownLatch进行使用

代码示例:

// 等待其它四个线程执行完毕后,再执行当前线程
CountDownLatch countDownLatch = new CountDownLatch(4);

//远程调用product微服务的接口查询三级分类的数据
threadPoolExecutor.submit(() -> {
	Result<CategoryView> categoryViewResult = skuDetailFeignClient.findCategoryBySkuId(skuId);
	skuDetailVo.setCategoryView(categoryViewResult.getData());
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询三级分类的数据");
	countDownLatch.countDown();  // 让CountDownLatch中的计数器减1
});

//远程调用product微服务的接口查询价格数据
threadPoolExecutor.submit(() -> {
	Result<SkuInfo> infoResult = skuDetailFeignClient.findPriceBySkuId(skuId);
	skuDetailVo.setPrice(infoResult.getData().getPrice());
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询价格数据");
	countDownLatch.countDown();
});

//远程调用product微服务的接口查询spu的销售属性和销售属性值
threadPoolExecutor.submit(() -> {
	Result<List<SpuSaleAttr>> spuSaleAttrListResult = skuDetailFeignClient.findSpuSaleAttrAndValueBySkuId(skuId);
	skuDetailVo.setSpuSaleAttrList(spuSaleAttrListResult.getData());
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询spu的销售属性和销售属性值");
	countDownLatch.countDown();
});

//远程调用product微服务,根据skuId获取所有兄弟sku销售属性值的组合
threadPoolExecutor.submit(() -> {
	Result<List<AttrValueConcatVo>> brotherSkuSaleAttrValueConcatResult = skuDetailFeignClient.findBrotherSkuSaleAttrValueConcatBySkuId(skuId);
	List<AttrValueConcatVo> attrValueConcatVoList = brotherSkuSaleAttrValueConcatResult.getData();
	//Collectors.toMap将流中的元素转换成Map,方法的第一个参数用来构建map的键,方法的第二个参数用来构建map的值
	Map<String, Long> map = attrValueConcatVoList.stream().collect(Collectors.toMap(attrValueConcatVo -> attrValueConcatVo.getAttrValueConcat(), attrValueConcatVo -> attrValueConcatVo.getSkuId()));
	String valuesSkuJson = JSON.toJSONString(map);
	skuDetailVo.setValuesSkuJson(valuesSkuJson);
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务,根据skuId获取所有兄弟sku销售属性值的组合");
	countDownLatch.countDown();
});

try {
	countDownLatch.await();   // 让执行当前方法的线程阻塞,等其它线程执行完毕以后再执行当前线程
} catch (InterruptedException e) {
	e.printStackTrace();
}

3. 线程池的弊端:无法直接对多个任务进行链式、组合处理。解决方案:使用juc中的CompletableFuture实现对任务编排的能力,可以轻松组织不同任务的运行顺序、规则以及方式。

3.1 异步执行任务

3.1.1 无返回值的方法:

runAsync(runnable):  CompletableFuture<Void> 以异步方式启动一个任务并在默认的线程池(ForkJoinPool)执行
runAsync(runnable,executor):CompletableFuture<Void>    以异步方式启动一个任务并在指定的线程池(executor)执行

3.1.2 有返回值的方法

supplyAsync(supplier): CompletableFuture<U>   以异步方式启动一个任务并在默认的线程池(ForkJoinPool)执行。
supplyAsync(supplier,executor):CompletableFuture<U>  以异步方式启动一个任务并在指定的线程池(executor)执行。

代码演示:

public static void supplyAsyncTest02() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread());
        return 10;
    },service);
    Integer count = supplyAsync.get();
    System.out.println(count);
}

public static void supplyAsyncTest01() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread());
        return 10;
    });
    Integer count = supplyAsync.get();  // 获取异步线程执行结果
    System.out.println(count);
}

3.2 whenComplete方法

在任务执行完毕以后(不论是正常执行完毕还是出现异常)执行某一个操作。

  1. 正常完成:whenComplete返回结果和上级任务一致,异常为null
  2. 出现异常:whenComplete返回结果为null,异常为上级任务的异常
    相关方法:
whenComplete(action) 		使用当前线程执行一个动作,不开启额外的线程
whenCompleteAsync(action)   在默认的线程池中开启一个线程执行该动作
whenCompleteAsync(action, executor) 在指定的线程池中开启一个线程执行该动作

注:上一次任务执行完毕以后产生了异常,此时再调用get方法获取结果就会抛出异常

public static void whenCompleteTest01() throws ExecutionException, InterruptedException {
    /**
         * result参数表示的是上一次任务执行完成以后的结果
         * e:表示的是异常对象
         */
    CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread());
        return 10;
    },service).whenComplete((result , e) -> {  // 使用main线程执行当前任务
        if(e == null) {
            System.out.println(Thread.currentThread() + "上一次任务正常执行完成了,任务的返回结果为:" + result);
        }else {
            System.out.println(Thread.currentThread() + "上一次任务执行时产生了异常,任务的返回结果为:" + result);
        }
    });

    Integer integer = supplyAsync.get(); // 获取异步任务的结果,如果whenComplete上一次执行的产生异常了,那么在调用该方法的时候就会报错
    System.out.println(integer);
}

public static void whenCompleteTest02() throws ExecutionException, InterruptedException {
    /**
         * result参数表示的是上一次任务执行完成以后的结果
         * e:表示的是异常对象
         */
    CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread());
        return 10 / 0 ;
    },service).whenComplete((result , e) -> {
        if(e == null) {
            System.out.println(Thread.currentThread() + "上一次任务正常执行完成了,任务的返回结果为:" + result);
        }else {
            System.out.println(Thread.currentThread() + "上一次任务执行时产生了异常,任务的返回结果为:" + result);
        }
    }).exceptionally((e) -> {   // 配合exceptionally方法可以在产生异常以后返回一个默认值。
        System.out.println(e);
        return 20 ;
    });

    Integer integer = supplyAsync.get();
    System.out.println(integer);
}

3.3 thenRun方法

相关方法:

// 无法获取到上一次任务的执行结果
thenRun(runnable):                 接下来跑一个任务,以当前线程作为跑任务的线程,不开额外的异步线程
thenRunAsync(runnable):            接下来跑一个任务,用默认线程池新开一个异步线程
thenRunAsync(runnable,executor):   接下来跑一个任务,用指定线程池新开一个异步线程

和whenComplete区别:thenRun无法获取到上一个任务产生的异常。当上一个任务执行完毕以后产生了异常,那么该任务无法执行。

3.4 thenAccept方法

相关方法:

thenAccept(consumer):              接下来跑一个任务,接受到上次的结果,以当前线程作为跑任务的线程,不开额外的异步线程
thenAcceptAsync(consumer):         接下来跑一个任务,接受到上次的结果,用默认线程池新开一个异步线程
thenAcceptAsync(consumer,executor) 接下来跑一个任务,接受到上次的结果,用指定线程池新开一个异步线程

代码示例:

public static void thenAcceptTest01() {
    CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread());
        return 10 ;
    },service).thenAccept((result) -> System.out.println(result * 10));
}

3.5 thenApply方法

相关方法:

thenApply(function) 接下来跑一个任务,接受到上次的结果,并且返回一个新的结果,以当前线程作为跑任务的线程,不开额外的异步线程
thenApplyAsync(function) 接下来跑一个任务,接受到上次的结果,并且返回一个新的结果,用默认线程池新开一个异步线程
thenApplyAsync(function, executor)接下来跑一个任务,接受到上次的结果,并且返回一个新的结果,用指定线程池新开一个异步线程

代码演示:

public static void thenApplyAsyncTest01() throws ExecutionException, InterruptedException {
    Integer count = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread());
        return 10;
    }, service).thenApplyAsync((result) -> {
        System.out.println(Thread.currentThread() + "---" + result * 10);
        return result * 2;
    }).get();  // 获取最终的执行结果
    System.out.println(count);
}

3.6 组合多任务

CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)   当所有的任务执行完毕以后,线程再向下进行执行
CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 当任意一个任务执行完毕以后,线程再向下进行执行
CompletableFuture<Void> runAfterBoth(other,action) 当两个任务执行完毕以后在执行一个新的任务

3.7 代码演示

CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
	//远程调用product微服务的接口查询三级分类的数据
	Result<CategoryView> categoryViewResult = skuDetailFeignClient.findCategoryBySkuId(skuId);
	skuDetailVo.setCategoryView(categoryViewResult.getData());
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询三级分类的数据");
}, threadPoolExecutor);

CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
	//远程调用product微服务的接口查询价格数据
	Result<SkuInfo> infoResult = skuDetailFeignClient.findPriceBySkuId(skuId);
	skuDetailVo.setPrice(infoResult.getData().getPrice());
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询价格数据");
}, threadPoolExecutor);

CompletableFuture<Void> cf3 = CompletableFuture.runAsync(() -> {
	//远程调用product微服务的接口查询spu的销售属性和销售属性值
	Result<List<SpuSaleAttr>> spuSaleAttrListResult = skuDetailFeignClient.findSpuSaleAttrAndValueBySkuId(skuId);
	skuDetailVo.setSpuSaleAttrList(spuSaleAttrListResult.getData());
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询spu的销售属性和销售属性值");
}, threadPoolExecutor);

CompletableFuture<Void> cf4 = CompletableFuture.runAsync(() -> {
	//远程调用product微服务,根据skuId获取所有兄弟sku销售属性值的组合
	Result<List<AttrValueConcatVo>> brotherSkuSaleAttrValueConcatResult = skuDetailFeignClient.findBrotherSkuSaleAttrValueConcatBySkuId(skuId);
	List<AttrValueConcatVo> attrValueConcatVoList = brotherSkuSaleAttrValueConcatResult.getData();
	//Collectors.toMap将流中的元素转换成Map,方法的第一个参数用来构建map的键,方法的第二个参数用来构建map的值
	Map<String, Long> map = attrValueConcatVoList.stream().collect(Collectors.toMap(attrValueConcatVo -> attrValueConcatVo.getAttrValueConcat(), attrValueConcatVo -> attrValueConcatVo.getSkuId()));
	String valuesSkuJson = JSON.toJSONString(map);
	skuDetailVo.setValuesSkuJson(valuesSkuJson);
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务,根据skuId获取所有兄弟sku销售属性值的组合");
}, threadPoolExecutor);

// 让四个异步任务执行完毕以后,再进行返回
CompletableFuture.allOf(cf1, cf2, cf3, cf4).join();

标签:调用,currentThread,Thread,任务,CompletableFuture,线程,多线程,远程
From: https://www.cnblogs.com/insilently/p/17704761.html

相关文章

  • 25届实习秋招-Java面试-JUC多线程面试题整理-牛客网
    JUC介绍一下JUC下的锁(如何使用及应用场景)线程什么是进程:特征什么是线程:资源为什么多线程,什么使用用单线程,什么时候多线程,什么条件下多线程快。进程和线程的对比:进程如何通信,每种通信存放的介质。||线程的通信,几种方式。join进程和线程的区别,在JVM层面的体现一......
  • 解决vue3中抽离出来的js如何调用页面的方法
    有时我们会用render渲染表格的columns,里面的按钮如何去调用.vue文件的方法?思路;在.vue文件中我们通过参数的方式传给.js文件,然后用变量接收,点击时执行(注意:.vue文件中setup执行比较早,按钮是点击事件,不会主动执行函数。为防止函数未声名就当做参数传递,必须在最后执行getFn函数,和d......
  • 多个feign接口使用@FeignClient注解调用同一个名称的微服务时,启动会发生异常
    解决方案:方法1.将feign接口合并方法2.在application.yml文件中增加配置spring.main.allow-bean-definition-overriding=true方法3.在@FeignClient注解上增加contextId属性,确保每个feignclient的contextId唯一。如@FeignClient(name="服务名",contextId="唯一名称")......
  • .Net多线程读取pdf文本
    1.nuget安装UglyToad.PdfPig2.SemaphoreSlimsemaphore=newSemaphoreSlim(10);同时启动10个线程读取指定页面文本。C#代码:staticstringGetPdfText(stringfilePath){FileInfofile=newFileInfo(filePath);if(file.Extension.ToLower().Contains("pdf"))......
  • 基于Spring事务的可靠异步调用实践
    SpringTxAsync组件是仓储平台组(WMS6)自主研发的一个专门用于解决可靠异步调用问题的组件。通过使用SpringTxAsync组件,我们成功地解决了在仓储平台(WMS6)中的异步调用需求。经过近二年多的实践并经历了两次618活动以及两次双11活动,该组件已经在我们的所有应用中稳定运行并成功应用于......
  • 本地部署Jellyfin影音服务器 - 公网远程影音库
    1.前言随着移动智能设备的普及,各种各样的使用需求也被开发出来,从最早的移动听音乐、看图片(MP3时代),到之后的移动视频需求(MP4时代)到现在的移动流媒体需求(智能手机看视频)。但当我们习惯这些需求后,忽然发现自己不知不觉间成了待割的韭菜(3台设备就要加钱)。作为一颗倔强的韭菜,自然不会甘......
  • 项目开发中难点-项目使用v-if控制表单/元素/组件显示隐藏,例如调用接口后赋值需重新加
    项目中使用v-if="show"  控制组件的显示或隐藏,当接口返回后this.show=false,进行赋值,后this.show=true显示 。但是页面没有正常显示,此时使用this.$nextTick。 一、$nextTick()概述1.$nextTick()原理$nextTick()是Vue.js框架中的一个方法,它主要用于DOM操作......
  • 调用excel数据自动生成word文档
    应用场景:相信很多时候,您是否有过和博主一样在WORD里面重复制作某种资料的工作。比如给定了一份模板,需要根据不同内容制作出不同的word,但模板是一样的。一般情况下就是老老实实的一份一份的去填写(但人力填写难保证不出错,精力有限)。那么有没有可以自动生完成word的办法呢?答案是肯定......
  • Navicat远程链接openGauss数据库
    文章目录前言一、环境准备二、openGauss服务设置步骤2.1切换至用户openGauss2.2添加放行IP2.3修改加密方式3.4重启openGauss服务3.5创建远程连接角色备注总结 前言最近这段时间再整理openGauss数据库相关内容,在这里总结记录并分享一些基础的操作以及遇......
  • Dami 本地过程调用框架(主打解耦),v0.24 发布
    Dami,专为本地多模块之间通讯解耦而设计(尤其是未知模块、隔离模块、领域模块)。零依赖,特适合DDD。特点结合Bus与RPC的概念,可作事件分发,可作接口调用,可作异步响应。支持事务传导(同步分发、异常透传)支持事件标识、拦截器(方便跟踪)支持监听者排序、附件传递(多监听时,可相互合......