RocketMQ 主要支持两种消息模型:集群消费(Clustering)和广播消费(Broadcasting)。
-
集群消费(Clustering):
- 在集群消费模式下,同一个消费者组(Consumer Group)中的消费者实例平均分摊消费消息,即一个消息只会被消费者组中的一个消费者消费一次。这种模式适用于负载均衡场景,可以提高消费的并行度。
-
广播消费(Broadcasting):
- 在广播消费模式下,同一个消费者组中的每个消费者实例都会收到发送到主题的所有消息的副本,即一个消息会被消费者组中的每个消费者都消费一次。这种模式适用于需要所有消费者都处理每条消息的场景。
下面分别给出基于Spring Boot的集群消费和广播消费的简单示例。
集群消费示例:
首先,在pom.xml
中添加RocketMQ的依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
配置application.properties
或application.yml
文件:
rocketmq:
name-server: 127.0.0.1:9876 # 修改为实际的RocketMQ NameServer地址
producer:
group: my-producer-group # 生产者组名
创建生产者:
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void send(String topic, String message) {
rocketMQTemplate.convertAndSend(topic, message);
}
}
创建集群消费者:
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-cluster-consumer-group", consumeMode = ConsumeMode.CONCURRENTLY)
public class ClusterConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Cluster Consumer received: " + message);
}
}
广播消费示例:
创建广播消费者:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-broadcast-consumer-group", consumeMode = ConsumeMode.CONCURRENTLY, messageModel = MessageModel.BROADCASTING)
public class BroadcastConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Broadcast Consumer received: " + message);
}
}
在以上代码中,我们定义了一个生产者服务ProducerService
和两个消费者服务ClusterConsumerService
和BroadcastConsumerService
。生产者服务负责发送消息到test-topic
。集群消费者服务ClusterConsumerService
使用了@RocketMQMessageListener
注解,并设置了consumerGroup
和consumeMode
属性来配置集群消费。广播消费者服务BroadcastConsumerService
也使用了@RocketMQMessageListener
注解,但是设置了messageModel
属性为MessageModel.BROADCASTING
来配置广播消费。
请注意,上述代码仅作为示例,实际开发时需要根据你的具体需求和RocketMQ的配置进行调整。此外,确保RocketMQ服务器正在运行且可以连接。在使用Spring Boot集成RocketMQ时,你还可以利用RocketMQTemplate
提供的其他高级功能来简化消息的发送和接收。