首页 > 编程语言 >动态线程池框架 hippo4j 源码解析

动态线程池框架 hippo4j 源码解析

时间:2022-10-14 23:01:00浏览次数:82  
标签:hippo4j void 源码 线程 executor null threadPoolId log

hippo4j 是一个动态管理和监控线程池的开源框架,它有两种运行模式:轻量级依赖配置中心以及无中间件依赖版本。

文档地址参见 https://hippo4j.cn/docs/user_docs/intro

其中无中间件依赖版本支持的功能更丰富,代码也更复杂一些,本文以该版本为例分析 hippo4j 的代码,以供参考。

1. 定义了一个新的线程池类 DynamicThreadPoolExecutor

解析:
直接使用 JUC 的 ThreadPoolExecutor 的问题:Spring 容器关闭的时候可能任务队列里的任务还没处理完,有丢失任务的风险。
为了解决该问题,可以实现 InitializingBean 和 DisposableBean接口,在 bean 初始化、容器关闭时做相应处理。这里的 AbstractDynamicExecutorSupport 即是这样实现的。
部分代码如下:

    @Override
    public void destroy() {
        shutdownSupport();
    }

    public void shutdownSupport() {
        if (log.isInfoEnabled()) {
            log.info("Shutting down ExecutorService" + (this.threadPoolId != null ? " '" + this.threadPoolId + "'" : ""));
        }
        if (this.executor != null) {
            if (this.waitForTasksToCompleteOnShutdown) {
                this.executor.shutdown();
            } else {
                for (Runnable remainingTask : this.executor.shutdownNow()) {
                    cancelRemainingTask(remainingTask);
                }
            }
            awaitTerminationIfNecessary(this.executor);
        }
    }

    protected void cancelRemainingTask(Runnable task) {
        if (task instanceof Future) {
            ((Future<?>) task).cancel(true);
        }
    }

    private void awaitTerminationIfNecessary(ExecutorService executor) {
        if (this.awaitTerminationMillis > 0) {
            try {
                if (!executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS)) {
                    if (log.isWarnEnabled()) {
                        log.warn("Timed out while waiting for executor" +
                                (this.threadPoolId != null ? " '" + this.threadPoolId + "'" : "") + " to terminate.");
                    }
                }
            } catch (InterruptedException ex) {
                if (log.isWarnEnabled()) {
                    log.warn("Interrupted while waiting for executor" +
                            (this.threadPoolId != null ? " '" + this.threadPoolId + "'" : "") + " to terminate.");
                }
                Thread.currentThread().interrupt();
            }
        }
    }

可以看到在 bean destroy 时,先检查是否需要等待任务完成再关闭线程池。
DynamicThreadPool 继承了 AbstractDynamicExecutorSupport,主要实现了任务计时和超时触发报警。

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if (executeTimeOut == null || executeTimeOut <= 0) {
            return;
        }
        startTimeThreadLocal.set(SystemClock.now());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Long startTime;
        if ((startTime = startTimeThreadLocal.get()) == null) {
            return;
        }
        try {
            long endTime = SystemClock.now();
            long executeTime;
            boolean executeTimeAlarm = (executeTime = (endTime - startTime)) > executeTimeOut;
            if (executeTimeAlarm && ApplicationContextHolder.getInstance() != null) {
                ThreadPoolNotifyAlarmHandler notifyAlarmHandler = ApplicationContextHolder.getBean(ThreadPoolNotifyAlarmHandler.class);
                if (notifyAlarmHandler != null) {
                    notifyAlarmHandler.asyncSendExecuteTimeOutAlarm(threadPoolId, executeTime, executeTimeOut, this);
                }
            }
        } finally {
            startTimeThreadLocal.remove();
        }
    }

2. DynamicThreadPoolPostProcessor

