首页 > 数据库 >Redis实现发布/订阅功能(实战篇)

Redis实现发布/订阅功能(实战篇)

时间:2024-09-14 19:26:53浏览次数:17  
标签:订阅 实战篇 channel Redis 消息 message redisTemplate

前言

博主在学习 Redis 实现发布订阅功能的时候,踩了太多的坑。

不是讲解不详细,看的一知半解;就是代码有问题,实际压根跑不起来!

于是博主萌生了自己写一个最新版且全程无错的博客供各位参考。希望各位不要把我才过的坑再踩一遍。(实战篇的所有代码均由本人测试,全程无Bug。)

废话不多说,让我们进入实战篇的学习!

在开始实战篇的之前,我们先一起回顾下原理篇的内容。

Redis 发布/订阅的优点

  • 高性能:Redis 作为内存存储,具备极高的读写性能,能够快速处理发布和订阅消息。
  • 简单易用:Redis 的发布/订阅接口简单,易于集成和使用。
  • 实时性强:发布的消息会立即传递给所有订阅者,具备高实时性。

Redis 发布/订阅的缺点

  • 消息丢失:由于 Redis 是内存存储,如果 Redis 实例宕机,未处理的消息可能会丢失。
  • 无法持久化:Redis 的发布/订阅模式不支持消息持久化,无法存储和检索历史消息。
  • 订阅者不可控:发布者无法控制订阅者的数量和状态,无法保证所有订阅者都能接收到消息。
  • 无确认机制:发布者无法确认消息是否被订阅者接收和处理。

正如上述中Redis的缺点,Redis的发布订阅功能并不可靠,如果我们需要保证消息的可靠性、包括确认、重试等要求,我们还是要选择使用MQ实现发布订阅。

Redis的发布/订阅应用场景:

  • 对于消息处理可靠性要求不强
  • 消息无需持久化
  • 消费能力无需通过增加消费方进行增强
  • 架构简单 中小型系统不希望应用过多中间件

Redis发布订阅命令

命令

描述

Redis Unsubscribe 命令

指退订给定的频道

Redis Subscribe 命令

订阅给定的一个或多个频道的信息

Redis Pubsub 命令

查看订阅与发布系统状态

Redis Punsubscribe 命令

退订所有给定模式的频道

Redis Publish 命令

将信息发送到指定的频道

Redis Psubscribe 命令

订阅一个或多个符合给定模式的频道


正式开始

一、添加依赖

首先,确保你已经安装并配置好 Redis 服务器,并创建了 Spring Boot 项目,在pom.xml中引入依赖。

<!-- 所需依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

二、配置文件中配置Redis

spring:
  # 项目名称
  application:
    name: test-redis-boot

  # Redis 配置
  data:
    redis:
      host: 填写自己的主机IP
      port: 8000
      password: 有则填,没有去掉这个属性
      database: 1
      # 连接超时时间
      timeout: 10s
      lettuce:
        pool:
          # 连接池中的最小空闲连接
          min-idle: 5
          # 连接池中的最大空闲连接
          max-idle: 8
          # 连接池的最大数据库连接数
          max-active: 20
          # #连接池最大阻塞等待时间(使用负值表示没有限制)
          max-wait: -1ms

三、创建Redis配置类

创建一个配置类,用于配置 Redis连接工厂和消息监听器监听通道信息

注:此配置类无需死记硬背。只需大致了解每个方法的作用即可。

/**
 * @Description Redis 配置类,用于配置 Redis 连接工厂和消息监听器监听通道信息
 * @Author gongming.Zhang
 * @Date 2024/9/11 18:27
 * @Version 1.0
 */
@Configuration
@Slf4j
public class RedisConfig {

    /**
     * 自定义 RedisTemplate 序列化方式
     *
     * @param redisConnectionFactory Redis 连接的线程安全工厂
     * @return 模板类
     */
    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
        // 绑定 RedisConnectionFactory
        redisTemplate.setConnectionFactory(redisConnectionFactory);

        // 创建 Jackson2JsonRedisSerializer 序列方式,对象类型使用 Object 类型。
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(new LaissezFaireSubTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(objectMapper, Object.class);

        // 设置 RedisTemplate 序列化规则,因为 key 通常是普通的字符串,所以使用StringRedisSerializer即可,而 value 是对象时,才需要使用序列化与反序列化。
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        // hash key 序列化规则
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        // 属性设置后操作
        redisTemplate.afterPropertiesSet();
        log.info("RedisTemplate 自定义序列化配置完毕...");
        return redisTemplate;
    }

