本文的代码是对一个Java基于阻塞的定时消费内存队列 - Jackie_JK - 博客园 (cnblogs.com)
方法的改进,完善了包装以及部分细节,非jdk21可能需要更换线程池类型。
消费类型:
@Getter
@AllArgsConstructor
public enum PushType {
ELASTIC,
SQL,
;
}
队列参数枚举:
@Getter
@AllArgsConstructor
public enum QueueEnum {
A(PushType.SQL, 3000, 2, 200, Duration.of(20, ChronoUnit.SECONDS)),
B(PushType.SQL, 3000, 2, 200, Duration.of(20, ChronoUnit.SECONDS)),
C(PushType.SQL, 3000, 2, 200, Duration.of(20, ChronoUnit.SECONDS)),
D(PushType.SQL, 3000, 2, 200, Duration.of(20, ChronoUnit.SECONDS)),
;
private final PushType pushType;
//队列长度
private final int capacity;
//消费线程数
private final int threads;
//单次消费数量
private final int batchNum;
//最长阻塞时间
private final Duration timeout;
}
初始化以及消费方法:
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class InsertQueue {
private static final Map<QueueEnum, LinkedBlockingQueue<Object>> TYPE_QUEUE_MAP = new EnumMap<>(QueueEnum.class);
private static final ExecutorService EXECUTOR_SERVICE = Executors.newVirtualThreadPerTaskExecutor();
public static <T> void init(QueueEnum type, Consumer<List<T>> task) {
if (TYPE_QUEUE_MAP.containsKey(type)) {
throw new IllegalStateException("重复初始化");
}
log.info("[" + InsertQueue.class.getSimpleName() + "]" + type + "队列长度" + type.getCapacity());
TYPE_QUEUE_MAP.put(type, new LinkedBlockingQueue<>(type.getCapacity()));
for (int i = 0; i < type.getThreads(); i++) {
EXECUTOR_SERVICE.submit(() -> {
while (true) {
try {
task.accept(pop(type));
} catch (Exception e) {
log.error("", e);
}
}
});
}
}
public static boolean push(QueueEnum type, Object obj) {
LinkedBlockingQueue<Object> blockingQueue = TYPE_QUEUE_MAP.get(type);
if (blockingQueue == null) {
throw new IllegalStateException("未初始化");
}
return blockingQueue.offer(obj);
}
private static <T> List<T> pop(QueueEnum type) throws InterruptedException {
//拿到batchSize个元素或超过maxWaitTime才返回,返回容器不会为空
LinkedBlockingQueue<Object> blockingQueue = TYPE_QUEUE_MAP.get(type);
BlockingQueue<T> queue = (BlockingQueue<T>) blockingQueue;
List<T> buffer = new ArrayList<>(type.getBatchNum());
try {
Queues.drain(queue, buffer, type.getBatchNum(), type.getTimeout());
return buffer;
} catch (InterruptedException ex) {
log.error("[" + InsertQueue.class.getSimpleName() + "]" + type + "队列获取元素中断");
throw ex;
}
}
public static int getQueueSize(QueueEnum type) {
return TYPE_QUEUE_MAP.get(type).size();
}
}
使用前需要调用#init(QueueEnum type, Consumer<List<T>> task)
往后只需要调用#push(QueueEnum type, Object obj) 方法即可
标签:MAP,QueueEnum,Java,PushType,private,内存,guava,type,final From: https://www.cnblogs.com/JackieJK/p/18238770