首页 > 数据库 >基于Redis的队列

基于Redis的队列

时间:2023-08-29 10:37:29浏览次数:48  
标签:queue 基于 Object 队列 object Redis redissonClient respDTO

1. 队列

//发布
@ApiOperation(value = "put 普通队列")
  @PostMapping("/queuePut")
  public Object put(@RequestBody CommonMapRespDTO respDTO)  {
    for (int i = 0; i < 20; i++) {
      //队列
      RQueue<Object> queue = redissonClient.getQueue(TopicConstant.normanQueueName);
      //队列满 返回异常
      queue.add(respDTO.getValue());
    }
    return "OK";
  }

//消费
 private void queueTest() {
    ExecutorServiceCasche.EXECUTOR.execute(() -> {
      //阻塞队列
      RQueue<Object> queue = redissonClient.getQueue(TopicConstant.normanQueueName);
      while (true) {
        //如果为空则等待
        Object object = queue.poll();
        if (object == null) {
          //50微妙
          ThreadUtil.sleep(50, TimeUnit.MILLISECONDS);
          continue;
        }
        System.out.println(Thread.currentThread().getName() + "读取到的值:" + object);
      }
    });
  }

2. 阻塞队列

//发布数据
@ApiOperation(value = "put 阻塞队列")
  @PostMapping("/blockingQueuePut")
  public Object blockingQueue(@RequestBody CommonMapRespDTO respDTO) throws InterruptedException {
    for (int i = 0; i < 20; i++) {
      //阻塞队列
      RBlockingQueue<Object> rBlockingQueue = redissonClient.getBlockingQueue(queueName);
      rBlockingQueue.put(respDTO.getValue());
    }
    return "OK";
  }
//消费
 private void blockQueueTest() {
    ExecutorServiceCasche.EXECUTOR.execute(() -> {
      //阻塞队列
      RBlockingQueue<Object> rBlockingQueue = redissonClient.getBlockingQueue(
          RedissionController.queueName);
      while (true) {
        Object object = null;
        try {
          object = rBlockingQueue.take();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "读取到的值:" + object);
      }
    });
  }

3. 延迟队列

//发布
@ApiOperation(value = "delayedQueue 延时队列")
  @PostMapping("/delayedQueue")
  public Object delayedQueue(@RequestBody CommonMapRespDTO respDTO) {
    Random random = new Random();
    for (int i = 0; i < 20; i++) {
      //基于redis发送变更通知
      RBlockingQueue<Object> rBlockingQueue = redissonClient.getBlockingQueue(TopicConstant.delayedQueueName);
      RDelayedQueue<Object> queue = redissonClient.getDelayedQueue(rBlockingQueue);
      Integer temp = random.nextInt(20);
      queue.offer(temp, temp, TimeUnit.SECONDS);
    }
    return "OK";
  }
//消费
private void delayQueue() {
    ExecutorServiceCasche.EXECUTOR.execute(() -> {
      //阻塞队列
      RBlockingDeque<Object> blockingDeque = redissonClient
          .getBlockingDeque(TopicConstant.delayedQueueName);
      RDelayedQueue<Object> queue = redissonClient.getDelayedQueue(blockingDeque);
      while (true) {
        Object object = null;
        try {
          object = blockingDeque.take();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "读取到的值:" + object);
      }
    });
  }

4. 发布订阅

//发送
@ApiOperation(value = "topic 发布订阅模式")
  @PostMapping("/topic")
  public Object topic(@RequestBody CommonMapRespDTO respDTO) {
    for (int i = 0; i < 20; i++) {
      //基于redis发送变更通知
      RTopic topic = redissonClient.getTopic(TopicConstant.DICT_CHANGE_TOPIC);
      topic.publish("change");
    }
    return "OK";
  }
//消费
 private void dictDataChangelistener() {
    //小代码变更 topic  发布订阅模式
    RTopic rTopic = redissonClient.getTopic(TopicConstant.DICT_CHANGE_TOPIC);
    rTopic.addListener(String.class, new MessageListener<String>() {
      @Override
      public void onMessage(CharSequence charSequence, String s) {
        try {
          String key = charSequence.toString();
          //单线程更新
          System.out.println(Thread.currentThread().getName() + "==>收到通知:" + key);
        } catch (Exception e) {
          log.error("==>同步小代码异常", e);
        }
      }
    });
  }

