1. 生成Mono对象
private Mono<String> getEosDuplicateLastScanId(EosProject eosProject) {
Mono<String> scanIdMono = webClient.get()
.uri(uriBuilder ->
UriComponentsBuilder.fromHttpUrl(HOST)
.path("/eos/data/duplicate/moduleLatestScanId")
.queryParam("moduleId", eosProject.getId().toString())
.queryParam("_", System.currentTimeMillis())
.build().toUri())
.headers(headers -> headers.add("Cookie", eosScanConfig.getCookie()))
.retrieve()
.bodyToMono(JSONObject.class)
.log()
.filter(jsonObject -> Objects.nonNull(jsonObject.getString("data")))
.map(jsonObject -> jsonObject.getString("data"));
return scanIdMono;
}
- url的完整参数需要使用uri方法产地进Function函数来实现童工UriComponentsBuilder.fromHttoUrl()来传递http中的host,最后通过toUri返回uri对象
- bodyToMono 是返回的数据解析到JSONObject类型
- filter前置过滤,需要判断下返回数据不为空的类型,如果不加filter条件,map返回为null的话会抛异常
2. mono对象的异步回调
eosProjectDuplicateMono.subscribe(
eosDuplicateDto -> {
Optional<InstanceReference> instanceReferenceOptional = buildInstanceReference(eosDuplicateDto, context, eosProject);
instanceReferenceOptional.ifPresent(instanceReference -> {
log.info("saveInstanceReference:{}", instanceReference);
instanceReferenceDao.save(instanceReference);
});
},
error -> {
log.error("fetch {} EosDuplicate reference error" ,eosProject.getId(), error);
latch.countDown();
},
() -> {
log.info("fetch {} EosDuplicate reference complete",eosProject.getId());
latch.countDown();
}
);
- eosProjectDuplicateMono 是mono对象
- eosProjectDuplicateMono 如果是Mono.empty()的话,执行subscribe的话,会执行最后onComplete逻辑
() -> {
log.info("fetch {} EosDuplicate reference complete",eosProject.getId());
latch.countDown();
}- onNext()是如果mono中有数据并正常返回的时候调用执行
eosDuplicateDto -> {
OptionalinstanceReferenceOptional = buildInstanceReference(eosDuplicateDto, context, eosProject);
instanceReferenceOptional.ifPresent(instanceReference -> {
log.info("saveInstanceReference:{}", instanceReference);
instanceReferenceDao.save(instanceReference);
});
}- onFailed()是如果mono在异步执行的过程中出现报错的时候被回调
error -> {
log.error("fetch {} EosDuplicate reference error" ,eosProject.getId(), error);
latch.countDown();
}- onNext()中的代码块如果执行报错,也会走到onFailed
3. 等待mono的结果返回
mon可以多个放入一个list,然后等待都执行完毕,类似线程池中同步等待,防止执行太快,控制批次
这个不能和subscribe 同时使用,所以如果使用subscribe订阅返回消息的时候,控制批次的话可以引入countDonwLatch,在onError和onComplete的时候加上countDown方法
4.2个mono串起来
private Mono<EosDuplicateDto> getEosProjectDuplicateResult(EosProject eosProject) {
Mono<String> lastScanIdMono = getEosDuplicateLastScanId(eosProject);
Mono<EosDuplicateDto> eosDuplicateMono = lastScanIdMono.flatMap(scanId -> {
// 使用从 mono1 返回的数据创建第二个 Mono
return getEosDuplicateResponse(scanId);
});
return eosDuplicateMono;
}
标签:instanceReference,log,Mono,mono,使用,webflux,error,eosProject From: https://www.cnblogs.com/PythonOrg/p/18399831
- lastScanIdMono 第一个mono的返回值,需要传递个第二个eosDuplicateMono 中,依赖flatMap 的流进行操作
- 如果第一个mono返回的是mono.empty()这种类型的话,调用flatMap的话,不会执行里面的流程,而是也直接返回一个mono.empty()