首页 > 其他分享 >线程池监控方案

线程池监控方案

时间:2023-05-01 16:06:09浏览次数:44  
标签:方案 return 线程 监控 executor public pool ThreadPoolExecutor


线程池监控方案_线程池监控

5ycode

某信贷cto,专注于java技术研究与应用,包括JVM、DDD、软件设计、源码阅读、以及经验分享

9篇原创内容

公众号

读了Java线程池实现原理及其在美团业务中的实践 后,我就想一个问题,如果让我去做这个线程池的监控,我该怎么做?

要对线程池进行监控,首先得明白,我们监控线程池的目的是什么?

监控是为了防患于未然,防止生产事故的发生。或者能在未发生时就进行入状态。

出问题线程池的现象:

  • 线程池异步处理,消费速度过慢,导致任务积压,响应过慢,或者队列有限,导致提交被拒绝;
  • 使用线程池做并行请求的时候,请求量过大,处理积压,导致响应变慢;
  • 业务评估不准确,导致线程池资源设置的合理;

对线程池监控的指标有以下几种:

1,队列饱和度;

2,单位时间内提交任务的速度远大于消费速度;

监控方案:

方案一:继承ThreadPoolExecutor对部分方法进行重写

/\*\*
 \* 创建可监控的线程池
 \* @author yxkong
 \* @version 1.0
 \* @date 2021/3/22 13:29
 \*/
public class ThreadPoolExecutorMonitor  extends ThreadPoolExecutor {

    public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public void shutdown() {
        //获取执行任务
        this.getCompletedTaskCount();
        //获取正在运行的线程数
        this.getActiveCount();
        //获取任务数
        this.getTaskCount();
        //队列剩余个数
        this.getQueue().size();
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                //获取线程执行结果
                Object result = ((Future<?>) r).get();
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt(); // ignore/reset
            }
        }
        if (t != null) {
            //处理异常
            System.out.println(t);
        }
        //记录线程执行时间
    }
}

方案二:自定义ThreadFactory、BlockingQueue、RejectedExecutionHandler

  • ThreadFactory:是了为了线程的命名,方便统一管理;
  • BlockingQueue:是为能动态调整队列的长度(数组扩缩容时,需要考虑锁以及性能,链表不用考虑)
  • RejectedExecutionHandler: 队列满了如何处理(可以动态扩容,小心把jvm撑爆,或者无法创建队列)
public class NamedThreadFactory implements ThreadFactory, Serializable {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    public NamedThreadFactory(String name) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
        namePrefix = name +poolNumber.getAndIncrement() +"-thread-";
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(), 0);
        if (t.isDaemon()){
            t.setDaemon(false);
        }
        if (t.getPriority() != Thread.NORM\_PRIORITY){
            t.setPriority(Thread.NORM\_PRIORITY);
        }
        return t;
    }
}
//自定义 LinkedBlockingQueue,将队列长度对外暴露可修改
public class CustomLinkedBlockingQueue <E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable 
}
public class MyRejectPolicy implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        //自定义处理逻辑 ,比如监控报警,队列满了
    }
}

自定义线程池

