视频通话源码,使用线程池的两大要点分析:
1、实现动态调整线程池参数
2、对线程池运行情况进行监控
一,线程池可调整的参数
1、核心线程数
2、超时时间
3、最大线程数
4、拒绝策略
而队列BlockingQueue因为是final类型,所以没有对外修改入口。但可以通过重写LinkedBlockingQueue并把capacity设置为非final。
二,结合配置中心实现动态调整
这里的配置中心使用Apollo, 通过监听配置中心变化,然后更新视频通话源码的线程池配置。示例代码如下:
@Slf4j @Component public class DynamicThreadPoolConfig { /** 线程执行器 **/ private volatile ThreadPoolExecutor executor; /** 核心线程数 **/ private Integer corePoolSize = 10; /** 最大值线程数 **/ private Integer maximumPoolSize = 20; /** 待执行任务的队列的长度 **/ private Integer workQueueSize = 1000; /** 线程空闲时间 **/ private Long keepAliveTime = 1000L; /** 线程名 **/ private String threadName; private Config config = ConfigService.getConfig("项目配置中心namespace"); public DynamicThreadPoolConfig() { init(config); } /** * 初始化 */ private void init(Config config) { log.info("线程池初始化中.........."); if (executor == null) { synchronized (DynamicThreadPoolConfig.class) { if (executor == null) { String corePoolSizeProperty = config.getProperty("corePoolSize", corePoolSize.toString()); log.info("修改前的核心线程池:{}",corePoolSizeProperty); String maximumPoolSizeProperty = config.getProperty("maximumPoolSize", maximumPoolSize.toString()); String keepAliveTImeProperty = config.getProperty("keepAliveTime", keepAliveTime.toString()); BlockingQueue<Runnable> workQueueProperty = new LinkedBlockingQueue<>(workQueueSize); executor = new ThreadPoolExecutor(Integer.valueOf(corePoolSizeProperty), Integer.valueOf(maximumPoolSizeProperty), Long.valueOf(keepAliveTImeProperty), TimeUnit.MILLISECONDS, workQueueProperty); } } } } /** * 监听到配置中心发生变化后,更新线程池配置 * @param changeEvent */ @ApolloConfigChangeListener public void onChange(ConfigChangeEvent changeEvent){ log.info("线程池参数配置发生变化,namespace:{}",changeEvent.getNamespace()); for(String key : changeEvent.changedKeys()){ ConfigChange change = changeEvent.getChange(key); String newValue = change.getNewValue(); refreshThreadPool(key,newValue); } } /** * 更新线程池配置 * @param key * @param newValue */ private void refreshThreadPool(String key, String newValue) { if (executor == null) { return; } if (ParamsEnum.CORE_POOL_SIZE.getParam().equals(key)) { executor.setCorePoolSize(Integer.valueOf(newValue)); log.info("修改核心线程数key={},value={}",key,newValue); } if (ParamsEnum.MAXIMUM_POOL_SIZE.getParam().equals(key)) { executor.setMaximumPoolSize(Integer.valueOf(newValue)); log.info("修改最大线程数key={},value={}", key, newValue); } if (ParamsEnum.KEEP_ALIVE_TIME.getParam().equals(key)) { executor.setKeepAliveTime(Integer.valueOf(newValue), TimeUnit.MILLISECONDS); log.info("修改线程空闲时间key={},value={}", key, newValue); } } public ThreadPoolExecutor getExecutor() { return executor; } } @AllArgsConstructor public enum ParamsEnum { CORE_POOL_SIZE("apollo.async.executor.thread.core_pool_size", "核心线程数"), MAXIMUM_POOL_SIZE("dynamic.maximumPoolSize", "最大线程数"), KEEP_ALIVE_TIME("dynamic.keepAliveTime", "线程空闲时间"), ; @Getter private String param; @Getter private String desc; }
三,监控方式
修改视频通话源码的线程池有关参数重要,但知道何时修改同样重要,可以考虑间隔一段时间进行采集,通过日志输出,达到临界点后告警。
同样,ThreadPoolExecutor也提供获取线程池相关信息的API:
这里通过一个定时任务进行统计,需要注意的是启动类上需要加上EnableScheduling注解
@Slf4j @Component @Async @ConditionalOnBean(DynamicThreadExecutor.class) public class ThreadPoolMonitorSchedule { @Autowired private DynamicThreadExecutor dynamicThreadExecutor; @Scheduled(fixedDelay = 2000) public void watchThreadPoolInfo(){ log.info("开始统计线程池相关数据"); ThreadPoolExecutor threadPoolExecutor = dynamicThreadExecutor.getExecutor(); BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue(); //线程活跃度:活跃线程数趋向于maximumPoolSize的时候,代表线程负载趋高。 log.info("核心线程数:{},活动线程数:{},最大线程数:{},线程池活跃度:{},任务完成数:{}," + "队列大小:{},当前排队线程数:{},队列剩余大小:{},队列使用度:{}", threadPoolExecutor.getCorePoolSize(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getMaximumPoolSize(), divide(threadPoolExecutor.getActiveCount(), threadPoolExecutor.getMaximumPoolSize()), threadPoolExecutor.getCompletedTaskCount(), (queue.size() + queue.remainingCapacity()), queue.size(), queue.remainingCapacity(), divide(queue.size(), queue.size() + queue.remainingCapacity())); } private String divide(int num1,int num2){ return String.format("%1.2f%%",Double.parseDouble(num1+"") / Double.parseDouble(num2+"")); } } /** *启动类 */ @MapperScan({"com.demo.dao"}) @SpringBootApplication @EnableScheduling public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
以上就是视频通话源码,使用线程池的两大要点分析, 更多内容欢迎关注之后的文章
标签:String,private,通话,源码,线程,key,executor,newValue From: https://www.cnblogs.com/yunbaomengnan/p/18261848