Redis的Streams是一个日志类型的数据结构,它在Redis 5.0中被引入,用于存储并取回消息。这种数据结构让Redis可以成为消息队列系统和流处理系统。它是一个持久化、可复制、基于磁盘的、具有时间序列信息的日志系统。
Streams主要的命令包括:
-
**XADD key ID field value [field value ...]**:向指定的stream中添加新的元素。
-
**XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]**:从给定的stream中读取数据。
-
XDEL key ID [ID ...]: 删除stream中的一个或多个消息。
-
**XRANGE key start end [COUNT count]**:返回stream中在给定范围内的元素。
-
**XGROUP CREATE key groupname lastDeliveredID [MKSTREAM]**:为一个stream创建一个新的消费者组。
例如:
# 添加元素
redis> XADD mystream * sensor-id 1234 temperature 19.8
"1526569495631-0"
# 读取数据
redis> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1526569495631-0"
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
# 创建消费者组
redis> XGROUP CREATE mystream mygroup 0
OK
这就是Redis Streams的一些基本介绍,它是一个非常强大的功能,可以应用于各种场景,例如日志收集,数据流处理等。
Java示例:
以下是一个如何使用 Java 通过 Jedis 连接 Redis 并进行 Streams 操作的示例:
import redis.clients.jedis.*;
public class Test {
public static void main(String[] args) {
// 连接到 Redis 服务
Jedis jedis = new Jedis("localhost");
System.out.println("连接成功");
// 使用 XADD 命令添加元素
StreamEntryID id = jedis.xadd("mystream", null, new HashMap<String, String>(){{put("sensor-id", "1234");put("temperature", "19.8");}}, 1000, false);
System.out.println("添加元素成功,ID 为: " + id);
// 使用 XREAD 命令读取数据
List<Entry<String, StreamEntry>> list = jedis.xread(1, 1, new Entry<>("mystream", StreamEntryID.UNRECEIVED_ENTRY));
for (Entry<String, StreamEntry> entry : list) {
System.out.println("读取到数据 stream: " + entry.getKey() + ", ID: " + entry.getValue().getID() + ", fields: " + entry.getValue().getFields());
}
// 创建消费者组
String result = jedis.xgroupCreate("mystream", "mygroup", null, false);
System.out.println("消费者组创建结果: " + result);
jedis.close();
}
}
在这个例子中,我们首先创建了一个流 mystream
并添加了一个元素。然后我们读取 mystream
中的数据并打印出来。然后我们创建了一个消费者组 mygroup
。
请注意,需要引入 jedis 库并连接到一个运行的 Redis 服务以运行此代码。
标签:key,stream,Redis,redis,Streams,详细,jedis,ID,mystream From: https://blog.csdn.net/weixin_37954941/article/details/137340139