首页 > 数据库 >Spring中基于redis stream 的消息队列实现方法

Spring中基于redis stream 的消息队列实现方法

时间:2024-09-02 23:52:49浏览次数:8  
标签:stream 队列 Spring redis Kafka 消息 redisProperties 连接池

     本文主要介绍了消息队列的概念性质和应用场景,介绍了kafka、rabbitMq常用消息队列中间件的应用模型及消息队列的实现方式,并实战了在Spring中基于redis stream 的消息队列实现方法。

一、消息队列

      消息队列是一种进程间通信或者同一个进程中不同线程间的通信方式,主要解决异步处理、应用耦合、流量消峰、负载均衡等问题,实现高性能、高可用、可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件。

1、异步处理

收到订单消息后,各子系统(库存、支付、消息)可以同步进行。

2、应用解耦

收到订单消息后,各子系统(库存、支付、消息)可以不用被调用或按顺序进行,解决调用失败造成的数据错误

3、流量削峰

在应用和数据库操作之间设置消息队列,消息队列配置请求最大数(低于数据库最大并发数),避免数据库超负荷运行。

4、负载均衡

Kafka、rabbitMq等支持主从架构,在多台服务器进行同步和自动选主。

二、消息队列实现方法

1、四大类方法

内存队列:消息队列通常在内存中实现

文件系统队列:消息可以被写入到文件系统中,持久化存储消息,但需要额外的磁盘空间和I/O操作。

数据库队列:消息可以被添加到数据库的特定表中,然后由另一个进程或线程从表中读取并处理。例如redis、tdengine都可以实现

消息队列中间件:如RabbitMQ、 Kafka等

2、kafka概念

Kafka 的核心架构由以下几个主要组件组成:

  1. Producer(生产者):发送消息的一方,负责发布消息到 Kafka 主题(Topic)。
  2. Consumer(消费者):接受消息的一方,订阅主题并处理消息。
  3. Broker(代理):服务代理节点,Kafka 集群中的一台服务器就是一个 broker,可以水平无限扩展,同一个 Topic 的消息可以分布在多个 broker 中。
  4. Topic(主题):Kafka 中的消息以 Topic 为单位进行划分,生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并进行消费。
  5. Partition(分区):主题的物理分片,提高了并行处理能力。
  6. Replica(副本):副本,是 Kafka 保证数据高可用的方式,Kafka 同一 Partition 的数据可以在多 Broker 上存在多个副本,通常只有主副本对外提供读写服务,当主副本所在 broker 崩溃或发生网络一场,Kafka 会在 Controller 的管理下会重新选择新的 Leader 副本对外提供读写服务。
  7. ZooKeeper:管理 Kafka 集群的元数据和分布式协调。

3、rabbitMq概念

1.Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

2.Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

3.Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

4.Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 5.Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

6.Connection 网络连接,比如一个TCP连接。

7.Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

8.Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

4、Redis

发布订阅、list 队列、zset 队列、Stream 队列

三、基于redis Stream的实现案例

如业务需要发送邮件和短信时,可引入消息队列,不影响业务进行。

1、依赖

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-data-redis</artifactId>

</dependency>

2、redisConfig.java连接配置

主要配置连接地址数据库

sync-stream-redis:
  host: 10.110.1.1
  password: aaaa
  database: 0
  port: 6379
  timeout: 10s
  # 连接超时时间
  lettuce:
    shutdown-timeout: 60s
    pool:
      # 连接池中的最小空闲连接
      min-idle: 0
      # 连接池中的最大空闲连接
      max-idle: 8
      # 连接池的最大数据库连接数
      max-active: 8
      # #连接池最大阻塞等待时间(使用负值表示没有限制)
      max-wait: -1ms

syncKey: stream_vir_name

