首页 > 数据库 >redis stream的所有方法以及用处和使用场景

redis stream的所有方法以及用处和使用场景

时间:2023-12-14 09:56:30浏览次数:33  
标签:count name stream redis 消息 key 用处

目录
Redis Stream 是 Redis 提供的一种数据结构,用于支持实时消息流。它是一个有序、可持久化的消息日志,支持多个消费者组消费消息,同时保留消息的发布顺序。以下是 Redis Stream 的一些主要方法以及它们的用途和使用场景:

一、用途: 将消息添加到 Stream 中。

使用场景: 用于发布消息,创建新的消息流。

XADD key ID field1 value1 [field2 value2 ...]
# 添加消息到 Stream
def add_message(stream_name, message_id, message_data):
    redis_client.xadd(stream_name, {message_id: message_data})

二、用途: 按范围获取消息。

使用场景: 用于检索指定范围内的消息,支持 COUNT 选项以限制返回的消息数量。

XRANGE key start end [COUNT count]
# 获取指定范围内的消息
def get_messages_in_range(stream_name, start, end):
    messages = redis_client.xrange(stream_name, start, end)
    return messages

三、用途: 阻塞读取消息,支持多个 Stream。

使用场景: 用于消费消息,支持阻塞等待新消息的到来。

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
# 获取当前最新消息的 ID
latest_ids = r.xinfo_stream(stream_name)
latest_id = '0' if not latest_ids else latest_ids['last-generated-id']

# 使用 XREAD 命令读取消息
# BLOCK 0 表示非阻塞,COUNT 表示返回的消息数量
response = r.xread(streams={stream_name: latest_id}, count=10, block=0)

四、用途: 创建消费者组。

使用场景: 在消费消息之前,需要创建消费者组。

XGROUP CREATE key groupname id-or-$ [MKSTREAM]
# 创建消费者组
def create_consumer_group(stream_name, group_name, start_id):
    redis_client.xgroup_create(stream_name, group_name, id=start_id, mkstream=True)

五、用途: 阻塞读取消息并将其分配给消费者组中的消费者。

使用场景: 用于多个消费者协同消费消息,支持阻塞等待新消息的到来。

XREADGROUP GROUP groupname consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
# 阻塞读取消息
def read_messages_blocking(stream_name, consumer_name, count=1, block=0):
    messages = redis_client.xreadgroup(groupname=consumer_name, consumername='my_consumer', streams={stream_name: '>'}, count=count, block=block)
    return messages

六、用途: 确认消息已被消费。

使用场景: 消费者在成功处理消息后,通过此命令通知 Stream 已经处理完消息。

XACK key group ID [ID ...]
# 确认消息已被消费
def ack_message(stream_name, group_name, message_id):
    redis_client.xack(stream_name, group_name, message_id)

七、用途: 获取待处理的消息列表。

使用场景: 用于获取尚未被确认的消息列表,可以用于监控消费者的处理情况。

XPENDING key group [start end COUNT count] [consumer]
# 获取待处理的消息列表
def get_pending_messages(stream_name, group_name, start='-inf', end='+', count=10):
    pending_messages = redis_client.xpending_range(stream_name, group_name, start, end, count)
    return pending_messages

八、用途: 删除消息。

使用场景: 用于清理 Stream 中的消息,保持消息数量在一个可控制的范围内。

XTRIM key MINID ~
# 删除消息
def delete_messages(stream_name, minid):
    redis_client.xtrim(stream_name, minid=minid)

上述是Redis Stream 中的一些基本方法,可以用于发布、订阅和处理实时消息。这些方法使得 Redis 可以用作可靠的消息队列和实时事件处理系统。

除了上述提到的主要方法,以下是 Redis Stream 支持的一些其他方法:

九、用途: 删除一个或多个消息。

使用场景: 用于删除不再需要的消息。

XDEL key ID [ID ...]
def xdel(redis, key, *message_ids):
    return redis.xtrim(key, *message_ids)

十、用途: 获取指定 Stream 的消息数量。

使用场景: 用于获取消息流的当前长度。

XLEN key
def xlen(redis, key):
    return redis.xlen(key)

十一、用途: 反向按范围获取消息。

使用场景: 与 XRANGE 类似,但按照相反的顺序返回消息。

XREVRANGE key end start [COUNT count]
def xrevrange(redis, key, end, start, count=None):
    return redis.xrevrange(key, end, start, count=count)

十二、用途: 非阻塞读取并分配消息给消费者组中的消费者。

使用场景: 与 XREADGROUP 类似,但是是非阻塞的。

XREADGROUP GROUP groupname consumer STREAMS key [key ...] COUNT count BLOCK milliseconds
def xreadgroup(redis, groupname, consumer, key, count=None, block=None):
    return redis.xreadgroup(groupname, consumer, {key: '>'}, count=count, block=block)

十三、用途: 将消费者组的 last delivered ID 设置为指定值。

使用场景: 用于重置消费者组的状态。

XGROUP SETID key groupname id-or-$
def xgroup_setid(redis, key, groupname, id_or_$):
    return redis.xgroup_create_mkstream(key, groupname, id_or_$)

