健康检查案例实现
案例要求:能够动态的实现,健康检查的间隔时间,重试次数,动态拉去配置信息,校验发现配置变更,进行健康检查任务的动态变更
1)配置远程调用
Spring 允许我们通过定义接口的方式,给任意位置发送 http 请求,实现远程调用,可以用来简化 HTTP 远程访问。需要webflux场景才可
1.1)导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
1.2)编写配置属性
//算法客户端主要用到 private String tankOcrUrl = "http://127.0.0.1:6013";的属性
@ConfigurationProperties(prefix = "edge.client")
@Data
@RefreshScope
public class EdgeClientProperties {
/**
* Configuration properties imagePath
*/
private String imagePath = "/srv/www/upload";
/**
* Configuration properties imageServerPrefix
*/
private String imageServerPrefix = "";
/**
* Configuration properties
*/
/**
* 危化车OCR识别地址
*/
private String tankOcrUrl = "http://127.0.0.1:6013";
}
//****************************************平台客户端,配置属性, private String endpoint;*******************************************
@ConfigurationProperties(prefix = "app.edge")
@Data
@RefreshScope
public class EdgeProperties {
/**
* 络端编码
*/
private String code;
/**
* 平台信息
*/
@NestedConfigurationProperty
private Platform platform = new Platform();
/**
* 配置信息
*/
@NestedConfigurationProperty
private EdgeConfig edgeConfig = new EdgeConfig();
/**
* 平台信息
*/
@Data
public static class Platform {
/**
* 平台地址
*/
private String endpoint;
}
/**
* 边缘端配置信息
*/
@Data
public static class EdgeConfig {
/**
* 注册地址
*/
private String registerUrl;
/**
* 心跳地址
*/
private String heartbeatUrl;
/**
* 账号
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 心跳时间
*/
private Integer heartTime;
/**
* 心跳最大等待时间
*/
private Integer maxHeartTime;
}
}
/******************************************** Yaml 配置文件********************************************************************/
--- #配置边缘端信息
app:
edge:
code: ${APP_EDGE_CODE:BD123456789}
platform:
endpoint: ${APP_PLATFORM_URL:http://192.168.3.11:32240}
edge-config:
register-url: ${APP_REGISTER_URL:/api/v1/itd/basic/edge/device/register}
heartbeat-url: ${APP_HEART_URL:/api/v1/itd/basic/edge/device/heartbeat}
heart-time: ${APP_HEART_TIME:10}
max-heart-time: ${APP_HEART_TIME:100}
username: ${APP_USERNAME:admin}
password: ${APP_PASSWORD:123456}
--- #算法服务器基础信息
edge:
client:
tank-ocr-url: http://192.168.3.68:31869
1.3)编写远程调用接口
/*************************************************平台端远程调用接口***********************************************/
@HttpExchange
public interface PlatformClient {
/**
* @param registerReq
* @return register
* @description 平台注册
*/
@PostExchange(url = "/api/v1/itd/basic/edge/device/register")
JsonResult<EdgeRegisterHeartbeatDTO> register(@RequestBody RegisterReq registerReq);
/**
* @param heartbeatReq
* @return register
* @description 健康检查
*/
@PostExchange(url = "/api/v1/itd/basic/edge/device/heartbeat")
JsonResult<EdgeRegisterHeartbeatDTO> heartbeatCheck(@RequestBody HeartbeatReq heartbeatReq);
}
/*************************************************算法识别远程调用接口***********************************************/
@HttpExchange
public interface RecognizeClient {
/**
* @param request
* @return RecognizeResponse
*/
@PostExchange
RecognizeResponse recognize(@RequestBody RecognizeRequest request);
/**
* @param request
* @return OcrResponse
*/
@PostExchange
DgCarResponse recognizeDgCar(@RequestBody RecognizeRequest request);
/**
* @param request
* @return CarTypeResponse
*/
@PostExchange
CarTypeResponse recognizeCarType(@RequestBody RecognizeRequest request);
}
1.4)编写配置类
/**
*在启动类我们已经将这两个Properties进行导入 @EnableConfigurationProperties({EdgeClientProperties.class, EdgeProperties.class})
*/
@Configuration(proxyBeanMethods = false)
@RequiredArgsConstructor
public class EdgeConfiguration {
/**
* @description 边缘客户端属性
*/
private final EdgeClientProperties properties;
/**
* @description 边缘属性
*/
private final EdgeProperties edgeProperties;
/**
* @description token管理容器
*/
private final TokenManager tokenManager;
/**
* 构建HttpServiceProxyFactory
* @param baseUrl 基础url
* @param objectMapper 对象映射器
* @return HttpServiceProxyFactory
*/
@NonNull
private HttpServiceProxyFactory buildFactory(String baseUrl, ObjectMapper objectMapper) {
AtomicReference<Integer> heartTime = new AtomicReference<>(edgeProperties.getEdgeConfig().getHeartTime());
//进行JSON序列化操作,后续会出《内容协商相关博客》
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder().codecs(configurer -> {
configurer.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper));
configurer.defaultCodecs().jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper));
}).build();
/*通过构建WebClient 进行远程调用,*/
val webClientBuild = WebClient.builder().baseUrl(baseUrl).exchangeStrategies(exchangeStrategies);
if (edgeProperties.getPlatform().getEndpoint().equals(baseUrl)) {
// 配置边缘端对平台的请求调用,由于心跳的时候需要设置token校验,每次心跳的时候进行token的设置
webClientBuild.filter((request, next) -> next
.exchange(ClientRequest.from(request).header("register", tokenManager.getToken()).build()));
// 超时等待时间,根据token管理容器动态的设置超时相应时间
Optional.ofNullable(tokenManager.getHeartbeat()).ifPresent(heartTime::set);
}
WebClient webClient = webClientBuild.build();
WebClientAdapter exchangeAdapter = WebClientAdapter.create(webClient);
exchangeAdapter.setBlockTimeout(Duration.of(heartTime.get(), ChronoUnit.SECONDS));
return HttpServiceProxyFactory.builderFor(exchangeAdapter).build();
}
/*************************************************算法识别远程调用********************************************************/
@Bean
public RecognizeClient recognizeClient(ObjectProvider<ObjectMapper> objectMappers) {
val objectMapper = objectMappers.getObject();
//调用构建方法,构建算法调用客户端
val factory = buildFactory(properties.getTankOcrUrl(), objectMapper);
return factory.createClient(RecognizeClient.class);
}
@Bean
public PlatformClient platformClient(ObjectProvider<ObjectMapper> objectMappers) {
val objectMapper = objectMappers.getObject();
//调用构建方法,构建平台调用客户端
val factory = buildFactory(edgeProperties.getPlatform().getEndpoint(), objectMapper);
return factory.createClient(PlatformClient.class);
}
}
2)健康检查定时任务
@Service
@Slf4j
@RequiredArgsConstructor
public class HeartbeatTask {
/**
* @description 边缘端属性
*/
private final EdgeProperties edgeProperties;
/**
* @description 边缘服务,进行业务相关的查询,这里就不为其展示业务的调用功能
*/
private final EdgeService edgeService;
/**
* @description 推送事件,当发生配置变更等情况进行变更
*/
private final ApplicationEventPublisher publisher;
/**
* @description 平台端远程调用接口
*/
private final PlatformClient platformClient;
/**
* 开启心跳检测
*/
public void startHeartbeat() {
//进行动态变更的时候,每次需要重新创建一个任务线程池
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Runnable heartbeatTask = new Runnable() {
@Override
public void run() {
log.info("Heartbeat sent at:{}", LocalDateTime.now());
HeartbeatReq heartbeatReq = new HeartbeatReq();//构建请求体
BeanUtil.copyProperties(edgeDeviceHeartbeatDTO, heartbeatReq);
JsonResult<EdgeRegisterHeartbeatDTO> result = new JsonResult<>();
try {
//向平台端发送健康检测请求
result = platformClient.heartbeatCheck(heartbeatReq);
log.info("platform-service with return {}.", result);
if (result.getData().getIsSuccess()) {
log.info("Heartbeat successfully sent.");
//业务代码,校验数据变更,进行动态更新
edgeService.verifyHeartbeatDevice(result.getData().getEdgeDeviceListDTO().getBindDeviceList());
Boolean verified = edgeService.verifyHeartbeatConfig(result.getData().getEdgeDeviceListDTO(),
edgeProperties.getCode());
if (verified) {
log.error("Configuration changed!!!");
//配送发生变更,发送变更事件进行后续的变更处理
pushHeartEvent("The configuration has changed. Restart the scheduled task",
HeartbeatStatus.CONFIG_CHANGE.status, scheduler);
}
}
} catch (WebClientResponseException e) {
log.error("---heartbeat request error !!! while making the HTTP request: ", e);
if (e.getStatusCode().value() == 403) {
log.error("token expires error, Heartbeat failed: {}", e.getMessage());
//token 过期,发送token失效事件
tokenExpired(scheduler);
}
}
}
};
scheduler.scheduleAtFixedRate(heartbeatTask, 5, edgeConfig.getHeartbeatTime(), TimeUnit.SECONDS);
}
/**
* @param message
* @param status
* @param scheduler
*/
private void pushHeartEvent(String message, Integer status, ScheduledExecutorService scheduler) {
HeartbeatEvent event = new HeartbeatEvent(this, message, status, scheduler);
publisher.publishEvent(event);
}
/**
* @param scheduler
* @return
*/
private void tokenExpired(ScheduledExecutorService scheduler) {
pushHeartEvent("token过期,重新开启定时任务", HeartbeatStatus.TOKEN_EXPIRATION.status, scheduler);
}
/**
* @param scheduler
* @description 停止心跳检测
*/
public void stopHeartbeat(ScheduledExecutorService scheduler) {
// 重新开启任务
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
log.error("Failed to gracefully shutdown scheduler");
}
}
} catch (InterruptedException ex) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
3)事件触发
3.1)心跳事件
/*** ******************************** 一、定义心跳事件,继承ApplicationEvent,自定义事件监听*******************************************/
@Getter
@Setter
public class HeartbeatEvent extends ApplicationEvent {
/**
* @description Java Attribute variables
/
private String message;
/**
* @description Java Attribute variables
*/
private Integer status;
/**
* @description Java Attribute variables
*/
private ScheduledExecutorService scheduler;
/**
* @param source 需要构造父类的source
* @param message
* @param status
* @param scheduler
* @return
*/
public HeartbeatEvent(Object source, String message, Integer status, ScheduledExecutorService scheduler) {
super(source);
this.message = message;
this.status = status;
this.scheduler = scheduler;
}
}
3.1)心跳监听
/*** *********************************** 二、定义心跳监听器,监听心跳事件*************************************************************/
@Slf4j
@Component
@RequiredArgsConstructor
public class HeartbeatEventListener {
/**
* 监听心跳返回的信息
*
* @param event
*/
@EventListener
public void onApplicationEvent(HeartbeatEvent event) {
log.info("onApplicationEvent received: {}", event.getMessage());
// 1.token过期
if (event.getStatus() == HeartbeatStatus.TOKEN_EXPIRATION.status) {
log.error("Token expired, stopping the heartbeat task and reloading register........");
//进行相关业务处理
restartHeart();
}
// 2.配置变更,重新加载
if (event.getStatus() == HeartbeatStatus.CONFIG_CHANGE.status) {
log.info("The event was accepted successfully, the configuration changed");
//进行相关业务处理
restartHeart();
}
}
/**
*进行业务代码处理,发生变更,重新关闭心跳,开启新的心跳检测
*/
private void restartHeart(){
heartbeatTask.stopHeartbeat(event.getScheduler());
heartbeatTask.startHeartbeat();
}
}
标签:Springboot,description,private,案例,param,scheduler,健康检查,public,String
From: https://www.cnblogs.com/zxlyy/p/18635299