首页 > 其他分享 >SpringCloud Stream消息驱动

SpringCloud Stream消息驱动

时间:2023-01-19 21:46:07浏览次数:59  
标签:Stream spring org boot springframework SpringCloud 驱动 public cloud

简单搭建,没有技术含量,Demo可用

1、介绍

①产生原因

RabbitMQ、RocketMQ、Kafka、ActiveMQ

在一个项目中,可能存在多种不同的MQ,在不同的MQ中,切换维护开发都很麻烦。

如果你会RabbitMQ,不会Kafka,要换MQ,还要重头学??

有没有技术,能够不再关注MQ细节,让我们使用一种适配绑定的版本,自动在MQ内切换:

有,SpringCloud Stream

②简介

SpringCloud Stream是一个构建消息驱动微服务的框架

官方文档:Spring Cloud Stream

应用程序通过inputs和outputs来和SpringCloud Stream的binder对象交互。

我们通过配置来binding(绑定),binder对象负责和消息中间件交互。

我们只需要和Spring Cloud Stream交互就能使用MQ了

就类似于JDBC能够操作Mysql,Orcal等不同数据库

通过Spring Integration来连接消息代理中间件,以实现消息事件驱动

SpringCloud Stream为一些中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区三个核心概念。

③支持MQ

image-20230119142918615

④设计思路

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接和MQ进行交互

通过定义绑定器作为中间层,我们能够完美实现应用程序和消息中间件细节之间的隔离。

通过向应用程序暴露统一的Channel通道,使应用程序不需要考虑各种不同的消息中间件实现。

通过定义绑定器Binder作为中间层,实现应用程序和消息中间件细节间的隔离

image-20230119145043428

通过定义绑定器Binder作为中间层,实现应用程序和消息中间件之间的隔离

⑤注解简介

image-20230119152603686

image-20230119145110519

组成 说明
Middleware 中间件,只支持RabbitMQ和Kafka
Binder Binder是应用和消息中间件之间的封装,通过Binder可以方便的连接中间件,可以动态改变消息类型(Kafka的topic,RabbitMq的exchange),可以通过配置文件实现
@Input 注解表示输入通道,通过输入通道收到的消息进入ApplicationCore
@Output 注解标识输出通道,发布的消息通过该通道离开ApplicationCore
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起

2、QuickStart

①Provider搭建

新建cloud-stream-rabbitmq-provider8801,生产者

pom.xml:

<dependencies>
    <!--stream rabbit -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <!--eureka client-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--监控-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--热部署-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

yml:

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  rabbitmq:
    host: 101.43.244.40
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
      bindings: #服务的整合处理
        output: #这个名字是一个通道的名称
          destination: studyExchange #表示要使用的Exchange名称定义
          content-type: application/json #设置消息类型,本次为json,本文要设置为“text/plain”
          binder: defaultRabbit #设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错)

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认是30S)
    lease-expiration-duration-in-seconds: 5 #如果超过5S间隔就注销节点 默认是90s
    instance-id: send-8801.com #在信息列表时显示主机名称
    prefer-ip-address: true #访问的路径变为IP地址

启动类:

@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }
}

业务类:

新建service.IMessageProvider接口

public interface IMessageProvider {
    public String send();
}
@EnableBinding(Source.class)    //定义消息的推送管道(Source是spring的)
public class IMessageProviderImpl implements IMessageProvider {

    @Resource
    private MessageChannel output;  //消息发送管道

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());     //MessageBuilder是spring的integration.support.MessageBuilder
        System.out.println("*******serial: " + serial);
        return null;
    }
}

新建controller.SendMessageController

@RestController
public class SendMessageController {
    @Resource
    private IMessageProvider iMessageProvider;
    @GetMapping("/sendMessage")
    public String sendMessage(){
        return iMessageProvider.send();
    }
}

启动Eureka注册中心,Provider8801,查看MQ控制台面板,Exchange成功创建:

image-20230119161407311

向Provider接口发送请求:

http://localhost:8801/sendMessage

控制台成功打印

image-20230119161512709

查看RabbitMQ控制台,能够看到发送的消息:

image-20230119161706954

②Consumer搭建

新建模块cloud-stream-rabbitmq-consumer8802

pom.xml:

<dependencies>
    <!--stream rabbit -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <!--eureka client-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--监控-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--热部署-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

yml.xml:

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  rabbitmq:
    host: 101.43.244.40
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
      bindings: #服务的整合处理
        input: #这个名字是一个通道的名称
          destination: studyExchange #表示要使用的Exchange名称定义
          content-type: application/json #设置消息类型,本次为json,本文要设置为“text/plain”
          binder: defaultRabbit #设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错)

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认是30S)
    lease-expiration-duration-in-seconds: 5 #如果超过5S间隔就注销节点 默认是90s
    instance-id: receive-8802.com #在信息列表时显示主机名称
    prefer-ip-address: true #访问的路径变为IP地址

