hippo4j 是一个动态管理和监控线程池的开源框架,它有两种运行模式:轻量级依赖配置中心以及无中间件依赖版本。
文档地址参见 https://hippo4j.cn/docs/user_docs/intro
其中无中间件依赖版本支持的功能更丰富,代码也更复杂一些,本文以该版本为例分析 hippo4j 的代码,以供参考。
1. 定义了一个新的线程池类 DynamicThreadPoolExecutor
解析:
直接使用 JUC 的 ThreadPoolExecutor 的问题:Spring 容器关闭的时候可能任务队列里的任务还没处理完,有丢失任务的风险。
为了解决该问题,可以实现 InitializingBean 和 DisposableBean接口,在 bean 初始化、容器关闭时做相应处理。这里的 AbstractDynamicExecutorSupport 即是这样实现的。
部分代码如下:
@Override
public void destroy() {
shutdownSupport();
}
public void shutdownSupport() {
if (log.isInfoEnabled()) {
log.info("Shutting down ExecutorService" + (this.threadPoolId != null ? " '" + this.threadPoolId + "'" : ""));
}
if (this.executor != null) {
if (this.waitForTasksToCompleteOnShutdown) {
this.executor.shutdown();
} else {
for (Runnable remainingTask : this.executor.shutdownNow()) {
cancelRemainingTask(remainingTask);
}
}
awaitTerminationIfNecessary(this.executor);
}
}
protected void cancelRemainingTask(Runnable task) {
if (task instanceof Future) {
((Future<?>) task).cancel(true);
}
}
private void awaitTerminationIfNecessary(ExecutorService executor) {
if (this.awaitTerminationMillis > 0) {
try {
if (!executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS)) {
if (log.isWarnEnabled()) {
log.warn("Timed out while waiting for executor" +
(this.threadPoolId != null ? " '" + this.threadPoolId + "'" : "") + " to terminate.");
}
}
} catch (InterruptedException ex) {
if (log.isWarnEnabled()) {
log.warn("Interrupted while waiting for executor" +
(this.threadPoolId != null ? " '" + this.threadPoolId + "'" : "") + " to terminate.");
}
Thread.currentThread().interrupt();
}
}
}
可以看到在 bean destroy 时,先检查是否需要等待任务完成再关闭线程池。
DynamicThreadPool 继承了 AbstractDynamicExecutorSupport,主要实现了任务计时和超时触发报警。
@Override
protected void beforeExecute(Thread t, Runnable r) {
if (executeTimeOut == null || executeTimeOut <= 0) {
return;
}
startTimeThreadLocal.set(SystemClock.now());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
Long startTime;
if ((startTime = startTimeThreadLocal.get()) == null) {
return;
}
try {
long endTime = SystemClock.now();
long executeTime;
boolean executeTimeAlarm = (executeTime = (endTime - startTime)) > executeTimeOut;
if (executeTimeAlarm && ApplicationContextHolder.getInstance() != null) {
ThreadPoolNotifyAlarmHandler notifyAlarmHandler = ApplicationContextHolder.getBean(ThreadPoolNotifyAlarmHandler.class);
if (notifyAlarmHandler != null) {
notifyAlarmHandler.asyncSendExecuteTimeOutAlarm(threadPoolId, executeTime, executeTimeOut, this);
}
}
} finally {
startTimeThreadLocal.remove();
}
}
2. DynamicThreadPoolPostProcessor
该类继承了 BeanPostProcessor,在 Bean 初始化前后对 ThreadPoolExecutor 及其子类进行一些处理,主要用来获取线程池对象注册到 server 或 框架内部定义的容器中(如果server没有对应的配置)。主要代码如下:
protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) {
String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId();
ThreadPoolExecutor executor = dynamicThreadPoolWrapper.getExecutor();
Map<String, String> queryStrMap = new HashMap(3);
queryStrMap.put(TP_ID, threadPoolId);
queryStrMap.put(ITEM_ID, properties.getItemId());
queryStrMap.put(NAMESPACE, properties.getNamespace());
boolean isSubscribe = false;
ThreadPoolExecutor newDynamicThreadPoolExecutor = null;
ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo();
try {
Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 5000L);
if (result.isSuccess() && result.getData() != null) {
String resultJsonStr = JSONUtil.toJSONString(result.getData());
if ((threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) {
// Create a thread pool with relevant parameters.
// 省略。。。
}
} else {
// DynamicThreadPool configuration undefined in server
// 省略。。。
}
} catch (Exception ex) {
// 省略。。。
} finally {
// 省略。。。
}
// 线程池对象注册到 hippo4j 框架内部定义的容器中,实际就是个map存储
GlobalThreadPoolManage.register(dynamicThreadPoolWrapper.getThreadPoolId(), threadPoolParameterInfo, dynamicThreadPoolWrapper);
return newDynamicThreadPoolExecutor;
}
3. AbstractRefreshListener 及其子类。
实现了 ApplicationListener,监听事件并做处理,包含四个实现:
- WebExecutorRefreshListener,监听 Web 容器(Tomcat/Jetty/Undertow) 的线程池配置变更。
- PlatformsRefreshListener,监听报警平台配置变更。
- DynamicThreadPoolRefreshListener,监听线程池本身的配置变更。
- AdapterExecutorsRefreshListener,监听第三方框架线程池变更(Dubbo/RocketMQ 等)。
代码比较简单,不再单独解析了。
4. 继承 ApplicationRunner/CommandLineRunner 的类
负责容器启动后回调,执行初始化工作,例如 DynamicThreadPoolMonitorExecutor,定时收集监控结果,主要代码如下:
@Slf4j
@RequiredArgsConstructor
public class DynamicThreadPoolMonitorExecutor implements ApplicationRunner {
private final BootstrapConfigProperties properties;
private ScheduledThreadPoolExecutor collectExecutor;
private List<ThreadPoolMonitor> threadPoolMonitors;
@Override
public void run(ApplicationArguments args) throws Exception {
// 省略。。。
// Get dynamic thread pool monitoring component.
threadPoolMonitors = new ArrayList<>();
collectExecutor = new ScheduledThreadPoolExecutor(
new Integer(1),
ThreadFactoryBuilder.builder().daemon(true).prefix("client.scheduled.collect.data").build());
// 配置文件中已有的 collectType 添加到 threadPoolMonitors,此处省略。。。
// 自定义扩展的 monitor 添加到 threadPoolMonitors,此处省略。。。
// Execute dynamic thread pool monitoring component.
collectExecutor.scheduleWithFixedDelay(
() -> scheduleRunnable(),
properties.getInitialDelay(),
properties.getCollectInterval(),
TimeUnit.MILLISECONDS);
if (GlobalThreadPoolManage.getThreadPoolNum() > 0) {
log.info("Dynamic thread pool: [{}]. The dynamic thread pool starts data collection and reporting.", getThreadPoolNum());
}
}
private void scheduleRunnable() {
for (ThreadPoolMonitor each : threadPoolMonitors) {
try {
each.collect();
} catch (Exception ex) {
log.error("Error monitoring the running status of dynamic thread pool. Type: {}", each.getType(), ex);
}
}
}
}
标签:hippo4j,void,源码,线程,executor,null,threadPoolId,log
From: https://www.cnblogs.com/ylty/p/16782030.html