首页 > 数据库 >Redis流详解及Java实践

Redis流详解及Java实践

时间:2024-08-13 09:25:11浏览次数:15  
标签:Map Java String Redis 详解 jedis key Jedis id

Redis流详解及Java实践


Redis 5.0版本引入了一个新的数据类型 - 流(Stream)。Stream主要用于消息队列场景,它弥补了Redis发布订阅(pub/sub)不能持久化消息的缺陷。旨在提供一种高效、可扩展的方式来进行消息的生产和消费。它可以用来构建消息队列、事件日志等系统,非常适合用于实时数据分析、微服务间的异步通信等场景。

Redis Stream 概述

基本概念

  • Stream:Redis Stream 是一个包含零个或任意多个流元素的有序队列,每个元素都有一个唯一标识符(ID)和一组键值对。
  • 消息:流中的每一个元素都是一个消息,包含了多个键值对。
  • ID:每条消息都有一个由 Redis 自动生成的 ID,用于排序和识别消息。
  • 消费者:消费消息的实体,可以是程序或服务。
  • 消费者组:一组消费者,可以更好地管理消息的消费和重试。

使用场景

  • 消息队列:用于异步消息传递。
  • 事件日志:记录事件发生的时间顺序。
  • 实时数据处理:如实时监控、日志分析等。
  • 分布式任务队列:协调多个服务之间的任务分配。

Redis Stream 命令

XADD - 添加消息

用于向 Stream 中添加一条或多条消息。语法如下:

XADD key [MAXLEN] [~|>] count field value [field value ...]
  • key:Stream 的键。
  • MAXLEN:可选参数,用于限制 Stream 的长度。
  • count:当 Stream 达到最大长度时,移除的元素数量。
  • field value:消息中的键值对。

XRANGE - 获取消息

用于获取 Stream 中的一系列消息。语法如下:

XRANGE key start stop [COUNT count]
  • key:Stream 的键。
  • startstop:消息 ID 的范围。
  • COUNT:可选参数,限制返回的消息数量。

XREAD - 读取消息

用于从一个或多个 Stream 中读取消息。语法如下:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [id] [...]
  • COUNT:可选参数,限制返回的消息数量。
  • BLOCK:阻塞时间,如果没有新消息,则等待指定毫秒数。
  • STREAMS:要读取的 Stream 列表。
  • id:上次读取的 ID,用于获取新消息。

XGROUP - 操作消费者组

用于创建、删除消费者组或消费者,以及管理消费者组的状态。例如:

XGROUP CREATE key groupname consumername mkstream
  • CREATE:创建一个新的消费者组。
  • groupname:消费者组的名字。
  • consumername:消费者的名称。
  • mkstream:如果 Stream 不存在,则创建它。

XACK - 确认消息

用于确认消息已经被成功处理。语法如下:

XACK key groupname id [id ...]
  • groupname:消费者组的名字。
  • id:要确认的消息 ID。

代码示例

下面通过Java代码来演示Redis流的基本操作:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;

import java.util.AbstractMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class RedisStreamExample {

    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);

        // 添加消息
        //命令 XADD mystream * sensor-id 1234 temperature 19.8
        //"*"表示让Redis自动生成消息ID。
        
        Map<String, String> messageMap = new HashMap<>();
        messageMap.put("sensor-id", "1234");
        messageMap.put("temperature", "19.8");
        StreamEntryID messageId = jedis.xadd("mystream", null, messageMap);
        System.out.println("Added message with ID: " + messageId);

        // 读取消息
        //命令 XREAD COUNT 2 STREAMS mystream 0-0
        //这会读取mystream流中最多2条消息,从最早的消息开始。
        
        List<StreamEntry> messages = jedis.xread(1,
                Long.MAX_VALUE,
                new AbstractMap.SimpleEntry<>("mystream", StreamEntryID.UNRECEIVED_ENTRY));
        
        for (StreamEntry entry : messages) {
            System.out.println("Read message: " + entry);
        }

        // 创建消费者组
        //命令 XGROUP CREATE mystream mygroup $
        //"$"表示从最新的消息开始消费
        
        String result = jedis.xgroupCreate("mystream", "mygroup", null, false);
        System.out.println("Consumer group created: " + result);

        // 消费者组读取消息
        //命令 XREADGROUP GROUP mygroup alice COUNT 1 STREAMS mystream >  
        //">"表示读取从未被发送给其他消费者的消息
        
        List<StreamEntry> groupMessages = jedis.xreadGroup("mygroup",
                "consumer1",
                1,
                Long.MAX_VALUE,
                false,
                new AbstractMap.SimpleEntry<>("mystream", StreamEntryID.UNRECEIVED_ENTRY));

        for (StreamEntry entry : groupMessages) {
            System.out.println("Group read message: " + entry);
            // 确认消息
            jedis.xack("mystream", "mygroup", entry.getID());
        }

        jedis.close();
    }
}

