首页 > 其他分享 >使用Spring Cloud Stream 驱动 RabbitMQ 代码示例

使用Spring Cloud Stream 驱动 RabbitMQ 代码示例

时间:2022-12-07 09:55:51浏览次数:76  
标签:Stream 示例 flowEnd Spring queue 文档 dap public out

1、Spring Cloud Stream 官方文档

官方配置文档参考:

Spring Cloud Stream Reference Documentation

Spring Cloud Stream RabbitMQ Binder Reference Guide

说明:

在网上查找了许多关于 SpringCloudStream 的中文配置文档,但多数文档更新不及时。而且 SpringCLoudStream 最近的版本在使用上变化较大,老版本的资料已经失去了参考价值。

官方文档虽然是英文的,对多数国内的开发人员阅读会有些吃力,但好在资料都是最新的,而且现在的翻译软件这么方便,国内的多数开发人员也是有一些英文的阅读基础的,建议还是参考官方文档来测试。

特别说明:

文档中的配置说明中,基本上都是 驼峰命名,但我们在实际的使用过程中,在 yaml 配置文件中,都是使用 - 来分割使用。

比如:文档中的 配置名为 acknowledgeMode

但在 yaml 中 acknowledge-mode

2、示例代码

2.1 application.yml

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: /
  cloud:
    stream:
      binders:
        # 配置 Binder
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 127.0.0.1
      default:
        binder: defaultRabbit
        consumer:
          max-attempts: 1  # 重试次数, 设置为 1 即不重试
      bindings:
        flowEndConsumer-in-0:
          group: queue  # queue
          destination: dap-oa.flowEnd  # exchange
        flowEndProducer-out-0:
          destination: dap-oa.flowEnd
      rabbit:
        #        default:
        #          acknowledge-mode: manual  # manual 手动确认
        #          dlq-ttl: 5000
        bindings:
          flowEndConsumer-in-0:
            consumer:
              binding-routing-key: flowEnd # 路由键
              exchange-type: topic  # 交换机类型
              # dlq-ttl: 5000 # 消息过期时间
              # acknowledge-mode: manual  # manual 手动确认  默认为 auto 自动确认
          flowEndProducer-out-0:
            producer:
              exchange-type: topic
              routing-key-expression: headers.routingKey # 根据 headers 中的 routingKey 参数,来确认路由到哪个 queue , "routingKey" 可自己命名
    function:
      definition: flowEndProducer;flowEndConsumer # 生产者和消费者必须在此配置,是能被自动识别

此配置内容,会在 rabbitMQ 中生成:

  • exchange : dap-oa.flowEnd
  • queue : dap-oa.flowEnd.queue
  • dap-oa.flowEnd.queue 与 dap-oa.flowEnd 绑定的路由键:flowEnd

交换机 exchange:

队列 queue:

2.2 生产者

方式一:

@Slf4j
@Configuration
public class ProducerDemo {

    @Bean
    public Supplier<Message<String>> flowEndProducer() {
        return new Supplier<Message<String>>() {
            @Override
            public Message<MsgData> get() {
                return MessageBuilder.withPayload("测试内容")
                .setHeader("routingKey", "flowEnd")     // 路由
                .build();
            }
        };
    }
}

方式二:

// 直接通过 StreamBridge 调用

private final StreamBridge streamBridge;

public void send() {
    streamBridge.send(RabbitConstant.FLOW_END_PRODUCER,
    MessageBuilder.withPayload("测试内容")
        .setHeader("routingKey", "flowEnd")
        .build());
}

2.2 消费者

@Slf4j
@Configuration
public class ConsumerDemo {

    @Bean
    public Consumer<Message<String>> flowEndConsumer() {
        return message -> {
            System.out.println("******************");
            System.out.println("At flowEndConsumer");
            System.out.println("******************");
            System.out.println("Received message " + message.getPayload());

            Channel channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
            Long deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);

            log.info("******************");
            log.info("channel:{}", channel);
            log.info("deliveryTag:{}", deliveryTag);
        };
    }
}

标签:Stream,示例,flowEnd,Spring,queue,文档,dap,public,out
From: https://www.cnblogs.com/xiangningdeguang/p/16962210.html

相关文章

  • Spring Cloud - Feign
    JAVA项目中如何实现接口调用?1)HttpclientHttpClient是ApacheJakartaCommon下的子项目,用来提供高效的、最新的、功能丰富的支持Http协议的客户端编程工具包,并且它......
  • IC64525: SQLCODE -30020 OR DRDA DATA STREAM SYNTAX ERROR REPORTED BY DB2 CONNECT
      Skiptomaincontent    Openacase    KnownIssues IC64525:SQLCODE-30020ORDRDADATASTREAMSYNT......
  • SpringCloud-负载均衡和通信(Ribbon、Feign)
    1.Ribbon:负载均衡(基于客户端)1.1负载均衡以及RibbonRibbon是什么?SpringCloudRibbon是基于NetflixRibbon实现的一套客户端负载均衡的工具。简单的说,Ribbon是......
  • SpringCloud-Eureka服务注册中心
    1什么是EurekaNetflix在涉及Eureka时,遵循的就是API原则.Eureka是Netflix的有个子模块,也是核心模块之一。Eureka是基于REST的服务,用于定位服务,以实现云端中间件层服务发......
  • SpringCloud-入门
    1、学习前言1.2文章大纲SpringCloud五大组件服务注册与发现——NetflixEureka负载均衡:客户端负载均衡——NetflixRibbon服务端负载均衡:——Feign(其也是依......
  • SpringBoot构建RESTful风格应用
    SpringBoot构建RESTful风格应用1.Web开发的两种模式:前后端不分离:以前没有移动互联网时,我们做的大部分应用都是前后端不分的,比如jsp,或者thymeleaf等后端分离模板,在这种架......
  • SpringMVC —— 响应
    响应页面  响应文本数据  响应json数据  响应json集合数据  注解    转换json时使用了类型转换器     ......
  • 解决SpringBoot框架因post数据量过大没反应问题(踩坑)
    最后在尝试下,springboot的application中加入如下两句话:OK~~~~spring.http.multipart.max-file-size=1000Mbspring.http.multipart.max-request-size=1000Mb补充知识:解......
  • SpringMVC —— 日期类型参数传递
    日期类型参数传递  相关注解  类型转换器   ......
  • Spring AOP
    SpringAOP介绍(1)面向切面编程(方面),利用AOP可以对业务逻辑的各个部分进行隔离,从而使得业务逻辑各部分之间的耦合度降低,提高程序的可重用性,同时提高了开发的效率。(2)通俗......