5ycode
某信贷cto,专注于java技术研究与应用,包括JVM、DDD、软件设计、源码阅读、以及经验分享
9篇原创内容
公众号
读了Java线程池实现原理及其在美团业务中的实践 后,我就想一个问题,如果让我去做这个线程池的监控,我该怎么做?
要对线程池进行监控,首先得明白,我们监控线程池的目的是什么?
监控是为了防患于未然,防止生产事故的发生。或者能在未发生时就进行入状态。
出问题线程池的现象:
- 线程池异步处理,消费速度过慢,导致任务积压,响应过慢,或者队列有限,导致提交被拒绝;
- 使用线程池做并行请求的时候,请求量过大,处理积压,导致响应变慢;
- 业务评估不准确,导致线程池资源设置的合理;
对线程池监控的指标有以下几种:
1,队列饱和度;
2,单位时间内提交任务的速度远大于消费速度;
监控方案:
方案一:继承ThreadPoolExecutor对部分方法进行重写
/\*\*
\* 创建可监控的线程池
\* @author yxkong
\* @version 1.0
\* @date 2021/3/22 13:29
\*/
public class ThreadPoolExecutorMonitor extends ThreadPoolExecutor {
public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
public void shutdown() {
//获取执行任务
this.getCompletedTaskCount();
//获取正在运行的线程数
this.getActiveCount();
//获取任务数
this.getTaskCount();
//队列剩余个数
this.getQueue().size();
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return super.shutdownNow();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
//获取线程执行结果
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null) {
//处理异常
System.out.println(t);
}
//记录线程执行时间
}
}
方案二:自定义ThreadFactory、BlockingQueue、RejectedExecutionHandler
- ThreadFactory:是了为了线程的命名,方便统一管理;
- BlockingQueue:是为能动态调整队列的长度(数组扩缩容时,需要考虑锁以及性能,链表不用考虑)
- RejectedExecutionHandler: 队列满了如何处理(可以动态扩容,小心把jvm撑爆,或者无法创建队列)
public class NamedThreadFactory implements ThreadFactory, Serializable {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public NamedThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = name +poolNumber.getAndIncrement() +"-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon()){
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM\_PRIORITY){
t.setPriority(Thread.NORM\_PRIORITY);
}
return t;
}
}
//自定义 LinkedBlockingQueue,将队列长度对外暴露可修改
public class CustomLinkedBlockingQueue <E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
}
public class MyRejectPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//自定义处理逻辑 ,比如监控报警,队列满了
}
}
自定义线程池
/\*\*
\* 自定义业务线程池
\* @return
\*/
@Bean("bizThreadPool")
public ThreadPoolExecutor bizThreadPool(){
return new ThreadPoolExecutor(5,
10,
200,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10),
new NamedThreadFactory("bizThreadPool"));
}
/\*\*
\* 自定义log线程池
\* @return
\*/
@Bean("logThreadPool")
public ThreadPoolExecutor logThreadPool(){
return new ThreadPoolExecutor(5,
10,
200,
TimeUnit.SECONDS,
new CustomLinkedBlockingQueue<>(10),
new NamedThreadFactory("bizThreadPool"));
针对线程池的监控以及动态调整
@RestController
@RequestMapping("/threadpool")
@Slf4j
public class ThreadPoolController {
/\*\*
\* 收集所有的线程池,线程池建议自己手动实现,不要用spring默认的
\* 这里是偷懒了,用了spring的特性,如果是java项目,实现后自己注册
\*/
@Autowired
public Map<String, ThreadPoolExecutor> map;
/\*\*
\* 获取所有的线程池
\* @return
\*/
@GetMapping("/list")
public ResultBean<Map<String,ThreadPoolExecutor>> list(){
return ResultBeanUtil.success("获取所有线程池成功!",map);
}
@GetMapping("/get")
public ResultBean<ThreadPoolExecutor> getThreadPool(String threadPool){
ThreadPoolExecutor executor = map.get(threadPool);
if(executor == null){
return ResultBeanUtil.noData("未找到对应的线程池");
}
return ResultBeanUtil.success("获取线程池成功!",executor);
}
@PostMapping("/modify")
public ResultBean<ThreadPoolExecutor> modifyThreadPool(String threadPool,Integer coreSize,Integer maximumPoolSize,Integer capacity){
ThreadPoolExecutor executor = map.get(threadPool);
if(executor == null){
return ResultBeanUtil.noData("未找到对应的线程池");
}
executor.setCorePoolSize(coreSize);
executor.setMaximumPoolSize(maximumPoolSize);
//启动所有的核心线程数,getTask中不会根据核心线程数修改workers,如果再有新线程,会动态调整
executor.prestartAllCoreThreads();
//如果将线程池改小,设置下,默认核心线程数是不会回收的
executor.allowCoreThreadTimeOut(true);
BlockingQueue<Runnable> queue = executor.getQueue();
if(queue instanceof CustomLinkedBlockingQueue){
CustomLinkedBlockingQueue customQueue = (CustomLinkedBlockingQueue) queue;
customQueue.setCapacity(capacity);
}
return ResultBeanUtil.success("获取线程池成功!",executor);
}
@PostMapping("test")
public ResultBean<Void> test(String threadPool,Integer size){
if (size == null || size ==0){
return ResultBeanUtil.paramEmpty("size不能为空");
}
ThreadPoolExecutor executor = map.get(threadPool);
if(executor == null){
return ResultBeanUtil.noData("未找到对应的线程池");
}
for (int i = 0; i < size; i++) {
int finalI = i;
executor.submit(new Runnable() {
@Override
public void run() {
log.info("任务{}执行",Integer.valueOf(finalI));
}
});
}
return ResultBeanUtil.success();
}
}
方案三:通过agent进行监控,并对外暴露http服务
这里需要注意几点:
1,ThreadPoolExecutor 是由Bootstrap ClassLoader加载,承载的线程池的类必须也是Bootstrap ClassLoader 加载,否则会出现找不到类定义的问题;
2,如果是实现ThreadPoolExecutor自定义的的Executor类,不需要考虑类加载的问题;
问题一的解决方案:
1,使用-Xbootclasspath/a: …/a.jar 让承载容器由Bootstrap ClassLoader加载;
2,使用byte-buddy 增强某个类,强制让Bootstrap ClassLoader加载
/\*\*
\* 针对threadPoolExecutor 的增强
\* @param instrumentation
\*/
private static void threadPoolExecutor(Instrumentation instrumentation){
new AgentBuilder.Default()
.disableClassFormatChanges()
//默认是不对bootstrap类加载器加载的对象instrumentation,忽略某个type后,就可以了
.ignore(ElementMatchers.noneOf(ThreadPoolExecutor.class))
//
.with(AgentBuilder.InitializationStrategy.NoOp.INSTANCE)
//
.with(AgentBuilder.RedefinitionStrategy.REDEFINITION)
.with(AgentBuilder.TypeStrategy.Default.REDEFINE)
.with(AgentBuilder.InjectionStrategy.UsingUnsafe.INSTANCE)
.type(ElementMatchers.is(ThreadPoolExecutor.class))
//.or(ElementMatchers.hasSuperType(ElementMatchers.named("java.util.concurrent.Executor")))
//.or(ElementMatchers.hasSuperType(ElementMatchers.named("java.util.concurrent.ExecutorService")))
.transform((builder, typeDescription, classLoader, javaModule) ->
builder.visit(Advice.to(ThreadPoolExecutorFinalizeAdvice.class).on(ElementMatchers.named("finalize")))
.visit(Advice.to(ThreadPoolExecutorExecuteAdvice.class).on(ElementMatchers.named("execute")))
)
.installOn(instrumentation);
}
暴露一个统一的接口,不需要各项目去实现。
public class MonitorTest {
@Test
public void test(){
System.out.println(ThreadPoolMonitorData.class.getClassLoader());
System.out.println(ThreadPoolMonitorData.alls());
System.out.println(ThreadPoolMonitor.class.getClassLoader());
ThreadPoolExecutor pool= threadpool();
pool.submit(()->{
System.out.println("线程池pool执行中1:"+Thread.currentThread().getName());
});
pool.submit(()->{
System.out.println("线程池pool执行中2:"+Thread.currentThread().getName());
});
pool.submit(()->{
System.out.println("线程池pool执行中3:"+Thread.currentThread().getName());
});
ExecutorService executorService = threadpool1();
executorService.submit(()->{
System.out.println("线程池executorService执行中1:"+Thread.currentThread().getName());
});
ThreadPoolMonitorData.alls().forEach((key,val) ->{
System.out.println("ThreadPoolMonitorData key="+key+" val:"+val);
});
ThreadPoolMonitor monitor = new ThreadPoolMonitor();
monitor.alls().forEach((key,val)->{
System.out.println("ThreadPoolMonitor key="+key+" val:"+val);
});
try {
Thread.sleep(3000);
}catch (Exception e){
e.printStackTrace();
}
}
private ThreadPoolExecutor threadpool(){
ThreadPoolExecutor pool = new ThreadPoolExecutor(5,
10,
200,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10));
return pool;
}
private ExecutorService threadpool1(){
return Executors.newCachedThreadPool();
}
}
public class ThreadPoolExecutorExecuteAdvice {
/\*\*
\* 对所有的线程的execute 进入方法进行监听
\* byteBuddy不支持对constructor
\* @Advice.OnMethodEnter 必须作用与static方法
\* @param obj
\* @param abc
\*/
@Advice.OnMethodEnter
public static void executeBefore(@Advice.This Object obj,@Advice.Argument(0) Object abc){
try{
ThreadPoolExecutor executor = (ThreadPoolExecutor) obj;
ThreadPoolMonitorData.add(executor.hashCode()+"",(ThreadPoolExecutor) obj);
}catch (Exception e){
e.printStackTrace();
}
}
}
null BootstrapClassLoader 输出是null
{}
sun.misc.Launcher$AppClassLoader@18b4aac2
线程池pool执行中1:pool-3-thread-1
线程池pool执行中2:pool-3-thread-2
线程池pool执行中3:pool-3-thread-3
线程池executorService执行中1:pool-4-thread-1
ThreadPoolMonitorData key=1564698139 val:java.util.concurrent.ThreadPoolExecutor@5d43661b\[Running, pool size = 3, active threads = 0, queued tasks = 0, completed tasks = 3\]
ThreadPoolMonitorData key=171421438 val:java.util.concurrent.ThreadPoolExecutor@a37aefe\[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1\]
监控获取到的数据,需要在某个地方进行统一采集。
建议的方案是:统一标准 以及 agent采集,根据实际情况采集需要的数据进行监控以及动态调整。
具体代码实现,请看:
线程池监控-bytebuddy-agent模式