    /**
     * 配置主题订阅
     * 
     * 可以添加多个监听器,监听多个通道,只需要将消息监听器与订阅的通道/主题绑定即可。
     * addMessageListener(MessageListener listener, Collection<? extends Topic> topics):将消息监听器与多个订阅的通道/主题绑定
     * addMessageListener(MessageListener listener, Topic topic):将消息监听器与订阅的通道/主题绑定
     *
     * @param connectionFactory Redis 连接的线程安全工厂
     * @return 容器对象
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 设置连接工厂,RedisConnectionFactory 可以直接从容器中取,也可以从 RedisTemplate 中取
        container.setConnectionFactory(connectionFactory);
        // 订阅名称叫 cache 的通道, 类似 Redis 中的 subscribe 命令
        container.addMessageListener(listenerAdapter, new ChannelTopic("cache"));
        // 订阅名称以 'test-' 开头的全部通道, 类似 Redis 的 pSubscribe 命令
        container.addMessageListener(listenerAdapter, new PatternTopic("test-*"));
        log.info("消息监听器和通道绑定完毕...");
        return container;
    }

    /**
     * 配置消息监听适配器
     *
     * @param redisReceiveListener
     * @return
     */
    @Bean
    public MessageListenerAdapter listenerAdapter(RedisReceiveListener redisReceiveListener) {
        return new MessageListenerAdapter(redisReceiveListener);
    }
}

四、创建消息发送服务端

创建一个消息发布类,用于向客户端发布消息。

/**
 * @Description 消息发布服务端
 * @Author gongming.Zhang
 * @Date 2024/9/12 9:42
 * @Version 1.0
 */
@Component
@Slf4j
public class MessagePublisher {
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;

    /**
     * 服务端发布消息
     *
     * @param channel 通道名
     * @param message 待发送的消息
     */
    public void sendMessage(String channel, String message) {
        redisTemplate.convertAndSend(channel, message);
        log.info("消息发送成功...  channel={}, message={}", channel, message);
    }

}

五、创建监听器(客户端)

用于监听服务端发送的消息,每次服务端发送新消息时,都会自动调用onMessage()方法。

/**
 * @Description Redis 消息监听器
 * @Author gongming.Zhang
 * @Date 2024/9/11 18:53
 * @Version 1.0
 */
/*
    当收到订阅的消息时,会将消息交给这个类处理。
     * 可以直接实现 MessageListener 接口,也可以继承它的实现类 MessageListenerAdapter.
     * 自动多线程处理,打印日志即可看出,即使手动延迟,也不会影响后面消息的接收。
 */
@Component
@Slf4j
public class RedisReceiveListener implements MessageListener {
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;

    /**
     * 处理回调逻辑。每次新消息到达时,都会调用此方法。通过 onMessage 方法执行业务代码
     * <p>
     * 该接口不仅可以访问实际消息,还可以访问接收消息的频道(Channel),以及订阅时用于匹配频道(Channel)的模式。
     * 此信息使被调用者不仅可以通过内容区分各种消息,还可以检查其他详细信息。
     *
     * @param message 消息对象,不能为 null
     * @param pattern 与通道匹配的模式(如果指定),可以为 null
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 1.获取消息所属的通道  -->  首先获取 字符串序列化器,再从给定的二进制数据中反序列化对象。
        String channel = redisTemplate.getStringSerializer().deserialize(message.getChannel());
        // 2.获取客户端发送的消息内容  -->  后期可以根据自己项目中 消息 的类型,来确定用什么序列化器
        Object msg = redisTemplate.getValueSerializer().deserialize(message.getBody());
        log.info("收到Redis订阅消息: channel={} msg={}", channel, msg);
    }
}

六、编写Controller测试

/**
 * @Description 测试订阅发布功能
 * @Author gongming.Zhang
 * @Date 2024/9/12 10:13
 * @Version 1.0
 */
@RestController
@RequestMapping(value = "/api/v1/publish")
public class PublisherController {

    @Autowired
    private MessagePublisher publisher;

