首页 > 其他分享 >spring-cloud-stream-rabbitmq 3.1.1使用

spring-cloud-stream-rabbitmq 3.1.1使用

时间:2024-02-08 14:22:11浏览次数:24  
标签:stream spring springframework binder 3.1 org cloud

 

 

 

1.引入spring cloud, spring cloud alibaba, spring boot依赖

            <!-- SpringCloud 微服务 -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2020.0.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <!-- SpringCloud Alibaba 微服务 -->
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>2021.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <!-- SpringBoot 依赖配置 -->
            <!--<dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.4.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>-->

2.引入spring cloud stream rabbit依赖

        <!-- spring cloud stream RabbitMQ -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

3. 配置

spring:
      cloud:
        stream:
          binders:
            default-rabbit:
              type: rabbit
              environment:
                spring:
                    rabbitmq:
                      host: ${RABBITMQ_HOST:xx}
                      port: ${RABBITMQ_PORT:5672}
                      username: xx
                      password: xx
          bindings:
            #MQ生产者
            test-out-0:
              destination: test-topic
              content-type: application/json
              binder: default-rabbit
            #MQ消费者
            test1-in-0:
              destination: test-topic
              group: test1-group
              content-type: application/json
              binder: default-rabbit
            #MQ消费者
            test2-in-0:
              destination: test-topic
              group: test2-group
              content-type: application/json
              binder: default-rabbit
          rabbit:
            bindings:
              test-out-0:
                producer:
                  routingKeyExpression: headers.type
                  exchange-type: direct
              test1-in-0:
                consumer:
                  bindingRoutingKey: k1
                  exchange-type: direct
                  acknowledge-mode: AUTO
              test2-in-0:
                consumer:
                  bindingRoutingKey: k2
                  exchange-type: direct
                  acknowledge-mode: AUTO
          #消费者方法名
          function:
            definition: test1;test2;

 

 4 生产者代码

 @Autowired
 private StreamBridge streamBridge;

    @ApiOperation("动态解析并发送消息到指定名称的通道")
    @PostMapping("test/send/{destination}/{routingKey}")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void send(@PathVariable String destination, @PathVariable String routingKey){
        Map<String,String> payload = new HashMap<>();
        payload.put("key1", "value1");
        payload.put("key2", "value2");
        Message<Map<String, String>> message = MessageBuilder
                .withPayload(payload)
                .setHeader("type", routingKey)
                .build();

        streamBridge.send(destination, message);
    }

// 请求 : http://localhost/test/send/test-out-0/k2

5 消费者

 @Bean
    public Consumer test1() {
        return request -> {
            log.info("test1 MQ消费  req:{}", request);
        };
    }

    @Bean
    public Consumer test2() {
        return request -> {
            log.info("test2 MQ消费  req:{}", request);
        };
    }

6 源码分析

生产者发送源码分析:

    org.springframework.cloud.stream.function.StreamBridge#send
        SubscribableChannel messageChannel = org.springframework.cloud.stream.function.StreamBridge#resolveDestination  封装channel
            SubscribableChannel messageChannel = new DirectWithAttributesChannel();
            Binding<T> binding = org.springframework.cloud.stream.binding.BindingService#bindProducer
                org.springframework.cloud.stream.binding.BindingService#getBinder
                    org.springframework.cloud.stream.binder.DefaultBinderFactory#getBinder
                    org.springframework.cloud.stream.binder.DefaultBinderFactory#doGetBinder
                    org.springframework.cloud.stream.binder.DefaultBinderFactory#getBinderInstance 
                    
                    1.得到RabbitMessageChannelBinder,启动时注入的org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration
                    2.得到DefaultBinderFactory, 启动时注入的org.springframework.cloud.stream.config.BindingServiceConfiguration
            org.springframework.cloud.stream.binding.BindingService#doBindProducer
                org.springframework.cloud.stream.binder.AbstractMessageChannelBinder#doBindProducer
                org.springframework.cloud.stream.binder.AbstractMessageChannelBinder#createProducerMessageHandler
                    org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder#createProducerMessageHandler
                        MessageHandler endpoint = new AmqpOutboundEndpoint()

 

