首页 > 其他分享 >RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消费收发实战

RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消费收发实战

时间:2023-12-28 20:35:00浏览次数:42  
标签:stream spring Alibaba RocketMQ SpringCloud test input bindings cloud

欢迎关注公众号:【11来了】 发送 “资料” 可以下载Redis、JVM系列文章PDF版本!

作者为在读研究生,目前研二,计划在公众号记录学习常用中间件笔记,以及明年更新面试经历!

Spring Cloud Alibaba 集成 RocketMQ 最佳实践

SpringBoot 相对于 SSM 来说已经很大程度上简化了开发,但是使用 SpringBoot 集成一些第三方的框架,还是需要花费一些力气

因此,SpringCloud 出现的宗旨就是简化 SpringBoot 集成第三方框架的过程,SpringCloud 内置集成了很多第三方插件,但是 SpringCloud 前期很重的依赖了 Netflix 组件, 但是 Netflix 组件不再维护了

因此,基于 SpringCloud 又出现了 SpringCloudAlibaba,可以灵活的进行扩展、替换插件,那么通过 SpringCloudAlibaba 集成 RocketMQ 之后,关系图如下:

RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消费收发实战_ci


SpringCloudAlibaba 集成 RocketMQ 官方文档



集成依赖

首先,项目引入 SpringCloud、SpringCloudAlibaba 依赖和 RocketMQ 依赖,之后项目都引入该依赖即可使用 RocketMQ

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <parent>
    <artifactId>spring-boot-starter-parent</artifactId>
    <groupId>org.springframework.boot</groupId>
    <version>2.3.12.RELEASE</version>
  </parent>

  <groupId>com.mq.cloud</groupId>
  <artifactId>parent</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>pom</packaging>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <com.cloud.version>Hoxton.SR12</com.cloud.version>
    <com.alibaba.cloud.version>2.2.8.RELEASE</com.alibaba.cloud.version>
  </properties>

  <dependencyManagement>
    <dependencies>
      <!--集成 SpringCloud-->
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${com.cloud.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
      <!--集成 SpringCloudAlibaba-->
      <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-alibaba-dependencies</artifactId>
        <version>${com.alibaba.cloud.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
      <!--引入 RocketMQ 依赖-->
      <dependency>
         <groupId>com.alibaba.cloud</groupId>
         <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
      </dependency>
    </dependencies>
  </dependencyManagement>
</project>


DashBoard

可以通过 dashboard 项目来观测 topic 消费情况,下载源码,在 application.yml 中配置 nameserver 地址启动即可

https://github.com/apache/rocketmq-dashboard

在 localhost:8080 即可访问 Dashboard

RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消费收发实战_ci_02


消息收发实战

项目结构如下:

RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消费收发实战_spring_03

首先新建一个项目,引入上边依赖

主启动类如下:

@SpringBootApplication
@EnableBinding({ CustomSource.class, CustomSink.class })
public class RocketMQApplication {

    public static void main(String[] args) {
        SpringApplication.run(RocketMQApplication.class, args);
        System.out.println("【【【【【  RocketMQApplication 启动成功!!!   】】】】】");
    }
  
    // @StreamListener 声明对应的 Input Binding,这里设置两个通道来接收 topic 信息
    @StreamListener("input")
    public void receiveInput(String receiveMsg) {
        System.out.println("input receive: " + receiveMsg);
    }

    @StreamListener("input2")
    public void receiveInput2(String receiveMsg) {
        System.out.println("input2 receive: " + receiveMsg);
    }
}


消费者的 stream 默认为 input,生产者默认为 output,我们可以使用自定义的 Source 和 Sink 来扩展 stream 里的消费者配置,自定义 Source 和 Sink 如下(在主启动类通过 @EnableBinding 来绑定):

我们通过自定义 Source 和 Sink 添加了一个通道 input2、output2,那么生产者和消费者就可以收发多个 topic 了

public interface CustomSink extends Sink {

    /**
     * Input channel name.
     */
    String INPUT2 = "input2";

    /**
     * @return input channel.
     */
    @Input(CustomSink.INPUT2)
    SubscribableChannel input2();
}

public interface CustomSource extends Source {

    /**
     * Name of the output channel.
     */
    String OUTPUT2 = "output2";

    /**
     * @return output channel
     */
    @Output(CustomSource.OUTPUT2)
    MessageChannel output2();
}


application.properties 如下:

spring.application.name=mq_rmqdemo
server.port=9500

# configure the nameserver of rocketmq
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
spring.cloud.stream.rocketmq.binder.group=mq_rmqdemo

# configure the output binding named output
# 第一个通道的 topic
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=application/json

# configure the input binding named input
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=test-group

# configure the output2 binding named output
# 第二个通道的 topic
spring.cloud.stream.bindings.output2.destination=test-topic2
spring.cloud.stream.bindings.output2.content-type=application/json

# configure the input binding named input
spring.cloud.stream.bindings.input2.destination=test-topic2
spring.cloud.stream.bindings.input2.content-type=application/json
spring.cloud.stream.bindings.input2.group=test-group2


接下来写生产者发送两个 topic,在消费者即可看到消息被成功接收:

// 生产者
public class Producer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 3; i++) {
            Message msg = new Message(
                    "test-topic",
                    "tagStr",
                    ("( " + i + " )message from mq_rmqdemo producer:【test-topic1】").getBytes());
            producer.send(msg);

            Message msg2 = new Message(
                    "test-topic2",
                    "tagStr",
                    ("( " + i + " )message from mq_rmqdemo producer:【test-topic2】").getBytes());
            producer.send(msg);
            producer.send(msg2);
        }
        System.out.println("Send Finished.");
    }
}


