RocketMQ的延迟消息
概述
举例说明延迟:PDD的拼单,大家在pdd拼单购买商品时,下单后,会有一个拼单倒计时,如果在倒计时结束的时候,还未拼单成功,那么系统就会取消到这个订单。
技术实现:只需在消息生产者代码中添加一句: message.setDelayTimeLevel(3);//设置延迟的等级,即消费者接收到消息不会马上消费,而是要等待一段时间
使用场景
需要定时任务的业务场景
延迟时间
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h(设置为4,就代表延迟30s)
延时消息流程
1.首先都会将其写入到CommitLog中
2.根据消息中的Topic信息和队列信息,将其转发到目标Topic的指定队列(ConsumeQueue)中
3.分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发
4.修改消息Topic的名字为SCHEDULE_TOPIC_XXXX
5.根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件
6.修改消息索引单元,计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中,投递时间 = 消息存储时间 + 延时等级时间
7.将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中
8.Broker内部有⼀个延迟消息服务类ScheuleMessageService,根据延迟级别数,创建对应数量的定时器Timer,定时消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中
9.在将消息到期后,队列的Level等级改为0,作为一条普通消息,投递到目标Topic
RocketMQ的事务消息
概述
如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交或回滚,这种夸多个数据库的事务操作叫分布式事务。
事务消息流程
1.MQ的发送方,首先向MQ Server发送一个半消息,也叫做half消息,half消息时MQServer拿到了,但却不能够被马上消费的
2.如果half消息发送成功,则执行本地事务,将这个消息写入到本地数据库;否则则回滚
3.本地事务执行成功之后,则给MQserver发送一个commit,表示此前发送的half消息可以被消费了
批量消息
概述
批量发送消息能显著提高传递消息的性能。限制是这些批量消息应该有相同的topic,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB
超出处理方法
将消息进行切割成多个小于4M的内容进行发送
修改4M的限制改成更大
消息过滤
消息过滤包括 tags过滤法sql过滤,消费者在消费消息的时候可以通过:Consumer.subscribe(topic,tags) 来指定要消费的消息,如果订阅多个Tag的消息,Tag间使用或运算符(双竖线||)连接。或者使用“*”来消费某Topic主题下的所有tags消息
消息重试
消息重试特点
1.对于同步和异步消息支持消息重试,对于oneway单向消息不支持重试
2.普通消息具有消息重试,顺序消息不支持消息重试
3.消息重试可能会造成消息重复,所以消费者一定要做好幂等处理
4.消息发送失败有三种情况:同步发送失败、异步发送失败、消息刷盘失败
发送策略失败--生产者
同步发送失败策略
对于普通消息,消息发送默认采用round-robin策略来选择所发送到的队列。如果发送失败,默认重试2 次。但在重试时是不会选择上次发送失败的Broker,而是选择其它Broker。当然,若只有一个Broker其 也只能发送到该Broker,但其会尽量发送到该Broker上的其它Queue
异步发送失败策略
异步发送失败重试时,异步重试不会选择其他broker,仅在同一个broker上做重试,所以该策略无法保 证消息不丢
消息刷盘失败策略
消息刷盘超时(Master或Slave)或slave不可用(slave在做数据同步时向master返回状态不是 SEND_OK)时,默认是不会将消息尝试发送到其他Broker的。不过,对于重要消息可以通过在Broker 的配置文件设置retryAnotherBrokerWhenNotStoreOK属性为true来开启。
发送策略失败--消费者
顺序消息重试
对于顺序消息消费失败默认会进行每隔1000毫秒进行重试,由于要保证消息是顺序消费,所以重试会导致后面的消息阻塞。可以通过下面的设置来修改重试间隔时间:consumer.setSuspendCurrentQueueTimeMillis(100);
无顺消息重试
对于无序消息(普通消息、延时消息、事务消息),当Consumer消费消息失败时,可以通过设置返回 状态达到消息重试的效果。不过需要注意,无序消息的重试只对集群消费方式生效,广播消费方式不提供失败重试特性。即对于广播消费,消费失败后,失败消息不再重试,继续消费后续消息
重试时间间隔
对于无序消息集群消费下的重试消费,每条消息默认最多重试16次,但每次重试的间隔时间是不同的,会逐渐变长。每次重试的间隔时间如: 1s 5s 10s ...2h ,如果16次都重试失败,消息进入死信队列:consumer.setMaxReconsumeTimes(10);
重试队列
对于需要重试消费的消息,并不是Consumer在等待了指定时长后再次去拉取原来的消息进行消费,而 是将这些需要重试消费的消息放入到了一个特殊Topic的队列中,而后进行再次消费的。这个特殊的队列就是重试队列
死信队列
概述
消息多次消费失败,达到最大重试次数,消息不会被丢弃而是进入死信队列(Dead-Letter Queue,DLQ),死信队列中的消息被称为死信消息(Dead-Letter Message,DLM)。
特点
1.死信队列中的消息无法再消费,死信队列对应Topic的权限为2,只有写权限,所以死信队列没有办法读取
2.3天之后死信队列分钟的消息被删除,和普通消息一样
3.死信队列就是一个特殊的Topic,名称为%DLQ%consumerGroup@consumerGroup,其中每个队列都是死信队列
4.如果⼀个消费者组未产生死信消息,则不会为其创建相应的死信队列
SpringBoot集成RocketMQ
Pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>RocketMQ-SpringBoot</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>RocketMQ-SpringBoot</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
Controller
@Controller
public class RocketMQController {
@Autowired
private ServiceProducer serviceProducer;
@RequestMapping("/send/{msg}")
@ResponseBody
public String send(@PathVariable("msg") String msg) {
serviceProducer.SendSync(msg);
//formats: `topicName:tags`
return "SUCCESS";
}
@RequestMapping("/sendLater/{msg}")
@ResponseBody
public String send2(@PathVariable("msg") String msg) {
serviceProducer.SendMsgLater(msg);
//formats: `topicName:tags`
return "SUCCESS";
}
}
Listener
@Component
@RocketMQTransactionListener(
txProducerGroup = "tx_order"
)
public class MyMQlistener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println(arg);
System.out.println(msg);
return RocketMQLocalTransactionState.COMMIT;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
return RocketMQLocalTransactionState.COMMIT;
}
}
Producer
public interface ServiceProducer {
//消息发送者
//发送同步请求
void SendSync(String msg);
//发送异步请求
void SendAsync(String msg);
//发送延迟消息
void SendMsg();
//发送延迟消息
void SendMsgLater(String msg);
}
@Service
public class ServiceProducerImpl implements ServiceProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void SendSync(String msg) {
Message<String> message= MessageBuilder.withPayload(msg).build();
rocketMQTemplate.syncSend("order_topic:add",message);
}
@Override
public void SendAsync(String msg) {
Message<String> message=MessageBuilder.withPayload(msg).build();
rocketMQTemplate.asyncSend("order_topic:update", message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
}
@Override
public void SendMsg() {
}
@Override
public void SendMsgLater(String msg) {
Message<String> message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.sendMessageInTransaction(
"tx_order",
"order_topic:delete",
message,
"9527"
);
}
}
Consumer
@Component
@RocketMQMessageListener(
consumerGroup = "order_consumer",
topic = "order_topic",
selectorExpression="*",
messageModel= MessageModel.BROADCASTING
)
public class consumer implements RocketMQListener<Message> {
@Override
public void onMessage(Message message) {
String str=new String(message.getBody());
System.out.println("当前接收到的消息是"+str);
}
}
启动类
@SpringBootApplication
public class Start {
public static void main(String[] args) {
SpringApplication.run(Start.class);
}
}
写在最后:RocketMQ是一款支持高并发大规模的消息中间件,在实际系统开发过程中运用的非常多。本篇文章介绍了SpringBoot如何集成RocketMQ,希望能够给大家带来帮助。笔者小,中,大厂均有面试经验,目前正在从事全栈开发工作,坚持每日分享java全栈开发知识与相关的面试真题,希望能够给大家带来帮助,同大家共同进步。
标签:集成,SpringBoot,队列,重试,死信,消息,msg,public,RocketMQ From: https://blog.csdn.net/qq_56438516/article/details/140698288