首页 > 数据库 >基于Redis的消息发布和订阅(广播模式)

基于Redis的消息发布和订阅(广播模式)

时间:2023-06-27 23:45:24浏览次数:41  
标签:订阅 UserDTO MessageListenerAdapter Redis class 广播 listenerAdapter 序列化 public

最近在优化一个redis的大key问题, 原先的同事写法是将所有的配置都放在一个key里面, value是一个map. 但是随着配置越来越多, 这个map也变得很大, 已经影响redis的性能.遂决定将map里面的每个配置单独出来一个key, 当有配置变动的时候, 通过redis的消息队列通知到其他程序. 下面是在springboot中具体实现方式:

1.引入依赖

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.配置监听

如下如所示, 定义了2个Linstener, 模拟2个应用监听同一个通道. 根据发送和接收的数据类型我们可以选择合适的数据序列化和反序列化方式, 默认序列化方式为RedisSerializer.java(). 对于普通的Bean来说使用json()java()的序列方式都可以.不同点在于:

  • RedisSerializer.java()使用JdkSerializationRedisSerializer,JDK提供的序列化功能。优点是反序列化时不需要提供类型信息(class),但缺点是需要实现Serializable接口,还有序列化后的结果非常庞大,是JSON格式的5倍左右,这样就会消耗redis服务器的大量内存
  • RedisSerializer.json()使用Jackson2JsonRedisSerializer, 借助Jackson库将对象序列化为JSON字符串。优点是速度快,序列化后的字符串短小精悍,不需要实现Serializable接口。但缺点也非常致命,那就是此类的构造函数中有一个类型参数,必须提供要序列化对象的类型信息(.class对象)。 通过查看源代码,发现其只在反序列化过程中用到了类型信息
@Configuration
public class RedisConfig {

    public static final String TOPIC = "topic_1";

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter,
                                            MessageListenerAdapter listenerAdapter2) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic(TOPIC));
        container.addMessageListener(listenerAdapter2, new PatternTopic(TOPIC));
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Listener1 listener1) {
        System.out.println("listener1 register");
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(listener1, "onMessage");
        listenerAdapter.setSerializer(RedisSerializer.json());
        return listenerAdapter;
    }

    @Bean
    MessageListenerAdapter listenerAdapter2(Listener2 listener2) {
        System.out.println("listener2 register");
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(listener2, "onMessage");
        listenerAdapter.setSerializer(RedisSerializer.java());
        return listenerAdapter;
    }
}

3. 创建监听类

  • 监听类需实现MessageListener 接口,并实现OnMessage方法
@Component
public class Listener1 implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        UserDTO userDTO = JSONObject.parseObject(message.toString(), UserDTO.class);
        System.err.println("listener1--->" + userDTO);
    }
}
  • 创建Listener2类, 模拟另外一个程序
@Component
public class Listener2 implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        UserDTO userDTO = JSONObject.parseObject(message.toString(), UserDTO.class);
        System.err.println("listener2--->" + userDTO);
    }
}

4.发送消息(广播模式)

  • 调用redisTemplate.convertAndSend()方法, 发送消息到topic_1. 消息会通过redis广播至所有的消费者.
@RestController
public class TestController {

    @Autowired
    private RedisTemplate redisTemplate;

    @GetMapping("/test")
    public String test() {
        redisTemplate.setValueSerializer(RedisSerializer.string());
        UserDTO userDTO = new UserDTO("sharloon", 18, 1);
        redisTemplate.convertAndSend("topic_1", JSON.toJSONString(userDTO));

        return "test ok";
    }
}

5. 效果展示

listener2--->UserDTO(name=sharloon, age=18, sex=1)
listener1--->UserDTO(name=sharloon, age=18, sex=1)

6. demo地址

https://gitee.com/sharloon/redis-message

标签:订阅,UserDTO,MessageListenerAdapter,Redis,class,广播,listenerAdapter,序列化,public
From: https://www.cnblogs.com/sharloon/p/17510205.html

相关文章

  • redis压缩列表ziplist
     压缩列表(ziplist)是Redis的一种内存紧凑型的数据结构,它是一个字节数组,可以包含任意多个元素,每个元素可以是一个字节数组或一个整数。压缩列表的结构由多个字段组成,包括zlbytes(压缩列表的字节长度),zltail(压缩列表尾元素的偏移量),zllen(压缩列表的元素数目),entryX(压缩列表存储的......
  • redis介绍和安装,redis普通链接和连接池,redis字符串类型,redis hash类型
    1Redis介绍和安装#Redis:软件,存储数据的,速度非常快,redis是一个key-value存储系统(没有表的概念),cs架构的软件 -服务端客户端(python作为客户端,java,go,图形化界面,命令窗口的命令)#es:存数据的地方#关系型数据库和非关系型数据库 -关系型:mysql,PostgreSQL,oracle,sqlserver,db2......
  • Redis
    Redis介绍:redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sortedset--有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原......
  • 「路飞项目09」redis
    1Redis介绍和安装#Redis:软件,存储数据的,速度非常快,redis是一个key-value存储系统(没有表的概念),cs架构的软件-服务端客户端(python作为客户端,java,go,图形化界面,命令窗口的命令)#es:存数据的地方#关系型数据库和非关系型数据库-关系型:mysql,PostgreSQL【PG】,oracle,sqlserver,db......
  • redis-分布式锁优化思路
    1.加锁的代码块,粒度越小越好,语句越少越好,更不容易出现"插队"的现象2.分段锁思路:以"超卖"为例,原本是待促销200件商品,一把锁,现改成10款20件商品的促销,改成10把锁,当并发小于10的时候,甚至都不需要锁,就可以并发安全......
  • redis-分布式锁样例
    redisTemplate:stringlockKey="product_101"stringclientID=UUID.randomUUID().toString()boolresult=redisTemplate.opsForValue().setIfAbsent(lockKey,clientID,10,TimeUnit.SECONDS)if(!result){return"error_code"}try{//业务逻辑......
  • python 批量删除 redis 大量数据
    #!/usr/bin/envpython#ScananddeletekeysinRedis.#Author:cdfivefromredisimportRedisimporttimedefRedisScanAndDelete(host,port,password,db,cursor,pattern,count,batch_delete_size):start_time=time.time()client=Redis(host......
  • Java使用redis-Redis是并发安全的吗?
    大家都清楚,Redis 是一个开源的高性能键值对存储系统,被开发者广泛应用于缓存、消息队列、排行榜、计数器等场景。由于其高效的读写性能和丰富的数据类型,Redis 受到了越来越多开发者的青睐。然而,在并发操作下,Redis 是否能够保证数据的一致性和安全性呢?接下来小岳将跟大家一起来探......
  • redis-分布式锁注意事项
    lockKey:商品标识value:当前线程标识1.确保每把锁同一时间能且仅能上一次setnx(setifnotexists):当锁不存在时才上锁redisTemplate.opsForValue().setIfAbsent(lockKey,value)没加上锁需要返回错误码直接让其重试2.上锁成功后,需要在之后将锁释放redisTemplate.delete(l......
  • Rabbitmq:消息队列介绍、Rabbitmq安装、 基于Queue实现生产者消费者模型、基本使用(生产
    目录一、消息队列介绍1.1介绍1.2MQ解决什么问题1.3常见消息队列及比较二Rabbitmq安装2.1服务端原生安装2.2服务端Docker安装2.3客户端安装2.4设置用户和密码三基于Queue实现生产者消费者模型四基本使用(生产者消费者模型)五消息安全之ack六消息安全之durable持久化七闲置消......