首页 > 数据库 >Redis队列升级版利用Spring项目BeanDefinition自动注入

Redis队列升级版利用Spring项目BeanDefinition自动注入

时间:2023-11-01 18:33:57浏览次数:44  
标签:return String default Spring Redis 队列 Override public BeanDefinition

利用Redis实现队列

先进先出队列: https://www.cnblogs.com/LiuFqiang/p/16366813.html
延迟队列: https://www.cnblogs.com/LiuFqiang/p/16592522.html
定长队列: https://www.cnblogs.com/LiuFqian/p/17372463.html
在使用Redis做消息队列的时候,需要配置队列属性的bean,如果自己项目生产自己项目消费,还需要配置消费者的bean,如此一来,项目免不了需要大量重复且不可缺少的配置bean的代码
所以这里借助spring自动装配的原理,自动注入这些bean,减去重复的配置文件

首先申明注解

/**
 * 标记是否开启redis队列
 *
 * @author: liufuqiang
 * @date: 2023-10-30 17:00
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Import(value = {ImportRedisAutoConfigure.class, RedisConfig.class})
public @interface EnableRedisQueue {

    String[] basePackages() default {""};

}

如果项目需要开启redis队列的生产或者消费,可以在配置文件加入这个注解
配置文件中新加redis的一些连接与队列信息

/**
 * reds队列配置信息
 *
 * @author: liufuqiang
 * @date: 2023-10-30 17:10
 */
@Data
@ConfigurationProperties(prefix = RedisConfig.REDIS_QUEUE)
public class RedisConfig implements Serializable {

    public RedisConfig() {
        System.out.println(1);
    }

    public static final String REDIS_QUEUE = "redis1";

    private List<QueueInfo> infos;

    @Data
    public static class QueueInfo implements Serializable{

        private String queueName;

        private Integer threadNum = 1;

        private String threadName;

        private String monitorClass;

    }

//    @Bean
//    public RedisTemplate redisTemplate() {
//        RedisTemplate redisTemplate = new RedisTemplate();
//        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
//        redisTemplate.setKeySerializer(new StringRedisSerializer());
//        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
//        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
//        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
//        redisTemplate.afterPropertiesSet();
//        return redisTemplate;
//    }
}

/**
 * redis队列自动注册
 *
 * @author: liufuqiang
 * @date: 2023-10-30 17:02
 */
@EnableConfigurationProperties(RedisConfig.class)
public class ImportRedisAutoConfigure implements ImportBeanDefinitionRegistrar, EnvironmentAware {

    private RedisConfig redisConfig;

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) {
        List<RedisConfig.QueueInfo> infos = redisConfig.getInfos();
        if (CollUtil.isEmpty(infos)) {
            return;
        }

        for (RedisConfig.QueueInfo queue : infos) {

            if (registry.containsBeanDefinition(queue.getQueueName())) {
                continue;
            }

            // 注入队列基本信息bean
            AbstractBeanDefinition rootBeanDefinition = BeanDefinitionBuilder.rootBeanDefinition(RedisQueue.class).getBeanDefinition();
            ConstructorArgumentValues argumentValues = new ConstructorArgumentValues();
            argumentValues.addIndexedArgumentValue(0, queue.getQueueName());
            rootBeanDefinition.setConstructorArgumentValues(argumentValues);
            registry.registerBeanDefinition(queue.getQueueName(), rootBeanDefinition);

            // 注入队列消费service

            final String consumerQueue = queue.getMonitorClass();
            if (StrUtil.isBlank(consumerQueue)) {
                continue;    
            }
            
            BeanDefinitionBuilder definitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(queue.getMonitorClass());
            definitionBuilder.setDestroyMethodName("destroy");
            AbstractBeanDefinition beanDefinition = definitionBuilder.getBeanDefinition();
            ConstructorArgumentValues values = new ConstructorArgumentValues();
            values.addIndexedArgumentValue(0, queue);
            values.addIndexedArgumentValue(1, rootBeanDefinition);
            beanDefinition.setConstructorArgumentValues(values);
            registry.registerBeanDefinition(queue.getMonitorClass(), beanDefinition);
        }
    }

    @Override
    public void setEnvironment(Environment environment) {
        ConfigurationProperties annotation = RedisConfig.class.getAnnotation(ConfigurationProperties.class);
        Assert.notNull(annotation, "can not find annotation ConfigurationProperties");
        redisConfig = Binder.get(environment).bind(annotation.prefix(), RedisConfig.class).get();
    }
}

队列接口类

/**
 * 队列接口类
 *
 * @author: liufuqiang
 * @date: 2023-10-31 14:50
 */
public interface IQueue<E> extends Queue<E>{

    @Override
    default boolean add(E e) {
        return false;
    }