/\*\*
     \* 自定义业务线程池
     \* @return
     \*/
    @Bean("bizThreadPool")
    public ThreadPoolExecutor bizThreadPool(){
        return new ThreadPoolExecutor(5,
                10,
                200,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10),
                new NamedThreadFactory("bizThreadPool"));
    }

    /\*\*
     \* 自定义log线程池
     \* @return
     \*/
    @Bean("logThreadPool")
    public ThreadPoolExecutor logThreadPool(){
        return new ThreadPoolExecutor(5,
                10,
                200,
                TimeUnit.SECONDS,
                new CustomLinkedBlockingQueue<>(10),
                new NamedThreadFactory("bizThreadPool"));

针对线程池的监控以及动态调整

@RestController
@RequestMapping("/threadpool")
@Slf4j
public class ThreadPoolController {

    /\*\*
     \* 收集所有的线程池,线程池建议自己手动实现,不要用spring默认的
     \* 这里是偷懒了,用了spring的特性,如果是java项目,实现后自己注册
     \*/

    @Autowired
    public Map<String, ThreadPoolExecutor> map;

    /\*\*
     \* 获取所有的线程池
     \* @return
     \*/
    @GetMapping("/list")
    public ResultBean<Map<String,ThreadPoolExecutor>> list(){
        return ResultBeanUtil.success("获取所有线程池成功!",map);
    }
    @GetMapping("/get")
    public ResultBean<ThreadPoolExecutor> getThreadPool(String threadPool){
        ThreadPoolExecutor executor = map.get(threadPool);
        if(executor == null){
            return ResultBeanUtil.noData("未找到对应的线程池");
        }
        return ResultBeanUtil.success("获取线程池成功!",executor);
    }
    @PostMapping("/modify")
    public ResultBean<ThreadPoolExecutor> modifyThreadPool(String threadPool,Integer coreSize,Integer maximumPoolSize,Integer capacity){
        ThreadPoolExecutor executor = map.get(threadPool);
        if(executor == null){
            return ResultBeanUtil.noData("未找到对应的线程池");
        }
        executor.setCorePoolSize(coreSize);
        executor.setMaximumPoolSize(maximumPoolSize);
        //启动所有的核心线程数,getTask中不会根据核心线程数修改workers,如果再有新线程,会动态调整
        executor.prestartAllCoreThreads();
        //如果将线程池改小,设置下,默认核心线程数是不会回收的
        executor.allowCoreThreadTimeOut(true);
        BlockingQueue<Runnable> queue = executor.getQueue();
        if(queue instanceof CustomLinkedBlockingQueue){
            CustomLinkedBlockingQueue customQueue = (CustomLinkedBlockingQueue) queue;
            customQueue.setCapacity(capacity);
        }
        return ResultBeanUtil.success("获取线程池成功!",executor);
    }
    @PostMapping("test")
    public ResultBean<Void> test(String threadPool,Integer size){
        if (size == null || size ==0){
            return ResultBeanUtil.paramEmpty("size不能为空");
        }
        ThreadPoolExecutor executor = map.get(threadPool);
        if(executor == null){
            return ResultBeanUtil.noData("未找到对应的线程池");
        }
        for (int i = 0; i < size; i++) {
            int finalI = i;
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    log.info("任务{}执行",Integer.valueOf(finalI));
                }
            });
        }
        return ResultBeanUtil.success();
    }
}

方案三:通过agent进行监控,并对外暴露http服务

这里需要注意几点:

1,ThreadPoolExecutor 是由Bootstrap ClassLoader加载,承载的线程池的类必须也是Bootstrap ClassLoader 加载,否则会出现找不到类定义的问题;

2,如果是实现ThreadPoolExecutor自定义的的Executor类,不需要考虑类加载的问题;

问题一的解决方案:

1,使用-Xbootclasspath/a: …/a.jar 让承载容器由Bootstrap ClassLoader加载;

2,使用byte-buddy 增强某个类,强制让Bootstrap ClassLoader加载

/\*\*
     \* 针对threadPoolExecutor 的增强
     \* @param instrumentation
     \*/
    private static void threadPoolExecutor(Instrumentation instrumentation){
        new AgentBuilder.Default()
                .disableClassFormatChanges()
                //默认是不对bootstrap类加载器加载的对象instrumentation,忽略某个type后,就可以了
                .ignore(ElementMatchers.noneOf(ThreadPoolExecutor.class))
                //
                .with(AgentBuilder.InitializationStrategy.NoOp.INSTANCE)
                //
                .with(AgentBuilder.RedefinitionStrategy.REDEFINITION)
                .with(AgentBuilder.TypeStrategy.Default.REDEFINE)
                .with(AgentBuilder.InjectionStrategy.UsingUnsafe.INSTANCE)
                .type(ElementMatchers.is(ThreadPoolExecutor.class))
                //.or(ElementMatchers.hasSuperType(ElementMatchers.named("java.util.concurrent.Executor")))
                //.or(ElementMatchers.hasSuperType(ElementMatchers.named("java.util.concurrent.ExecutorService")))
                .transform((builder, typeDescription, classLoader, javaModule) ->
                        builder.visit(Advice.to(ThreadPoolExecutorFinalizeAdvice.class).on(ElementMatchers.named("finalize")))
                                .visit(Advice.to(ThreadPoolExecutorExecuteAdvice.class).on(ElementMatchers.named("execute")))
                )
                .installOn(instrumentation);
    }

