首页 > 其他分享 >SpringBoot集成RocketMQ

SpringBoot集成RocketMQ

时间:2024-07-25 20:56:32浏览次数:12  
标签:集成 SpringBoot 队列 重试 死信 消息 msg public RocketMQ

RocketMQ的延迟消息

概述

举例说明延迟:PDD的拼单,大家在pdd拼单购买商品时,下单后,会有一个拼单倒计时,如果在倒计时结束的时候,还未拼单成功,那么系统就会取消到这个订单。

技术实现:只需在消息生产者代码中添加一句: message.setDelayTimeLevel(3);//设置延迟的等级,即消费者接收到消息不会马上消费,而是要等待一段时间

使用场景

需要定时任务的业务场景

延迟时间

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h(设置为4,就代表延迟30s)

延时消息流程

1.首先都会将其写入到CommitLog中

2.根据消息中的Topic信息和队列信息,将其转发到目标Topic的指定队列(ConsumeQueue)中

3.分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发

4.修改消息Topic的名字为SCHEDULE_TOPIC_XXXX

5.根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件

6.修改消息索引单元,计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中,投递时间 = 消息存储时间 + 延时等级时间 

7.将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中

8.Broker内部有⼀个延迟消息服务类ScheuleMessageService,根据延迟级别数,创建对应数量的定时器Timer,定时消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中

9.在将消息到期后,队列的Level等级改为0,作为一条普通消息,投递到目标Topic

RocketMQ的事务消息

概述

如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交或回滚,这种夸多个数据库的事务操作叫分布式事务。

事务消息流程

1.MQ的发送方,首先向MQ Server发送一个半消息,也叫做half消息,half消息时MQServer拿到了,但却不能够被马上消费的

2.如果half消息发送成功,则执行本地事务,将这个消息写入到本地数据库;否则则回滚

3.本地事务执行成功之后,则给MQserver发送一个commit,表示此前发送的half消息可以被消费了

批量消息

概述

批量发送消息能显著提高传递消息的性能。限制是这些批量消息应该有相同的topic,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB

超出处理方法

将消息进行切割成多个小于4M的内容进行发送

修改4M的限制改成更大

消息过滤

消息过滤包括 tags过滤法sql过滤,消费者在消费消息的时候可以通过:Consumer.subscribe(topic,tags) 来指定要消费的消息,如果订阅多个Tag的消息,Tag间使用或运算符(双竖线||)连接。或者使用“*”来消费某Topic主题下的所有tags消息

消息重试

消息重试特点

1.对于同步和异步消息支持消息重试,对于oneway单向消息不支持重试 

2.普通消息具有消息重试,顺序消息不支持消息重试

3.消息重试可能会造成消息重复,所以消费者一定要做好幂等处理

4.消息发送失败有三种情况:同步发送失败、异步发送失败、消息刷盘失败 

发送策略失败--生产者

同步发送失败策略

对于普通消息,消息发送默认采用round-robin策略来选择所发送到的队列。如果发送失败,默认重试2 次。但在重试时是不会选择上次发送失败的Broker,而是选择其它Broker。当然,若只有一个Broker其 也只能发送到该Broker,但其会尽量发送到该Broker上的其它Queue

异步发送失败策略

异步发送失败重试时,异步重试不会选择其他broker,仅在同一个broker上做重试,所以该策略无法保 证消息不丢

消息刷盘失败策略

消息刷盘超时(Master或Slave)或slave不可用(slave在做数据同步时向master返回状态不是 SEND_OK)时,默认是不会将消息尝试发送到其他Broker的。不过,对于重要消息可以通过在Broker 的配置文件设置retryAnotherBrokerWhenNotStoreOK属性为true来开启。

发送策略失败--消费者

顺序消息重试

对于顺序消息消费失败默认会进行每隔1000毫秒进行重试,由于要保证消息是顺序消费,所以重试会导致后面的消息阻塞。可以通过下面的设置来修改重试间隔时间:consumer.setSuspendCurrentQueueTimeMillis(100); 

