首页 > 数据库 >Redis Stream实现全部节点机器推送消息

Redis Stream实现全部节点机器推送消息

时间:2022-09-29 14:56:31浏览次数:53  
标签:Stream stream keys Redis StreamMessageListenerContainer 推送 redisTemplate subscri

背景

有时候,在微服务时代,我们需要对全部的机器节点进行通知。在常规情况下,一个请求经过负载均衡只有一个机器可以收到。那么,如何能让全部的机器都收到同样的请求呢?需要借助消息队列的监听机制,让每个节点都监听一个队列,让消息发送到所有的队列中。

rabbit MQ的fanout交换机可以实现这种功能。

那么,如果想用redis去实现这个功能,有没有什么好的选择呢?毕竟仅仅为了一个全部节点的推送,就引入另外一个中间件,不是一个很经济选择。

那么Redis的Stream结构就是一个可以选择的了。

实现

对于Redis的Stream结构,诞生之初就是为了用作消息队列的。具体用法如下:

  • 发送消息
public void sendConfigMessage() {
        MapRecord<String, String, String> entries = StreamRecords.mapBacked(Collections.singletonMap("msg", "plsGet")).withStreamKey(RedisConfig.stream);
        // 将消息添加至消息队列中
        redisTemplate.opsForStream().add(entries);
}
  • 建立监听
    @Bean
    public Subscription subscription(RedisConnectionFactory factory) {
        Set<String> keys = redisTemplate.keys(streamPattern);
        if (keys != null && keys.size() != 0) {
            keys = keys.stream().filter(key -> !key.equals(stream)).collect(Collectors.toSet());
            if (keys.size() != 0) {
                redisTemplate.delete(keys);
            }
        }

        group = UUID.randomUUID().toString();
        if (Boolean.FALSE.equals(redisTemplate.hasKey(stream))) {
            StringRecord stringRecord = StreamRecords.string(Collections.singletonMap("msg", "init")).withStreamKey(RedisConfig.stream);
            redisTemplate.opsForStream().add(stringRecord);
        }

        redisTemplate.opsForStream().createGroup(stream, group);
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .build();
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(factory, options);
        Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(group, "consumer1"), StreamOffset.create(configStream, ReadOffset.lastConsumed()), configStreamListener);
        listenerContainer.start();
        return subscription;
    }

对于每一台机器,都让它关联唯一的消费组,而这个功能关联唯一的stream key。Redis的stream机制在于,每一条给stream key发送的消息都会推送给所有的消费组,这样所有的机器都会收到这条消息。

一些问题

Redis的连接和MQ还是有一些区别。当Redis连接超时之后,之前建立的监听就不能用了,因为之前的长连接断开了。

一个解决档案就是手动维持一个Netty的心跳机制,不停轮训判断当前的订阅是否还处于活跃状态。一旦不处于活跃状态就要重新建立长连接:

    @Autowired
    Subscription subscription;

    @Autowired
    RedisConnectionFactory factory;


    @Autowired
    StreamListener streamListener;

    @Autowired
    RedisTemplate<String, Object> redisTemplate;

    //当Redis连接超时,自动重置stream队列。否则监听失效
    @Bean
    public ClientResources clientResources(){

        NettyCustomizer nettyCustomizer = new NettyCustomizer() {

            @Override
            public void afterChannelInitialized(Channel channel) {
                channel.pipeline().addLast(
                        new IdleStateHandler(0, 0, 10));

                channel.pipeline().addLast(new ChannelDuplexHandler() {
                    @Override
                    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
                        if (subscription != null && !subscription.isActive()) {
                            synchronized ("resetStreamLock") {
                                if (!subscription.isActive()) {
                                    StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer
                                            .StreamMessageListenerContainerOptions
                                            .builder()
                                            .pollTimeout(Duration.ofSeconds(1))
                                            .build();
                                    StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer0 = StreamMessageListenerContainer.create(factory, options);
                                    Subscription subscription0 = listenerContainer0.receiveAutoAck(Consumer.from(RedisConfig.group, "consumer1"), StreamOffset.create(RedisConfig.stream, ReadOffset.lastConsumed()), streamListener);
                                    listenerContainer0.start();
                                    subscription = subscription0;
                                    log.info("reset getStream!");
                                }
                            }
                        }
                        if (evt instanceof IdleStateEvent) {
                            ctx.disconnect();
                        }
                    }
                });
            }

            @Override
            public void afterBootstrapInitialized(Bootstrap bootstrap) {

            }

        };

        return ClientResources.builder().nettyCustomizer(nettyCustomizer ).build();
    }

标签:Stream,stream,keys,Redis,StreamMessageListenerContainer,推送,redisTemplate,subscri
From: https://www.cnblogs.com/LostSecretGarden/p/16741552.html

相关文章

  • linux 安装Redis以及使用
    安装redisyuminstallredis 安装完毕后,使用下面的命令启动redis服务设置redis为开机自动启动chkconfigredison 进入redis服务#进入本机redisredis-cli......
  • 【Redis】缓存穿透、缓存击穿、缓存雪崩
     缓存雪崩缓存雪崩是指缓存同一时间大面积失效,所以后面的请求都会落在数据库上,造成数据库短时间内承受大量请求而崩掉。目前电商首页以及热点数据都会缓存,一般缓存都......
  • Golang Redis有序集合(sorted set)
    Redis有序集合(sortedset)和集合一样也是string类型元素的集合,且不允许重复的成员,不同的是每个元素都会关联一个double类型的分数,这个分数主要用于集合元素排序。引用git......
  • request_time和upstream_response_time详解
    下图是request_time。  下图是upstream_response_time.   精准的描述就是:request_time是从接收到客户端的第一个字节开始,到把所有的响应数据都发送完为止。up......
  • docker 简介和使用总结:image,container,dockfile,nginx,redis,compose
    硬件发展速度:asp.net+sql+redis+nginx:资源利用率太低,多软件工具不能隔离,相互影响,不能快速部署 0.docker简介和使用总结:image,container,dockfile,nginx,redis,compose......
  • 虚拟机的Linux系统安装redis
    在虚拟机的Linux系统安装redis时出现make:***[install]错误2百度了一下发现是因为没有安装gcc因为Redis是C实现的,需要gcc来进行编译执行安装gcc依赖和gcc:yum-yins......
  • redis有序集合中是否存在某个成员
    redis命令使用参考网页:​​http://redis.cn/commands.html​​有序集合中,redis没有命令直接判断有序集合中是否存在某个成员,自行通过代码实现,示例代码如下:#include<stdio.h......
  • 利用redis实现倒计时任务
    示例如下:importredisimporttimedefevent_handler(msg):'''监听到任何key过期后,得到的消息如下msg={'type':'pmessage',#返回值类型'pattern':'__k......
  • Memcached vs Redis, 挑选哪一个?
    MemcachedvsRedis,挑选哪一个?标签:MencachedRedisMemchached还是Redis?该用哪一个?当我们讨论改进性能的时候,这是每次技术讨论中最常见的一个问题。每当性能需要改善时......
  • 使用Spring boot基于Redis快速搭建分布式Session缓存方案
    使用Springboot基于Redis快速搭建分布式Session缓存方案大型web应用中,session的管理非常重要,这是单机存储满足不了的.通常有以下几种方法:将session持久化到数据库中.但......