首页 > 数据库 >Redis实现消息队列的几种方式及基于Stream的消息队列演示

Redis实现消息队列的几种方式及基于Stream的消息队列演示

时间:2023-10-28 23:32:11浏览次数:35  
标签:消费者 Stream 队列 Redis list 消息 key ID

消息队列(Message Queue) ,字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

消息队列:存储和管理消息,也被称为消息代理(Message Broker )

生产者:发送消息到消息队列

消费者:从消息队列获取消息并处理消息

Redis实现消息队列的几种方式及基于Stream的消息队列演示_发布订阅

Redis提供了三种不同的方式来实现消息队列:

◆list结构:基于List结构模拟消息队列

◆PubSub:基本的点对点消息模型

◆Stream: 比较完善的消息队列模型

基于List结构模拟消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表, 很容易模拟出队列效果。

队列是入口和出口不在一边,因此我们可以利用: LPUSH结合RPOP、或者RPUSH结合LPOP来实现。

不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。

因此这里应该使用BLPOP或者BLPOP来实现阻塞效果。

RPOP或LPOP:非阻塞

BLPOP或者BLPOP:阻塞

Redis实现消息队列的几种方式及基于Stream的消息队列演示_消息队列_02

基于List的消息队列有哪些优缺点?

优点:

●利用Redis存储,不受限于JVM内存上限

●基于Redis的持久化机制,数据安全性有保证

●可以满足消息有序性

缺点:

●无法避免消息丢失

●只支持单消费者


基于PubSub的消息队列