    @Override
    default boolean offer(E e) {
        return false;
    }

    @Override
    default E remove() {
        return null;
    }

    @Override
    default E poll() {
        return null;
    }

    @Override
    default E element() {
        return null;
    }

    @Override
    default E peek() {
        return null;
    }

    @Override
    default int size() {
        return 0;
    }

    @Override
    default boolean isEmpty() {
        return false;
    }

    @Override
    default boolean contains(Object o) {
        return false;
    }

    @Override
    default Iterator<E> iterator() {
        return null;
    }

    @Override
    default Object[] toArray() {
        return new Object[0];
    }

    @Override
    default <T> T[] toArray(T[] a) {
        return null;
    }

    @Override
    default boolean remove(Object o) {
        return false;
    }

    @Override
    default boolean containsAll(Collection<?> c) {
        return false;
    }

    @Override
    default boolean addAll(Collection<? extends E> c) {
        return false;
    }

    @Override
    default boolean removeAll(Collection<?> c) {
        return false;
    }

    @Override
    default boolean retainAll(Collection<?> c) {
        return false;
    }

    @Override
    default void clear() {

    }


    @Override
    default boolean removeIf(Predicate<? super E> filter) {
        return false;
    }

    @Override
    default Spliterator<E> spliterator() {
        return null;
    }

    @Override
    default Stream<E> stream() {
        return null;
    }

    @Override
    default Stream<E> parallelStream() {
        return null;
    }

    @Override
    default void forEach(Consumer<? super E> consumer) {

    }
}

先入先出的队列实现

/**
 * @author: liufuqiang
 * @date: 2023-10-31 15:44
 */
public class RedisQueue implements IQueue<String>{

    private String queueName;

    private StringRedisTemplate stringRedisTemplate ;

    public RedisQueue(String queueName, StringRedisTemplate redisTemplate) {
        this.queueName = queueName;
        this.stringRedisTemplate = redisTemplate;
    }

    @Override
    public boolean offer(String body) {
        return stringRedisTemplate.opsForList().rightPush(queueName, body) > 0;
    }

    @Override
    public String poll() {
        return "new mshsssss";
    }
}

队列消费抽象类

/**
 * redis队列抽象处理
 *
 * @author: liufuqiang
 * @date: 2023-10-30 18:31
 */
@Slf4j
public abstract class AbstractMonitorQueue implements MonitorRedisQueue, InitializingBean {

    private RedisConfig.QueueInfo queueInfo;

    private RedisQueue redisQueue;

    private ExecutorService executorService;

    private AtomicBoolean isRunning = new AtomicBoolean(false);

    public AbstractMonitorQueue(RedisConfig.QueueInfo queueInfo, RedisQueue redisQueue) {
        this.queueInfo = queueInfo;
        this.redisQueue = redisQueue;
    }

