首页 > 其他分享 >SpringCloud系列之(十一)SpringCloud Stream消息驱动

SpringCloud系列之(十一)SpringCloud Stream消息驱动

时间:2024-02-28 10:46:21浏览次数:30  
标签:十一 Stream stream SpringCloud rabbitmq springframework spring org cloud

SpringCloud Stream消息驱动

企业中常用的消息中间件

ActiveMQ

RabbitMQ

RocketMQ

Kafka

一、消息驱动概述

1. 为什么引入SpringCloud Stream

​ 消息中间件的产品众多(ActiveMQ、RabbitMQ、RocketMQ、Kafka...),学习成本高

​ 一个系统中可能使用了多种消息中间件,切换/维护/开发成本高

​ SpringCloud Stream的出现,实现了屏蔽底层的细节差异,使得操作SpringCloud Stream,就可以操作底层不同的消息中间件

2. 是什么

​ 官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。
​ 应用程序通过inputs【消费者】或者outputs【生产者】来与Spring Cloud Stream中binder对象【靠Binder实现的屏蔽底层消息中间件的差异】交互。
​ 通过我们配置来binding(绑定),而Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
​ 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
​ Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、Kafka。

​ 一句话:屏蔽底层消息中间件的差异,降低切换/维护/开发成本,统一消息的编程模型

类比Hibernate/Mybatis【ORM框架】和Mysql、Oracle、SqlServer

类比JDBC和Mysql、Oracle、SqlServer

官网

https://spring.io/projects/spring-cloud-stream#overview

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/

Spring Cloud Stream中文指导手册

https://m.wang1314.com/doc/webapp/topic/20971999.html

3. 设计思想

Stream中的消息通信方式遵循了发布-订阅模式,Topic主题(对应RabbitMQ中的Exchange,kafka中的Topic)进行广播

4. Spring Cloud Stream标准流程套路