标签:stream,spring,Alibaba,RocketMQ,SpringCloud,test,input,bindings,cloud
From: https://blog.51cto.com/u_16186397/9018056

相关文章

  • springcloud动力节点-01Eureka
    SpringCloudEureka1.SpringCloudEureka简介注册发现中心Eureka来源于古希腊词汇,意为“发现了”。在软件领域,Eureka是Netflix在线影片公司开源的一个服务注册与发现的组件,和其他Netflix公司的服务组件(例如负载均衡、熔断器、网关等)一起,被SpringCloud社区整合......
  • springcloud动力节点-05Sleuth
    SpringCloudSleuth1.什么是链路追踪官网:https://spring.io/projects/spring-cloud-sleuth链路追踪就是:追踪微服务的调用路径2.链路追踪的由来在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的服务节点调用来协同产生最后的请求结果,每一个请求都会开成一......
  • springcloud动力节点-04Hystrix
    SpringCloudHystrix1.前言1.1什么是服务雪崩   服务雪崩的本质:线程没有及时回收。不管是调用成功还是失败,只要线程可以及时回收,就可以解决服务雪崩1.2服务雪崩怎么解决1.2.1修改调用的超时时长(不推荐)将服务间的调用超时时长改小,这样就可以让线程及时回收,保证服......
  • springcloud动力节点-03OpenFeign
    SpringCloudOpenFeign 1.说在前面上一节我们讲到Ribbon做了负载均衡,用Eureka-Client来做服务发现,通过RestTemplate来完成服务调用,但是这都不是我们的终极方案,终极方案是使用OpenFeign2.OpenFeign简介https://docs.spring.io/spring-cloud-open......
  • Spring Cloud动力节点-07Alibaba简介、注册、配置中心
    1.项目简介SpringCloudAlibaba致力于提供微服务开发的一站式解决方案。此项目包含开发分布式应用微服务的必需组件,方便开发者通过SpringCloud编程模型轻松使用这些组件来开发分布式应用服务。依托SpringCloudAlibaba,您只需要添加一些注解和少量配置,就可以将SpringClo......
  • springcloud动力节点-06Admin监控 Or Gateway网关
    SpringCloudAdmin 监控端点新建工程:admin-serverpom中springcloud版本号和版本控制要添加<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instan......
  • 微服务框架 SpringCloud微服务架构3
    微服务框架SpringCloud微服务架构3Eureka3.1提供者与消费者3.1.1一些概念服务提供者:一次业务中,被其它微服务调用的服务。(提供接口给其它微服务)服务消费者:一次业务中,调用其它微服务的服务。(调用其它微服务提供的接口)在我们的Demo案例中  很明显两者是如下的关系:3......
  • day23 SpringCloud应用改造实践 (8.3.1-8.4.1)
    8.3-1-SpringCloud应用改造实践上一、使用SkyWalkingAgentJava中使用agent,提供以下三种方式实现使用官方提供的基础镜像skywalking-base将agent包构建到已经存在的基础镜像中sidecar模式挂载agent(推荐)1.1使用官方提供的基础镜像https://skywalking.apache.org/downlo......
  • MAC 下载安装、启动和关闭 RocketMQ实例
    文章目录1.下载并解压2.实例2.1使用Maven打包、构建2.2修改Nameserver和Broker启动的配置2.2启动nameserver和broker3.关闭rocketmq的命令参考文献1.下载并解压首先,你需要安装好Java环境、Maven。网址:http://rocketmq.apache.org/dowloading/releases/,下载ro......
  • RocketMQ系统性学习-RocketMQ领域模型及Linux下单机安装
    欢迎关注公众号【11来了】,发送笔记可领取Redis、JVM等系列完整pdf!MQ之间的对比三种常用的MQ对比,ActiveMQ、Kafka、RocketMQ性能方面:三种MQ吞吐量级别为:万,百万,十万消息发送时延:毫秒,毫秒,微秒可用性:主从,分布式,分布式扩展性方面:水平伸缩能力:均支持技术栈:Java,Java/Scala,Java功能......