    @GetMapping("/sendMessage")
    public String sendMessage(@RequestParam(value = "message") String message, @RequestParam(value = "channel") String channel) {
        publisher.sendMessage(channel, message);
        return "Message published: " + message;
    }
}

七、测试响应数据

至此,我们 SpringBoot 整合 Redis 实现发布订阅功能已经完成!


总结

通过本文,我们详细介绍了如何在 SpringBoot 中整合 Redis 实现发布/订阅功能,并提供了详细的代码示例。Redis 发布/订阅模式以其高性能和简单易用的特点,在实时消息传递场景中有着广泛的应用,但同时也需要注意其消息丢失和无法持久化等缺点,需要根据实际业务需求选择。

标签:订阅,实战篇,channel,Redis,消息,message,redisTemplate
From: https://blog.csdn.net/weixin_63944437/article/details/142265100

相关文章

  • Redis运维之监控指标,性能监控,监控方式,响应慢分析
    目录1Redis监控1.1Redis监控指标1.1.1性能指标:Performance1.1.2内存指标:Memory1.1.3基本活动指标:Basicactivity1.1.4持久性指标:Persistence1.1.5错误指标:Error1.2监控方式1.2.1info1.2.2性能监控1.2.3内存监控1.2.4基本活动指标1.2.5持久性指标1.2.6错误指标......
  • SpringBoot集成Redis以及StringRedisTemplate的使用
    引言该篇文章介绍了在SpringBoot项目中,集成了Redis,Redis提供了RedisTemplate的扩展StringRedisTemplate,用于对字符串的操作,使用StringRedisTemplate,不需要(写RedisConfig)额外设置序列化。不过存取对象需要自己序列化、反序列化。本文会介绍配置,以及连接Redis,还有使用的部分代......
  • springboot保存redis键值对出现乱码\xac\xed\x00\x05t\x00
    当使用RedisTemplate进行操作时,发现保存的key带有特殊字符。原因是RedisTemplate默认处理key为对象,改为StringRedisTemplate后,能正确保存字符串key,避免了编码问题。@SpringBootTestclassRedisApplicationTests{@AutowiredprivateRedisTemplateredisTemplate;......
  • 蓝易云服务器 - ubuntu安装rediscli教程
    在Ubuntu上安装RedisCLI的教程如下:打开终端:按下 Ctrl+Alt+T组合键可打开终端。更新软件包列表:运行以下命令更新软件包列表。sudoaptupdate安装Redis:使用以下命令安装Redis服务器。sudoaptinstallredis-server验证Redis安装:安装完成后,Redis将自动作为系统服务运行。可以......
  • redis的使用
    一、数据类型1、字符串类型String1)赋值:SETkeyvalue。如sethelloworld2)取值:GETkey。如gethello。返回是world3)自增:INCRkey。就是Mysql的AUTO_INCREMENT。每次执行INCRkey时,该key的值都会+1.若key不存在,则先建立一个0,然后+1,返回1。如果值不是整数则报错。该操......
  • Redis之pipeline与事务
    前言    Redis使用的是单reactor网络模型,也就是io多路复用+非阻塞io的异步处理流程(注册事件,在事件循环callback处理事件)。我们可以将每个连接抽象看成一个pipe,哪个pipe中的数据先满就先处理。注意,单reactor指的是acceptor只有一个,而工作线程在6.0版本之前只有一个,也就......
  • Redis解决"Warning: Using a password with .....may not be safe"
    Redis的命令行工具redis-cli可以连接、访问Redis数据库,在没有设置密码验证的情况下,我们可以无需密码直接访问Redis数据库,如下所示./redis-cli127.0.0.1:6379>127.0.0.1:6379>pingPONG127.0.0.1:6379>我们连接到本地的Redis服务并执行ping命令,该命令用于检测Redis服务是否启动,......
  • redis集群的搭建
    一、创建节点文件夹因为集群至少要6个节点,所有创建6个文件夹mkdir700{0,1,2,3,4,5}二、复制配置文件到每个创建的文件夹cpredis.conf700{0,1,2,3,4,5}三、修改每个配置文件的端口等信息绑定服务器的IP:bind0.0.0.0或者注释掉#bind0.0.0.0关闭保护模式用于公网访......
  • Redis 分布式锁的正确实现原理演化历程与 Redission的源码
    ......