首页 > 数据库 >Redis实现延迟队列

Redis实现延迟队列

时间:2022-08-16 18:36:12浏览次数:58  
标签:return String Thread 队列 LOG Redis value public 延迟

 

一、延迟队列

进入该队列的消息会被延迟消费的队列,一般的队列,进入队列后会进行排队依次消费掉

二、使用场景

需要进行延迟消费的场景,本文举例为某本书籍更新了章节,待内容上传完成及各种逻辑处理完成之后延迟五分钟给用户推送消息通知

三、使用介绍

实现方式还有很多,可以使用java延迟队列RelayQueue或者RabbitMQ来实现,这里记录redis实现

主要思路为Redis的ZSET集合来实现,score值为过期时间的时间戳,再创建线程池通过rangeByScore获取需要处理的数据

public interface IDelayQueue<E> {
    /**
     * 向延时队列中添加数据
     *
     * @param score 分数
     * @param data  数据
     * @return true 成功 false 失败
     */
    boolean add(long score, E data);

    /**
     * 从延时队列中获取数据
     *
     * @return
     */
    String get();

    /**
     * 删除数据
     *
     * @param data 数据
     * @return
     */
    boolean rem(E data);
}

  

public class RedisDelayQueue implements IDelayQueue<String> {

    private RedisTemplate<String, String> redisQueueTemplate;

    private String key;

    public RedisDelayQueue(String key, RedisTemplate<String, String> redisQueueTemplate) {
        this.key = key;
        this.redisQueueTemplate = redisQueueTemplate;
    }

    @Override
    public boolean add(long score, String data) {
        return redisQueueTemplate.opsForZSet().add(key, data, score);
    }

    @Override
    public String get() {
        double now = System.currentTimeMillis();
        double min = Double.MIN_VALUE;
        Set<String> res = redisQueueTemplate.opsForZSet().rangeByScore(key, min, now, 0, 10);
        if (!CollectionUtils.isEmpty(res)) {
            for (String data : res){
                // 删除成功,则进行处理,防止并发获取重复数据
                if (rem(data)){
                    return data;
                }
            }
        }
        return null;
    }


    @Override
    public boolean rem(String data) {
        return redisQueueTemplate.opsForZSet().remove(key, data) > 0;
    }

}

实例化RedisDealyQueu并且注入,  

private final IDelayQueue<String> queue;

 在需要往队列插入数据的地方:

 

       int delayMinutes = 5;
        LocalDateTime localDateTime = LocalDateTime.now().plusMinutes(delayMinutes);
        long score = localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
        return queue.add(score, JSONUtil.toJsonStr(bookUpdateNotify)); 

 

 这里score表示延迟五分钟

public abstract class AbstractDelayQueueWorkerService implements InitializingBean {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractDelayQueueWorkerService.class);

    protected volatile boolean monitorStarted = false;

    protected volatile boolean monitorShutDowned = false;

    //默认休眠时间 500毫秒
    private static final int DEFAULT_SLEEP_TIME = 500;

    private ExecutorService executorService;

    private static final int DEFAULT_THREAD_NUM = 1;

    // 线程数量
    private int threadNum = DEFAULT_THREAD_NUM;

    private int threadSheepTime = DEFAULT_SLEEP_TIME;

    // 线程名称
    protected String threadName;

    // 需要监控的延时队列
    protected IDelayQueue<String> monitorQueue;

    public void setQueueTaskConf(String threadName, int threadNum, IDelayQueue<String> monitorQueue, int threadSheepTime) {
        this.threadName = threadName;
        this.threadNum = threadNum;
        this.monitorQueue = monitorQueue;
        this.threadSheepTime = threadSheepTime;
    }

    public void setThreadNum(int threadNum) {
        this.threadNum = threadNum;
    }

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public void setThreadSheepTime(int threadSheepTime) {
        this.threadSheepTime = threadSheepTime;
    }

    @Override
    public void afterPropertiesSet() throws Exception {

        executorService = Executors.newFixedThreadPool(threadNum);

        for (int i = 0; i < threadNum; i++) {
            final int num = i;
            executorService.execute(() -> {
                Thread.currentThread().setName(threadName + "[" + num + "]");
                while (!monitorShutDowned) {
                    String value = null;
                    try {
                        value = beforeExecute();
                        // 获取数据时进行删除操作,删除成功,则进行处理,防止并发获取重复数据
                        if (StringUtils.isNotEmpty(value)) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Monitor Thread[" + Thread.currentThread().getName() + "], get from queue,value = {}", value);
                            }
                            boolean success = execute(value);
                            // 失败重试
                            if (!success) {
                                success = retry(value);
                                if (!success) {
                                    LOG.warn("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = {}", value);
                                }
                            } else {
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:execute successfully!values = {}", value);
                                }
                            }
                        } else {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:monitorThreadRunning = {}", monitorStarted);
                            }
                            Thread.sleep(threadSheepTime);
                        }
                    } catch (Exception e) {
                        LOG.error("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = " + value, e);
                    }
                }
                LOG.info("Monitor Thread[" + Thread.currentThread().getName() + "] Completed...");
            });
        }
        LOG.info("thread pool is started...");
    }

    /**
     * 操作队列取数据
     *
     * @return 队列数据
     */
    public String beforeExecute() {
        return monitorQueue.get();
    }

    /**
     * 执行业务逻辑
     */
    public abstract boolean execute(String value);

    /**
     * 重试
     *
     * @param value 队列内容
     * @return true:成功,false:失败
     */
    protected boolean retry(String value) {
        LOG.info("job retry, value: {}", value);
        return execute(value);
    }

    protected void shutdown() {
        executorService.shutdown();
        LOG.info("thread pool is shutdown...");
    }
}

 

  

  

 

