首页 > 其他分享 >Springboot RocketMQ整合—官方原版

Springboot RocketMQ整合—官方原版

时间:2023-09-13 16:01:21浏览次数:37  
标签:Springboot class topic rocketmq 原版 consumer public RocketMQ String

Doker 技术人自己的数码品牌

Doker官网:Doker 多克

一、添加maven依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${RELEASE.VERSION}</version>
</dependency>

Springboot RocketMQ整合—官方原版_User

二、发送消息

1、修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

Springboot RocketMQ整合—官方原版_ide_02

注意:
请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

2、编写代码

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    
    public static void main(String[] args){
        SpringApplication.run(ProducerApplication.class, args);
    }
    
    public void run(String... args) throws Exception {
          //send message synchronously
        rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
          //send spring message
        rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
        //send messgae asynchronously
          rocketMQTemplate.asyncSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
            @Override
            public void onSuccess(SendResult var1) {
                System.out.printf("async onSucess SendResult=%s %n", var1);
            }

            @Override
            public void onException(Throwable var1) {
                System.out.printf("async onException Throwable=%s %n", var1);
            }

        });
          //Send messages orderly
          rocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey")
        
        //rocketMQTemplate.destroy(); // notes:  once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
    }
    
    @Data
    @AllArgsConstructor
    public class OrderPaidEvent implements Serializable{
        private String orderId;
        
        private BigDecimal paidMoney;
    }
}

Springboot RocketMQ整合—官方原版_User_03

三、接收消息

1、Push模式

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876

Springboot RocketMQ整合—官方原版_System_04

注意:
请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

@SpringBootApplication
public class ConsumerApplication{
    
    public static void main(String[] args){
        SpringApplication.run(ConsumerApplication.class, args);
    }
    
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    public class MyConsumer1 implements RocketMQListener<String>{
        public void onMessage(String message) {
            log.info("received message: {}", message);
        }
    }
    
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
    public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{
        public void onMessage(OrderPaidEvent orderPaidEvent) {
            log.info("received orderPaidEvent: {}", orderPaidEvent);
        }
    }
}

Springboot RocketMQ整合—官方原版_ide_05

2、Pull模式

RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876
# When set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic, rocketmqTemplate will start lite pull consumer
# If you do not want to use lite pull consumer, please do not set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic
rocketmq.pull-consumer.group=my-group1
rocketmq.pull-consumer.topic=test

Springboot RocketMQ整合—官方原版_User_06

注意之前lite pull consumer的生效配置为rocketmq.consumer.group和rocketmq.consumer.topic,但由于非常容易与push-consumer混淆,因此在2.2.3版本之后修改为rocketmq.pull-consumer.group和rocketmq.pull-consumer.topic.

编写代码

@SpringBootApplication
public class ConsumerApplication implements CommandLineRunner {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Resource(name = "extRocketMQTemplate")
    private RocketMQTemplate extRocketMQTemplate;

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        //This is an example of pull consumer using rocketMQTemplate.
        List<String> messages = rocketMQTemplate.receive(String.class);
        System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);

        //This is an example of pull consumer using extRocketMQTemplate.
        messages = extRocketMQTemplate.receive(String.class);
        System.out.printf("receive from extRocketMQTemplate, messages=%s %n", messages);
    }
}

Springboot RocketMQ整合—官方原版_System_07

四、事务消息

修改application.properties

## application.propertiesrocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

Springboot RocketMQ整合—官方原版_User_08

注意:
请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public static void main(String[] args){
        SpringApplication.run(ProducerApplication.class, args);
    }

    public void run(String... args) throws Exception {
        try {
            // Build a SpringMessage for sending in transaction
            Message msg = MessageBuilder.withPayload(..)...;
            // In sendMessageInTransaction(), the first parameter transaction name ("test")
            // must be same with the @RocketMQTransactionListener's member field 'transName'
            rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
        } catch (MQClientException e) {
            e.printStackTrace(System.out);
        }
    }

    // Define transaction listener with the annotation @RocketMQTransactionListener
    @RocketMQTransactionListener
    class TransactionListenerImpl implements RocketMQLocalTransactionListener {
          @Override
          public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // ... local transaction process, return bollback, commit or unknown
            return RocketMQLocalTransactionState.UNKNOWN;
          }

          @Override
          public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            // ... check transaction status and return bollback, commit or unknown
            return RocketMQLocalTransactionState.COMMIT;
          }
    }
}

Springboot RocketMQ整合—官方原版_ide_09

五、消息轨迹

Producer 端要想使用消息轨迹,需要多配置两个配置项:

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