@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport
{


/**
 * 数据库连接配置
 * @return
 */
@Bean(name = "syncDataRedisProperties")
@ConfigurationProperties(prefix = "spring.sync-stream-redis")
public RedisProperties syncDataRedisProperties() {
    return new RedisProperties();
}





@Bean(name = "syncDataRedisConnectionFactory")
public RedisConnectionFactory syncDataRedisConnectionFactory(@Qualifier("syncDataRedisProperties") RedisProperties redisProperties) {
    RedisProperties.Sentinel sentinel = redisProperties.getSentinel();
    RedisConfiguration redisConfig = null;
    if (sentinel == null) {
        // redis单体模式连接配置
        RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration();
        standaloneConfig.setHostName(redisProperties.getHost());
        standaloneConfig.setPort(redisProperties.getPort());
        standaloneConfig.setDatabase(redisProperties.getDatabase());
        standaloneConfig.setPassword(RedisPassword.of(redisProperties.getPassword()));
        standaloneConfig.setDatabase(redisProperties.getDatabase());
        redisConfig = standaloneConfig;
    }


    // lettuce连接池配置
    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    RedisProperties.Lettuce lettuce = redisProperties.getLettuce();
    if(lettuce.getPool() != null) {
        RedisProperties.Pool pool = redisProperties.getLettuce().getPool();
        // 连接池最大连接数
        poolConfig.setMaxTotal(pool.getMaxActive());
        // 连接池中的最大空闲连接
        poolConfig.setMaxIdle(pool.getMaxIdle());
        // 连接池中的最小空闲连接
        poolConfig.setMinIdle(pool.getMinIdle());
        // 连接池最大阻塞等待时间(使用负值表示没有限制)
        poolConfig.setMaxWaitMillis(pool.getMaxWait().toMillis());
    }
    LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder builder = LettucePoolingClientConfiguration.builder();
    // timeout
    if(redisProperties.getTimeout() != null) {
        builder.commandTimeout(redisProperties.getTimeout());
    }
    // shutdownTimeout
    if(lettuce.getShutdownTimeout() != null) {
        builder.shutdownTimeout(lettuce.getShutdownTimeout());
    }
    // 创建Factory对象
    LettuceClientConfiguration clientConfig = builder.poolConfig(poolConfig).build();
    return new LettuceConnectionFactory(redisConfig, clientConfig);
}

}

3、RedisStreamConfig.java 监听配置及消费者注册 启动监听

主要配置监听容器设置(最大消息数-流量削峰可重点关注)和消费者组的注册,程序运行时启动监听,配置监听的topic(streamName示例中为syncKey)

主要用到以下类和方法

StreamMessageListenerContainer 、createGroup、register、createautoAcknowledge

@SuppressWarnings({"rawtypes", "unchecked"})
public class RedisStreamConfig {

//  监听性质配置

@Bean( name = "syncListenerContainer", initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer syncListenerContainer(@Qualifier("syncDataRedisConnectionFactory") RedisConnectionFactory factory) {
    StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
            StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                    .builder()
                    // Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小, 设置为0会导致CPU飙升
                    .pollTimeout(Duration.ofSeconds(2))
                    // 一次最多获取多少条消息
                    .batchSize(10)
                    // 运行 Stream 的 poll task
                    .executor(emsThreadPoolTaskExecutor)
                    // 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理
                    .errorHandler(e -> {
                        logger.error("streamMessageListenerContainer异常", e);
                    })
                    .build();
    return StreamMessageListenerContainer.create(factory, options);
}

@Bean
public Subscription syncDeviceDataSubscription(@Qualifier("syncListenerContainer") StreamMessageListenerContainer listenerContainer) {
    String groupName = syncKey + "ems";
    StreamOperations streamOperations = syncDataRedisTemplate.opsForStream();
    RecordId recordId = null;
    // 如果队列不存在,则创建队列
    if (Boolean.FALSE.equals(syncDataRedisTemplate.hasKey(syncKey))) {
        recordId = streamOperations.add(syncKey, Collections.singletonMap("_up", "up"));
        // 删除创建队列时的测试消息
        streamOperations.delete(syncKey, recordId);
    }
    // 如果分组不存在,则创建分组
    StreamInfo.XInfoGroups groups = streamOperations.groups(syncKey);
    long groupCount = groups.stream().filter(xInfoGroup -> xInfoGroup.groupName().equals(groupName)).count();
    if (groupCount <= 0) {
        streamOperations.createGroup(syncKey, groupName);
    }

    StreamMessageListenerContainer.StreamReadRequest<String> readRequest =
            StreamMessageListenerContainer.StreamReadRequest
                    .builder(StreamOffset.create(syncKey, ReadOffset.lastConsumed()))
                    .consumer(Consumer.from(groupName, "consumer_" + System.currentTimeMillis()))
                    .cancelOnError(t -> false)
                    // 自动确认消息
                    .autoAcknowledge(true)
                    .build();
    return  listenerContainer.register(readRequest, syncDataStreamListener);

}


}

4、消费者接受消息

主要实现接口StreamListener,并重写onMessage, 在onMessage可调用其他业务方法进行处理(如短信邮箱发送等),接受到的消息格式MapRecord,<id, map<string,data>>.

@Component
public class SyncDataStreamListener implements StreamListener<String, MapRecord<String,String,String>> {
    private final Logger logger = LoggerFactory.getLogger(SyncDataStreamListener.class);


    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        try {
            String stream = message.getStream();
            RecordId messageId = message.getId();
            Map<String, String> value = message.getValue();
            //   业务处理(如短信、邮箱发送)
        } catch (Exception e) {
            logger.error("处理异常", e);
        }

    }
}