    @SneakyThrows
    @Override
    public boolean process() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNamePrefix(queueInfo.getThreadName()).build();
        executorService = Executors.newFixedThreadPool(queueInfo.getThreadNum(), threadFactory);
        for (int i = 0; i < queueInfo.getThreadNum(); i++) {
            executorService.execute(() -> {
                while (isRunning.get()) {
                    String queueMsg = getQueueMsg();
                    if (StrUtil.isBlank(queueMsg)) {
                        continue;
                    }

                    this.execute(queueMsg);
                    log.info("queueMsg;{}", queueMsg);

                    try {
                        Thread.sleep(10_000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        return true;
    }

    public String getQueueMsg() {
        return redisQueue.poll();
    }

    public void beforeProcess(String msg) {

    }


    public void afterProcess(String msg) {

    }

    public abstract void execute(String msg);

    @Override
    public void afterPropertiesSet() throws Exception {
        log.info("开始执行任务");
        isRunning.set(true);
        this.process();
    }

    public void destroy() {
        System.out.println("开始销毁队列");
        isRunning.set(false);
        executorService.shutdown();
    }
}

队列消费接口

/**
 * redis队列监控
 *
 * @author: liufuqiang
 * @date: 2023-10-30 18:17
 */
public interface MonitorRedisQueue {

    /**
     * 执行队列逻辑
     * @param msg
     * @return
     */
    boolean process();

}

具体队列的消费实现类如下

/**
*
*
* @author: liufuqiang
* @date: 2023-10-30 18:37
*/
@Slf4j
public class MonitorRedisQueueServiceImpl extends AbstractMonitorQueue {


    public MonitorRedisQueueServiceImpl(RedisConfig.QueueInfo queueInfo, RedisQueue redisQueue) {
        super(queueInfo, redisQueue);
    }

    @Override
    public void execute(String msg) {
        log.info(this.getClass().getName() + "执行具体逻辑");
    }


}

以上文件可以集成到项目公用jar包,也可以放在公共模块,项目需要使用时引入模块的jar包
在使用的使用只需要开启注解@EnableRedisQueue
并且在配置文件填写需要队列的配置信息,如redis实际消费的key,消费线程的名字,消费线程的数量,消费者的className,如果不需要消费者,只生产消息,则只需要配置queue-name

redis1:
  infos:
    - queue-name: order111
      thread-name: thread111
      thread-num: 10
      monitor-class: org.dromara.sms4j.example.service.queue.MonitorRedisQueueServiceImpl
    - queue-name: order222

消费者只需要新建消费类并且继承AbstractMonitorQueue即可

如果需要进行生产消息,注入的时候 @Qualifier(value = "order111") value选择为配置文件中的queue-name

标签:return,String,default,Spring,Redis,队列,Override,public,BeanDefinition
From: https://www.cnblogs.com/LiuFqiang/p/17803807.html

相关文章

  • redis 学习 一
    1.redis基本命令//启动客户端redis-cli//密码认证authpassword//远程服务redis-cli-hhost-pport-apasswordredis-cli-h127.0.0.1-p6379-apassword查看当前数据库中key的数量dbsize切换库命令:selectindex[0-15]退出客户端连接:exit2.redis操作......
  • Redis通过复制rdb文件方式同步线上数据到本地以及提示:Can't handle RDB format versi
    场景Redis的持久化机制-RDB方式和AOF方式:https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/105052841Redis持久化机制导致服务自启动后恢复数据过长无法使用以及如何关闭:https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/130237326以上对于redis持久化......
  • Spring随笔
    Springboot关于Starter的命名官方的Starter都遵循一个类似的命名模式;spring-boot-starter-*,其中*是一个特定类型的应用程序。第三方启动器通常以项目的名称开始。例如,一个名为thirdpartyproject的第三方启动器项目通常被命名为thirdpartyproject-spring-boot-star......
  • Java后台微信点餐小程序2023年最新版笔记Springboot+Mysql+Freemarker+Bootstrap
    由于之前的Java后台微信点餐小程序有些知识点过时了,所以今天重新出一版,把里面过时的知识点更新下第一章,技术选型(重要)在开始学习之前,要记得安装jdk8和mysql8,后面的笔记里也会具体讲解怎么安装,但是jdk8和mysql8必须和石头哥保持一致。1,后台技术选型:JDK8(必须保持一致)Mysql8(必......
  • SpringBoot数据响应、分层解耦、三层架构
    响应数据@ResponseBody类型:方法注解、类注解位置:Controller方法、类上作用:将方法返回值直接响应,如果返回值类型是实体对象/集合,将会转换为json格式响应说明:@RestController=@Controller+@ResponseBody统一响应结果步骤:获取员工数据,返回统一响应结果,在页面渲染......
  • 【Redis】Ubuntu22.04安装Redis
    Redis数据库安装前言:最近想要学习用Python控制Redis的方法,但是Redis官网是不支持Windows直接安装的,各种大佬的Windows移植版本也比较老,虽然够用,但是也希望使用官网版本。网上的各种安装教程或多或少都存在一点问题,这里我针对我所使用的服务器版本安装Redis服务进行整理,若与我采......
  • Redis Bigkey排查
    在处理bigkey问题可以先从一下几点入手什么是bigkey?bigkey危害?bigkey是如何产生的?如何发现bigkey?如何处理bigkey?什么是BigkeyRedisbigkey是指在Redis数据库中占用空间较大的键值对。这些键通常包含了大量的数据,可能会影响Redis的性能和内存使用。例如,在一个集合......
  • spring注入bean错误-Bean named 'abc' is expected to be of type 'AAA' but was actu
    先看如下两个注入到spring容器中的bean,一个是UserNewManager,一个是UserManager。@ServicepublicclassUserNewManager{publicvoiddoSomething(){}}@ServicepublicclassUserManager{...}再看下面的testcase,利用@Resource注解来注入bean。@SpringB......
  • Jenkins+Docker 一键自动化部署 SpringBoot 项目
    Jenkins和Docker是现代软件开发中非常流行的工具,可以帮助我们自动化构建、测试和部署应用程序。SpringBoot是一种流行的Java框架,可以帮助开发人员快速开发Web应用程序。在本文中,我们将介绍如何使用Jenkins和Docker一键自动化部署SpringBoot应用程序。准备工作首先,你需要安装并配......
  • Spring Boot自动化部署
    SpringBoot是一款非常流行的Java开发框架,它基于SpringFramework,提供了快速构建应用程序的能力。本文将介绍如何使用SpringBoot自动化部署,并详细讲解代码实现细节。自动化部署流程:一般的JavaWeb应用程序的部署流程如下:1)编写代码2)将代码打包成war文件3)将war文件上传到服务器4)在服......