首页 > 其他分享 >RocketMQ 基础教程-RocketMQ整合SpringBoot

RocketMQ 基础教程-RocketMQ整合SpringBoot

时间:2024-07-02 09:29:55浏览次数:28  
标签:String void 发送 rocketmq 消息 基础教程 public RocketMQ SpringBoot

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

学习必须往深处挖,挖的越深,基础越扎实!

阶段1、深入多线程

阶段2、深入多线程设计模式

阶段3、深入juc源码解析


阶段4、深入jdk其余源码解析


阶段5、深入jvm源码解析

码哥源码部分

码哥讲源码-原理源码篇【2024年最新大厂关于线程池使用的场景题】

码哥讲源码【炸雷啦!炸雷啦!黄光头他终于跑路啦!】

码哥讲源码-【jvm课程前置知识及c/c++调试环境搭建】

​​​​​​码哥讲源码-原理源码篇【揭秘join方法的唤醒本质上决定于jvm的底层析构函数】

码哥源码-原理源码篇【Doug Lea为什么要将成员变量赋值给局部变量后再操作?】

码哥讲源码【你水不是你的错,但是你胡说八道就是你不对了!】

码哥讲源码【谁再说Spring不支持多线程事务,你给我抽他!】

终结B站没人能讲清楚红黑树的历史,不服等你来踢馆!

打脸系列【020-3小时讲解MESI协议和volatile之间的关系,那些将x86下的验证结果当作最终结果的水货们请闭嘴】

1、官网

RocketMQ为 SpringBoot 提供了整合方案,官网地址如下,上面提供了详细的整合步骤及案例。

    https://github.com/apache/rocketmq-spring

官方详细文档(可以切换不同的版本)

    https://github.com/apache/rocketmq-spring/blob/release-2.0.1/README.md

2、消息生产者

1)添加依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.1</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.1</version>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

2)配置文件

    rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
    rocketmq.producer.group=my-group

3)启动类

    @SpringBootApplication
    public class MQProducerApplication {
        public static void main(String[] args) {
            SpringApplication.run(MQSpringBootApplication.class);
        }
    }

4)测试类

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = {MQSpringBootApplication.class})
    public class ProducerTest {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @Test
        public void test1(){
            rocketMQTemplate.convertAndSend("springboot-mq","hello springboot rocketmq");
        }
    }

3、消息消费者

1)添加依赖

同消息生产者

2)配置文件

同消息生产者

3)启动类

    @SpringBootApplication
    public class MQConsumerApplication {
        public static void main(String[] args) {
            SpringApplication.run(MQSpringBootApplication.class);
        }
    }

4)消息监听器

    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "springboot-mq-consumer-1")
    public class Consumer implements RocketMQListener<String> {
    
        @Override
        public void onMessage(String message) {
            log.info("Receive message:"+message);
        }
    }

5)RocketMQMessageListener参数

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface RocketMQMessageListener {
    
        String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
        String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
        String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
        String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
        String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
    
        /**
         * 消费者分组
         *
         * @return
         */
        String consumerGroup();
    
        /**
         * 主题
         */
        String topic();
    
        /**
         * selectorType:消息选择器类型
         * - SelectorType.TAG:默认值,根据TAG选择,仅支持表达式格式如:“tag1 || tag2 || tag3”,如果表达式为null或者“*”标识订阅所有消息
         * - SelectorType.SQL92:根据SQL92表达式选择
         */
        SelectorType selectorType() default SelectorType.TAG;
    
        /**
         * selectorType 对应的表达式
         */
        String selectorExpression() default "*";
    
    
        /**
         * consumeMode:消费模式
         * - ConsumeMode.CONCURRENTLY:默认值,并行处理
         * - ConsumeMode.ORDERLY:按顺序处理
         */
        ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
    
        /**
         * messageMode:消息模型
         * - MessageModel.CLUSTERING:默认值,集群
         * - MessageModel.BROADCASTING:广播
         */
        MessageModel messageModel() default MessageModel.CLUSTERING;
    
        /**
         * 最大线程数,默认值 64
         */
        int consumeThreadMax() default 64;
    
        /**
         * 消费失败,最大重试次数
         * <p>
         * - 在并发模式中,-1表示16
         * - 在有序模式中,-1表示整数最大值
         */
        int maxReconsumeTimes() default -1;
    
        /**
         * 消息可能阻止使用线程的最长时间(分钟)
         */
        long consumeTimeout() default 15L;
    
        /**
         * 发送回复消息超时
         */
        int replyTimeout() default 3000;
    
        /**
         * 默认值 ${rocketmq.consumer.access-key:}
         */
        String accessKey() default ACCESS_KEY_PLACEHOLDER;
    
        /**
         * 默认值 ${rocketmq.consumer.secret-key:}
         */
        String secretKey() default SECRET_KEY_PLACEHOLDER;
    
        /**
         * 启用消息轨迹,默认值 false
         */
        boolean enableMsgTrace() default false;
    
        /**
         * 自定义的消息轨迹主题,默认值${rocketmq.consumer.customized-trace-topic:}
         * 没有配置此配置项则使用默认的主题
         */
        String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
    
        /**
         * 命名服务器地址,默认值${rocketmq.name-server:}
         */
        String nameServer() default NAME_SERVER_PLACEHOLDER;
    
        /**
         * 默认值${rocketmq.access-channel:}
         */
        String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
    }

