首页 > 数据库 >用Redisson的延迟队列RDelayedQueue处理延迟任务或者定时任务

用Redisson的延迟队列RDelayedQueue处理延迟任务或者定时任务

时间:2024-06-05 13:58:47浏览次数:42  
标签:Redisson String 队列 Redis RDelayedQueue public 延迟

什么是Redisson

Redisson在基于NIO的Netty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Java实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。

什么是RDelayedQueue

获取RDelayedQueue:

    public <V> RDelayedQueue<V> getDelayedQueue(RQueue<V> destinationQueue) {
        if (destinationQueue == null) {
            throw new NullPointerException();
        }
        return new RedissonDelayedQueue<V>(queueTransferService, destinationQueue.getCodec(), commandExecutor, destinationQueue.getName());
    }

RDelayedQueue是一种延迟队列接口,Redisson自带一个叫‘RedissonDelayedQueue’的实现类是他的子类,用于实现延迟任务。注意根据上面源码可以看到用Redisson获取RDelayedQueue时需要传一个实现RQueue队列的参数,而Redisson又可以创建另一个队列叫‘RedissonBlockingDeque’,它就是实现RBlockingDeque接口的阻塞队列,而RBlockingDeque又是RQueue的子类。

获取:RBlockingDeque:

    public <V> RBlockingDeque<V> getBlockingDeque(String name) {
        return new RedissonBlockingDeque<V>(commandExecutor, name, this);
    }

RDelayedQueue的使用方法

1、项目中要配置Redis,然后根据Redis配置Redisson;

  #Redis配置
  # Redis数据库索引(默认为0)
  # Redis服务器地址
  # Redis服务器连接端口
  # Redis服务器连接密码(默认为空)
  # 链接超时时间 单位 ms(毫秒)
  # 连接池最大连接数(使用负值表示没有限制) 默认 8
  # 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
  # 连接池中的最大空闲连接 默认 8
  # 连接池中的最小空闲连接 默认 0
  redis:
    database: 0
    host: 127.0.0.1
    port: 6379
    connect-timeout: 3000
    password:
    lettuce:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0

RedissonConfig:

@Configuration
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    @Value("${spring.redis.database}")
    private int database;

    @Bean
    public RedissonClient getRedisson() {
        Config config = new Config();
        String address = "redis://" + host + ":" + port;
        config.useSingleServer().setAddress(address).setDatabase(database);
        return Redisson.create(config);
    }
}

 

2、通过Redisson获取阻塞队列(RBlockingDeque)的子类:RedissonBlockingDeque,并通过它获取延迟队列(RDelayedQueue)的子类:RedissonDelayedQueue;

RBlockingDeque<User> blockingDeque = redissonClient.getBlockingDeque("demoName");
        RDelayedQueue<User> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        delayedQueue.offer(user, 5, TimeUnit.SECONDS);
        //记录设置任务的时间
        System.out.println(user.getUserId() + "添加时间是:" + LocalDateTime.now());
        //开启新线程执行任务,不阻塞主线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    User user2 = blockingDeque.poll();
                    if (user2 != null) {
                        System.out.println(user2);
                        System.out.println(user2.getUserId() + "执行时间是:" + LocalDateTime.now());
                    }
                }
            }
        }).start();

 

3、让RedissonDelayedQueue的‘offer’方法添加延迟任务对象,然后通过RedissonBlockingDeque的‘poll’或者‘take’方法获取延迟任务对象然后进行后续操作。

队列take()和poll()的区别:
take():返回队列的头元素,并把它从队列中删除,如果队列为空时则阻塞线程直到有新的元素添加进来并返回;
poll():返回队列的头元素,并把它从队列中删除,如果队列头元素为空则返回null但是不阻塞线程;

使用项目Demo

 任务需求:新增一个对象User,并让他5s以后介绍自己(toString方法打印信息)。

项目结构:

 pom依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.code</groupId>
    <artifactId>MyRDelayedQueue</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>2.5.4</version>
        </dependency>
        <!-- Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.5.4</version>
        </dependency>
        <!-- Redisson -->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.20.0</version>
        </dependency>
    </dependencies>
</project>

application.yml文件:

#Redis配置
# Redis数据库索引(默认为0)
# Redis服务器地址
# Redis服务器连接端口
# Redis服务器连接密码(默认为空)
# 链接超时时间 单位 ms(毫秒)
# 连接池最大连接数(使用负值表示没有限制) 默认 8
# 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
# 连接池中的最大空闲连接 默认 8
# 连接池中的最小空闲连接 默认 0
spring:
  redis:
    database: 0
    host: 127.0.0.1
    port: 6379
    connect-timeout: 3000
    password:
    lettuce:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0

server:
  port: 8080

启动类:

@SpringBootApplication
public class MyRDQueueApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyRDQueueApplication.class, args);
    }
}

Redisson配置文件:

@Configuration
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    @Value("${spring.redis.database}")
    private int database;

    @Bean
    public RedissonClient getRedisson() {
        Config config = new Config();
        String address = "redis://" + host + ":" + port;
        config.useSingleServer().setAddress(address).setDatabase(database);
        return Redisson.create(config);
    }
}

User:

public class User {
    private String userId;
    private String userName;
    private String password;