![](https://gitee.com/honourer/picturebed/raw/master/SpringCloud/图像 (24).png)

Binder:连接中间件,屏蔽差异

Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置

Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入

5. 编码API和常用注解

二、案例说明

1. RabbitMQ环境配置

参考Windows 工具使用指南.md 二十八、RabbitMQ

2. 环境说明

cloud-eureka-server7001,作为服务注册中心

cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块

cloud-stream-rabbitmq-consumer8802,作为消息接收模块

cloud-stream-rabbitmq-consumer8803,作为消息接收模块

RabbitMQ

三、消息驱动之生产者

  • 建Module【cloud-stream-rabbitmq-provider8801】

  • 改POM

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>cloud2020</artifactId><groupId>com.atguigu.springcloud</groupId><version>1.0-SNAPSHOT</version></parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
    
        <dependencies>
    
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
    
        <dependency>
            <groupId>com.atguigu.springcloud</groupId>
            <artifactId>cloud-api-commons</artifactId>
            <version>${project.version}</version>
        </dependency>
    
    
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    
    
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
  • 写YML

    server:
      port: 8801
    
    spring:
      application:
        name: cloud-stream-provider
      cloud:
        stream:
          binders: # 在此处配置要绑定的rabbitmq的服务信息;
            defaultRabbit: # 表示定义的名称,用于于binding整合
              type: rabbit # 消息组件类型
              environment: # 设置rabbitmq的相关的环境配置
                spring:
                  rabbitmq:
                    host: 192.168.59.128
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            output: # 这个名字是一个通道的名称
              destination: studyExchange # 表示要使用的Exchange名称定义
              content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
              binder: defaultRabbit  # 设置要绑定的消息服务的具体设置
    
    eureka:
      client: # 客户端进行Eureka注册的配置
        service-url:
          defaultZone: http://localhost:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
        lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
        instance-id: send-8801.com  # 在信息列表时显示主机名称
        prefer-ip-address: true     # 访问的路径变为IP地址
    
  • 主启动

    package com.atguigu.springcloud;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class StreamMQMain8801 {
        public static void main(String[] args) {
            SpringApplication.run(StreamMQMain8801.class, args);
        }
    }
    
  • 业务类

    Service接口

    package com.atguigu.springcloud.service;
    
    public interface IMessageProvider {
        public String send();
    }
    

    Service实现类

    package com.atguigu.springcloud.service.impl;
    
    import com.atguigu.springcloud.service.IMessageProvider;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.messaging.MessageChannel;
    
    import javax.annotation.Resource;
    import java.util.UUID;
    
    @EnableBinding(Source.class)//定义消息的推送管道
    public class MessageProviderImpl implements IMessageProvider {
        /**
         * 消息发送管道
         */
        @Resource
        private MessageChannel output;
    
        @Override
        public String send() {
            String serial = UUID.randomUUID().toString();
            output.send(MessageBuilder.withPayload(serial).build());
            System.out.println("*****serial:" + serial);
            return null;
        }
    }
    

    Controller

    package com.atguigu.springcloud.controller;
    
    import com.atguigu.springcloud.service.IMessageProvider;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    
    @RestController
    public class SendMessageController {
        @Resource
        private IMessageProvider messageProvider;
    
        @GetMapping("/sendMessage")
        public String sendMessage(){
            return messageProvider.send();
        }
    }
    
  • 测试

    启动cloud-eureka-server7001

    启动rabbitmq

    启动cloud-stream-rabbitmq-provider8801

    修改yml配置文件,加入如下内容

    spring:
      rabbitmq:
        host: 192.168.59.128
        port: 5672
        username: guest
        password: guest
    

    完整的配置文件如下

    server:
      port: 8801
    
    spring:
      application:
        name: cloud-stream-provider
      cloud:
        stream:
          binders: # 在此处配置要绑定的rabbitmq的服务信息;
            defaultRabbit: # 表示定义的名称,用于于binding整合
              type: rabbit # 消息组件类型
              environment: # 设置rabbitmq的相关的环境配置
                spring:
                  rabbitmq:
                    host: 192.168.59.128
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            output: # 这个名字是一个通道的名称
              destination: studyExchange # 表示要使用的Exchange名称定义
              content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
              binder: defaultRabbit  # 设置要绑定的消息服务的具体设置
      # 如果不配置该内容,即使上面的cloud stream配置中配置了rabbitmq的内容也会默认连接本地的RabbitMQ
      rabbitmq:
        host: 192.168.59.128
        port: 5672
        username: guest
        password: guest
    
    eureka:
      client: # 客户端进行Eureka注册的配置
        service-url:
          defaultZone: http://localhost:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
        lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
        instance-id: send-8801.com  # 在信息列表时显示主机名称
        prefer-ip-address: true     # 访问的路径变为IP地址
    

    参考网站:https://blog.csdn.net/chuanchengdabing/article/details/118109912

    启动8801后在RabbitMQ中能看到Exchanges中增加了一个,就是我们配置的

    多次调用http://localhost:8801/sendMessage接口

    RabbitMQ中能够看到流量的变化

至此,生产者构建成功。

四、消息驱动之消费者

  • 建Module【cloud-stream-rabbitmq-consumer8802】

  • 改POM

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>cloud2020</artifactId><groupId>com.atguigu.springcloud</groupId><version>1.0-SNAPSHOT</version></parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
    
        <dependencies>
    
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
    
        <dependency>
            <groupId>com.atguigu.springcloud</groupId>
            <artifactId>cloud-api-commons</artifactId>
            <version>${project.version}</version>
        </dependency>
    
    
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    
    
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    
    </dependencies>
    
  • 写YML

    server:
      port: 8802
    
    spring:
      application:
        name: cloud-stream-consumer
      cloud:
        stream:
          binders: # 在此处配置要绑定的rabbitmq的服务信息;
            defaultRabbit: # 表示定义的名称,用于于binding整合
              type: rabbit # 消息组件类型
              environment: # 设置rabbitmq的相关的环境配置
                spring:
                  rabbitmq:
                    host: 192.168.59.128
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            input: # 这个名字是一个通道的名称
              destination: studyExchange # 表示要使用的Exchange名称定义
              content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
              binder: defaultRabbit  # 设置要绑定的消息服务的具体设置
      rabbitmq:
        host: 192.168.59.128
        port: 5672
        username: guest
        password: guest
    eureka:
      client: # 客户端进行Eureka注册的配置
        service-url:
          defaultZone: http://localhost:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
        lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
        instance-id: receive-8802.com  # 在信息列表时显示主机名称
        prefer-ip-address: true     # 访问的路径变为IP地址
    
  • 主启动

    package com.atguigu.springcloud;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class StreamMQMain8802 {
    
        public static void main(String[] args) {
            SpringApplication.run(StreamMQMain8802.class, args);
        }
    
    }
    
  • 业务类

    package com.atguigu.springcloud.controller;
    
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    @Component
    @EnableBinding(Sink.class)
    public class ReceiveMessageListenerController {
        @Value("${server.port}")
        private String serverPort;
    
        /**
         * 监听队列
         * @param message
         */
        @StreamListener(Sink.INPUT)
        public void input(Message<String> message){
            System.out.println("消费者1号,接受:" + message.getPayload() + "\t port:"+serverPort);
        }
    }
    
  • 测试

    启动cloud-stream-rabbitmq-consumer8802

    调用http://localhost:8801/sendMessage

    8801能够看到消息发送到队列

    8802能够接收到队列中的消息

    RabbitMQ中能够看到流量的变化

    指定的Exchange中也能看到流量的变化

至此,消费者构建完成。

五、分组消费与持久化

  • 依照cloud-stream-rabbitmq-consumer8802,拷贝出一份cloud-stream-rabbitmq-consumer8803

  • 启动

    启动rabbitmq

    启动cloud-eureka-server7001

    启动cloud-stream-rabbitmq-provider8801

    启动cloud-stream-rabbitmq-consumer8802

    启动cloud-stream-rabbitmq-consumer8803

    访问http://localhost:7001/,能够看到现在有两个消费者,一个生产者,环境准备完成

  • 运行后有两个问题

    • 重复消费
    • 消息持久化

    注:这两个问题都要通过Stream的消费组特性来解决。

1. 重复消费

访问http://localhost:8801/sendMessage,两个消费者都能收到消息,说明消息被重复消费了

1.1 产生原因

默认情况下每个微服务的分组是不同的, 不同组是可以全面消费(重复消费)的,同一组内多个消费者会发生竞争关系,只有其中一个消费者可以消费。

1.2 如何判定当前的消费者在不同的分组中

1.3 自定义配置分组

​ 目前的分组我们用的是默认的,可以通过修改yml实现自定义配置分组,以cloud-stream-rabbitmq-consumer8802为例,yml中加入如下配置

spring:
  cloud:
    stream:
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          group: atguiguA

完整的yml配置如下

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.59.128
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置
          group: atguiguA
  rabbitmq:
    host: 192.168.59.128
    port: 5672
    username: guest
    password: guest
eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: receive-8802.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

可以看到配置已经生效了

1.4 通过自定义配置为同一分组解决重复消费的问题

参照1.3 自定义配置分组,将两个消费者配置为同一个组atguiguA

重启后查看分组信息,还是会显示两个分组

可以看到,分组A有两个实例

分组B是没有实例的

访问http://localhost:8801/sendMessage两次,不再出现重复消费

至此,重复消费的问题解决。

2. 持久化

继续使用上一步的环境,做如下操作

cloud-stream-rabbitmq-consumer8802停止

cloud-stream-rabbitmq-consumer8803停止

访问http://localhost:8801/sendMessage 4次

修改cloud-stream-rabbitmq-consumer8802的yml配置,去掉group属性的配置

cloud-stream-rabbitmq-consumer8802启动

没有消费过的消息丢失了

cloud-stream-rabbitmq-consumer8803启动

仍然能够成功消费没有消费过的消息

结论:配置group属性可以防止消息丢失。

标签:十一,Stream,stream,SpringCloud,rabbitmq,springframework,spring,org,cloud
From: https://www.cnblogs.com/wzzzj/p/18039258

相关文章

  • SpringCloud系列之(九)服务配置
    服务配置目前在用的服务配置+服务总线的三套方案Config+BusNaccos(Alibaba)Apollo(携程)上海地区一、SpringCloudConfig分布式配置中心1.概述1.1分布式系统面临的配置问题​ 微服务意味着要将单体应用中的业务拆分成一个个子服务,每个服务的粒度相对较小,因此系统中会出现......
  • SpringCloud系列之(八)服务网关
    服务网关类比医院的分诊台一、Zuul由Netflix团队研发,不再使用官网:https://github.com/Netflix/zuul/wiki1.Zuul1.x模型​ Springcloud中所集成的Zuul版本,采用的是Tomcat容器,使用的是传统的ServletI0处理模型。​ 学过尚硅谷web中期课程都知道一个题目,Servlet的生命周期......
  • SpringCloud系列之(七)服务降级
    服务降级一、Hystrix断路器1.概述1.1分布式系统面临的问题复杂分布式体系结构中的应用程序有数十个依赖关系,每个依赖关系在某些时候将不可避免地失败。![](https://gitee.com/honourer/picturebed/raw/master/SpringCloud/图像(16).png)服务雪崩​ 多个微服务之间调用......
  • SpringCloud系列之(六)服务调用
    服务调用完成微服务之间的分布式调用一、Ribbon1.概述1.1是什么​ SpringCloudRibbon是基于NetflixRibbon实现的一套客户端负载均衡【消费者侧80】的工具。​ 简单的说,Ribbon是Netflix发布的开源项目,主要功能是提供客户端的软件负载均衡算法和服务调用。Ribbon客户端组......
  • SpringCloud系列之(五)服务注册中心
    服务注册中心【服务注册与发现】一、EurekaSpring社区出的,Nacos是阿里出的1.Eureka基础知识1.1什么是服务治理?​ SpringCloud封装了Netflix公司开发的Eureka模块来实现服务治理。​ 在传统的rpc远程调用框架中,服务与服务之间依赖关系比较复杂,管理比较复杂,所以需要......
  • SpringCloud系列之(四)微服务架构编码构建
    微服务架构编码构建一、IDEA新建project工作空间1.微服务cloud整体聚合父工程Project【父工程Project空间新建】NewProject聚合总工程名字Maven选版本字符编码注解生效激活java编译版本选8FileType过滤【选做】删除src目录2.父工程POM文......
  • SpringCloud系列之(二)从2 2 x和H版开始说起
    从2.2.x和H版开始说起SpringCloudDalston.SR1+SpringBoot1.5.9版本的搭配已过时一、SpringBoot版本选择SpringBoot是以数字作为版本的,如:SpringBoot1.5.9二、SpringCloud版本选择SpringCloud的版本命名规则及版本关系​ SpringCloud采用了英国伦敦地铁站的名称来命名,......
  • SpringCloud系列之(一)微服务架构零基础理论入门
    微服务架构零基础理论入门一、微服务架构概述1.什么是微服务由MartinFlower提出微小的、独立的进程、轻量级、可独立部署​ 微服务架构是种架构模式,它提倡将单一应用程序划分成一组小的服务,服务之间互相协调、互相配合,为用户提供最终价值。每个服务运行在其独立的进程中,......
  • SpringBoot 1x 系列之(十一)Spring Boot与任务
    SpringBoot与任务异步任务、定时任务、邮件任务1.异步任务1.1应用场景执行一些操作(如:邮件任务等)不想阻塞当前线程的情况下,可以通过多线程的方式进行异步处理。1.2快速使用主配置类//开启@Async异步注解功能@EnableAsync@EnableRabbit@EnableCaching@MapperScan("co......
  • SpringMVC系列之(十一)异常处理
    异常处理1.未进行异常处理的异常传递流程2.SpringMVC异常处理流程3.SpringMVC异常处理开发步骤编写自定义异常类(做提示信息的)编写异常处理器配置异常处理器(跳转到错误提示页面)4.实现Controller中的方法packagecn.itcast.controller;importcn.itcast.domain.U......