该类继承了 BeanPostProcessor,在 Bean 初始化前后对 ThreadPoolExecutor 及其子类进行一些处理,主要用来获取线程池对象注册到 server 或 框架内部定义的容器中(如果server没有对应的配置)。主要代码如下:

    protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) {
        String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId();
        ThreadPoolExecutor executor = dynamicThreadPoolWrapper.getExecutor();
        Map<String, String> queryStrMap = new HashMap(3);
        queryStrMap.put(TP_ID, threadPoolId);
        queryStrMap.put(ITEM_ID, properties.getItemId());
        queryStrMap.put(NAMESPACE, properties.getNamespace());
        boolean isSubscribe = false;
        ThreadPoolExecutor newDynamicThreadPoolExecutor = null;
        ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo();
        try {
            Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 5000L);
            if (result.isSuccess() && result.getData() != null) {
                String resultJsonStr = JSONUtil.toJSONString(result.getData());
                if ((threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) {
                    // Create a thread pool with relevant parameters.
                    // 省略。。。
                }
            } else {
                // DynamicThreadPool configuration undefined in server
                // 省略。。。
        
            }
        } catch (Exception ex) {
            // 省略。。。
        } finally {
            // 省略。。。
        }
        // 线程池对象注册到 hippo4j 框架内部定义的容器中,实际就是个map存储
        GlobalThreadPoolManage.register(dynamicThreadPoolWrapper.getThreadPoolId(), threadPoolParameterInfo, dynamicThreadPoolWrapper);
        return newDynamicThreadPoolExecutor;
    }

3. AbstractRefreshListener 及其子类。

实现了 ApplicationListener,监听事件并做处理,包含四个实现:

  • WebExecutorRefreshListener,监听 Web 容器(Tomcat/Jetty/Undertow) 的线程池配置变更。
  • PlatformsRefreshListener,监听报警平台配置变更。
  • DynamicThreadPoolRefreshListener,监听线程池本身的配置变更。
  • AdapterExecutorsRefreshListener,监听第三方框架线程池变更(Dubbo/RocketMQ 等)。
    代码比较简单,不再单独解析了。

4. 继承 ApplicationRunner/CommandLineRunner 的类

负责容器启动后回调,执行初始化工作,例如 DynamicThreadPoolMonitorExecutor,定时收集监控结果,主要代码如下:

@Slf4j
@RequiredArgsConstructor
public class DynamicThreadPoolMonitorExecutor implements ApplicationRunner {

    private final BootstrapConfigProperties properties;

    private ScheduledThreadPoolExecutor collectExecutor;

    private List<ThreadPoolMonitor> threadPoolMonitors;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 省略。。。
        // Get dynamic thread pool monitoring component.
        threadPoolMonitors = new ArrayList<>();
        collectExecutor = new ScheduledThreadPoolExecutor(
                new Integer(1),
                ThreadFactoryBuilder.builder().daemon(true).prefix("client.scheduled.collect.data").build());
        // 配置文件中已有的 collectType 添加到 threadPoolMonitors,此处省略。。。
       
        // 自定义扩展的 monitor 添加到 threadPoolMonitors,此处省略。。。
        
        // Execute dynamic thread pool monitoring component.
        collectExecutor.scheduleWithFixedDelay(
                () -> scheduleRunnable(),
                properties.getInitialDelay(),
                properties.getCollectInterval(),
                TimeUnit.MILLISECONDS);
        if (GlobalThreadPoolManage.getThreadPoolNum() > 0) {
            log.info("Dynamic thread pool: [{}]. The dynamic thread pool starts data collection and reporting.", getThreadPoolNum());
        }
    }

    private void scheduleRunnable() {
        for (ThreadPoolMonitor each : threadPoolMonitors) {
            try {
                each.collect();
            } catch (Exception ex) {
                log.error("Error monitoring the running status of dynamic thread pool. Type: {}", each.getType(), ex);
            }
        }
    }
}

标签:hippo4j,void,源码,线程,executor,null,threadPoolId,log
From: https://www.cnblogs.com/ylty/p/16782030.html

相关文章