首页 > 其他分享 >RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及事务消息收发、最大重试消费实战

RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及事务消息收发、最大重试消费实战

时间:2024-01-05 23:02:14浏览次数:28  
标签:producer SpringCloud public 重试 tag msg new order RocketMQ


欢迎关注公众号:【11来了】 发送 “资料” 可以下载Redis、JVM系列文章PDF版本!

作者为在读研究生,目前研二,计划在公众号记录学习常用中间件笔记,以及明年更新面试经历!

事务消息收发

流程如下:

  1. 发送给 MQ 一条任务操作
  2. MQ 的 Broker 成功收到后,那么发送方就开始执行原子 db 业务
  3. 如果执行原子 db 业务失败,并没有将执行成功状态同步给 Broker
  4. 那么 Broker 会去检查 db 事务是否成功,最后要么事务提交,可以被生产者消费,要么事务回滚,生产者无法消费

事务消息收发流程图如下:

RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及事务消息收发、最大重试消费实战_System


事务消息收发消费者如下:

public class TransactionConsumer {

    public static void main(String[] args) throws Exception {
        // 1、创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TransactionConsumer");

        // 2、为消费者对象设置 NameServer 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 3、订阅主题
        consumer.subscribe("Transaction-Test-Topic", "*");

        // 4、注册监听消息,并打印消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    String printMsg = new String(msg.getBody()) + ", recvTime: "
                            + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
                    System.out.println(printMsg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 5、把消费者直接启动起来
        consumer.start();
        System.out.println("Consumer Started Finished.");
    }
}


这里模拟事务成功执行的生产者,执行该生产者之后,消费者可以收到消息并消费:

public class TransactionProducer {

    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer(
                "transaction_producer_group");
        producer.setNamesrvAddr("127.0.0.1:9876");

        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                /**
                 * 这里执行本地事务,如果本地事务执行成功,就返回成功
                 * 如果本地事务失败,就返回失败
                 */
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 触发事务的检查,提供给到生产者一个检查事务是否成功提交的机会
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        producer.start();

        List<Order> list = new ArrayList<>();
        for (int i = 0; i < 12; i ++) {
            Order order = new Order();
            order.orderId = i;
            order.desc = "desc:" + i;
            order.tag = "tag" + i % 3;
            list.add(order);
        }
        for (Order order : list) {
            Message msg = new Message(
                    "Transaction-Test-Topic",
                    order.tag,
                    (order.toString()).getBytes());
            msg.setKeys("Transaction_Tag");
            msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));

            // 直接将 msg 发送出去
            producer.sendMessageInTransaction(msg, null);
        }
        System.out.println("Send Finished.");
    }

    public static class Order {
        int orderId;
        String desc;
        String tag;

        @Override
        public String toString() {
            return "orderId="+orderId+", desc="+desc+", tag="+tag;
        }
    }
}


这里模拟事务执行失败的生产者,执行该生产者之后,消费者不会收到消息:

public class TransactionProducerFail {

    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer(
                "transaction_producer_group_fail");
        producer.setNamesrvAddr("127.0.0.1:9876");

        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                /**
                 * 这里执行本地事务,如果本地事务执行成功,就返回成功
                 * 如果本地事务失败,就返回失败
                 */
                return LocalTransactionState.UNKNOW;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 触发事务的检查,提供给到生产者一个检查事务是否成功提交的机会
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        });

        producer.start();

        List<TransactionProducer.Order> list = new ArrayList<>();
        for (int i = 0; i < 12; i ++) {
            TransactionProducer.Order order = new TransactionProducer.Order();
            order.orderId = i;
            order.desc = "desc:" + i;
            order.tag = "tag" + i % 3;
            list.add(order);
        }
        for (TransactionProducer.Order order : list) {
            Message msg = new Message(
                    "Transaction-Test-Topic",
                    order.tag,
                    (order.toString()).getBytes());
            msg.setKeys("Transaction_Tag");
            msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));

            // 直接将 msg 发送出去
            producer.sendMessageInTransaction(msg, null);
        }
        System.out.println("Send Finished.");
    }

    public static class Order {
        int orderId;
        String desc;
        String tag;

        @Override
        public String toString() {
            return "orderId="+orderId+", desc="+desc+", tag="+tag;
        }
    }
}



最大重试消费

重试分为两种:生产者重试、消费者重试

生产者重试设置:

  • 生产者配置重试次数
// 同步
producer.setRetryTimesWhenSendFailed(3)
// 异步
producer.setRetryTimesWhenSendAsyncFailed(3);
// 如果发送失败,是否尝试发送到其他 Broker 节点
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
  • 生产者设置重试的策略
producer.addRetryResponseCode(ResponseCode.FLUSH_DISK_TIMEOUT);

消费者重试设置:

  • 消费者有序消费时,如果消费失败,返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 即可
  • 消费者并发消费时,如果消费失败,返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 即可

生产者代码如下(消费者代码就不贴了,只需要消费时返回需要重试的状态码即可):