rocketmq.producer.enable-msg-trace=true
rocketmq.producer.customized-trace-topic=my-trace-topic

Springboot RocketMQ整合—官方原版_ide_10

Consumer 端消息轨迹的功能需要在 @RocketMQMessageListener 中进行配置对应的属性:

@Service
@RocketMQMessageListener(
    topic = "test-topic-1", 
    consumerGroup = "my-consumer_test-topic-1",
    enableMsgTrace = true,
    customizedTraceTopic = "my-trace-topic"
)
public class MyConsumer implements RocketMQListener<String> {
    ...
}

Springboot RocketMQ整合—官方原版_System_11

注意:
默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置 rocketmq.consumer.customized-trace-topic 配置项,不需要为在每个 @RocketMQMessageListener 配置。
若需使用阿里云消息轨迹,则需要在@RocketMQMessageListener中将accessChannel配置为CLOUD。

六、ACL功能

Producer 端要想使用 ACL 功能,需要多配置两个配置项:

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

rocketmq.producer.access-key=AK
rocketmq.producer.secret-key=SK

Springboot RocketMQ整合—官方原版_ide_12

Consumer 端 ACL 功能需要在 @RocketMQMessageListener 中进行配置

@Service
@RocketMQMessageListener(
    topic = "test-topic-1", 
    consumerGroup = "my-consumer_test-topic-1",
    accessKey = "AK",
    secretKey = "SK"
)
public class MyConsumer implements RocketMQListener<String> {
    ...
}

Springboot RocketMQ整合—官方原版_ide_13

注意:
可以不用为每个 @RocketMQMessageListener 注解配置 AK/SK,在配置文件中配置 rocketmq.consumer.access-key 和 rocketmq.consumer.secret-key 配置项,这两个配置项的值就是默认值

七、请求 应答语义支持

RocketMQ-Spring 提供 请求/应答 语义支持。

  • Producer端

发送Request消息使用SendAndReceive方法

注意
同步发送需要在方法的参数中指明返回值类型
异步发送需要在回调的接口中指明返回值类型

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    
    public static void main(String[] args){
        SpringApplication.run(ProducerApplication.class, args);
    }
    
    public void run(String... args) throws Exception {
        // 同步发送request并且等待String类型的返回值
        String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
        System.out.printf("send %s and receive %s %n", "request string", replyString);

        // 异步发送request并且等待User类型的返回值
        rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {
            @Override public void onSuccess(User message) {
                System.out.printf("send user object and receive %s %n", message.toString());
            }

            @Override public void onException(Throwable e) {
                e.printStackTrace();
            }
        }, 5000);
    }
    
    @Data
    @AllArgsConstructor
    public class User implements Serializable{
        private String userName;
            private Byte userAge;
    }
}

Springboot RocketMQ整合—官方原版_User_14

  • Consumer端

需要实现RocketMQReplyListener<T, R> 接口,其中T表示接收值的类型,R表示返回值的类型。

@SpringBootApplication
public class ConsumerApplication{
    
    public static void main(String[] args){
        SpringApplication.run(ConsumerApplication.class, args);
    }
    
    @Service
    @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
    public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
        @Override
        public String onMessage(String message) {
          System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
          return "reply string";
        }
      }
   
    @Service
    @RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer")
    public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User>{
        public void onMessage(User user) {
              System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
              User replyUser = new User("replyUserName",(byte) 10);    
              return replyUser;
        }
    }

    @Data
    @AllArgsConstructor
    public class User implements Serializable{
        private String userName;
            private Byte userAge;
    }
}

Springboot RocketMQ整合—官方原版_User_15

八、常见问题

  1. 生产环境有多个nameserver该如何连接?

rocketmq.name-server支持配置多个nameserver地址,采用;分隔即可。例如:172.19.0.1:9876;172.19.0.2:9876

  1. rocketMQTemplate在什么时候被销毁?

开发者在项目中使用rocketMQTemplate发送消息时,不需要手动执行rocketMQTemplate.destroy()方法, rocketMQTemplate会在spring容器销毁时自动销毁。

  1. 启动报错:Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[xxx] has been created before, specify another name please

RocketMQ在设计时就不希望一个消费者同时处理多个类型的消息,因此同一个consumerGroup下的consumer职责应该是一样的,不要干不同的事情(即消费多个topic)。建议consumerGroup与topic一一对应。

  1. 发送的消息内容体是如何被序列化与反序列化的?

