1. 响应式编程定义
响应式编程是一种异步非阻塞编程。
异步编程的目的是为了寻求更高的程序执行效率,通过编写异步的非阻塞代码可以将当先执行的任务切换到另一个任务,并在异步处理完成后返回到当前线程。
2.Java提供两种异步编程模型:
2.1 Callback(回调):
当一个方法调用另一个方法,等待另一个方法执行完成后,将结果通知自己,这就叫callback
public interface CallBack {
public void stepBCallstepA(String result);
}
@Service @Slf4j public class StepA implements CallBack{ @Autowired private StepB stepB; //StepA call stepB to do something public void stepACallStepB(){ long start=System.currentTimeMillis(); new Thread(new Runnable() { @Override public void run() { stepB.stepBprocess(StepA.this,"stepA call stepB"); //stepA调用stepB逻辑 } }).start(); stepASelfBusiness(); log.info("time consume:{}",String.valueOf(System.currentTimeMillis()-start)); }
//暴漏给stepB回调stepA的回调函数 @Override public void stepBCallstepA(String result) { try { log.info("stepB return stepA result:{}"+result); }catch (Exception e){ } } //stepA自己的业务逻辑 public void stepASelfBusiness(){ try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }
@Service
@Slf4j
public class StepB {
public void stepBprocess(CallBack callBack,String input){
String result=businessProcess(input);
callBack.stepBCallstepA(result);//通过回调函数调用stepA中方法
}
public String stepBPurelyprocess(String input){
return businessProcess(input);
}
private String businessProcess(String input){
//business process
for(int i=0;i<10000;i++){
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return "B businessProcess completed";
}
}
执行结果如下,可见stepA已经执行完,stepB才将计算结果通过回调函数将结果返回给stepA
2022-12-11 20:05:32.042 c.c.a.m.s.q.r.c.StepA http-nio-8080-exec-2 [INFO] time consume:10004
2022-12-11 20:05:37.650 c.c.a.m.s.q.r.c.StepA Thread-2 [INFO] stepB return stepA result:{}B businessProcess completed
如果不用回调函数,执行结果如下:
@Service @Slf4j public class StepC { @Autowired private StepB stepB; public void stepCCallStepB(){ long start=System.currentTimeMillis(); String result=stepB.stepBPurelyprocess("stepC call stepB"); //直接调用stepB方法,并将StepB计算结果返回给StepC log.info("stepB return stepC result:{}"+result); stepCSelfBusiness(); log.info("time consume:{}",String.valueOf(System.currentTimeMillis()-start)); } public void stepCSelfBusiness(){ try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果:
2022-12-11 20:04:59.487 c.c.a.m.s.q.r.c.StepC http-nio-8080-exec-1 [INFO] stepB return stepC result:{}B businessProcess completed
2022-12-11 20:05:09.499 c.c.a.m.s.q.r.c.StepC http-nio-8080-exec-1 [INFO] time consume:25666
2.2 Future:
异步方法立即返回Future<T>
。异步过程计算一个T
值,Future
对象封装装了对T值的访问逻辑。T值不是立即可用的,线程可以轮询该对象,直到该值可用为止。例如,ExecutorService
正在运行的Callable<T>
任务使用Future
对象。
2.2.1 callable & runable
创建线程方式: thread runable callable。 callable较runable有返回结果。
多线程使用场景:当任务不是很耗时,多线程作用不大,反而性能上比不上单线程,而在多线程中,推荐CompleteableFuture去创建任务开启线程操作,性能比Callable与FutureTask组合好很多
单线程vsFutreTask Callable
@Service @Slf4j public class CallableTest { @Autowired private MongoTemplate mongoTemplate; public void implementCallable(){ try { long start=System.currentTimeMillis(); FutureTask<List<Map>> table1Task = new FutureTask<>(new MongoCallble1()); FutureTask<List<Map>> table3Task = new FutureTask<>(new MongoCallble2()); FutureTask<List<Map>> table2Task = new FutureTask<>(new MongoCallble3()); FutureTask<List<Map>> ranktable2Task = new FutureTask<>(new MongoCallble4()); new Thread(table1Task).start(); new Thread(table3Task).start(); new Thread(table2Task).start(); new Thread(ranktable2Task).start(); List<Integer> date2 = Arrays.asList(20221101); Query query2 = new Query(); query2.addCriteria(Criteria.where("businessDate").in(date2)); List<Map> result2=mongoTemplate.find(query2, Map.class, "table1-v2"); List<Map> table1List = table1Task.get(); List<Map> table3List = table3Task.get(); log.info("time consume:{}",String.valueOf(System.currentTimeMillis()-start)); }catch (Exception e){ log.error(e.toString()); } } public void noCallable(){ long start=System.currentTimeMillis(); List<Integer> date = Arrays.asList(20221102); Query query = new Query(); query.addCriteria(Criteria.where("businessDate").in(date)); List<Map> result=mongoTemplate.find(query, Map.class, "table1-v2"); List<Integer> businessTag1 = Arrays.asList(20221102); Query query1 = new Query(); query1.addCriteria(Criteria.where("businessDate").in(businessTag1)); List<Map> result1=mongoTemplate.find(query1, Map.class, "table3-v2"); List<Integer> date2 = Arrays.asList(20221102); Query query2 = new Query(); query2.addCriteria(Criteria.where("businessDate").in(date2)); List<Map> result2=mongoTemplate.find(query2, Map.class, "table1-v2"); List<Integer> date3 = Arrays.asList(20221102); Query query3 = new Query(); query3.addCriteria(Criteria.where("businessDate").in(date)); List<Map> result3=mongoTemplate.find(query3, Map.class, "table2-v2"); List<Integer> date4 = Arrays.asList(20221102); Query query4 = new Query(); query4.addCriteria(Criteria.where("businessDate").in(date)); List<Map> result4=mongoTemplate.find(query4, Map.class, "ranktable2-v2"); log.info("time consume:{}",String.valueOf(System.currentTimeMillis()-start)); } class MongoCallble1 implements Callable<List<Map>> { @Override public List<Map> call() throws Exception { List<Integer> date = Arrays.asList(20221102); Query query = new Query(); query.addCriteria(Criteria.where("businessDate").in(date)); List<Map> result=mongoTemplate.find(query, Map.class, "table1-v2"); return result; } } class MongoCallble2 implements Callable<List<Map>> { @Override public List<Map> call() throws Exception { List<Integer> date = Arrays.asList(20221102); Query query = new Query(); query.addCriteria(Criteria.where("businessDate").in(date)); List<Map> result=mongoTemplate.find(query, Map.class, "table3-v2"); return result; } } class MongoCallble3 implements Callable<List<Map>> { @Override public List<Map> call() throws Exception { List<Integer> date = Arrays.asList(20221102); Query query = new Query(); query.addCriteria(Criteria.where("businessDate").in(date)); List<Map> result=mongoTemplate.find(query, Map.class, "table2-v2"); return result; } } class MongoCallble4 implements Callable<List<Map>> { @Override public List<Map> call() throws Exception { List<Integer> date = Arrays.asList(20221102); Query query = new Query(); query.addCriteria(Criteria.where("businessDate").in(date)); List<Map> result=mongoTemplate.find(query, Map.class, "ranktable2-v2"); return result; } } }
通过执行代码log可以发现,在不执行query3 query4时,由于任务不怎么耗时,单线程效率比callable多线程效率还高;但如果加入query3 query4甚至更多耗时任务时,多线程效率是更高的
2.2.2 Futuretask Callable vs CompletableFuture
public void completableService(){ try { long start = System.currentTimeMillis(); CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> { List<Integer> date = Arrays.asList(20221102); Query query = new Query(); query.addCriteria(Criteria.where("businessDate").in(date)); List<Map> result = mongoTemplate.find(query, Map.class, "table1-v2"); return result; }); CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> { List<Integer> date = Arrays.asList(20221102); Query query = new Query(); query.addCriteria(Criteria.where("businessDate").in(date)); List<Map> result = mongoTemplate.find(query, Map.class, "table2-v2"); return result; }); CompletableFuture completableFuture3 = CompletableFuture.supplyAsync(() -> { List<Integer> date = Arrays.asList(20221102); Query query = new Query(); query.addCriteria(Criteria.where("businessDate").in(date)); List<Map> result = mongoTemplate.find(query, Map.class, "table3-v2"); return result; }); CompletableFuture completableFuture4 = CompletableFuture.supplyAsync(() -> { List<Integer> date = Arrays.asList(20221102); Query query = new Query(); query.addCriteria(Criteria.where("businessDate").in(date)); List<Map> result = mongoTemplate.find(query, Map.class, "table4-v2"); return result; }); List<Map> list1= (List<Map>) completableFuture1.get(); completableFuture2.get(); completableFuture3.get(); completableFuture4.get(); log.info("time consume:{}",String.valueOf(System.currentTimeMillis()-start)); }catch (Exception e){ } }
分析CompletableFuture比Callable与FutureTask组合好的原因:
查看CompletableFuture.supplyAsync()的源码,看到有个asyncPool->Executor ,用到线程池。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
2.3 publish subscrbe方式
参考文献:
callback参考:https://www.jianshu.com/p/7ee7edac1a13 https://blog.csdn.net/u012393791/article/details/52777029
callable参考:https://blog.csdn.net/qq_22744093/article/details/111468896
标签:编程,List,响应,query,result,new,Query,class From: https://www.cnblogs.com/enhance/p/16972230.html