1. 通过feign进行远程调用是一种同步调用,只有当一个远程调用执行完毕以后,才会进行下一个远程调用,效率较低。
2. 可以考虑业务的执行逻辑,如果各个远程调用之间互不影响的话,可以考虑使用多线程来进行优化,提高效率。
1. 配置线程池
1.1 在公共的微服务中编写ThreadPoolConfiguration配置类,并自定义线程工厂
默认线程工厂的弊端:不利于对线程池输出的日志进行分析,无法确定日志是哪个微服务产生的。
使用自定义线程工厂,可以控制每一个线程的名称。
@Configuration
@EnableConfigurationProperties(value = ThreadPoolProperties.class)
public class ThreadPoolConfiguration {
@Autowired
private ThreadPoolProperties threadPoolProperties;
@Value("${spring.application.name}")
private String applicationName;
/**
* 配置一个线程池
* int corePoolSize:核心线程数
* int maximumPoolSize:最大线程数
* long keepAliveTime:临时线程最大空闲时间
* TimeUnit unit:时间单位
* BlockingQueue<Runnable> workQueue:任务队列
* ThreadFactory threadFactory:线程工厂
* RejectedExecutionHandler handler:任务的拒绝策略
* @return
*/
@Bean
public ThreadPoolExecutor threadPoolExecutor(){
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
threadPoolProperties.getCorePoolSize(),
threadPoolProperties.getMaximumPoolSize(),
threadPoolProperties.getKeepAliveTime(),
TimeUnit.MINUTES,
new ArrayBlockingQueue<>(threadPoolProperties.getWorkQueueSize()),
new ThreadFactory() {
int num = 1;
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("thread-【" + applicationName + "】-" + num++);
return thread;
}
},
new ThreadPoolExecutor.AbortPolicy());
return threadPoolExecutor;
}
}
1.2 在公共微服务中编写ThreadPoolProperties实体类进行线程池参数配置
@Data
@ConfigurationProperties(prefix = "app.threadpool")
public class ThreadPoolProperties {
private int corePoolSize;
private int maximumPoolSize;
private long keepAliveTime;
private int workQueueSize;
}
1.3 使用@Import注解对线程池配置类进行封装,要使用线程池的微服务只需要在启动类上添加@EnableThreadPool注解即可
@Target(value = ElementType.TYPE)
@Retention(value = RetentionPolicy.RUNTIME)
@Import(value = ThreadPoolConfiguration.class)
public @interface EnableThreadPool {
}
1.4 在要使用线程池的微服务中的application.yml中配置线程池的参数信息
# 线程池参数配置
app:
threadpool:
corePoolSize: 5
maximumPoolSize: 10
keepAliveTime: 2
workQueueSize: 60
2. 使用线程池对远程调用进行改造,每一次远程调用就向线程池中提交一个任务,配合CountDownLatch进行使用
代码示例:
// 等待其它四个线程执行完毕后,再执行当前线程
CountDownLatch countDownLatch = new CountDownLatch(4);
//远程调用product微服务的接口查询三级分类的数据
threadPoolExecutor.submit(() -> {
Result<CategoryView> categoryViewResult = skuDetailFeignClient.findCategoryBySkuId(skuId);
skuDetailVo.setCategoryView(categoryViewResult.getData());
log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询三级分类的数据");
countDownLatch.countDown(); // 让CountDownLatch中的计数器减1
});
//远程调用product微服务的接口查询价格数据
threadPoolExecutor.submit(() -> {
Result<SkuInfo> infoResult = skuDetailFeignClient.findPriceBySkuId(skuId);
skuDetailVo.setPrice(infoResult.getData().getPrice());
log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询价格数据");
countDownLatch.countDown();
});
//远程调用product微服务的接口查询spu的销售属性和销售属性值
threadPoolExecutor.submit(() -> {
Result<List<SpuSaleAttr>> spuSaleAttrListResult = skuDetailFeignClient.findSpuSaleAttrAndValueBySkuId(skuId);
skuDetailVo.setSpuSaleAttrList(spuSaleAttrListResult.getData());
log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询spu的销售属性和销售属性值");
countDownLatch.countDown();
});
//远程调用product微服务,根据skuId获取所有兄弟sku销售属性值的组合
threadPoolExecutor.submit(() -> {
Result<List<AttrValueConcatVo>> brotherSkuSaleAttrValueConcatResult = skuDetailFeignClient.findBrotherSkuSaleAttrValueConcatBySkuId(skuId);
List<AttrValueConcatVo> attrValueConcatVoList = brotherSkuSaleAttrValueConcatResult.getData();
//Collectors.toMap将流中的元素转换成Map,方法的第一个参数用来构建map的键,方法的第二个参数用来构建map的值
Map<String, Long> map = attrValueConcatVoList.stream().collect(Collectors.toMap(attrValueConcatVo -> attrValueConcatVo.getAttrValueConcat(), attrValueConcatVo -> attrValueConcatVo.getSkuId()));
String valuesSkuJson = JSON.toJSONString(map);
skuDetailVo.setValuesSkuJson(valuesSkuJson);
log.info(Thread.currentThread().getName() + "---->远程调用product微服务,根据skuId获取所有兄弟sku销售属性值的组合");
countDownLatch.countDown();
});
try {
countDownLatch.await(); // 让执行当前方法的线程阻塞,等其它线程执行完毕以后再执行当前线程
} catch (InterruptedException e) {
e.printStackTrace();
}
3. 线程池的弊端:无法直接对多个任务进行链式、组合处理。解决方案:使用juc中的CompletableFuture实现对任务编排的能力,可以轻松组织不同任务的运行顺序、规则以及方式。
3.1 异步执行任务
3.1.1 无返回值的方法:
runAsync(runnable): CompletableFuture<Void> 以异步方式启动一个任务并在默认的线程池(ForkJoinPool)执行
runAsync(runnable,executor):CompletableFuture<Void> 以异步方式启动一个任务并在指定的线程池(executor)执行
3.1.2 有返回值的方法
supplyAsync(supplier): CompletableFuture<U> 以异步方式启动一个任务并在默认的线程池(ForkJoinPool)执行。
supplyAsync(supplier,executor):CompletableFuture<U> 以异步方式启动一个任务并在指定的线程池(executor)执行。
代码演示:
public static void supplyAsyncTest02() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread());
return 10;
},service);
Integer count = supplyAsync.get();
System.out.println(count);
}
public static void supplyAsyncTest01() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread());
return 10;
});
Integer count = supplyAsync.get(); // 获取异步线程执行结果
System.out.println(count);
}
3.2 whenComplete方法
在任务执行完毕以后(不论是正常执行完毕还是出现异常)执行某一个操作。
- 正常完成:whenComplete返回结果和上级任务一致,异常为null
- 出现异常:whenComplete返回结果为null,异常为上级任务的异常
相关方法:
whenComplete(action) 使用当前线程执行一个动作,不开启额外的线程
whenCompleteAsync(action) 在默认的线程池中开启一个线程执行该动作
whenCompleteAsync(action, executor) 在指定的线程池中开启一个线程执行该动作
注:上一次任务执行完毕以后产生了异常,此时再调用get方法获取结果就会抛出异常
public static void whenCompleteTest01() throws ExecutionException, InterruptedException {
/**
* result参数表示的是上一次任务执行完成以后的结果
* e:表示的是异常对象
*/
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread());
return 10;
},service).whenComplete((result , e) -> { // 使用main线程执行当前任务
if(e == null) {
System.out.println(Thread.currentThread() + "上一次任务正常执行完成了,任务的返回结果为:" + result);
}else {
System.out.println(Thread.currentThread() + "上一次任务执行时产生了异常,任务的返回结果为:" + result);
}
});
Integer integer = supplyAsync.get(); // 获取异步任务的结果,如果whenComplete上一次执行的产生异常了,那么在调用该方法的时候就会报错
System.out.println(integer);
}
public static void whenCompleteTest02() throws ExecutionException, InterruptedException {
/**
* result参数表示的是上一次任务执行完成以后的结果
* e:表示的是异常对象
*/
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread());
return 10 / 0 ;
},service).whenComplete((result , e) -> {
if(e == null) {
System.out.println(Thread.currentThread() + "上一次任务正常执行完成了,任务的返回结果为:" + result);
}else {
System.out.println(Thread.currentThread() + "上一次任务执行时产生了异常,任务的返回结果为:" + result);
}
}).exceptionally((e) -> { // 配合exceptionally方法可以在产生异常以后返回一个默认值。
System.out.println(e);
return 20 ;
});
Integer integer = supplyAsync.get();
System.out.println(integer);
}
3.3 thenRun方法
相关方法:
// 无法获取到上一次任务的执行结果
thenRun(runnable): 接下来跑一个任务,以当前线程作为跑任务的线程,不开额外的异步线程
thenRunAsync(runnable): 接下来跑一个任务,用默认线程池新开一个异步线程
thenRunAsync(runnable,executor): 接下来跑一个任务,用指定线程池新开一个异步线程
和whenComplete区别:thenRun无法获取到上一个任务产生的异常。当上一个任务执行完毕以后产生了异常,那么该任务无法执行。
3.4 thenAccept方法
相关方法:
thenAccept(consumer): 接下来跑一个任务,接受到上次的结果,以当前线程作为跑任务的线程,不开额外的异步线程
thenAcceptAsync(consumer): 接下来跑一个任务,接受到上次的结果,用默认线程池新开一个异步线程
thenAcceptAsync(consumer,executor) 接下来跑一个任务,接受到上次的结果,用指定线程池新开一个异步线程
代码示例:
public static void thenAcceptTest01() {
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread());
return 10 ;
},service).thenAccept((result) -> System.out.println(result * 10));
}
3.5 thenApply方法
相关方法:
thenApply(function) 接下来跑一个任务,接受到上次的结果,并且返回一个新的结果,以当前线程作为跑任务的线程,不开额外的异步线程
thenApplyAsync(function) 接下来跑一个任务,接受到上次的结果,并且返回一个新的结果,用默认线程池新开一个异步线程
thenApplyAsync(function, executor)接下来跑一个任务,接受到上次的结果,并且返回一个新的结果,用指定线程池新开一个异步线程
代码演示:
public static void thenApplyAsyncTest01() throws ExecutionException, InterruptedException {
Integer count = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread());
return 10;
}, service).thenApplyAsync((result) -> {
System.out.println(Thread.currentThread() + "---" + result * 10);
return result * 2;
}).get(); // 获取最终的执行结果
System.out.println(count);
}
3.6 组合多任务
CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) 当所有的任务执行完毕以后,线程再向下进行执行
CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 当任意一个任务执行完毕以后,线程再向下进行执行
CompletableFuture<Void> runAfterBoth(other,action) 当两个任务执行完毕以后在执行一个新的任务
3.7 代码演示
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
//远程调用product微服务的接口查询三级分类的数据
Result<CategoryView> categoryViewResult = skuDetailFeignClient.findCategoryBySkuId(skuId);
skuDetailVo.setCategoryView(categoryViewResult.getData());
log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询三级分类的数据");
}, threadPoolExecutor);
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
//远程调用product微服务的接口查询价格数据
Result<SkuInfo> infoResult = skuDetailFeignClient.findPriceBySkuId(skuId);
skuDetailVo.setPrice(infoResult.getData().getPrice());
log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询价格数据");
}, threadPoolExecutor);
CompletableFuture<Void> cf3 = CompletableFuture.runAsync(() -> {
//远程调用product微服务的接口查询spu的销售属性和销售属性值
Result<List<SpuSaleAttr>> spuSaleAttrListResult = skuDetailFeignClient.findSpuSaleAttrAndValueBySkuId(skuId);
skuDetailVo.setSpuSaleAttrList(spuSaleAttrListResult.getData());
log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询spu的销售属性和销售属性值");
}, threadPoolExecutor);
CompletableFuture<Void> cf4 = CompletableFuture.runAsync(() -> {
//远程调用product微服务,根据skuId获取所有兄弟sku销售属性值的组合
Result<List<AttrValueConcatVo>> brotherSkuSaleAttrValueConcatResult = skuDetailFeignClient.findBrotherSkuSaleAttrValueConcatBySkuId(skuId);
List<AttrValueConcatVo> attrValueConcatVoList = brotherSkuSaleAttrValueConcatResult.getData();
//Collectors.toMap将流中的元素转换成Map,方法的第一个参数用来构建map的键,方法的第二个参数用来构建map的值
Map<String, Long> map = attrValueConcatVoList.stream().collect(Collectors.toMap(attrValueConcatVo -> attrValueConcatVo.getAttrValueConcat(), attrValueConcatVo -> attrValueConcatVo.getSkuId()));
String valuesSkuJson = JSON.toJSONString(map);
skuDetailVo.setValuesSkuJson(valuesSkuJson);
log.info(Thread.currentThread().getName() + "---->远程调用product微服务,根据skuId获取所有兄弟sku销售属性值的组合");
}, threadPoolExecutor);
// 让四个异步任务执行完毕以后,再进行返回
CompletableFuture.allOf(cf1, cf2, cf3, cf4).join();
标签:调用,currentThread,Thread,任务,CompletableFuture,线程,多线程,远程
From: https://www.cnblogs.com/insilently/p/17704761.html