4、发送同步消息

同步消息也就这些API,简单讲解一下!

    //发送普通同步消息-Object
    syncSend(String destination, Object payload)
    //发送普通同步消息-Message
    syncSend(String destination, Message<?> message)
    //发送批量普通同步消息
    syncSend(String destination, Collection<T> messages)
    //发送普通同步消息-Object,并设置发送超时时间
    syncSend(String destination, Object payload, long timeout)
    //发送普通同步消息-Message,并设置发送超时时间
    syncSend(String destination, Message<?> message, long timeout)
    //发送批量普通同步消息,并设置发送超时时间
    syncSend(String destination, Collection<T> messages, long timeout)
    //发送普通同步延迟消息,并设置超时,这个下文会演示
    syncSend(String destination, Message<?> message, long timeout, int delayLevel)
    @Setter
    @Getter
    @Accessors(chain = true)
    @AllArgsConstructor
    @NoArgsConstructor
    public class MsgTest {
        private int id;
        private String context;
        private Date date;
    
    }
    
    /**
     * 同步消息-
     */
    @Test
    void syncSendStr() {
        //syncSend和send是等价的
        rocketMQTemplate.syncSend("first-topic-str", "hello world test1");
        //send底层还是会调用syncSend的代码
        rocketMQTemplate.send("first-topic-str", MessageBuilder.withPayload("hello world test1").build());
        SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", "hello world test2");
        log.info("syncSend===>{}",res);
    }
    
    /**
     * 同步消息-
     */
    @Test
    void syncSendPojo() {
        MsgTest msg = new MsgTest(1,"hello world test3",new Date());
        SendResult res = rocketMQTemplate.syncSend("first-topic-pojo", MessageBuilder.withPayload(msg).build());
        log.info("syncSend===>{}",res);
    }

这里存在两种消息体,一种是Object的,另一种是Message<?>的形式的,其实我们发送Object的时候,底层是有帮我们做转换的,其实和我们在上层调用

    MessageBuilder.withPayload("hello world test1").build()

是一样的!源码如下

5、异步消息

    //发送普通异步消息-Object
    asyncSend(String destination, Object payload, SendCallback sendCallback)
    //发送普通异步消息-Message
    asyncSend(String destination, Message<?> message, SendCallback sendCallback)
    //发送普通异步消息-Object,并设置发送超时时间
    asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout)
    //发送普通异步消息-Message,并设置发送超时时间
    asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout)
    //发送普通异步延迟消息,并设置超时,这个下文会演示
    asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel)
    /**
     * 异步消息-String
     * 指发送方发出数据后,不等接收方发回响应,接着发送下个数据包
     * 关键实现异步发送回调接口(SendCallback)
     * 在执行消息的异步发送时应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理
     * 这种方式任然需要返回发送消息任务的执行结果,异步不影响后续任务,不会造成阻塞
     */
    @Test
    void asyncSendStr() {
        rocketMQTemplate.asyncSend("first-topic-str:tag1", "hello world test2 asyncSendStr", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("异步消息发送成功:{}", sendResult);
            }
    
            @Override
            public void onException(Throwable throwable) {
                log.info("异步消息发送失败:{}", throwable.getMessage());
            }
        });
    }

6、单向消息