标签:return,String,Thread,队列,LOG,Redis,value,public,延迟
From: https://www.cnblogs.com/LiuFqiang/p/16592522.html

相关文章

  • Redis---hash哈希散列
    1.前言Redishash(哈希散列)是由字符类型的field(字段)和value组成的哈希映射表结构(也称散列表),它非常类似于表格结构。在hash类型中,field与value一一对应,且不允许重......
  • Redis---set集合
    1.前言Redisset(集合)遵循无序排列的规则,集合中的每一个成员(也就是元素,叫法不同而已)都是字符串类型,并且不可重复。Redisset是通过哈希映射表实现的,所以它的添加、删除......
  • Redis---列表
    1.前言Redislist(列表)相当于Java语言中的LinkedList结构,是一个链表而非数组,其插入、删除元素的时间复杂度为O(1),但是查询速度欠佳,时间复杂度为O(n)。当向列表中添......
  • 1020 胖胖的牛牛 优先队列 bfs 转向时上上次xy与当前xy都不同
     链接:https://ac.nowcoder.com/acm/problem/208246来源:牛客网题目描述每逢佳节胖三斤,牛牛在过去的节日里长胖了,连拐弯都困难,甚至会卡在门上......
  • 项目中,需要对设备上线离线的动作做日志打印. (利用redis发布订阅监听key解决. 观
    https://blog.csdn.net/weixin_44642403/article/details/116278262?ops_request_misc=&request_id=&biz_id=102&utm_term=redis%E7%9B%91%E5%90%ACkey%E5%9C%A8%E4%BB%80%......
  • 解决Redis、MySQL缓存双写不一致问题
     redis、mysql双写缓存不一致: 但是在更新缓存方面,对于更新完数据库,是更新缓存呢,还是删除缓存。又或者是先删除缓存,再更新数据库,其实大家存在很大的争议。目前没有一......
  • 记Windows服务器Redis 6379被攻击 被设置主从模式同步项目数据
    在工作中第一次经历被攻击,我是一个前端,同时复负责维护一个已上线的项目,在最近一段时间小程序与后台经常出现这个报错,搜了下说我的从机是只读模式,不能写入,问了同事得知这......
  • Redis-5.0141 解决应用问题-17
    1.缓存穿透1.1什么是缓存穿透?   缓存穿透是指查询一个一定不存在的数据,由于缓存在没有命中时需要从数据库查询,查不到数据则不写入缓存,这将导致这个不存在的数据每......
  • Redis常用指令之string、list、set、zset、hash
    Redis之五大类型常用指令redis的一些小知识redis服务器端口默认是6379在编译完成后的bin目录下启动服务端:redis-server客户端连接操作:redis-cli-hlocalhost-p......
  • redis的常见面试题
    为什么要用redis减少了mysql数据库的压力,在这之前mysql一个人承受,然后要承受大量的数据请求,大部分都是读操作。而且经常都是重复查一个东西,浪费了很多时间进行磁盘iore......