RocketMQ的消息体都是以byte[]方式存储。当业务系统的消息内容体如果是java.lang.String类型时,统一按照utf-8编码转成byte[];如果业务系统的消息内容为非java.lang.String类型,则采用jackson-databind序列化成JSON格式的字符串之后,再统一按照utf-8编码转成byte[]。

  1. 如何指定topic的tags?

RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。 在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称。

注意:
tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。

  1. 发送消息时如何设置消息的key?

可以通过重载的xxxSend(String destination, Message<?> msg, ...)方法来发送消息,指定msg的headers来完成。示例:

Message<?> message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build();
rocketMQTemplate.send("topic-test", message);

Springboot RocketMQ整合—官方原版_User_16

同理还可以根据上面的方式来设置消息的FLAG、WAIT_STORE_MSG_OK以及一些用户自定义的其它头信息。

注意:
在将Spring的Message转化为RocketMQ的Message时,为防止header信息与RocketMQ的系统属性冲突,在所有header的名称前面都统一添加了前缀USERS_。因此在消费时如果想获取自定义的消息头信息,请遍历头信息中以USERS_开头的key即可。

  1. 消费消息时,除了获取消息payload外,还想获取RocketMQ消息的其它系统属性,需要怎么做?

消费者在实现RocketMQListener接口时,只需要起泛型为MessageExt即可,这样在onMessage方法将接收到RocketMQ原生的MessageExt消息。

@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
public class MyConsumer2 implements RocketMQListener<MessageExt>{
    public void onMessage(MessageExt messageExt) {
        log.info("received messageExt: {}", messageExt);
    }
}

Springboot RocketMQ整合—官方原版_ide_17

  1. 如何指定消费者从哪开始消费消息,或开始消费的位置?

消费者默认开始消费的位置请参考:RocketMQ FAQ。 若想自定义消费者开始的消费位置,只需在消费者类添加一个RocketMQPushConsumerLifecycleListener接口的实现即可。 示例如下:

@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
public class MyConsumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(String message) {
        log.info("received message: {}", message);
    }

    @Override
    public void prepareStart(final DefaultMQPushConsumer consumer) {
        // set consumer consume message from now
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
              consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
    }
}

Springboot RocketMQ整合—官方原版_ide_18

同理,任何关于DefaultMQPushConsumer的更多其它其它配置,都可以采用上述方式来完成。

  1. 如何发送事务消息?

在客户端,首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明@RocketMQTransactionListener,实现确认和回查方法;然后再使用资源模板RocketMQTemplate, 调用方法sendMessageInTransaction()来进行消息的发布。 注意:从RocketMQ-Spring 2.1.0版本之后,注解@RocketMQTransactionListener不能设置txProducerGroup、ak、sk,这些值均与对应的RocketMQTemplate保持一致

  1. 如何声明不同name-server或者其他特定的属性来定义非标的RocketMQTemplate?

第一步: 定义非标的RocketMQTemplate使用你需要的属性,可以定义与标准的RocketMQTemplate不同的nameserver、groupname等。如果不定义,它们取全局的配置属性值或默认值。

// 这个RocketMQTemplate的Spring Bean名是'extRocketMQTemplate', 与所定义的类名相同(但首字母小写)
@ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876"
   , ... // 定义其他属性,如果有必要。
)
public class ExtRocketMQTemplate extends RocketMQTemplate {
  //类里面不需要做任何修改
}

Springboot RocketMQ整合—官方原版_User_19

第二步: 使用这个非标RocketMQTemplate

@Resource(name = "extRocketMQTemplate") // 这里必须定义name属性来指向上述具体的Spring Bean.
private RocketMQTemplate extRocketMQTemplate;

Springboot RocketMQ整合—官方原版_System_20

接下来就可以正常使用这个extRocketMQTemplate了。

  1. 如何使用非标的RocketMQTemplate发送事务消息?

首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明@RocketMQTransactionListener,注解字段的rocketMQTemplateBeanName指明为非标的RocketMQTemplate的Bean name(若不设置则默认为标准的RocketMQTemplate),比如非标的RocketMQTemplate Bean name为“extRocketMQTemplate",则代码如下:

@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")
    class TransactionListenerImpl implements RocketMQLocalTransactionListener {
          @Override
          public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // ... local transaction process, return bollback, commit or unknown
            return RocketMQLocalTransactionState.UNKNOWN;
          }

          @Override
          public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            // ... check transaction status and return bollback, commit or unknown
            return RocketMQLocalTransactionState.COMMIT;
          }
    }

Springboot RocketMQ整合—官方原版_ide_21

然后使用extRocketMQTemplate调用sendMessageInTransaction()来发送事务消息。

  1. MessageListener消费端,是否可以指定不同的name-server而不是使用全局定义的'rocketmq.name-server'属性值 ?