这里普通单向消息就只有两个操作空间,这个不用多说了,一个是Object,另一个是Message

    /**
     * 单向消息
     * 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
     * 此方式发送消息的过程耗时非常短,一般在微秒级别
     * 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
     */
    @Test
    void sendOneWayStr() {
        rocketMQTemplate.sendOneWay("first-topic-str:tag1", "hello world test2 sendOneWayStr");
        log.info("单向消息已发送");
    }

6、批量消息

    /**
     * 批量消息
     */
    @Test
    void asyncSendBatch() {
        Message<String> msg = MessageBuilder.withPayload("hello world test1").build();
        List<Message> msgList = Arrays.asList(msg,msg,msg,msg,msg);
        SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", msgList);
        log.info("批量消息");
    }

7、延迟消息

1)同步延迟消息

    /**
     * 同步延迟消息
     * rocketMQ的延迟消息发送其实是已发送就已经到broker端了,然后消费端会延迟收到消息。
     * RocketMQ 目前只支持固定精度的定时消息。
     * 固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     * 延迟的底层方法是用定时任务实现的。
     */
    @Test
    void syncSendDelayedStr() {
        Message<String> message = MessageBuilder.withPayload("syncSendDelayedStr" + new Date()).build();
        /**
         * @param destination formats: `topicName:tags`
         * @param message 消息体
         * @param timeout 发送超时时间
         * @param delayLevel 延迟级别  1到18
         * @return {@link SendResult}
         */
        SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", message, 3000, 3);
        log.info("res==>{}", res);
    }

2)异步延迟消息

    /**
     * 异步延迟消息
     */
    @Test
    void asyncSendDelayedStr() {
        //Callback
        SendCallback sc=new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("发送异步延时消息成功");
            }
            @Override
            public void onException(Throwable throwable) {
                log.info("发送异步延时消息失败:{}",throwable.getMessage());
            }
        };
    
        Message<String> message= MessageBuilder.withPayload("asyncSendDelayedStr").build();
        rocketMQTemplate.asyncSend("first-topic-str:tag1", message, sc, 3000, 3);
    }

8、顺序消息

使用rocketmq-spring-boot-starter发送顺序消息就比较方便了,不像使用rocket-client那样,需要手动获取RocketMQ中当前topic的队列个数然后再通过hashKey值,mqs.size()取模,得到一个索引值,这里底层都帮我们做好了处理!

    /**
     * 顺序消息
     */
    @Test
    void SendOrderStr() {
        List<MsgTest> msgList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            msgList.add(new MsgTest(100, "我是id为100的第" + (i + 1) + "条消息", new Date()));
        }
        msgList.forEach(t -> {
            rocketMQTemplate.asyncSendOrderly("first-topic-str:tag1", t, String.valueOf(t.getId()), new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("异步消息发送成功:{}", sendResult);
                }
    
                @Override
                public void onException(Throwable throwable) {
                    log.info("异步消息发送失败:{}", throwable.getMessage());
                }
            });
        });
    }

9、事务消息

1)发送者

    /**
     * 事务消息  注意这里还有一个监听器 TransactionListenerImpl
     */
    @Test
    void sendTransactionStr() {
    
        String[] tags = {"TAGA", "TAGB", "TAGC"};
        for (int i = 0; i < 3; i++) {
            Message<String> message = MessageBuilder.withPayload("事务消息===>" + i).build();
            TransactionSendResult res = rocketMQTemplate.sendMessageInTransaction("transaction-str:" + tags[i], message, i + 1);
            if (res.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE) && res.getSendStatus().equals(SendStatus.SEND_OK)) {
                log.info("事物消息发送成功");
            }
    
            log.info("事物消息发送结果:{}", res);
        }
    }

2)事务消息生产者端的消息监听器

    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    @RocketMQTransactionListener
    public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            log.info("executeLocalTransaction");
            // 执行本地事务
            String tag = String.valueOf(msg.getHeaders().get("rocketmq_TAGS"));
            if ("TAGA".equals(tag)) {
                //这里只讲TAGA消息提交,状态为可执行
                return RocketMQLocalTransactionState.COMMIT;
            } else if ("TAGB".equals(tag)) {
                return RocketMQLocalTransactionState.ROLLBACK;
            } else if ("TAGC".equals(tag)) {
                return RocketMQLocalTransactionState.UNKNOWN;
            }
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    
        //mq回调检查本地事务执行情况
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            log.info("checkLocalTransaction===>{}", msg);
            return RocketMQLocalTransactionState.COMMIT;
        }
    }