实战案例

示例 1: 日志收集系统

有一个简单的日志收集系统,其中包含一个生产者服务用于发送日志到 Redis Stream,一个消费者服务用于读取并处理这些日志。

生产者服务
import redis.clients.jedis.Jedis;

public class LogProducer {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost");
        String key = "log-stream";
        String id = jedis.xadd(key, Map.of("log-level", "INFO", "message", "Application started"));
        System.out.println("Log message added with ID: " + id);
    }
}
消费者服务
import redis.clients.jedis.Jedis;
import java.util.Map;

public class LogConsumer {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost");
        String key = "log-stream";
        String group = "log-consumer-group";
        String consumer = "log-consumer-1";

        // 创建消费者组
        jedis.xgroupCreate(key, group, "$", true);

        while (true) {
            Map<String, Map<String, Map<String, String>>> messages = jedis.xread(
                    Map.of(group, ">"), 1, 1000
            );
            if (!messages.isEmpty()) {
                for (Map.Entry<String, Map<String, Map<String, String>>> entry : messages.get(key).entrySet()) {
                    String id = entry.getKey();
                    Map<String, String> fields = entry.getValue();
                    System.out.println("Processing log message: " + fields.get("message"));
                    // 处理日志...
                    jedis.xack(key, group, id);
                }
            } else {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

示例 2:消息队列

假设我们有一个简单的消息队列,其中包含一个生产者服务用于发送消息到 Redis Stream,一个消费者服务用于读取并处理这些消息。

生产者服务
import redis.clients.jedis.Jedis;

public class MessageProducer {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost");
        String key = "message-queue";
        String id = jedis.xadd(key, Map.of("message", "Hello from producer!"));
        System.out.println("Message added with ID: " + id);
    }
}
消费者服务
import redis.clients.jedis.Jedis;
import java.util.Map;

public class MessageConsumer {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost");
        String key = "message-queue";
        String group = "message-consumer-group";
        String consumer = "consumer-1";

        // 创建消费者组
        jedis.xgroupCreate(key, group, "$", true);

        while (true) {
            Map<String, Map<String, Map<String, String>>> messages = jedis.xread(
                    Map.of(group, ">"), 1, 1000
            );
            if (!messages.isEmpty()) {
                for (Map.Entry<String, Map<String, Map<String, String>>> entry : messages.get(key).entrySet()) {
                    String id = entry.getKey();
                    Map<String, String> fields = entry.getValue();
                    System.out.println("Received message: " + fields.get("message"));
                    // 处理消息...
                    jedis.xack(key, group, id);
                }
            } else {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

示例 3: 事件日志

有一个简单的事件日志系统,用于记录用户的登录活动。

记录登录事件
import redis.clients.jedis.Jedis;

public class LoginEventLogger {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost");
        String key = "login-events";
        String id = jedis.xadd(key, Map.of("username", "user1", "status", "success"));
        System.out.println("Login event added with ID: " + id);
    }
}
监控登录事件
import redis.clients.jedis.Jedis;
import java.util.Map;

public class LoginEventMonitor {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost");
        String key = "login-events";
        String group = "login-event-monitor-group";
        String consumer = "monitor-1";

        // 创建消费者组
        jedis.xgroupCreate(key, group, "$", true);

        while (true) {
            Map<String, Map<String, Map<String, String>>> messages = jedis.xread(
                    Map.of(group, ">"), 1, 1000
            );
            if (!messages.isEmpty()) {
                for (Map.Entry<String, Map<String, Map<String, String>>> entry : messages.get(key).entrySet()) {
                    String id = entry.getKey();
                    Map<String, String> fields = entry.getValue();
                    String username = fields.get("username");
                    String status = fields.get("status");
                    System.out.println("Monitoring login event: User " + username + " logged in with status " + status);
                    // 监控登录状态...
                    jedis.xack(key, group, id);
                }
            } else {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

示例 4: 实时数据分析

:有一个系统,需要实时分析用户的行为数据。

发送用户行为数据
import redis.clients.jedis.Jedis;

public class UserBehaviorSender {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost");
        String key = "user-behavior";
        String id = jedis.xadd(key, Map.of("action", "view_product", "product_id", "12345", "timestamp", "1691918825"));
        System.out.println("User behavior added with ID: " + id);
    }
}
分析用户行为数据
import redis.clients.jedis.Jedis;
import java.util.Map;

public class UserBehaviorAnalyzer {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost");
        String key = "user-behavior";
        String group = "behavior-analyzer-group";
        String consumer = "analyzer-1";

        // 创建消费者组
        jedis.xgroupCreate(key, group, "$", true);

        while (true) {
            Map<String, Map<String, Map<String, String>>> messages = jedis.xread(
                    Map.of(group, ">"), 1, 1000
            );
            if (!messages.isEmpty()) {
                for (Map.Entry<String, Map<String, Map<String, String>>> entry : messages.get(key).entrySet()) {
                    String id = entry.getKey();
                    Map<String, String> fields = entry.getValue();
                    String action = fields.get("action");
                    String productId = fields.get("product_id");
                    String timestamp = fields.get("timestamp");
                    System.out.println("Analyzing user behavior: Action " + action + ", Product ID " + productId + ", Timestamp " + timestamp);
                    // 分析行为数据...
                    jedis.xack(key, group, id);
                }
            } else {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

最佳实践

  • 数据持久化:合理设置 Stream 的过期时间,防止数据无限增长。
  • 错误处理:使用消费者组和 XACK 来处理失败的消息。
  • 性能优化:利用批量操作和管道(pipelining)减少网络往返时间。
  • 监控与报警:定期检查 Stream 的长度和消费者组的状态。

标签:Map,Java,String,Redis,详解,jedis,key,Jedis,id
From: https://blog.csdn.net/weixin_42564451/article/details/141154660

相关文章

  • 11、java程序流程控制之三:循环结构(while循环)、循环结构(do-while循环)、break 与 co
    java程序流程控制之三:Ⅰ、循环结构:while循环1、while循环结构:其一、描述:其二、代码为:其三、截图为:Ⅱ、循环结构:do-while循环1、do-while循环结构:其一、描述:其二、代码为:其三、截图为:2、do-while循环结构的案例1:输出正数或负数的个数其一、描述:其二、代码为:其三、......
  • Java封装 小白版
    封装使用对象的方法将对象的变量和方法保护起来,就称为封装。外界只能通过对象的接口(方法)访问对象的服务。封装就是隐藏对象的属性和实现细节,仅提供公共访问方式来让外界访问快捷键:Alt+insert——>GetterandSetter封装的好处隐藏类的实现细节只能通过规定方法访问数据方......
  • Java 生产者和消费者模式练习 (2024.8.12)
        ProducerAndConsumerExercise1packageProducerAndConsumer20240812;publicclassProducerAndConsumerExercise1{publicstaticvoidmain(String[]args){//生产者(Producer)和消费者(Consumer)模式//这是一个十分经典的多线程协作模......
  • 基于java实现MYDB数据库
    整体结构--MYDB分为后端和前端,前后端通过socket进行交互前端(客户端)用于读取用户输入,并发送到后端执行,输出返回结果MYDB后端需要解析SQL--MYDB的后端分为5个模块分别是:1、TransactionManager事务管理器2、DataManager数据管理器3、VersionManager版本管理器4、Index......
  • 深入解析Node.js中的fs.watch:options与listener详解
    在Node.js中,fs.watch方法是一个功能强大的文件系统监控工具,它允许我们对文件或目录进行实时监控,并在文件或目录发生变化时触发相应的操作。在使用fs.watch时,两个关键的部分是options对象和listener回调函数。本文将详细讲解这两个部分,帮助读者更好地理解和使用fs.watch。一......
  • 提升前端性能的JavaScript技巧
    1.前端JavaScript性能问题前端JavaScript的性能问题可以显著影响Web应用的用户体验和整体性能。以下是一些常见的前端JavaScript性能问题:1.1.频繁的DOM操作问题描述:JavaScript经常需要与DOM(文档对象模型)交互来更新页面内容。然而,每次DOM操作都可能触发浏览器的重绘(rep......
  • Redis命令之scan的用法和注意细节
    背景Redis提供了scan命令,用于增量迭代获取db里的key。命令格式:SCANcursor[MATCHpattern][COUNTcount]其中SCAN、MATCH、COUNT为命令关键字;cursor为游标,如果为0表示起始,每次执行命令会返回新的cursor,可用于下次命令的增量迭代;pattern为模式,即匹配规则,如Match*表示匹配所......
  • JavaSE基础知识分享(五)
    写在前面前面讲的是面向对象中的继承思想,下面让我们来看看多态这部分的内容!Java面向对象概念概述多态概述:某一个事物在不同状态下的多种状态。实现多态的三大前提:要有继承关系。要有方法的重写。要有父类的引用指向子类对象。访问成员的特点:成员变量:编译时看左,运行......
  • Java毕业设计 基于SSM vue在线教学质量评价系统
    Java毕业设计基于SSMvue在线教学质量评价系统SSMvue在线教学质量评价系统功能介绍前端学生首页图片轮播展示登录学生注册教师展示教师详情学生评价课程信息课程详情提交选修该课学生选课学生留言个人中心后台管理员管理员登录个人中心学生管理教......
  • 四数相加2 | LeetCode-454 | 哈希集合 | Java详细注释
    ......