    public User() {
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    //介绍自己
    public String Speak() {
        return "我的信息是:{" +
                "userId='" + userId + '\'' +
                ", userName='" + userName + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}

 

controller:

@RestController
public class IndexController {

    @Autowired
    public QueueService queueService;

    @RequestMapping("/queue")
    public String addUser(){
        return queueService.addUser();
    }
}

 

service:

@Service
public class QueueService {

    @Autowired
    public RedissonClient redissonClient;

    public String addUser() {
        //新增User
        User user = new User();
        user.setUserId("123456");
        user.setUserName("queueTask");
        user.setPassword("666666");
        //5秒后让新增的User讲话
        RBlockingDeque<User> blockingDeque = redissonClient.getBlockingDeque("speak");
        RDelayedQueue<User> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        delayedQueue.offer(user, 5, TimeUnit.SECONDS);
        //指定日期格式
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        //打印添加任务时间
        System.out.println("添加时间:" + formatter.format(LocalDateTime.now()));
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    User task = null;
                    try {
                        task = blockingDeque.take();
                        System.out.println(task.Speak());
                        //打印执行任务时间
                        System.out.println("执行时间:" + formatter.format(LocalDateTime.now()));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
        return user.getUserId();
    }
}

结果测试:

启动项目后请求路径:http://localhost:8080/queue

后台打印:

添加时间:2024-06-05 13:42:07
我的信息是:{userId='123456', userName='queueTask', password='666666'}
执行时间:2024-06-05 13:42:12

结果:执行任务时间比添加时间推迟5秒。

 

标签:Redisson,String,队列,Redis,RDelayedQueue,public,延迟
From: https://www.cnblogs.com/Bernard94/p/18230679

相关文章

  • 人魅延迟高/无法联机的解决方法 人魅联机失败、联机有延迟怎么办
    人魅作为一款多人联机的恐怖解谜游戏,不仅具有极高的游戏性和趣味性,还能够锻炼玩家的观察力和团队合作能力。四个玩家需要扮演自己的角色,在小镇里探索解密,只有齐心协力,才能成功破解谜题,揭开真相。这一点来说,或许有点像剧本杀。不过,目前人魅在steam上面的评价并不是很好,大多数玩......
  • Redisson 限流器源码分析
    Redisson限流器源码分析对上篇文章网友评论给出问题进行解答:redis的key是否会过期可以先阅读上篇文章:redis+AOP+自定义注解实现接口限流-古渡蓝按-博客园(cnblogs.com)注解AOP代码部分提取//调用Reids工具类的rateLimiter方法longnumber=RedisUtils.rat......
  • osg使用整理(11):延迟渲染
    osg使用整理(11):延迟渲染一、基础概念前向渲染流程:顶点着色器->图元装配成点线三角形->几何着色器->裁剪剔除->光栅化(片元着色器)->透明度测试、深度测试。延迟渲染流程:顶点着色器->图元装配成点线三角形->几何着色器->裁剪剔除->光栅化输出G-Buffer,存储每个像素的属性信息(位......
  • synchronized、Lock本地锁和Redisson分布式锁的简单使用
    文章目录概念准备工作synchronized本地锁演示JUC包的Lock本地锁演示Redisson的RLock分布式锁演示源码地址参考来源概念redisson是一个简单易用的Redis客户端工具。不仅如此,它还具备分布式锁的功能准备工作快速整合SSMP请参考我这篇文章SpringBoot快速整合Spring......
  • RabbitMQ 进阶使用之延迟队列 → 订单在30分钟之内未支付则自动取消
    开心一刻晚上,媳妇和儿子躺在沙发上儿子疑惑的问道:妈妈,你为什么不去上班媳妇:妈妈的人生目标是前20年靠父母养,后40年靠你爸爸养,再往后20年就靠你和妹妹养儿子:我可养不起媳妇:为什么儿子:因为,呃...,我和你的想法一样讲在前面如果你们对RabbitMQ感到陌生,那可以停止往下阅读了......
  • flutter 空安全、late延迟及required关键词
    空安全Dart和Kotlin一样都是支持空安全,空安全操作符主要有两个:?可空类型!类型断言可空类型在之前我们的介绍中,声明一个变量,如:Stringstr="A";str=null;这个时候str=null代表会报错,提示Avalueoftype'Null'can'tbeassignedtoavariableoftype'String......
  • window对象的常见属性、延迟函数、时间循环eventloop
    一、window对象JavaScript中的全局对象,代表浏览器窗口或者浏览器标签页。它具有许多属性和方法,以下是其中一些常见的属性:window.document:表示当前窗口或标签页的文档对象,可以用来操作和访问文档的内容。window.navigator:包含有关浏览器的信息,如浏览器的名称、版本、......
  • 第七节:RabbitMq延迟队列实操(死信交换机+TTL)和死信插件的使用
    一.        二.        三.         !作       者:Yaopengfei(姚鹏飞)博客地址:http://www.cnblogs.com/yaopengfei/声     明1:如有错误,欢迎讨论,请勿谩骂^_^。声     明2:原创博客请在转载......
  • ios系统上h5页面播放audio标签声音有延迟问题处理
    原文链接https://www.cnblogs.com/yalong/p/18214816背景app内嵌了一个H5页面,页面有个需求是点击某些按钮就触发声音,于是就使用了audio标签,但是有个问题就是在ios上,点击声音会有短时间的延迟,然后才播放声音找了好几种方案总算解决了方案一click事件改为mouseup事件因为移动......
  • Redis教程(十七):Redis的Redisson分布式锁
    Redis分布式锁 Redis分布式锁的主要作用是在分布式系统环境下提供一种机制,用于确保在同一时间只有一个进程(或线程)能够执行某个关键代码段或访问特定的资源。这主要用于控制对共享资源的并发访问,以避免因多个进程同时修改同一数据而导致的数据不一致或其他竞争条件问题。 ......