你知道的越多,你不知道的越多
点赞再看,养成习惯
文章目录
- 前言
- 代码实现
- 定义异步处理工具类
- 实现 java 线程池
- 新建 AppInit 实现 ApplicationRunner 接口完成启动项目时异步数据初始化
前言
前面的工作中,为了提高地区数据的响应时间,需要加载全国区划数据到 redis 中缓存起来,这个过程希望在项目时启动。
由于初始化全国区划到 redis 中这个过程是比较耗时的,所以我们可以考虑使用异步执行的方式去实现。
代码实现
定义异步处理工具类
import com.xymy.common.config.ThreadPoolExecutorUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
/**
* <h2>异步处理工具类<h2>
*
* @author xymy
* @create 2023-02-07 16:45
*/
@Slf4j
public enum RunMerger {
SM;
public void asyncHandle(String threadName, Runnable ...runs) {
this.asyncHandle(null, threadName,runs);
}
public void asyncHandle(ExecutorService seedPool, String threadName, Runnable ... runs) {
if (seedPool == null) {
seedPool = ThreadPoolExecutorUtil.getSeedPool(threadName, runs.length);
}
try {
for (Runnable run : runs) {
CompletableFuture.runAsync(() -> {
run.run();
},seedPool).exceptionally(ex -> {
log.error("处理异常", ex);
return null;
});
}
} finally {
ThreadPoolExecutorUtil.releaseSeedPool(seedPool);
}
}
public void syncHandle(Runnable ...runs) {
Arrays.stream(runs).forEach(Runnable::run);
}
public void asyncHandle(Runnable ...runs) {
this.asyncHandle(null, runs);
}
}
实现 java 线程池
- 新建 ThreadPoolExecutorUtil 类
import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.xymy.common.utils.DistrKeyGenerator;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
/**
* <h2>线程池<h2>
*
* @author xymy
* @create 2021-11-22 14:24
*/
@Slf4j
public class ThreadPoolExecutorUtil {
private static Map<String, ExecutorService> executors = new ConcurrentHashMap<>();
public static ExecutorService getSeedPool(int poolSize) {
DistrKeyGenerator keyGenerator = new DistrKeyGenerator();
return getSeedPool(keyGenerator.generateKey() +"-", poolSize);
}
public static ExecutorService getSeedPool(String poolName, int poolSize) { // 需手动释放
if (StringUtils.isBlank(poolName)) {
DistrKeyGenerator keyGenerator = new DistrKeyGenerator();
poolName = keyGenerator.generateKey() +"-";
}
ExecutorService executorService = executors.get(poolName);
if (null == executorService) {
synchronized (ThreadPoolExecutorUtil.class) {
executorService = executors.get(poolName);
if (null == executorService) {
executorService = init(poolName,poolSize);
executors.put(poolName, executorService);
}
}
}
return executorService;
}
public static void releaseSeedPool(ExecutorService executorService) {
Set<Map.Entry<String, ExecutorService>> entries = executors.entrySet();
for (Map.Entry<String, ExecutorService> entry : entries) {
ExecutorService value = entry.getValue();
if (value == executorService) {
executors.remove(entry.getKey());
}
}
if (executorService != null) {
executorService.shutdown();
}
}
public static void releaseSeedPool(String poolName) { // 释放
ExecutorService executorService = executors.remove(poolName);
if (executorService != null) {
executorService.shutdown();
}
}
private ThreadPoolExecutorUtil(){}
private static ExecutorService init(String poolName, int poolSize) {
ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
threadFactoryBuilder.setUncaughtExceptionHandler((t, e) -> log.error( "线程[{}]异常:", t.getName(), e));
threadFactoryBuilder.setNamePrefix("apply-pool-" + poolName);
threadFactoryBuilder.setDaemon(false);
return new ThreadPoolExecutor(poolSize,
poolSize,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(poolSize*10),
threadFactoryBuilder.build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
}
- 新建 DistrKeyGenerator 类,用来生成主键
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.1-jre</version>
</dependency>
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
/**
* 主键生成
* sharding-jdbc核心源码
* @author xymy
* @create 2019-07-24 09:37
*/
@Slf4j
@Component
public class DistrKeyGenerator {
public static final long EPOCH;
// 自增序列长度
private static final long SEQUENCE_BITS = 12L;
// workId的长度(单位是位时长度)
private static final long WORKER_ID_BITS = 10L;
private static final long SEQUENCE_MASK = (1 << SEQUENCE_BITS) - 1;
private static final long WORKER_ID_LEFT_SHIFT_BITS = SEQUENCE_BITS;
private static final long TIMESTAMP_LEFT_SHIFT_BITS = WORKER_ID_LEFT_SHIFT_BITS + WORKER_ID_BITS;
// 位运算计算workerId的最大值(workerId占10位,那么1向左移10位就是workerId的最大值)
private static final long WORKER_ID_MAX_VALUE = 1L << WORKER_ID_BITS;
private long workerId = 0;
// EPOCH就是起始时间,从2016-11-01 00:00:00开始的毫秒数
static {
Calendar calendar = Calendar.getInstance();
calendar.set(2016, Calendar.NOVEMBER, 1);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
EPOCH = calendar.getTimeInMillis();
}
private long sequence;
private long lastTime;
public DistrKeyGenerator() {
}
public DistrKeyGenerator(long workerId) {
Preconditions.checkArgument(workerId >= 0L && workerId < WORKER_ID_MAX_VALUE);
this.workerId = workerId;
}
/**
* Generate key.
* 生成id
* @return key type is @{@link Long}.
*/
public synchronized long generateKey() {
long currentMillis = getCurrentMillis();
// 每次取分布式唯一ID的时间不能少于上一次取时的时间
Preconditions.checkState(lastTime <= currentMillis, "Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds", lastTime, currentMillis);
// 如果同一毫秒范围内,那么自增,否则从0开始
if (lastTime == currentMillis) {
// 如果自增后的sequence值超过4096,那么等待直到下一个毫秒
if (0L == (sequence = ++sequence & SEQUENCE_MASK)) {
currentMillis = waitUntilNextTime(currentMillis);
}
} else {
sequence = 0;
}
// 更新lastTime的值,即最后一次获取分布式唯一ID的时间
lastTime = currentMillis;
if (log.isDebugEnabled()) {
log.debug("{}-{}-{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(lastTime)), workerId, sequence);
}
// 从这里可知分布式唯一ID的组成部分;
return ((currentMillis - EPOCH) << TIMESTAMP_LEFT_SHIFT_BITS) | (workerId << WORKER_ID_LEFT_SHIFT_BITS) | sequence;
}
// 获取下一毫秒的方法:死循环获取当前毫秒与lastTime比较,直到大于lastTime的值;
private long waitUntilNextTime(final long lastTime) {
long time = getCurrentMillis();
while (time <= lastTime) {
time = getCurrentMillis();
}
return time;
}
/**
* Get current millis.
*
* @return current millis
*/
private long getCurrentMillis() {
return System.currentTimeMillis();
}
}
新建 AppInit 实现 ApplicationRunner 接口完成启动项目时异步数据初始化
@Component
@Slf4j
public class AppInit implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) {
RunMerger.SM.asyncHandle(() -> {
log.info("项目启动完成,开始初始化全国区划数据...");
//省略数据初始化的代码,此处为耗时的操作
String countryCode = "xxxxx";
xxService.initAreaInfoList(countryCode);
log.info("项目启动完成,完成初始化全国区划数据...");
});
}
}
至此,就可以实现 springboot 项目启动时异步执行初始化逻辑。