无顺消息重试

对于无序消息(普通消息、延时消息、事务消息),当Consumer消费消息失败时,可以通过设置返回 状态达到消息重试的效果。不过需要注意,无序消息的重试只对集群消费方式生效,广播消费方式不提供失败重试特性。即对于广播消费,消费失败后,失败消息不再重试,继续消费后续消息

重试时间间隔

对于无序消息集群消费下的重试消费,每条消息默认最多重试16次,但每次重试的间隔时间是不同的,会逐渐变长。每次重试的间隔时间如: 1s 5s 10s ...2h ,如果16次都重试失败,消息进入死信队列:consumer.setMaxReconsumeTimes(10); 

重试队列

对于需要重试消费的消息,并不是Consumer在等待了指定时长后再次去拉取原来的消息进行消费,而 是将这些需要重试消费的消息放入到了一个特殊Topic的队列中,而后进行再次消费的。这个特殊的队列就是重试队列

死信队列

概述

消息多次消费失败,达到最大重试次数,消息不会被丢弃而是进入死信队列(Dead-Letter Queue,DLQ),死信队列中的消息被称为死信消息(Dead-Letter Message,DLM)。

特点

1.死信队列中的消息无法再消费,死信队列对应Topic的权限为2,只有写权限,所以死信队列没有办法读取

2.3天之后死信队列分钟的消息被删除,和普通消息一样

3.死信队列就是一个特殊的Topic,名称为%DLQ%consumerGroup@consumerGroup,其中每个队列都是死信队列

4.如果⼀个消费者组未产生死信消息,则不会为其创建相应的死信队列

SpringBoot集成RocketMQ

Pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>RocketMQ-SpringBoot</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>RocketMQ-SpringBoot</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.5.RELEASE</version>
  </parent>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-spring-boot-starter</artifactId>
      <version>2.0.4</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
  </dependencies>
</project>

Controller

@Controller
public class RocketMQController {
    @Autowired
    private ServiceProducer serviceProducer;

    @RequestMapping("/send/{msg}")
    @ResponseBody
    public String send(@PathVariable("msg") String msg)  {
        serviceProducer.SendSync(msg);
        //formats: `topicName:tags`
        return "SUCCESS";
    }
    @RequestMapping("/sendLater/{msg}")
    @ResponseBody
    public String send2(@PathVariable("msg") String msg)  {
        serviceProducer.SendMsgLater(msg);
        //formats: `topicName:tags`
        return "SUCCESS";
    }
}

Listener

@Component
@RocketMQTransactionListener(
        txProducerGroup = "tx_order"
)
public class MyMQlistener implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println(arg);
        System.out.println(msg);
        return RocketMQLocalTransactionState.COMMIT;
    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        return RocketMQLocalTransactionState.COMMIT;
    }
}

Producer

public interface ServiceProducer {
    //消息发送者
    //发送同步请求
    void SendSync(String msg);
    //发送异步请求
    void SendAsync(String msg);
    //发送延迟消息
    void SendMsg();
    //发送延迟消息
    void SendMsgLater(String msg);
}

 

@Service
public class ServiceProducerImpl implements ServiceProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Override
    public void SendSync(String msg) {
        Message<String> message= MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.syncSend("order_topic:add",message);
    }
    @Override
    public void SendAsync(String msg) {
        Message<String> message=MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.asyncSend("order_topic:update", message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }
            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
            }
        });
    }

    @Override
    public void SendMsg() {

    }

    @Override
    public void SendMsgLater(String msg) {
        Message<String> message = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.sendMessageInTransaction(
                "tx_order",
                "order_topic:delete",
                message,
                "9527"
        );
    }
}

Consumer

@Component
@RocketMQMessageListener(
        consumerGroup = "order_consumer",
        topic = "order_topic",
        selectorExpression="*",
        messageModel= MessageModel.BROADCASTING
)
public class consumer implements RocketMQListener<Message> {
    @Override
    public void onMessage(Message message) {
        String str=new String(message.getBody());
        System.out.println("当前接收到的消息是"+str);
    }
}