暴露一个统一的接口,不需要各项目去实现。

public class MonitorTest {

    @Test
    public void test(){
        System.out.println(ThreadPoolMonitorData.class.getClassLoader());
        System.out.println(ThreadPoolMonitorData.alls());
        System.out.println(ThreadPoolMonitor.class.getClassLoader());
        ThreadPoolExecutor pool= threadpool();
        pool.submit(()->{
            System.out.println("线程池pool执行中1:"+Thread.currentThread().getName());
        });
        pool.submit(()->{
            System.out.println("线程池pool执行中2:"+Thread.currentThread().getName());
        });
        pool.submit(()->{
            System.out.println("线程池pool执行中3:"+Thread.currentThread().getName());
        });

        ExecutorService executorService =  threadpool1();
        executorService.submit(()->{
            System.out.println("线程池executorService执行中1:"+Thread.currentThread().getName());
        });
        ThreadPoolMonitorData.alls().forEach((key,val) ->{
            System.out.println("ThreadPoolMonitorData key="+key+" val:"+val);
        });

        ThreadPoolMonitor monitor = new ThreadPoolMonitor();
        monitor.alls().forEach((key,val)->{
            System.out.println("ThreadPoolMonitor key="+key+" val:"+val);
        });

        try {
            Thread.sleep(3000);
        }catch (Exception e){
            e.printStackTrace();
        }

    }


    private ThreadPoolExecutor threadpool(){
        ThreadPoolExecutor pool =  new ThreadPoolExecutor(5,
                10,
                200,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10));
        return pool;
    }
    private  ExecutorService threadpool1(){
        return Executors.newCachedThreadPool();
    }
}
public class ThreadPoolExecutorExecuteAdvice {
    /\*\*
     \* 对所有的线程的execute 进入方法进行监听
     \* byteBuddy不支持对constructor
     \* @Advice.OnMethodEnter 必须作用与static方法
     \* @param obj
     \* @param abc
     \*/
    @Advice.OnMethodEnter
    public static void executeBefore(@Advice.This Object obj,@Advice.Argument(0) Object abc){
       try{
           ThreadPoolExecutor executor = (ThreadPoolExecutor) obj;
           ThreadPoolMonitorData.add(executor.hashCode()+"",(ThreadPoolExecutor) obj);
       }catch (Exception e){
           e.printStackTrace();
       }
    }
}
null   BootstrapClassLoader 输出是null
{}
sun.misc.Launcher$AppClassLoader@18b4aac2
线程池pool执行中1:pool-3-thread-1
线程池pool执行中2:pool-3-thread-2
线程池pool执行中3:pool-3-thread-3
线程池executorService执行中1:pool-4-thread-1
ThreadPoolMonitorData key=1564698139 val:java.util.concurrent.ThreadPoolExecutor@5d43661b\[Running, pool size = 3, active threads = 0, queued tasks = 0, completed tasks = 3\]
ThreadPoolMonitorData key=171421438 val:java.util.concurrent.ThreadPoolExecutor@a37aefe\[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1\]

监控获取到的数据,需要在某个地方进行统一采集。

建议的方案是:统一标准 以及 agent采集,根据实际情况采集需要的数据进行监控以及动态调整。
具体代码实现,请看:
线程池监控-bytebuddy-agent模式

线程池监控方案_线程池监控_02