标签:stream,spring,springframework,binder,3.1,org,cloud
From: https://www.cnblogs.com/smileblogs/p/18011781

相关文章

  • 在spring场景下同时使用Mockito进行集成测试
    转自:GPT回答https://chat.openai.com/share/07721c36-f18e-429f-b570-d8e14b4d8b37Mockito@InjectMocks测试February8,2024 Anonymous如何使用@InjectMocks测试被代理对象ChatGPTChatGPT使用@InjectMocks来测试被代理对象是一种......
  • SpringBoot脚手架使用
    介绍脚手架可以帮助我们快速创建SpringBoot项目。Spring提供的脚手架页面地址,核心为https://github.com/spring-io/initializr这个项目,https://github.com/spring-io/start.spring.io这个项目在此基础上提供了一些额外配置,并提供了前端页面。内部是通过https://start.spri......
  • AXI-STREAM简介
    AXI-STREAM简介概念AXI-Stream总线是一种高效、简单的数据传输协议,主要用于高吞吐量的数据流传输场景。相比于传统的AXI总线,AXI-Stream总线更加简单和轻量级,它通过无需地址的方式,将数据从一个模块传输到另一个模块,适用于需要高速数据传输的应用场景。部分术语Transfer:基于TVA......
  • Spring6-IoC(learning)
    Spring-IoCA.控制反转IoC(InversionofControl):控制反转(设计思想)Spring通过IoC容器管理所有Java对象的实例化与初始化,控制对象间的依赖关系。将由IoC容器管理的Java对象称为SpringBean,它与使用new关键字创建的Java对象没有任何区别。IoC容器是Spring框架最重要的核心组件之一......
  • javascript 下载 application/octet-stream 文件
    functiondownloadFile(id){varxhr=newXMLHttpRequest();xhr.open('POST','https://localhost/api/app/isp-detection/'+id+'/download');xhr.responseType='blob';xhr.setRequestHeader('Cont......
  • Spring Boot 集成 Redisson分布式锁(注解版)
    转载自:https://blog.csdn.net/Ascend1977/article/details/131126047        Redisson是一种基于Redis的Java驻留集群的分布式对象和服务库,可以为我们提供丰富的分布式锁和线程安全集合的实现。在SpringBoot应用程序中使用Redisson可以方便地实现分布式应用......
  • Spring Boot项目常用配置整理
    〇、常用地址一、配置文件1.1bootatrap.xml#Spring配置spring:#应用名application:name:data-xx-platform#启动环境profiles:active:@spring.profiles.active@cloud:nacos:#注册中心discovery:server-addr:http://nacos......
  • SpringBoot简介
    1、为什么有SpringBoot?J2EE笨重的开发、繁多的配置、低下的开发效率、复杂的部署流程、第三方技术集成难度大。2、SpringBoot是什么?是一个一站式整合所有应用框架的框架;并且完美整合Spring技术栈。SpringBoot来简化开发,约定大于配置,去繁从简,justrun就能创建一个......
  • Spring 接点、切点、切面、引入、织入、通知 概念
    importcom.github.pagehelper.PageHelper;importorg.aspectj.lang.ProceedingJoinPoint;importorg.aspectj.lang.annotation.Around;importorg.aspectj.lang.annotation.Aspect;importorg.aspectj.lang.annotation.Pointcut;importorg.springframework.context.ann......
  • unidac在lazarus 3.0/fpc3.3.1遇到的问题
    近日和樵夫交流时发现unidac在aarch64linux交叉编译UniProvider.pas出错:UniProvider.pas(1040,1)Error:Compilationraisedexceptioninternally奇怪的是其他CPU类型是正常的。樵夫的解决办法:1、修改UniProvider.pas,添加{$ifFPC_FULLVERSION<30301}TEnumerator......