启动类

@SpringBootApplication
public class Start {
    public static void main(String[] args) {
        SpringApplication.run(Start.class);
    }
}

写在最后:RocketMQ是一款支持高并发大规模的消息中间件,在实际系统开发过程中运用的非常多。本篇文章介绍了SpringBoot如何集成RocketMQ,希望能够给大家带来帮助。笔者小,中,大厂均有面试经验,目前正在从事全栈开发工作,坚持每日分享java全栈开发知识与相关的面试真题,希望能够给大家带来帮助,同大家共同进步。

标签:集成,SpringBoot,队列,重试,死信,消息,msg,public,RocketMQ
From: https://blog.csdn.net/qq_56438516/article/details/140698288

相关文章

  • 3、集成Mybatis框架、JDBC、mybatis-plus
    简化代码加入lombok依赖<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency> 1、pom.xml添加jdbc、mysql驱动、mybatis框架、mybatis-plus<dependency><groupId>org.sprin......
  • SpringBoot 日志
    目录一、日志概述二、日志使用2.1打印日志2.2日志格式2.3日志级别2.4日志配置2.4.1日志级别2.4.2日志持久化2.4.3日志文件分割2.4.4配置日志格式三、更简单的日志输出3.1添加lombok依赖3.2输出日志一、日志概述在SpringBoot项目启动后,项目本身就......
  • SpringBoot 依赖之Java Mail Sender邮件发送
    JavaMailSender依赖名称:JavaMailSender功能描述:SendemailusingJavaMailandSpringFramework’sJavaMailSender.使用JavaMail和SpringFramework的JavaMailSender发送电子邮件。<dependency> <groupId>org.springframework.boot</groupId> <artifa......
  • SpringBoot 结合官网对MQTT消息队列整合记录
    SpringBoot结合官网对MQTT消息队列整合首先是mavenPom的引入MqttClient<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.......
  • SpringBootApplication入口调用service类方法
    要在publicstaticvoidmain(String[]args)中调用Service的方法,需要在Application类中手动获取Spring容器,并从中获取Service的实例。示例如下:启动入口程序@SpringBootApplicationpublicclassRouteApplication{publicstaticvoidmain(String[]args){......
  • SpringBoot3.x整合Druid数据库连接池
    引入依赖<!--Druid数据库连接池--><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-3-starter</artifactId><version>1.2.21</version></dependency......
  • springboot+vue前后端分离项目:导出功能报错Request processing failed: cn.hutool.cor
    1.报错截图: 2.hutool官网,推荐引入poi-ooxml依赖 3.mvn仓库找到依赖 4.用最新版依赖 5.复制到本项目pom.xml,刷新maven 解决......
  • springboot自学(2)Bean的加载控制
    Bean的加载控制Bean的加载控制指根据特定情况对bean进行选择性加载以达到适用于项目的目标。那么哪些加载方式适用于编程的形式加载控制呢    控制的演示  注解形式控制bean加载使用@conditional注解的派生注解设置各种组合条件控制bean的加载 有诸多......
  • 【YashanDB数据库】yasdb jdbc驱动集成druid连接池,业务(java)日志中有token IDENTIFIE
    问题现象客户的java日志中有如下异常信息:问题的风险及影响对正常的业务流程无影响,但是影响druid的mergesql功能(此功能会将sql语句中的字面量替换为绑定变量,然后将替换以后的sql视为同一个,然后用做执行性能统计)问题影响的版本与yashandb版本无关问题发生原因druid源码中在......
  • UTS插件集成步骤
    如需定制或咨询插件,请联系作者QQ252797991插件网页“试用”按钮点击插件网页右上角“试用”按钮,接着选择要试用插件的项目AppId,接着点击“导入到HBuilderX”,自动打开HBuilderX,选择项目后会自动下载插件到modules文件夹下代码里先引入变量,避免自定义基座时没有把插件打......