标签:方案,return,线程,监控,executor,public,pool,ThreadPoolExecutor
From: https://blog.51cto.com/yxkong/6238931

相关文章

  • SRIO接口卡航电总线解决方案
    TES600是天津拓航科技的一款基于FPGA与DSP协同处理架构的通用高性能实时信号处理平台,该平台采用1片TI的KeyStone系列多核浮点/定点DSPTMS320C6678作为主处理单元,采用1片Xilinx的Kintex-7系列FPGAXC7K325T作为协处理单元,具有1个FMC子卡接口,具有4路SFP+万兆光纤接口,处理节点之间通......
  • Java线程池中的四种拒绝策略
    CallerRunsPolicy:这是默认的拒绝策略,当线程池队列已满并且无法处理新任务时,将由提交任务的线程来执行该任务。这种策略可以降低新任务的流量,但也会增加提交任务的线程的负载。AbortPolicy:当线程池队列已满并且无法处理新任务时,将抛出RejectedExecutionException异常,阻止新任......
  • Linux各监控指标及问题排查
    htop/tophtop足以覆盖大多数指标,详细直接查看帮助即可。sort:bymem/cpu/state.根据进程状态排序也至关重要,特别在loadaverage过高的时候。根据内存以及CPU使用率排序用以定位高资源占用者。filterfieldsprocess/count...CPU基本信息在linux中一切皆文件,查看/proc/cpuin......
  • 线程安全
    1、前言 先看看下面的代码输出是什么:publicclassTestSyncimplementsRunnable{intb=100;synchronizedvoidm1()throwsInterruptedException{b=1000;Thread.sleep(500);//6System.out.println("b="+b);......
  • 从CPU的视角看 多线程代码为什么那么难写!
      当我们提到多线程、并发的时候,我们就会回想起各种诡异的bug,比如各种线程安全问题甚至是应用崩溃,而且这些诡异的bug还很难复现。我们不禁发出了灵魂拷问“为什么代码测试环境运行好好的,一上线就不行了?”。为了解决线程安全的问题,我们的先辈们在编程语言中引入了各种各样新名......
  • Prometheus 监控系统安装
    目录下载启动使用参考Prometheus既是一个时序数据库,又是一个监控系统,更是一套完备的监控生态解决方案。本文简要介绍Prometheus的安装和使用。下载根据系统下载Download版本,并解压tarxvfzprometheus-*.tar.gzcdprometheus-*启动./prometheus--config.file=prometh......
  • cocoa线程同步synchronized
    synchronized关键字   1.synchronized方法:通过在方法声明中加入synchronized关键字来声明synchronized方法。如:publicsynchronizedvoidaccessVal(intnewVal);synchronized方法控制对类成员变量的访问:每个类实例对应一把锁,每个synchroni......
  • Qt中Socket跨线程通讯问题
    对于一个QTcpServer服务器来说,每当有新客户端连接时,系统会为其分配一个新的QTcpSocket对象进行管理。默认情况下,在incomingConnection函数中创建的QTcpSocket对象将在应用程序主线程中运行,而不是连接所需的处理线程中运行。如果开发者需要确保收到数据的顺序以及避免线程竞争,可以......
  • Android开发多线程断点续传下载器
    111Android开发多线程断点续传下载器分类: Android 2011-10-0123:14931人阅读 评论(8) 收藏 举报使用多线程断点续传下载器在下载的时候多个线程并发可以占用服务器端更多资源,从而加快下载速度,在下载过程中记录每个线程已拷贝数据的数......
  • 苏州服务器托管方案|关于天安保险数据中心解决方案
    前言   网上金融贸易越来越发达,导致越来越多的银行、证券、保险等金融行业不断进行线上业务拓展。这也使得海量的敏感数据存储和维护成为亘需攻克的难题。面对如此庞大的需求,金融企业一般面临两个选择:自建数据中心,或者将服务器托管。在金融领域,瞄准保险行业的IDC需求,苏州胜网......