启动类:

@SpringBootApplication
public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class, args);
    }
}

Controller:

@EnableBinding(Sink.class)
@Controller
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT) //监听
    public void input(Message<String> message){
        System.out.println("消费者1号------>收到的消息:" + message.getPayload() + "\t port:" + serverPort);
    }
}

③测试

启动Eureka7001,Provider8801,Consumer8802

向8801发送请求:

http://localhost:8801/sendMessage

8802接收请求:

image-20230119163547326

关于之前8801发送的消息,为什么8802没有接收到呢?

因为在8802启动之前,StudyExchange被创建出,但是没有binding队列,所以消息发送的Exchange就丢失了。

这些丢失的消息,在Consuemr启动,binding队列到Exchange后,不会被接收到

3、分组消费

①项目搭建

拷贝Consumer8802,新建Consumer8803子模块

启动:

image-20230119170033361

②重复消费问题

如图,8801发送消息,8802和8803都可以对该消息进行消费

image-20230119170932834

为什么?

因为8802和8803各自对Exchange创建了队列,并且绑定到了8801创建的Exchange上,所以Exchange会分发两条相同的消息到他们各自的队列上,他们也会通过各自的队列消费:

image-20230119211448780

③解决:分组

同一个组的多个微服务实例,每次只会有一个拿到Provider产生的消息。而不同组的多个微服务,每个组都可以消费一次:

组?:

如下图所示,队列的Queue是由 Exchange的名词+.+组名 组成的

image-20230119212310019

将Consumer8802和Consumer8803设置为同一个组:

在它们的配置中添加group都设置为GroupA

image-20230119212029409

测试:

8802为8803都为GroupA,此时消息只会被消费一次

image-20230119212457728

image-20230119212619243

image-20230119212605452

标签:Stream,spring,org,boot,springframework,SpringCloud,驱动,public,cloud
From: https://www.cnblogs.com/zko0/p/17062192.html

相关文章

  • 基于STC8H单片机的双通道直流有刷电机驱动控制器
    基于STC8H单片机的双通道直流有刷电机驱动控制器简介:本设计以STC8H1K08单片机为控制核心,使用PWMA模块生成2路同频PWM波,内置AD转换电位器的实时电压,计算生成PWM波占空比值......
  • 关于#springCloud集成swagger#的问题
    提问:关于#springCloud集成swagger#的问题,如何解决?springCloud整合swagger时Postman请求接口没问题,但用swagger访问时需要认证客户端但后端日志显示已经请求成功。解答:在Sp......
  • 《DFZU2EG_4EV MPSoC之嵌入式Linux开发指南》 第十九章 新字符设备驱动实验​
    新字符设备驱动实验​经过前两章实验的实战操作,我们已经掌握了Linux字符设备驱动开发的基本步骤,字符设备驱动开发重点是使用register_chrdev函数注册字符设备,当不再使用设备......
  • SpringCloud Alibaba之Sentinelt组件
    文章目录​​一、Sentinel熔断与限流​​​​二、控制台安装​​​​1、Sentinel控制台安装​​​​三、规则讲解​​​​1、实时监控​​​​2、流控规则​​​​2.1流控......
  • Istio与SpringCloud对比
    Istio数据平面的高性能智能网络代理,它是基于Envoy改进的Istio-Proxy,控制和协调了被代理服务的所有网络通信,同时也负责收集和上报相关的监控数据。也就是说,代理服务跟外......
  • SpringCloud(二)
    文章目录​​6、Ribbon​​​​6.1、Ribbon是什么​​​​6.2、Ribbon能干什么​​​​6.3、Ribbon实现负载均衡环境搭建​​​​6.4、Ribbon实现负载均衡​​​​6.5、自定......
  • 如何解决安装Windows 11/10时找不到磁盘驱动器
     001、利用u盘对华硕笔记本安装系统时出现如下问题  002下载IntelRapidStorageTechnology(IRST),下载链接对其进行解压:  003、将其拷贝至U盘介质中 ......
  • SpringCloud Tencent Polaris
    北极星是腾讯开源的服务发现和治理中心,致力于解决分布式或者微服务架构中的服务可见、故障容错、流量控制和安全问题。虽然,业界已经有些组件可以解决其中一部分问题,但是缺少......
  • 万字长文助你上手软件领域驱动设计 DDD
    最近看了一本书《解构-领域驱动设计》,书中提出了领域驱动设计统一过程(DDDRUP),它指明了实践DDD的具体步骤,并很好地串联了各种概念、模式和思想。因此,我对书本内容做了梳......
  • 万字长文助你上手软件领域驱动设计 DDD
    最近看了一本书《解构-领域驱动设计》,书中提出了领域驱动设计统一过程(DDDRUP),它指明了实践DDD的具体步骤,并很好地串联了各种概念、模式和思想。因此,我对书本内容做了梳......