利用Redis实现队列
先进先出队列: https://www.cnblogs.com/LiuFqiang/p/16366813.html
延迟队列: https://www.cnblogs.com/LiuFqiang/p/16592522.html
定长队列: https://www.cnblogs.com/LiuFqian/p/17372463.html
在使用Redis做消息队列的时候,需要配置队列属性的bean,如果自己项目生产自己项目消费,还需要配置消费者的bean,如此一来,项目免不了需要大量重复且不可缺少的配置bean的代码
所以这里借助spring自动装配的原理,自动注入这些bean,减去重复的配置文件
首先申明注解
/**
* 标记是否开启redis队列
*
* @author: liufuqiang
* @date: 2023-10-30 17:00
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Import(value = {ImportRedisAutoConfigure.class, RedisConfig.class})
public @interface EnableRedisQueue {
String[] basePackages() default {""};
}
如果项目需要开启redis队列的生产或者消费,可以在配置文件加入这个注解
配置文件中新加redis的一些连接与队列信息
/**
* reds队列配置信息
*
* @author: liufuqiang
* @date: 2023-10-30 17:10
*/
@Data
@ConfigurationProperties(prefix = RedisConfig.REDIS_QUEUE)
public class RedisConfig implements Serializable {
public RedisConfig() {
System.out.println(1);
}
public static final String REDIS_QUEUE = "redis1";
private List<QueueInfo> infos;
@Data
public static class QueueInfo implements Serializable{
private String queueName;
private Integer threadNum = 1;
private String threadName;
private String monitorClass;
}
// @Bean
// public RedisTemplate redisTemplate() {
// RedisTemplate redisTemplate = new RedisTemplate();
// Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
// redisTemplate.setKeySerializer(new StringRedisSerializer());
// redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
// redisTemplate.setHashKeySerializer(new StringRedisSerializer());
// redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
// redisTemplate.afterPropertiesSet();
// return redisTemplate;
// }
}
/**
* redis队列自动注册
*
* @author: liufuqiang
* @date: 2023-10-30 17:02
*/
@EnableConfigurationProperties(RedisConfig.class)
public class ImportRedisAutoConfigure implements ImportBeanDefinitionRegistrar, EnvironmentAware {
private RedisConfig redisConfig;
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) {
List<RedisConfig.QueueInfo> infos = redisConfig.getInfos();
if (CollUtil.isEmpty(infos)) {
return;
}
for (RedisConfig.QueueInfo queue : infos) {
if (registry.containsBeanDefinition(queue.getQueueName())) {
continue;
}
// 注入队列基本信息bean
AbstractBeanDefinition rootBeanDefinition = BeanDefinitionBuilder.rootBeanDefinition(RedisQueue.class).getBeanDefinition();
ConstructorArgumentValues argumentValues = new ConstructorArgumentValues();
argumentValues.addIndexedArgumentValue(0, queue.getQueueName());
rootBeanDefinition.setConstructorArgumentValues(argumentValues);
registry.registerBeanDefinition(queue.getQueueName(), rootBeanDefinition);
// 注入队列消费service
final String consumerQueue = queue.getMonitorClass();
if (StrUtil.isBlank(consumerQueue)) {
continue;
}
BeanDefinitionBuilder definitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(queue.getMonitorClass());
definitionBuilder.setDestroyMethodName("destroy");
AbstractBeanDefinition beanDefinition = definitionBuilder.getBeanDefinition();
ConstructorArgumentValues values = new ConstructorArgumentValues();
values.addIndexedArgumentValue(0, queue);
values.addIndexedArgumentValue(1, rootBeanDefinition);
beanDefinition.setConstructorArgumentValues(values);
registry.registerBeanDefinition(queue.getMonitorClass(), beanDefinition);
}
}
@Override
public void setEnvironment(Environment environment) {
ConfigurationProperties annotation = RedisConfig.class.getAnnotation(ConfigurationProperties.class);
Assert.notNull(annotation, "can not find annotation ConfigurationProperties");
redisConfig = Binder.get(environment).bind(annotation.prefix(), RedisConfig.class).get();
}
}
队列接口类
/**
* 队列接口类
*
* @author: liufuqiang
* @date: 2023-10-31 14:50
*/
public interface IQueue<E> extends Queue<E>{
@Override
default boolean add(E e) {
return false;
}
@Override
default boolean offer(E e) {
return false;
}
@Override
default E remove() {
return null;
}
@Override
default E poll() {
return null;
}
@Override
default E element() {
return null;
}
@Override
default E peek() {
return null;
}
@Override
default int size() {
return 0;
}
@Override
default boolean isEmpty() {
return false;
}
@Override
default boolean contains(Object o) {
return false;
}
@Override
default Iterator<E> iterator() {
return null;
}
@Override
default Object[] toArray() {
return new Object[0];
}
@Override
default <T> T[] toArray(T[] a) {
return null;
}
@Override
default boolean remove(Object o) {
return false;
}
@Override
default boolean containsAll(Collection<?> c) {
return false;
}
@Override
default boolean addAll(Collection<? extends E> c) {
return false;
}
@Override
default boolean removeAll(Collection<?> c) {
return false;
}
@Override
default boolean retainAll(Collection<?> c) {
return false;
}
@Override
default void clear() {
}
@Override
default boolean removeIf(Predicate<? super E> filter) {
return false;
}
@Override
default Spliterator<E> spliterator() {
return null;
}
@Override
default Stream<E> stream() {
return null;
}
@Override
default Stream<E> parallelStream() {
return null;
}
@Override
default void forEach(Consumer<? super E> consumer) {
}
}
先入先出的队列实现
/**
* @author: liufuqiang
* @date: 2023-10-31 15:44
*/
public class RedisQueue implements IQueue<String>{
private String queueName;
private StringRedisTemplate stringRedisTemplate ;
public RedisQueue(String queueName, StringRedisTemplate redisTemplate) {
this.queueName = queueName;
this.stringRedisTemplate = redisTemplate;
}
@Override
public boolean offer(String body) {
return stringRedisTemplate.opsForList().rightPush(queueName, body) > 0;
}
@Override
public String poll() {
return "new mshsssss";
}
}
队列消费抽象类
/**
* redis队列抽象处理
*
* @author: liufuqiang
* @date: 2023-10-30 18:31
*/
@Slf4j
public abstract class AbstractMonitorQueue implements MonitorRedisQueue, InitializingBean {
private RedisConfig.QueueInfo queueInfo;
private RedisQueue redisQueue;
private ExecutorService executorService;
private AtomicBoolean isRunning = new AtomicBoolean(false);
public AbstractMonitorQueue(RedisConfig.QueueInfo queueInfo, RedisQueue redisQueue) {
this.queueInfo = queueInfo;
this.redisQueue = redisQueue;
}
@SneakyThrows
@Override
public boolean process() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNamePrefix(queueInfo.getThreadName()).build();
executorService = Executors.newFixedThreadPool(queueInfo.getThreadNum(), threadFactory);
for (int i = 0; i < queueInfo.getThreadNum(); i++) {
executorService.execute(() -> {
while (isRunning.get()) {
String queueMsg = getQueueMsg();
if (StrUtil.isBlank(queueMsg)) {
continue;
}
this.execute(queueMsg);
log.info("queueMsg;{}", queueMsg);
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
return true;
}
public String getQueueMsg() {
return redisQueue.poll();
}
public void beforeProcess(String msg) {
}
public void afterProcess(String msg) {
}
public abstract void execute(String msg);
@Override
public void afterPropertiesSet() throws Exception {
log.info("开始执行任务");
isRunning.set(true);
this.process();
}
public void destroy() {
System.out.println("开始销毁队列");
isRunning.set(false);
executorService.shutdown();
}
}
队列消费接口
/**
* redis队列监控
*
* @author: liufuqiang
* @date: 2023-10-30 18:17
*/
public interface MonitorRedisQueue {
/**
* 执行队列逻辑
* @param msg
* @return
*/
boolean process();
}
具体队列的消费实现类如下
/**
*
*
* @author: liufuqiang
* @date: 2023-10-30 18:37
*/
@Slf4j
public class MonitorRedisQueueServiceImpl extends AbstractMonitorQueue {
public MonitorRedisQueueServiceImpl(RedisConfig.QueueInfo queueInfo, RedisQueue redisQueue) {
super(queueInfo, redisQueue);
}
@Override
public void execute(String msg) {
log.info(this.getClass().getName() + "执行具体逻辑");
}
}
以上文件可以集成到项目公用jar包,也可以放在公共模块,项目需要使用时引入模块的jar包
在使用的使用只需要开启注解@EnableRedisQueue
并且在配置文件填写需要队列的配置信息,如redis实际消费的key,消费线程的名字,消费线程的数量,消费者的className,如果不需要消费者,只生产消息,则只需要配置queue-name
redis1:
infos:
- queue-name: order111
thread-name: thread111
thread-num: 10
monitor-class: org.dromara.sms4j.example.service.queue.MonitorRedisQueueServiceImpl
- queue-name: order222
消费者只需要新建消费类并且继承AbstractMonitorQueue即可
如果需要进行生产消息,注入的时候 @Qualifier(value = "order111") value选择为配置文件中的queue-name