3)消费者

    @Slf4j
    @Component
    @RocketMQMessageListener(consumerGroup = "transaction-group", topic = "transaction-str")
    public class TransactionConsumer implements RocketMQListener<String> {
    
        @Override
        public void onMessage(String str) {
            log.info("===>"+str);
        }
    
    }

标签:String,void,发送,rocketmq,消息,基础教程,public,RocketMQ,SpringBoot
From: https://blog.csdn.net/smart_an/article/details/140116104

相关文章

  • 基于web的酒店客房管理系统 毕业设计 springboot+Vue+mysql
    介绍我开发了一个基于Web的酒店客房管理系统,旨在通过现代化的管理工具提升酒店客房管理的效率和用户体验。该系统分为管理员角色和用户角色,管理员负责管理用户、客房信息、预约、入住、退房等内容,用户则可以查询客房信息、进行预约、管理入住和退房等操作。系统通过丰富的功能......
  • ADS基础教程23 - 有限元电磁仿真(FEM)可视化操作
    EM介绍一、引言二、FEM可视化操作流程1.打开可视化界面2.查看介质的网格3.设置网格颜色4.选择网格5.传感器选择6.编辑传感器7.选择频率8.动画三、总结一、引言在ADS基础教程22中介绍了如何在ADS进行有限元电磁仿真(FEM),本文将继续介绍FEM的可视化操作。二、FEM可视......
  • 【毕设源码】基于Springboot的加油站管理小程序
    本项目分为两个角色,分别是管理员和用户,其中管理员使用的是web管理后台,用户为微信小程序源码获取请私信技术部分:前端:1.管理员为html+css+js2.用户是微信小程序原生开发,使用微信开发者工具,wxml+wxss+js3.后端:Springboot(如需PHP版本请私信)4.数据库:MySql管理员:1.登......
  • 基于springboot的环保网站管理系统,环保管理系统,附源码+数据库+论文,包远程安装调试
    1、项目介绍现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本环保网站就是在这样的大环境下诞生,其可以帮助管理者在短时间内处理完毕庞大的数据信息,使用这种软件工具可以帮助管理人员提高事务处理效率,达......
  • 基于springboot的古树名木保护管理系统
    博主介绍:java高级开发,从事互联网行业六年,熟悉各种主流语言,精通java、python、php、爬虫、web开发,已经做了多年的设计程序开发,开发过上千套设计程序,没有什么华丽的语言,只有实实在在的写点程序。......
  • springboot+vue项目如何集成企业微信
    本文以springboot+vue技术开发的低代码平台为案例,介绍应用系统如何集成企业微信,包括同步企业微信组织用户、单点登录、消息发送等。在线体验:http://www.yunchengxc.com1、准备应用1.1、注册企业微信账号作为企业微信的企业管理员,首先登录企业微信官网,注册一个企业微信账号。......
  • MQ测试方法(RocketMQ 4.X)
    官网了解rocketmq背景我们知道一般消息中间件的基础消费模型如下,生产者产生一类主题消息,而消费者就消费一类主题消息。 Rocket也是采用该模型,并进行了扩展,实现了多人发不同的topic且多人消费的场景。 上面还能看出,一个Topic下有多个队列,可以在不同Broker上。再结合一下部......
  • SpringBoot项目配置文件加密
    前言防止配置文件敏感信息泄露,去年公司出现过类似事件,也防止源码泄露,对项目中的配置文件进行加密引入方式pom文件引入以下依赖<dependency><groupId>com.github.ulisesbocchio</groupId><artifactId>jasypt-spring-boot-starter</artifactId><version>3.0.3</v......
  • springboot校企对接实习管理系统 毕业设计-附源码11959
    摘 要校企合作实习是一种重要的实践教学模式,但是在实际的推行过程中,存在许多管理问题。其中包括远程指导困难、学生管理困难、校企信息沟通不畅等问题一直困扰着校方负责管理实习的教师们。随着互联网系统开发技术的发展,应用web技术开发B/s模式的实习管理系统,根据用户需......
  • springboot使用itextpdf+jfreechart制作PDF文档
    1.springboot引入的依赖组件项目中需要引入itextpdf和jfreechart两个组件,版本根据项目所需进行引入,maven组件版本查询可根据如下地址进行查询:maven组件查询<dependency><groupId>com.itextpdf</groupId><artifactId>itextpdf</artifactId><vers......