5、生产者

@GetMapping("/redis/ps")
public String redisPublish(String content,Integer count){
    StreamOperations streamOperations = redisTemplate.opsForStream();
    for (int i = 0; i < count; i++) {
        AtomicInteger num = new AtomicInteger(i);
        Map msgMap = new HashMap();
        msgMap.put("count", i);
        msgMap.put("sID", num);
        //新增消息
        streamOperations.add(syncKey,msgMap);
    
    return "success";
}

标签:stream,队列,Spring,redis,Kafka,消息,redisProperties,连接池
From: https://blog.csdn.net/xingyuemengjin/article/details/141833131

相关文章

  • 【万字文档+PPT+源码】基于springboot+vue的研究生双选信息发布系统-可用于毕设-课程
    博主简介:......
  • Spring源码(9)--FactoryBean简介
    FactoryBean简介FactoryBean是一个工厂对象,用于创建和管理其他Bean的实例。FactoryBean接口定义了一种创建Bean的方式,它允许开发人员在Bean的创建过程中进行更多的自定义操作。通过实现FactoryBean接口,开发人员可以创建复杂的Bean实例,或者在Bean实例化之前进行一......
  • 前后端分离项目的创建方法(注:前端使用了vue技术,后端使用了Spring常用框架)
    下面我将创建一个实现系统登录的前后端分离项目:第一步:使用idea新建一个空项目,命名为Login,然后点击Create。第二步:找到setting选项,点击Build,Exception,Deployment,找到BuildTools,然后选择Maven,重写里面的settings.xml和repository,然后点击OK。(注:setting.xml一般在conf目录......
  • redis命令:set
    saddkeymember:向set中添加一个或多个元素127.0.0.1:6379>saddusers2u11u22u33(integer)6sremkeymember:移除set中的指定元素 127.0.0.1:6379>sremusers2u3(integer)1127.0.0.1:6379>sremusers23(integer)1scardkey:返回set中元素个数127.0.......
  • 【Java】Spring-AOP与拦截器实战 (上手图解)
    Java系列文章目录补充内容Windows通过SSH连接Linux第一章Linux基本命令的学习与Linux历史文章目录Java系列文章目录一、前言二、学习内容:三、问题描述四、解决方案:4.1认识依赖4.2使用AOP与拦截器4.2.1使用AOP4.2.1.1设置DemoAop类4.2.2.2设置切面4.2.2.3设......
  • java-基于spring boot在线装修管理系统
    摘要互联网发展至今,无论是其理论还是技术都已经成熟,而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播,搭配信息管理工具可以很好地为人们提供服务。针对信息管理混乱,出错率高,信息安全性差,劳动强度大,费时费力等问题,采用在线装修管理系统可以有效管理,使信息管理能......
  • 《JavaEE进阶》----9.<SpringMVC实践项目:【简易对话留言板(数据存在数据库中)】>
    本篇博客讲解设计的一个网页版简易对话留言板。这个是将数据存在数据库中。我们通过链接本地数据库。在这里面存入的数据。此时数据存在在硬盘中,只要数据不被删除,硬盘不损坏。那么这些数据就会被永久保存引入的依赖:配置数据库:spring:datasource:#数据库连接配置......
  • SpringBoot实现前后端传输加密设计
    在Web应用中,确保前后端之间的数据传输安全是非常重要的。这通常涉及到使用HTTPS协议、数据加密、令牌验证等安全措施。本文通过将前后端之间的传输数据进行加密,用于在SpringBoot应用中实现前后端传输加密设计。一、数据加密方案即使使用了HTTPS,也可能需要在应用层对数据......
  • windows 即停即用redis(解压即可用)
    之前做过一篇windows安装redis的教程https://blog.csdn.net/qq_27233977/article/details/141789517,这次作为补充,主要针对研发人员,很多时候,研发人员需要装一大堆中间件,但是不是每次都用得上,这个时候即停即用就显得非常有必要。下载github地址(上传者是redis作者):https://git......
  • Redis基础知识学习笔记(三)
    文章目录Redis命令一.Redis命令1.启动redis服务器2.本地启动redis客户端3.远程启动redis客户端4.测试是否建立连接二.Redis键(key)RedisKey相关命令三.Redis字符串(String)Redis字符串命令四.Redis哈希(Hash)Redis字符串命令五.Redis列表(List)Redis列表命......