5. 发布订阅不会进行数据的持久化、队列都会进行数据的持久化

标签:queue,基于,Object,队列,object,Redis,redissonClient,respDTO
From: https://www.cnblogs.com/hopeway-shaon/p/17664062.html

相关文章

  • Redis 入门篇
    1.初始Redis1.1认识NoSQL‍NoSql可以翻译做NotOnlySql(不仅仅是SQL),或者是NoSql(非Sql的)数据库。是相对于传统关系型数据库而言,有很大差异的一种特殊的数据库​,因此也称之为非关系型数据库。‍1.1.1结构化与非结构化‍​​1.1.2关系型与非关系型的差异存储方式......
  • redis 消息队列方案
    List实现消息队列使用LPUSH、RPOP左进右出或RPUSH、LPOP右进左出,实现消息顺序消费使用BLPOP、BRPOP这种阻塞式读取的命令,实现消息及时消费ack机制使用,使用index读取list的消息,正常消费完成后再使用POP删除//使用redission实现@Slf4j@ServicepublicclassQue......
  • 基于SpringBoot的装饰工程管理系统
    如今社会上各行各业,都喜欢用自己行业的专属软件工作,互联网发展到这个时候,人们已经发现离不开了互联网。新技术的产生,往往能解决一些老技术的弊端问题。因为传统装饰工程项目信息管理难度大,容错率低,管理人员处理数据费工费时,所以专门为解决这个难题开发了一个装饰工程管理系统项目立......
  • 栈和队列在数据结构中的应用
    文章目录理解栈和队列的概念及其特点栈的应用和操作队列的应用和操作结论......
  • 抖音订单推送设计思路-基于支付ID而不是订单号来推送
    抖音订单推送设计思路-基于支付ID而不是订单号来推送方案:不是基于订单号来核销和分账的,抖音是基于每一笔抖音支付来做订单推送,核销和分账现象:这样会在抖音的订单中心中会出现很多条订单推送的记录。抖音App订单列表状态如下:待支付,已取消,已支付,已核销抖音文档地址订单同步htt......
  • Redis 缓存满了怎么办?
    Redis缓存使用内存来保存数据,随着需要缓存的数据量越来越大,有限的缓存空间不可避免地会被写满。此时,应该怎么办?本篇文章接下来就来聊聊缓存满了之后的数据淘汰机制。值得注意的是,在Redis中 过期策略和内存淘汰策略是两个完全不同的概念。Redis过期策略指的是Redis使用......
  • redis的2种持久化方式:aof和rdb
    redis的rdb和aof持久化方式的区别及使用场景原创 陈烨123 java知路 2023-07-0422:25 发表于湖北Redis提供了两种持久化方式:RDB(RedisDatabase)和AOF(AppendOnlyFile)。RDB持久化是通过周期性地将Redis数据集快照写入磁盘上的二进制文件实现的。在触发持久化时,Redis会......
  • 队列和栈
    队列和栈是两种常见的数据结构,常用于存储和操作数据的方式。它们有不同的特点和用途。队列(Queue)是一种先进先出(First-In-First-Out,FIFO)的数据结构。可以将其想象成排队的人群,仅允许在队尾插入元素,从队头移除元素。新元素总是加入到队列的末尾,而最先加入的元素会最先被移除。队列......
  • 基于QT实现的商品销售管理系统[2023-08-28]
    基于QT实现的商品销售管理系统[2023-08-28]商品销售管理系统该系统需创建和管理以下信息:1、商品信息:标号、品名、产地、进价、数量、到期日期、进货日期、销售数据集;2、销售信息:销售日期、数量、价格、营业员工号、业务类型(售出、退货);系统功能要求如下:1.创建和管理描述......
  • Redis 为什么这么快?
    前言  作为一名后端软件工程师,工作中你肯定和Redis打过交道。但是Redis为什么快呢?很多人只能答出Redis因为它是基于内存实现的,但是对于其它原因都是模棱两可。那么今天就一起来看看是Redis为什么快吧:         Redis为什么这么快? 一、基于内......