Rocketmq消费模式
在 Apache RocketMQ 有两种消费模式,分别是:
- 集群消费模式:当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。(默认的消费模式)
- 广播消费模式:当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。
集群消费模式适用于每条消息只需要被处理一次的场景,也就是说整个消费组会Topic收到全量的消息,而消费组内的消费分担消费这些消息,因此可以通过扩缩消费者数量,来提升或降低消费能力,具体示例如下图所示,是最常见的消费方式。
广播消费模式适用于每条消息需要被消费组的每个消费者处理的场景,也就是说消费组内的每个消费者都会收到订阅Topic的全量消息,因此即使扩缩消费者数量也无法提升或降低消费能力,具体示例如下图所示。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>Rocketmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>Rocketmq</name>
<description>Rocketmq</description>
<properties>
<java.version>8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
集群模式java demo
生产者:
package com.example.rocketmq;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Service
public class Producer implements ApplicationRunner {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.producer.topic}")
private String smsTopic;
@Autowired
private ScheduledExecutorService scheduledExecutorService;
public SendResult send(String messageContent, String tags) {
String destination = StringUtils.isBlank(tags) ? smsTopic : smsTopic + ":" + tags;
SendResult sendResult =
rocketMQTemplate.syncSend(
destination,
MessageBuilder.withPayload(messageContent).
setHeader(MessageConst.PROPERTY_KEYS, "your_unique_key").
build()
);
if (sendResult != null) {
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
// send message success ,do something
System.out.println("send successfully!");
}
}
return sendResult;
}
private class ProductorTask implements Runnable{
private String name;
private Integer idx;
public ProductorTask(String name, Integer idx) {
this.name = name;
this.idx = idx;
}
@Override
public void run() {
send(name + ":" + String.valueOf(idx++), "v1");
}
}
@Override
public void run(ApplicationArguments args) throws Exception {
scheduledExecutorService.scheduleAtFixedRate(new ProductorTask("productor1", 1), 0, 1, TimeUnit.SECONDS);
}
}
消费者1:
package com.example.rocketmq;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
consumerGroup = "${rocketmq.consumer1.group}",
topic = "${rocketmq.consumer1.topic}"
)
public class Consumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(String s) {
System.out.println("Consumer1消费普通短信:" + s);
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setInstanceName("Consumer1");
}
}
消费者2:
package com.example.rocketmq;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @projectName: yunfei
* @package: com.example.rocketmq
* @className: Consumer2
* @author: Yunfei
* @description: TODO
* @date: 2024/3/26 10:14
* @version: 1.0
*/
@Component
@RocketMQMessageListener(
consumerGroup = "${rocketmq.consumer1.group}",
topic = "${rocketmq.consumer1.topic}"
)
public class Consumer2 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(String s) {
System.out.println("Consumer2消费普通短信:" + s);
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setInstanceName("Consumer2");
}
}
生产者定时任务线程池:
package com.example.rocketmq;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @projectName: yunfei
* @package: com.example.rocketmq
* @className: ThreadConfig
* @author: Yunfei
* @description: TODO
* @date: 2024/3/26 9:53
* @version: 1.0
*/
@Configuration
public class ThreadPoolConfig
{
// 核心线程池大小
private int corePoolSize = 50;
// 最大可创建的线程数
private int maxPoolSize = 200;
// 队列最大长度
private int queueCapacity = 1000;
// 线程池维护线程所允许的空闲时间
private int keepAliveSeconds = 300;
/**
* 执行周期性或定时任务
*/
@Bean(name = "scheduledExecutorService")
protected ScheduledExecutorService scheduledExecutorService()
{
return new ScheduledThreadPoolExecutor(corePoolSize,
new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build(),
new ThreadPoolExecutor.CallerRunsPolicy())
{
@Override
protected void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r, t);
if (t != null)
System.out.println(t.getMessage());
}
};
}
}
配置文件:
rocketmq:
name-server: xxxx:9876
producer:
group: platform-sms-server-group
sendMessageTimeout: 10000
topic: sms-common-topic
consumer1:
group: platform-sms-worker-common-group
topic: "${rocketmq.producer.topic}"
打印结果:
Consumer2消费普通短信:productor1:2
send successfully!
Consumer2消费普通短信:productor1:3
send successfully!
Consumer2消费普通短信:productor1:4
send successfully!
Consumer1消费普通短信:productor1:5
send successfully!
Consumer1消费普通短信:productor1:6
可以看到一条消息只被一个消费者消费。
广播消费模式demo
代码与集群模式大体保持不变,我们先在consumer1上增加注解内容messageModel = MessageModel.BROADCASTING,使其广播。
@Component
@RocketMQMessageListener(
consumerGroup = "${rocketmq.consumer1.group}",
topic = "${rocketmq.consumer1.topic}",
messageModel = MessageModel.BROADCASTING
)
public class Consumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(String s) {
System.out.println("Consumer1消费普通短信:" + s);
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setInstanceName("Consumer1");
}
}
打印结果:
Consumer1消费普通短信:productor1:2
Consumer2消费普通短信:productor1:2
send successfully!
Consumer1消费普通短信:productor1:3
send successfully!
Consumer2消费普通短信:productor1:4
Consumer1消费普通短信:productor1:4
send successfully!
Consumer2消费普通短信:productor1:5
Consumer1消费普通短信:productor1:5
send successfully!
Consumer1消费普通短信:productor1:6
send successfully!
Consumer2消费普通短信:productor1:7
Consumer1消费普通短信:productor1:7
send successfully!
Consumer1消费普通短信:productor1:8
send successfully!
Consumer2消费普通短信:productor1:9
Consumer1消费普通短信:productor1:9
send successfully!
Consumer1消费普通短信:productor1:10
可以看到Consumer2是没有消费完所有消息的,而Consumer1是消费了所有数据的。
现在给Consumer2也增加注解:
@Component
@RocketMQMessageListener(
consumerGroup = "${rocketmq.consumer1.group}",
topic = "${rocketmq.consumer1.topic}",
messageModel = MessageModel.BROADCASTING
)
public class Consumer2 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(String s) {
System.out.println("Consumer2消费普通短信:" + s);
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setInstanceName("Consumer2");
}
}
打印结果:
Consumer2消费普通短信:productor1:3
Consumer1消费普通短信:productor1:3
send successfully!
Consumer2消费普通短信:productor1:4
Consumer1消费普通短信:productor1:4
send successfully!
Consumer2消费普通短信:productor1:5
Consumer1消费普通短信:productor1:5
send successfully!
Consumer2消费普通短信:productor1:6
Consumer1消费普通短信:productor1:6
send successfully!
Consumer2消费普通短信:productor1:7
Consumer1消费普通短信:productor1:7
send successfully!
Consumer2消费普通短信:productor1:8
Consumer1消费普通短信:productor1:8
这样就正常了。
标签:消费,短信,productor1,模式,org,import,rocketmq From: https://blog.csdn.net/weixin_43145941/article/details/137039567