前言
使用线程池 ThreadPoolExecutor 过程中你是否有以下痛点呢?
-
代码中创建了一个 ThreadPoolExecutor,但是不知道那几个核心参数设置多少比较合适
-
凭经验设置参数值,上线后发现需要调整,改代码重新发布服务,非常麻烦
-
线程池相对开发人员来说是个黑盒,运行情况不能及时感知到,直到出现问题
如果你有以上痛点,这块开源的动态可监控线程池框架(DynamicTp)或许能帮助到你
此项目由Dromara社区开源,基于配置中心的轻量级动态可监控线程池,项目地址:https://dynamictp.cn/
功能特性
- 代码零侵入:所有配置都放在配置中心,对业务代码零侵入
- 轻量简单:基于 springboot 实现,引入 starter,接入只需简单4步就可完成,顺利3分钟搞定
- 高可扩展:框架核心功能都提供 SPI 接口供用户自定义个性化实现(配置中心、配置文件解析、通知告警、监控数据采集、任务包装等等)
- 线上大规模应用:参考美团线程池实践open in new window,美团内部已经有该理论成熟的应用经验
- 多平台通知报警:提供多种报警维度(配置变更通知、活性报警、容量阈值报警、拒绝触发报警、任务执行或等待超时报警),已支持企业微信、钉钉、飞书报警,同时提供 SPI 接口可自定义扩展实现
- 监控:定时采集线程池指标数据,支持通过 MicroMeter、JsonLog 日志输出、Endpoint 三种方式,可通过 SPI 接口自定义扩展实现
- 任务增强:提供任务包装功能,实现TaskWrapper接口即可,如 MdcTaskWrapper、TtlTaskWrapper、SwTraceTaskWrapper,可以支持线程池上下文信息传递
- 兼容性:JUC 普通线程池和 Spring 中的 ThreadPoolTaskExecutor 也可以被框架监控,@Bean 定义时加 @DynamicTp 注解即可
- 可靠性:框架提供的线程池实现 Spring 生命周期方法,可以在 Spring 容器关闭前尽可能多的处理队列中的任务
- 多模式:参考Tomcat线程池提供了 IO 密集型场景使用的 EagerDtpExecutor 线程池
- 支持多配置中心:基于主流配置中心实现线程池参数动态调整,实时生效,已支持 Nacos、Apollo、Zookeeper、Consul、Etcd,同时也提供 SPI 接口可自定义扩展实现
- 中间件线程池管理:集成管理常用第三方组件的线程池,已集成Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix等组件的线程池管理(调参、监控报警)
系统架构图
接入指南
Maven 依赖
下面只介绍几个常用场景接入pom,zk、consul这些请查看官网
-
apollo 应用用接入用此依赖
<dependency> <groupId>cn.dynamictp</groupId> <artifactId>dynamic-tp-spring-boot-starter-apollo</artifactId> <version>1.0.8</version> </dependency>
-
spring-cloud 场景下的 nacos 应用接入用此依赖
<dependency> <groupId>cn.dynamictp</groupId> <artifactId>dynamic-tp-spring-cloud-starter-nacos</artifactId> <version>1.0.8</version> </dependency>
-
非 spring-cloud 场景下的 nacos 应用接入用此依赖
<dependency> <groupId>cn.dynamictp</groupId> <artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId> <version>1.0.8</version> </dependency>
注意版本:nacos-config-spring-boot-starter 0.2.10 及以下版本对应 springboot 2.3.12.RELEASE及以下版本, 0.2.11-beta及以上版本对应springboot 版本2.4.0及以上版本,具体看官方说明
配置文件
线程池定义可以配置在文件中,然后在Spring应用中可以通过@Resource、@Autowire、或通过工具类来获取实例,以下配置文件,除了维护公共属性还定义了名称为austin.im.notice、execute-xxl-thread-pool线程池,其中格式支持yml、properties 类型、json 类型、zk文件
spring:
dynamic:
tp:
enabled: true
enabledBanner: true # 是否开启banner打印,默认true
enabledCollect: true # 是否开启监控指标采集,默认false
collectorType: micrometer # 监控数据采集器类型(JsonLog | MicroMeter),默认logging
monitorInterval: 5 # 监控时间间隔(报警判断、指标采集),默认5s
apollo: # apollo配置,不配置默认拿apollo配置第一个namespace
namespace: dynamic-tp-apollo-dtp.yml
configType: yml
platforms:
- platform: wechat
urlKey: 38aa7eff500-1287
receivers: apollo
- platform: ding
urlKey: f80dad441fcd65bac48473d4a88dcd6a
secret: SECb544445a6a34f0315d08b17de41
receivers: 18888888888
executors:
- threadPoolName: austin.im.notice
corePoolSize: 6
maximumPoolSize: 8
queueCapacity: 200
queueType: VariableLinkedBlockingQueue # 任务队列,查看源码QueueTypeEnum枚举类
rejectedHandlerType: CallerRunsPolicy # 拒绝策略,查看RejectedTypeEnum枚举类
keepAliveTime: 50
allowCoreThreadTimeOut: false
threadNamePrefix: austin- # 线程名前缀
- threadPoolName: execute-xxl-thread-pool
corePoolSize: 3
maximumPoolSize: 3
queueCapacity: 200
queueType: VariableLinkedBlockingQueue # 任务队列,查看源码QueueTypeEnum枚举类
rejectedHandlerType: CallerRunsPolicy # 拒绝策略,查看RejectedTypeEnum枚举类
keepAliveTime: 50
allowCoreThreadTimeOut: false
threadNamePrefix: austin- # 线程名前缀
notifyItems: # 报警项,不配置自动会配置(变更通知、容量报警、活性报警、拒绝报警)
- type: capacity # 报警项类型,查看源码 NotifyTypeEnum枚举类
enabled: true
threshold: 80 # 报警阈值
platforms: [ding,wechat] # 可选配置,不配置默认拿上层platforms配置的所以平台
interval: 120 # 报警间隔(单位:s)
- type: change
enabled: true
- type: liveness
enabled: true
threshold: 80
- type: reject
enabled: true
threshold: 1
代码使用
线程池实例定义
如果不再配置文件中进行定义,也可以在代码中直接定义
@Configuration
public class DtpConfig {
/**
* 通过{@link DynamicTp} 注解定义普通juc线程池,会享受到该框架监控功能,注解名称优先级高于方法名
*
* @return 线程池实例
*/
@DynamicTp("commonExecutor")
@Bean
public ThreadPoolExecutor commonExecutor() {
return (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
}
/**
* 通过{@link ThreadPoolCreator} 快速创建一些简单配置的动态线程池
* tips: 建议直接在配置中心配置就行,不用@Bean声明
*
* @return 线程池实例
*/
@Bean
public DtpExecutor dtpExecutor1() {
return ThreadPoolCreator.createDynamicFast("dtpExecutor1");
}
/**
* 通过{@link ThreadPoolBuilder} 设置详细参数创建动态线程池(推荐方式),
* ioIntensive,参考tomcat线程池设计,实现了处理io密集型任务的线程池,具体参数可以看代码注释
*
* tips: 建议直接在配置中心配置就行,不用@Bean声明
* @return 线程池实例
*/
@Bean
public DtpExecutor ioIntensiveExecutor() {
return ThreadPoolBuilder.newBuilder()
.threadPoolName("ioIntensiveExecutor")
.corePoolSize(20)
.maximumPoolSize(50)
.queueCapacity(2048)
.ioIntensive(true)
.buildDynamic();
}
/**
* tips: 建议直接在配置中心配置就行,不用@Bean声明
* @return 线程池实例
*/
@Bean
public ThreadPoolExecutor dtpExecutor2() {
return ThreadPoolBuilder.newBuilder()
.threadPoolName("dtpExecutor2")
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(50)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)
.waitForTasksToCompleteOnShutdown(true)
.awaitTerminationSeconds(5)
.buildDynamic();
}
}
代码调用
从DtpRegistry中根据线程池名称获取,或者通过依赖注入方式(推荐,更优雅)
- 依赖注入方式使用,优先推荐依赖注入方式,不能使用依赖注入的场景可以使用方式2
@Resource
private ThreadPoolExecutor dtpExecutor1;
public void exec() {
dtpExecutor1.execute(() -> System.out.println("test"));
}
- 通过DtpRegistry注册器获取
public static void main(String[] args) {
DtpExecutor dtpExecutor = DtpRegistry.getDtpExecutor("dtpExecutor1");
dtpExecutor.execute(() -> System.out.println("test"));
}
通知报警
调参通知
运行报警
线程池活跃度告警
活跃度 = activeCount / maximumPoolSize
队列容量告警
容量使用率 = queueSize / queueCapacity
拒绝策略告警
线程池线程数达到配置的最大线程数,且任务队列已满,再提交任务会触发拒绝策略
任务队列超时告警
重写ThreadPoolExecutor的execute()方法和beforeExecute()方法,如果配置了执行超时或排队超时值,则会进行报警
任务执行超时告警
重写ThreadPoolExecutor的afterExecute()方法,根据当前时间和beforeExecute()中设置的startTime的差值即可算出任务的实际执行时间,然后判断如果差值大于配置的runTimeout则累加排队超时任务数量,则会进行告警
监控
支持接入prometheus+grafana做监控,效果如下
总结
DynamicTcp是一个功能实用,上手简单动态线程池组件,很轻量对业务无侵入,目前我们业务系统已经开始介入,读者朋友们赶紧使用起来吧。
标签:报警,DynamicTp,配置,Bean,开源,线程,监控,true From: https://www.cnblogs.com/waldron/p/16733047.html