@Service
@RocketMQMessageListener(
   nameServer = "NEW-NAMESERVER-LIST", // 可以使用这个optional属性来指定不同的name-server
   topic = "test-topic-1", 
   consumerGroup = "my-consumer_test-topic-1",
   enableMsgTrace = true,
   customizedTraceTopic = "my-trace-topic"
)
public class MyNameServerConsumer implements RocketMQListener<String> {
   ...
}

Springboot RocketMQ整合—官方原版_ide_22

大家好,我是Doker品牌的Sinbad,欢迎点赞和评论,您的鼓励是我们持续更新的动力!欢迎加微信进入技术群聊!


标签:Springboot,class,topic,rocketmq,原版,consumer,public,RocketMQ,String
From: https://blog.51cto.com/Doker/7455947

相关文章

  • 对SpringBoot接口进行操作日志记录
    最近业务有需求要对所有的用户操作进行日志记录,方便管理员查询不同权限级别的用户对系统的操作记录,现有的日志只是记录了异常信息、业务出错、重要功能的执行进行了记录,并不能满足需求要求,最直接的解决方法是在每个接口上去添加log.info之类的代码,但是这种方式对业务代码的切入性......
  • SpringBoot入门(一) springBoot框架搭建和启动
    1.创建maven工程MavenProject      //CODE    <projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xs......
  • SpringBoot教程(二)springboot的配置文件
    一.springboot配置文件的类型application.propertiesapplication.yml项目结构,因为不可以同时使用这两种文件启动时任选一个放到resources下即可 二.properties配置文件的使用packagecom.lpinfo.shop.lpinfoshop;importorg.springframework.beans.factory.annotation.Autowi......
  • springboot发布部署web jar包
    1.在idea中生成jar包文件 2.我这个项目使用的是JavaJDK20,所以要在官网下载这个版本在服务器上安装。https://www.oracle.com/java/technologies/downloads/   有些系统需要重启下服务器才会生效。 3.把第一步生成的 demo-0.0.1-SNAPSHOT.jar文件复制到服务器......
  • Springboot @ConfigurationProperties对象 静态方法调用无效
    一.问题1.springboot使用 @ConfigurationProperties注入对象普通方法调用可以 在静态方法中调用的时候读取不到参数二.文件packagecom.lpinfo.framework.config;@Data@Component@PropertySource("classpath:/oss.properties")@ConfigurationProperties(prefix="oss")......
  • RocketMQ-(7-1)-可观测-Metrics
    RocketMQ以Prometheus格式公开以下指标。您可以使用这些指标监视您的集群。服务端Metrics指标生产者Metrics指标消费者Metrics指标版本支持:以下指标Metrics是从5.1.0版本开始支持。Metrics指标详情Metrictypes消息队列RocketMQ版定义的Metrics完全兼容开源Prom......
  • RocketMQ教程-(6-2)-运维部署-Admin Tool
    执⾏命令⽅法:./mqadmin{command}{args}⼏乎所有命令都需要配置-n表⽰NameServer地址,格式为ip:port⼏乎所有命令都可以通过-h获取帮助如果既有Broker地址(-b)配置项又有clusterName(-c)配置项,则优先以Broker地址执⾏命令,如果不配置Broker地址,则对集群中所有主机执⾏命令......
  • RocketMQ教程-(5)-功能特性-消费者分类
    ApacheRocketMQ支持PushConsumer、SimpleConsumer以及PullConsumer这三种类型的消费者,本文分别从使用方式、实现原理、可靠性重试和适用场景等方面为您介绍这三种类型的消费者。背景信息ApacheRocketMQ面向不同的业务场景提供了不同消费者类型,每种消费者类型的集成方式和......
  • RocketMQ-(8-1)-EventBridge-EventBridge 核心概念
    RocketMQEventBridge核心概念理解EventBridge中的核心概念,能帮助我们更好的分析和使用EventBridge。本文重点介绍下EventBridge中包含的术语:EventSource:事件源。用于管理发送到EventBridge的事件,所有发送到EventBridge中的事件都必须标注事件源名称信息,对应CloudEvent事件体中的s......
  • RocketMQ-(9-1)-MQTT-EventBridge概述
    RocketMQMQTT概览传统的消息队列MQ主要应用于服务(端)之间的消息通信,比如电商领域的交易消息、支付消息、物流消息等等。然而在消息这个大类下,还有一个非常重要且常见的消息领域,即IoT类终端设备消息。近些年,我们看到随着智能家居、工业互联而兴起的面向IoT设备类的消息正在呈爆炸式......