PubSub (发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息

■SUBSCRIBE channel [channel]:订阅一个或多个频道

■PUBLISH channel msg :向一个频道发送消息

■PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

Redis实现消息队列的几种方式及基于Stream的消息队列演示_消息队列_03

基于PubSub的消息队列有哪些优缺点?

优点:

采用发布订阅模型,支持多生产、多消费

缺点:

●不支持数据持久化

●无法避免消息丢失

●消息堆积有上限,超出时数据丢失


基于Stream的消息队列

Stream是Redis 5.0引入的一种新数据类型,可以实现-个功能非常完善的消息队列。

发送消息的命令:

Redis实现消息队列的几种方式及基于Stream的消息队列演示_stream_04

读取消息的方式之一: XREAD

Redis实现消息队列的几种方式及基于Stream的消息队列演示_消息队列_05

Redis实现消息队列的几种方式及基于Stream的消息队列演示_发布订阅_06

STREAM类型消息队列的XREAD命令特点:

●消息可回溯

●一个消息可以被多个消费者读取

●可以阻塞读取

●有消息漏读的风险

基于Stream的消息队列-消费者组

消费者组(Consumer Group) :将多个消费者划分到一-个组中,监听同-一个队列。具备下列特点:

消息分流

队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度

消息标示

消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费。

消息确认(防止消息丢失)

消费者获取消息后,消息处于pending状态,并存入一个pending-list.当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。

创建消费者组:

XGROUP CREATE key groupName ID [MKSTREAM]

key:队列名称

groupName:消费者组名称

ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息

MKSTREAM:队列不存在时自动创建队列

其它常见命令:

#删除指定的消费者组
XGROUP DESTORY key groupName
#给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
#删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

●group: 消费组名称

●consumer: 消费者名称,如果消费者不存在,会自动创建- -个消费者

●count:本次查询的最大数量

●BLOCK milliseconds: 当没有消息时最长等待时间

●NOACK:无需手动ACK,获取到消息后自动确认

●STREAMS key:指定队列名称

●ID:获取消息的起始ID:

●">":从下一个未消费的消息开始

●其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始


STREAM类型消息队列的XREADGROUP命令特点:

●消息可回溯

●可以多消费者争抢消息,加快消费速度

●可以阻塞读取

●没有消息漏读的风险

●有消息确认机制,保证消息至少被消费一次

Redis实现消息队列方法对比

Redis实现消息队列的几种方式及基于Stream的消息队列演示_Redis_07

Redis实现消息队列的几种方式及基于Stream的消息队列演示_Redis_08

while (true) {
                try {
                    // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                    );
                    // 2.判断订单信息是否为空
                    if (list == null || list.isEmpty()) {
                        // 如果为null,说明没有消息,继续下一次循环
                        continue;
                    }
                    // 解析数据
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 3.创建订单
                    createVoucherOrder(voucherOrder);
                    // 4.确认消息 XACK
                    stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理订单异常", e);
                    handlePendingList();
                }
            }


标签:消费者,Stream,队列,Redis,list,消息,key,ID
From: https://blog.51cto.com/AmbitionGarden/8073198

相关文章

  • 数据结构之栈和队列
    一:物理结构和逻辑结构除了数组和链表之外,常用过的数据结构还有很多,但大对数* 都以数组或链表作为存储方式。数组和链表可以被看作数据存储* 地‘物理结构“**什么是数据存储的物理结构呢?*如果把数据结构比作活生生的人,那么物理结构就是人的血肉*和骨骼,看得见,摸得着,实......
  • java——redis随笔——实战——优惠券秒杀——分布式锁
    注意:synchronized用户单机(jvm)上面的锁,对于分布式应用则无能为力。所以对于分布式系统,则需要分布式锁。 分布式锁:满足分布式系统或集群模式下多线程课件并且可以互斥的锁分布式锁的核心思想就是让大家共用同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分......
  • Redis 数据持久化方案
    今日目标掌握Redis数据持久化原理在分布式系统中,存储非结构化数据的中间件Redis是必不可少的,Redis在小、中、大甚至高并发系统中都有发挥起作用的场合,在之前我已经给大家介绍过Redis的基础数据结构(String/Hash/Set/ZSet/List)的增删改查操作。现在思考一个问题,Redis如果仅仅只是将......
  • [ Redis 2 ] 持久化
    Redis_2持久化1.Redis.conf详解redis.conf从上向下详解1.1单位配置1redis对单位的大小写是不敏感的,单位可以是gb,GB,Gb等等。2可以包含其他配置文件的配置1.2网络配置绑定的ip和端口号bind127.0.0.1protected-modeyes#保护模式port63791.3通用配置Gener......
  • 一文掌握Java Stream API
    引言JavaStreamAPI自Java8引入以来,已成为处理集合数据的强大工具。它不仅提高了代码的可读性,还优化了性能,使得集合操作变得更加简洁和高效。本文将深入探讨如何利用StreamAPI的常用操作,帮助你更好地掌握这一强大的功能。JavaStreamAPI简介JavaStream是Java8引入的......
  • redis缓存更新策略,缓存穿透,缓存雪崩,缓存击穿。封装redis工具类
    (redis缓存)缓存是存储数据的临时地方,一般读写性能高1.给商铺添加缓存思路:在对应的serviceImpl里写逻辑@OverridepublicResultqueryById(Longid){Stringkey=CACHE_SHOP_KEY+id;//1.从redis查询商铺缓存StringshopJSON=stringRedisTemplate.opsF......
  • Linux 下使用 Docker 安装 Redis
    1、下载redisdockerpullredis:6.2.62、提前创建挂载目录mkdir-p/mydata/redis/confmkdir-p/mydata/redis/datamkdir-p/mydata/redis/logtouch/mydata/redis/conf/redis.conftouch/mydata/redis/log/redis.logchmod777/mydata/redis/log/redis.log3、启......
  • Java基础 阻塞队列的方式实现等待唤醒机制,哪里体现了等待?哪里又体现了唤醒?
    Java的阻塞队列(BlockingQueue)可以用来实现等待唤醒机制,其中等待和唤醒的操作在队列的不同方法中体现:1.等待:在阻塞队列中,等待通常发生在以下情况:2.当队列为空时,消费者线程试图从队列中取出元素时,它会被阻塞,直到队列中有元素可供消费。这种等待是通过阻塞队列的take()方法来实现......
  • Java基础 等待唤醒机制(阻塞队列方式实现)
    等待唤醒机制还可以用阻塞队列的方式进行实现    练习:利用阻塞队列完成生产者和消费者(等待唤醒机制)的代码细节:生产者和消费者必须使用同一个阻塞队列阻塞队列的创建方式(泛型:队列里面数据的类型):ArrayBlockingQueue<String> queue = new  ArrayBlockingQueue<......
  • 数据结构与算法(LeetCode) 第二节 链表结构、栈、队列、递归行为、哈希表和有序表
    一、链表结构1.单向链表节点结构publicclassNode{ publicintvalue;publicNodenext;publicNode(intdata){value=data;}}2.双向链表节点结构publicclassDoubleNode{publicintvalue;publicDoubleNodelast;publicDouble......