首页 > 数据库 >使用spring-plugin和redisson实现延迟队列

使用spring-plugin和redisson实现延迟队列

时间:2023-05-09 23:12:27浏览次数:57  
标签:redisson return plugin 队列 spring delayQueueType DelayQueueTypes public

目录

一、介绍

本文主要介绍如何使用spring plugin和redisson去实现延迟队列

二、步骤

  • pom.xml引入依赖包

        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.plugin</groupId>
                <artifactId>spring-plugin-core</artifactId>
                <version>2.0.0.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>3.16.3</version>
            </dependency>
        </dependencies>
    
  • 配置文件application.properties

    server.port=18084
    
    spring.redis.host=192.168.48.48
    spring.redis.port=6379
    spring.redis.database=1
    spring.redis.password=
    
  • 定义延迟队列任务的枚举类DelayQueueTypes.java

    public enum DelayQueueTypes {
        TASK_1("task-1"),
        TASK_2("task-2");
    
        public String name;
    
        DelayQueueTypes(String name) {
            this.name = name;
        }
    }
    
  • 定义延迟队列的插件DelayQueueHandler.java

    public interface DelayQueueHandler extends Plugin<DelayQueueTypes> {
        void execute(Object data);
    }
    
  • 实现不同任务的插件

    @Component
    @Order(value = 1)
    public class Task1DelayQueueTypesHandler implements DelayQueueHandler {
        private static final Logger log = LoggerFactory.getLogger(Task1DelayQueueTypesHandler.class);
    
        @Override
        public void execute(Object data) {
            log.info("任务一接收到数据 = {}", data);
        }
    
        @Override
        public boolean supports(DelayQueueTypes delayQueueType) {
            return DelayQueueTypes.TASK_1 == delayQueueType;
        }
    }
    
    @Component
    @Order(value = -2)
    public class Task2DelayQueueTypesHandler implements DelayQueueHandler {
        private static final Logger log = LoggerFactory.getLogger(Task1DelayQueueTypesHandler.class);
    
        @Override
        public void execute(Object data) {
            log.info("任务二接收到数据 = {}", data);
        }
    
        @Override
        public boolean supports(DelayQueueTypes delayQueueType) {
            return DelayQueueTypes.TASK_2 == delayQueueType;
        }
    }
    
  • 注册插件

    @Configuration
    @EnablePluginRegistries(value = {DelayQueueHandler.class})
    public class PluginConfiguration {
    }
    
  • 定义Redis配置类RedisConfig.java

    @Configuration
    public class RedisConfig {
    
        @Bean
        public RedissonClient redissonClient(RedisProperties redisProperties) {
            Config config = new Config();
            SingleServerConfig singleServerConfig = config.useSingleServer().setAddress("redis://" + redisProperties.getHost() + ":" + redisProperties.getPort());
            if (!StringUtils.isEmpty(redisProperties.getPassword())) {
                singleServerConfig.setPassword(redisProperties.getPassword());
            }
            return Redisson.create(config);
        }
    }
    
  • 定义延迟队列的工具类RedisDelayQueueUtils.java

    
    @Component
    public class RedisDelayQueueUtils {
        private static final Logger logger = LoggerFactory.getLogger(RedisDelayQueueUtils.class);
    
        @Autowired
        private RedissonClient redissonClient;
    
        /**
         * 加入队列
         *
         * @param t
         * @param l
         * @param unit
         * @param delayQueueType
         * @param <T>
         * @return
         */
        public <T> int addDelayQueue(T t, long l, TimeUnit unit, DelayQueueTypes delayQueueType) {
            try {
                RBlockingQueue<T> rBlockingQueue = redissonClient.getBlockingQueue(delayQueueType.name);
                RDelayedQueue<T> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
                rDelayedQueue.offer(t, l, unit);
                logger.info("[{}队列]增加元素[{}], 有效期:{} {}", delayQueueType.name, t.toString(), l, unit.toString());
                return 1;
            } catch (Exception e) {
                logger.error("[{}队列]增加元素失败", delayQueueType.name);
                return -1;
            }
        }
    
        /**
         * 加入/替换队列(保证唯一)
         *
         * @param t
         * @param l
         * @param unit
         * @param delayQueueType
         * @param <T>
         * @return
         */
        public <T> int addOrUpdateDelayQueue(T t, long l, TimeUnit unit, DelayQueueTypes delayQueueType) {
            try {
                RBlockingQueue<T> rBlockingQueue = redissonClient.getBlockingQueue(delayQueueType.name);
                RDelayedQueue<T> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
                // 先清空再加入
                rDelayedQueue.removeAll(Arrays.asList(t));
                rDelayedQueue.offer(t, l, unit);
                logger.info("[{}队列]增加元素[{}], 有效期:{}", delayQueueType.name, t.toString(), l);
                return 1;
            } catch (Exception e) {
                logger.error("[{}队列]增加元素失败", delayQueueType.name);
                return -1;
            }
        }
    
        /**
         * 获取队列
         *
         * @param delayQueueType
         * @return
         * @throws InterruptedException
         */
        public <T> T getDelayQueue(DelayQueueTypes delayQueueType) throws InterruptedException {
            if (redissonClient != null) {
                RBlockingDeque<T> blockingDeque = redissonClient.getBlockingDeque(delayQueueType.name);
                redissonClient.getDelayedQueue(blockingDeque);
                return blockingDeque.take();
            }
            return null;
        }
    }
    
  • 定义延迟队列的启动运行器DelayQueueHandlerRunner.java

    @Component
    public class DelayQueueHandlerRunner implements CommandLineRunner {
        private ExecutorService delayQueueHandleThreadPool = new ThreadPoolExecutor(5, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    
        @Autowired
        private PluginRegistry<DelayQueueHandler, DelayQueueTypes> delayQueueTypesPluginRegistry;
    
        @Autowired
        private RedisDelayQueueUtils redisDelayQueueUtils;
    
        @Override
        public void run(String... args) throws Exception {
            // 这边添加数据到延迟队列主要是为了测试
            redisDelayQueueUtils.addDelayQueue("这是任务一的数据", 5, TimeUnit.SECONDS, DelayQueueTypes.TASK_1);
            redisDelayQueueUtils.addDelayQueue("这是任务一的数据", 10, TimeUnit.SECONDS, DelayQueueTypes.TASK_1);
            redisDelayQueueUtils.addDelayQueue("这是任务二的数据", 15, TimeUnit.SECONDS, DelayQueueTypes.TASK_2);
    
            if (delayQueueTypesPluginRegistry == null) return;
    
            DelayQueueTypes[] delayQueueTypes = DelayQueueTypes.values();
            for (DelayQueueTypes delayQueueType : delayQueueTypes) {
                delayQueueHandleThreadPool.submit(() -> {
                    Object data = null;
                    try {
                        data = redisDelayQueueUtils.getDelayQueue(delayQueueType);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    List<DelayQueueHandler> delayQueueHandlerList = delayQueueTypesPluginRegistry.getPluginsFor(delayQueueType);
                    for (DelayQueueHandler delayQueueHandler : delayQueueHandlerList) {
                        delayQueueHandler.execute(data);
                    }
                });
            }
        }
    }
    

三、运行效果

四、源码

https://github.com/1277463718lmt/spring-plugin-demo.git

标签:redisson,return,plugin,队列,spring,delayQueueType,DelayQueueTypes,public
From: https://www.cnblogs.com/linmt/p/17386627.html

相关文章

  • Spring-MVC-随笔
    Spring-MVC一、SpringMVC简介1、什么是MVCMVC是一种软件架构的思想,将软件按照模型、视图、控制器来划分M:Model,模型层,指工程中的JavaBean,作用是处理数据JavaBean分为两类:一类称为实体类Bean:专门存储业务数据的,如Student、User等一类称为业务处理Bean:指Service或Dao......
  • Springboot创建多module项目--转载
    1)createnewproject(或File-->new-->project)2)选中SpringInitializr,点击Nextimage.png3)填写必要信息,点击Nextimage.png4)依赖页不用勾选,点击Nextimage.png5)选择项目存储目录,点击Finishimage.png6)在pom.xml中加入packaging......
  • springboot alibaba druid数据库连接池配置,输出可执行sql
    #数据源配置spring:datasource:type:com.alibaba.druid.pool.DruidDataSourcedruid:#初始连接数initialSize:5#最小连接池数量minIdle:2#最大连接池数量maxActive:50#配置获取连接等待超时的时间......
  • Springboot-hbase增删改20230509
    1、启动 2、ZK客户端    3、springboot+hbase实例1)、pom<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId&......
  • Spring AOP官方文档学习笔记(四)之Spring AOP的其他知识点
    1.选择哪种AOP(1)使用SpringAOP比使用完整版的AspectJ更方便简单,因为不需要在开发和构建过程中引入AspectJ编译器以及织入器,如果我们只希望通知能够在SpringBean上执行,那么选用SpringAOP就可以了,如果我们希望通知能够在不由Spring所管理的对象上执行,那么就需要使用Aspect......
  • spring框架_Applicationcontext功能
    Applicationcontext的功能拓展主要来自于不属于beanfactory的接口,主要包括四个接口Messagesource:国际化ResourcePatternResolver:获取资源ApplicationEventPublisher:发布事件EnvironmentCapable:获取环境变量感觉发布事件这个功能有点像消息队列,发布订阅,在compent......
  • SpringMVC常用注解整理
    一、组件型注解:@Component在类定义之前添加@Component注解,他会被spring容器识别,并转为bean。@Repository对Dao实现类进行注解(特殊的@Component)@Service用于对业务逻辑层进行注解,(特殊的@Component)@Controller用于控制层注解,(特殊的@Component)以上四种注解都是......
  • SpringBoot - 参数接收方式
    SpringBoot-参数接收方式·前言·使用@PathVariable接收路径中的参数·使用@RequestParam获取路径中?后的参数·使用@RequestBody获取Map对象·使用@RequestBody获取实体对象前言使用@PathVariable接收路径中的参数@GetMapping(value="/param/{id}")publicStri......
  • springBootMVC搭建
    springBootMVC搭建 分类专栏:spring环境配置spring环境配置专栏收录该内容17篇文章0订阅订阅专栏今天给大家介绍一下springBootMVC,让我们学习一下如何利用SpringBoot快速的搭建一个简单的web应用。环境准备一个称手的文本编辑器(例如Vim、Emacs、SublimeText)或者I......
  • 注解驱动的spring mvc(二)
    四:视图名的确定。springmvc可以通过可以通过多种方式确定视图名,在前面的例子中,方法无返回值,视图名更具请求参数确定。Controller方法还是返回一个String类型的值作为视图名。Java代码@RequestMapping("/user/list.htm")publicStringlistAllUser(){ return"user......