public class RetryProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(
                "producer_group",
                true);
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 设置一些重试的策略
        producer.addRetryResponseCode(ResponseCode.FLUSH_DISK_TIMEOUT);
        // 设置发送失败最大重试次数
        producer.setRetryTimesWhenSendFailed(3);
        producer.setRetryTimesWhenSendAsyncFailed(3);
        producer.start();

        List<Order> list = new ArrayList<>();
        for (int i = 0; i < 12; i ++) {
            Order order = new Order();
            order.orderId = i;
            order.desc = "desc:" + i;
            order.tag = "tag" + i % 3;
            list.add(order);
        }
        for (Order order : list) {
            Message msg = new Message(
                    "Filter-Test-Topic",
                    order.tag,
                    (order.toString()).getBytes());
            msg.setKeys("Filter_Tag");
            msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));

            // 直接将 msg 发送出去
            producer.send(msg);
        }
        System.out.println("Send Finished.");
    }

    public static class Order {
        int orderId;
        String desc;
        String tag;

        @Override
        public String toString() {
            return "orderId="+orderId+", desc="+desc+", tag="+tag;
        }
    }
}


标签:producer,SpringCloud,public,重试,tag,msg,new,order,RocketMQ
From: https://blog.51cto.com/u_16186397/9120080

相关文章

  • SpringCloud微服务实战——搭建企业级开发框架(三十二):代码生成器使用配置说明
    一、新建数据源配置  因考虑到多数据源问题,代码生成器作为一个通用的模块,后续可能会为其他工程生成代码,所以,这里不直接读取系统工程配置的数据源,而是让用户自己维护。参数说明数据源名称:用于查找区分数据源的名称连接地址:连接方式:数据库类型:数据库地址等参数,例:jdbc:m......
  • 软件测试/测试开发全日制培训|Pytest跳过用例和失败重试
    前言在我们日常进行自动化测试时,经常会遇到功能阻塞,未实现或者环境有问题等等原因,一些用例执行不了,如果我们注释掉或删除掉这些测试用例,后面可能还要进行恢复操作,这个时候pytest的跳过测试功能就能帮助我们,先跳过,等到问题解决时,恢复执行即可;同时我们还有可能会遇到需要对失败用例重......
  • SpringCloud微服务实战——搭建企业级开发框架(三十一):自定义MybatisPlus代码生成器实现
      理想的情况下,代码生成可以节省很多重复且没有技术含量的工作量,并且代码生成可以按照统一的代码规范和格式来生成代码,给日常的代码开发提供很大的帮助。但是,代码生成也有其局限性,当牵涉到复杂的业务逻辑时,简单的代码生成功能无法解决。  目前市面上的代码生成器层出不穷,大多......
  • 一款神仙级SpringCloud微服务开源项目,接私活吊到不行!(附源码)
    今天给大家推荐一个牛逼的接私活项目,SpringCloud微服务架构项目!一个由商业级项目升级优化而来的微服务架构,采用SpringBoot2.7、SpringCloud等核心技术构建,提供基于React和Vue的两个前端框架用于快速搭建企业级的SaaS多租户微服务平台。架构图项目介绍采用前后端分离的模式......
  • RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消息追踪、延时消息实战
    欢迎关注公众号:【11来了】发送“资料”可领取深入理解Redis系列文章结合电商场景讲解Redis使用场景、中间件系列笔记和编程高频电子书!作者为在读研究生,目前研二,计划在公众号记录学习常用中间件笔记,以及明年更新面试经历!消息追踪设置消息追踪需要修改broker启动的配置文件......
  • 使用springcloud 实现 蓝绿发布、灰度发布(金丝雀发布)
    介绍工作中经常要涉及到功能发布,这个时候也经常是业务系统最有可能遇到问题的时候,需要要尽量减少发布引起的风险。比如在系统负载比较小的时候使用。还有蓝绿发布、灰度发布等等,今天介绍一下这几种常见的发布,并使用springcloud实现。1.传统发布方式一个系统最初的时候,使用量小,用户......
  • SpringBoot2 整合 SpringCloud Feign 实例
    文章目录1.简介2.工程实例2.1注册中心springcloud-study-eureka-server2.1.1依赖pom.xml2.1.2配置文件application.properties2.1.3启动类EurekaServerApplication.java2.2服务提供者springcloud-study-hello-service2.2.1依赖pom.xml2.2.2配置文件application.ym......
  • SpringBoot2 整合 SpringCloud 的 Hystrix断路器 实例
    文章目录1.概述2.短路器3.SpringFeign使用Hystrix断路器3.1工程实例3.2修改Fegin模块3.3测试运行参考文献1.概述微服务架构中服务之间互相调用,单个服务通常会集群部署,由于网络等原因,服务不能保证100%可用,如果单个服务出现问题会出现请求的堆积,Servlet容器的线程资源......
  • spring-retry 重试机制
    引用pom.xml<dependency><groupId>org.springframework.retry</groupId><artifactId>spring-retry</artifactId><version>1.3.4</version></dependency>注:本人测试使用jdk8例子:publicvoiddemo(Stringstr)......
  • RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消费收发实战
    欢迎关注公众号:【11来了】发送“资料”可以下载Redis、JVM系列文章PDF版本!作者为在读研究生,目前研二,计划在公众号记录学习常用中间件笔记,以及明年更新面试经历!SpringCloudAlibaba集成RocketMQ最佳实践SpringBoot相对于SSM来说已经很大程度上简化了开发,但是使用SpringBo......