在RocketMQ中,要实现消息的顺序消费,你需要确保以下几点:
- 发送消息时,相同业务顺序的消息应该发送到同一个队列(MessageQueue)。
- 消费者在消费时,应该使用顺序消费的方式。
下面是一个使用Spring Boot和RocketMQ实现消息顺序消费的例子。
- 添加依赖 (
pom.xml
):
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
</dependencies>
- 配置RocketMQ (
application.yml
):
rocketmq:
name-server: 127.0.0.1:9876 # 替换为你的RocketMQ NameServer地址
producer:
group: ordered-producer-group
consumer:
group: ordered-consumer-group
subscribe:
- topic: ordered-topic
selectorExpression: "*"
- 创建消息生产者 (
OrderedProducerService.java
):
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class OrderedProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult send(String topic, String message, int orderId) {
// 使用orderId作为队列选择器的key
return rocketMQTemplate.syncSendOrderly(topic, MessageBuilder.withPayload(message).build(), String.valueOf(orderId));
}
}
- 创建消息消费者 (
OrderedConsumerService.java
):
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(
topic = "ordered-topic",
consumerGroup = "ordered-consumer-group",
selectorType = RocketMQMessageListener.SelectorType.TAG,
selectorExpression = "*",
consumeMode = ConsumeMode.ORDERLY // 设置为顺序消费
)
public class OrderedConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("Received ordered message: %s%n", message);
}
}
- 创建Spring Boot应用程序 (
OrderedDemoApplication.java
):
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class OrderedDemoApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(OrderedDemoApplication.class, args);
OrderedProducerService producerService = context.getBean(OrderedProducerService.class);
// 发送几条顺序消息,orderId相同的消息会被顺序消费
for (int i = 0; i < 5; i++) {
SendResult sendResult = producerService.send("ordered-topic", "Hello, RocketMQ! " + i, 100); // orderId为100
System.out.printf("Send ordered message: %s%n", sendResult);
}
}
}
在这个例子中,我们使用了syncSendOrderly
方法来发送顺序消息,并且为了保证消息顺序,我们传入了一个orderId
作为队列选择器的key。在消费端,我们通过@RocketMQMessageListener
注解设置consumeMode
为ORDERLY
来实现顺序消费。
运行上述Spring Boot应用程序,它将发送几条消息到ordered-topic
,并由OrderedConsumerService
以顺序方式接收并打印出来。由于所有消息都使用相同的orderId
,所以它们会被发送到同一个MessageQueue,从而保证了消费的顺序性。
请注意,为了确保消息的顺序性,同一业务的消息应该
标签:顺序,ordered,--,demo,springframework,org,import,rocketmq From: https://www.cnblogs.com/xylfjk/p/17983267