首页 > 数据库 >redis zset 延迟合并任务处理

redis zset 延迟合并任务处理

时间:2022-08-30 15:12:08浏览次数:64  
标签:taskId zset redis 任务 key public redisTemplate 延迟

redis zset 延迟合并任务处理

@Autowired
    public RedisTemplate redisTemplate;
    
##1.发送端:在接口中收集任务ID,累计时间段之后,合并处理。
##redis zset 主键,任务ID(不重复),时间戳    
String key = "任务分组名称";
//延时
redisTemplate.opsForZSet().add(key,taskId,System.currentTimeMillis());
System.out.println("加入任务, taskId: " + taskId + ", exeTime: " + System.currentTimeMillis() + ", 当前时间:" + LocalDateTime.now() +",key = " + key);


##2.接收端
@Configuration
public class ExecutorServiceConf {
    @Bean
    public ExecutorService executorService() {
        return new ThreadPoolExecutor(
                20,
                40,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                new ThreadFactoryBuilder().setNameFormat("myMergeTask-pool-%d").build(),
                new ThreadPoolExecutor.AbortPolicy());
    }
}


@Service
public class ConsumerTask {
    @Autowired
    public RedisTemplate redisTemplate;
    @Autowired
    ExecutorService executorService;
    @Value("${delayTime:2}")
    private Integer delayTime;
    
    //服务启动的时候加载
    @PostConstruct
    public void consumer() {
        //启动线程池
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    String key = "任务分组名称";
                    //rangeByScore 主键,0,当前时间,0,偏移量1000
                    Set<Integer> taskIdSet = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis(), 0, 1000);
                    if (taskIdSet == null || taskIdSet.isEmpty()) {
                        logger.info("没有任务");
                    } else {
                        //1.查询任务ID关联的记录

                        //2.任务合并处理
                        
                        //3.处理成功后,清理redis
                        taskIdSet.forEach(id -> {
                            //4.根据taskId更新任务状态
                            
                            //5.根据taskId删除redis
                            long result = redisTemplate.opsForZSet().remove(key, id);
                            if (result == 1L) {
                                logger.info("从延时队列中获取到任务,taskId:" + id + ",当前时间:" + LocalDateTime.now());
                            }
                        });
                    }

                    try {
                        //间隔2分钟时间
                        TimeUnit.MINUTES.sleep(delayTime);
                    } catch (InterruptedException e) {
                        logger.error("定时扫描exception:",e);
                    }
                }
            }
        });
    }
    
}

 

标签:taskId,zset,redis,任务,key,public,redisTemplate,延迟
From: https://www.cnblogs.com/oktokeep/p/16639376.html

相关文章

  • redission同时加多个锁
    业务场景:比如:给某条记录点赞时,有两个条件:(1)本条记录有点赞限制  (2)点赞人有点赞限制。问题:并发时,需要加锁,而且需要同时加两把锁。工具类:@Servicepublicclass......
  • redis(linux)
    官网下载安装包https://redis.io/download/安装gcc,命令yuminstallgcc进入安装目录make命令进行编译如果报错-jemalloc/jemalloc.h:没有哪个文件,首先看有没有gcc,然后......
  • redis分布式锁
    什么是分布式锁?Redis因为单进程、性能高常被用于分布式锁;锁在程序中作用是同步工具,保证共享资源在同一时刻只能被一个线程访问。Java中经常用的锁synchronized、Lock,但是......
  • 50道Redis高频面试题
    一、Redis到底是单线程还是多线程Redis6.0版本之前的单线程指的是其网络I/O和键值对读写是由一个线程完成。也就是只有网络请求模块和数据操作模块是单线程的,而其他的持......
  • Django使用Redis进行缓存详细流程
    1.背景和意义服务器数据非经常更新。若每次都从硬盘读取一次,浪费服务器资源、拖慢响应速度。而且数据更新频率较高,服务器负担比较大。若保存到数据库,还需要额外建立一张对......
  • 10 分钟彻底理解 Redis 的持久化和主从复制
    在这篇文章,一起了解一下其中一个非常重要的内容:Redis的持久化机制。什么是Redis持久化?Redis作为一个键值对内存数据库(NoSQL),数据都存储在内存当中,在处理客户端请求时,所......
  • RedisInsight :Redis 官方可视化工具
    RedisInsight是Redis官方出品的可视化管理工具,可用于设计、开发、优化你的Redis应用。支持深色和浅色两种主题,界面非常炫酷。可支持String、Hash、Set、List、JSON等多种......
  • Redis主要数据结构以及应用场景
    String最常用的各式,以kv格式进行存储常用的场景在于对象json存储,以及对象缓存、分布式锁、计数器等。SETKEYVALUE存入字符串的键值对MSETkeyvalue[keyvalue......
  • Redis数据类型(一)------------------String类型
    Redis数据类型之String类型String类型,也就是字符串类型,是Redis中最简单的存储类型。其value是字符串,不过根据字符串的格式不同,又可以分为3类:String:普通字符串int:整数......
  • redis 0: "AUTH <password> called without any password configured for the def
    运行项目的时候,报redis0:"AUTH<password>calledwithoutanypasswordconfiguredforthedef原因:主要是redis没有设置密码解决步骤:1.先进入到redis容器中......