十四、用途: 销毁消费者组。

使用场景: 在不再需要消费者组时使用。

XGROUP DESTROY key groupname
def xgroup_destroy(redis, key, groupname):
    return redis.xgroup_destroy(key, groupname)

这些方法提供了对 Redis Stream 的更多控制和操作能力,您可以根据具体的需求选择适当的方法。它们共同构成了一个强大的消息处理系统,适用于实时数据流和事件处理场景。

十五、分别XREAD和XREADGROUP

XREAD 是用于直接读取消息的命令,不涉及消费者组。你可以通过指定多个 STREAM 名称来一次性读取多个流中的消息。

XREADGROUP 则是用于消费者组的命令。它允许多个消费者组并发地读取消息,并且支持消息的消费者组内分发。

标签:count,name,stream,redis,消息,key,用处
From: https://www.cnblogs.com/yuezongke/p/17900455.html

相关文章

  • 【JavaSE】Stream流
    Stream作用:简化数组或集合的操作获取Stream流对象Stream流中间操作方法返回值都是Stream流对象,可以链式编程Stream流终结操作方法返回值不再是Stream流对象Stream收集操作因为Stream流操作不会修改数据源,因此需要收集操作:将Stream流操作后到结果数据转回到集合收......
  • Redis数据结构4:REDIS_ZIPLIST
    REDIS_ZIPLISTzipList(压缩列表)是一种紧凑型的数据结构,占用一片连续的内存,本质上是一个字节数组。能提高CPU缓存的利用效率,并且针对不同数据结构进行不同编码,节省内存开销。编码结构zipList的字节数组主要由5个部分组成:zlbytes、zltail、zllen、zltail和entry。zlbytes记录......
  • Redis实战篇
    Redis实战篇开篇导读短信登录这一块我们会使用redis共享session来实现商户查询缓存通过本章节,我们会理解缓存击穿,缓存穿透,缓存雪崩等问题,让小伙伴的对于这些概念的理解不仅仅是停留在概念上,更是能在代码中看到对应的内容优惠卷秒杀通过本章节,我们可以学会Redis的计数......
  • 2023最新中级难度Redis面试题,包含答案。刷题必备!记录一下。
    好记性不如烂笔头内容来自面试宝典-中级难度Redis面试题合集问:请解释Redis中的持久化机制RDB和AOF的区别,并谈谈你在实际应用中的选择。Redis的两种持久化机制分别为RDB和AOF:RDB(RedisDatabase)是Redis默认的持久化方式,会在指定的时间间隔内将内存中的数据集快照写入磁盘......
  • 2023最新初级难度Redis面试题,包含答案。刷题必备!记录一下。
    好记性不如烂笔头内容来自面试宝典-初级难度Redis面试题合集问:请简单介绍一下Redis,以及它主要用于解决什么问题?Redis是一款键值存储系统,也被称为“内存数据库”,其主要特点是在内存中高速存储数据。它的优点在于其极高的读写速度和较低的延迟,因此常被用来作为缓存、队列......
  • Redis
    入门Redis是一种基于Key-Value键值对的在内存数据库。版本号第二位是奇数则是非稳定版本,偶数则为稳定版本。常用命令命令作用redis-server/myredis/redis7.conf启动Redisredis-cli-a159123zxc-p6379连接Redisquit退出Redis界面,此时Redis仍在运行shu......
  • Redis内存分析工具-RDBtools安装&使用
    目录是什么安装安装Python(已安装忽略,低版本需要卸载重安)安装GCC(已安装忽略)安装rdbtools和python-lzf安装成功页面基础命令常用示例查找大key与处理导出CVS文件直连Redis服务查询单个key详情生成HTML图表更多用法见Help是什么Rdbtools提供了一组工具,可以帮助用户分析、导入和转换......
  • Springboot项目通过redis实现接口的幂等性
    在SpringBoot项目中,通过Redis实现接口的幂等性通常是通过在Redis中存储唯一标识符(token、UUID等)的方式来实现。当接口第一次被调用时,生成并存储一个唯一标识符到Redis,然后将该标识符返回给客户端。客户端在后续的请求中携带该标识符,服务端在处理请求之前检查Redis中是否存在该标识......
  • 【Centos】Centos 7.6 安装 Redis 7.2.3
    1  前言我们继续安装Redis。2 安装步骤2.1 下载压缩包https://redis.io/download/2.2 解压tar-xvfredis-7.2.3.tar.gz2.3 安装make2.4 启动./src/redis-server./redis.conf2.5 修改配置修改配置文件:redis.conf#绑定开放bind127.0.0.......
  • Java 8 Stream 流的常用方法总结
    Java8Stream流的常用方法总结Java8引入了一个新的API:StreamAPI,它允许我们以声明式的方式处理数据集合。StreamAPI提供了一系列强大的方法,可以帮助我们更简洁、高效地处理数据。本文将总结Java8Stream流的常用方法,并提供相应的代码示例。1.创建Stream首先,我们需要了......