最近在复习所学过的队列的知识,像什么LinkedBlockingDeque。ArrayBlockingQueue,还有ribbitmq里的乱七八糟的,其本质我感觉啊这些技术就是一些队列,只不过大体上分为单机队列和分布式队列而已,当然本文的重点在于redis实现延时队列啊,可能有人会说,用ribbitmq这个专门的消息中间件实现延时队列不香么,给消息设置个ttl,失效了放入死信队列进行监听,不就行了吗。可是面试的时候或者有些公司就是不想用消息中间件,你难不成还去劝说老总吗?显然这不现实,我们要做的就是尽量能多学点就多学点吧,保不齐以后就有这么刻薄的需求要我们来实现呢。
思路
先介绍下redis的5大数据类型吧,string hash set sortset list,其中与本文有关的就是sortset,大白话来说就是sortset可以进行排序,可以进行排序就意味这是不是与队列搭边了。其实大体上的思路就是存数据时以时间戳为排序规则存入,取数据时:当前时间戳>存入数据的时间戳则消费。
生产者实现
实际开发中可以根据需求封装一个vo类作为消息存入redis,我这里直接简单写了把i作为value存入QUEUENAME集合中,设置scroe为(当前时间+i分钟)的时间戳
@Autowired
RedisTemplate redisTemplate;
String QUEUENAME = "zzh:queue";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Test
public void pr() {
for (int i = 1; i <= 1; i++) {
Calendar now = Calendar.getInstance();
now.setTime(new Date());
now.set(Calendar.MINUTE, now.get(Calendar.MINUTE) + i);
System.out.println("生产了:" + i + "当前时间为:" + simpleDateFormat.format(System.currentTimeMillis()) + "消费时间为:" + simpleDateFormat.format(now.getTime()));
//往QUEUENAME这个集合中放入i,设置scorce为排序规则
redisTemplate.opsForZSet().add(QUEUENAME, i, now.getTime().getTime());
}
}
消费者实现
拿出score范围在0-当前时间戳的每条数据,遍历比较是否时间戳<当前时间戳,如果小于表示没有到达可以消费的时间,直至>了才可以消费
/**
* @param
* @method 每间隔1秒执行一次
*/
@Scheduled(cron = "*/1 * * * * * ")
public void cs() {
System.out.println("------------等待消费--------------");
//取出QUEUENAME集合中的score在0-当前时间戳这个范围的所有值
Set<Integer> set = redisTemplate.opsForZSet().rangeByScore(QUEUENAME, 0, System.currentTimeMillis());
Iterator<Integer> iterator = set.iterator();
while (iterator.hasNext()) {
Integer value = iterator.next();
//遍历取出每一个score
Double score = redisTemplate.opsForZSet().score(QUEUENAME, value);
//达到了时间就进行消费
if (System.currentTimeMillis() > score) {
System.out.println("消费了:" + value + "消费时间为:" + simpleDateFormat.format(System.currentTimeMillis()));
redisTemplate.opsForZSet().remove(QUEUENAME, value);
}
}
}
测试
可以看到只有达到时间才进行消费
完整代码链接
觉得不错的点个小星星吧
https://github.com/zhangzihang3/DelayQueue.git