一个Topic可以被多个Group订阅的,每个group下只会被一个consumer消费
consumer+GroupId消费到那个位点,是记录在客户端,还是记录在Rocket服务器
一个 Group ID 代表一个 Consumer 实例群组。同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。
否则您的消息可能会丢失。点击这里了解更多内容。
消息队列RocketMQ版是基于发布或订阅模型的消息系统。消费者,即消息的订阅方订阅关注的Topic,以获取并消费消息。由于消费者应用一般是分布式系统,以集群方式部署,因此消息队列RocketMQ版约定以下概念:
什么是集群?使用相同Group ID的消费者属于同一个集群。同一个集群下的消费者消费逻辑必须完全一致(包括Tag的使用)。更多信息,请参见订阅关系一致。
集群消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被集群内的任意一个消费者处理即可。
广播消费:当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。
集群消费模式
适用场景
适用于消费端集群化部署,每条消息只需要被处理一次的场景。此外,由于消费进度在服务端维护,可靠性更高。具体消费示例如下图所示。
注意事项
集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上。
广播消费模式
适用场景
适用于消费端集群化部署,每条消息需要被集群下的每个消费者处理的场景。具体消费示例如下图所示。
注意事项
广播消费模式下不支持顺序消息。
广播消费模式下不支持重置消费位点。
每条消息都需要被相同订阅逻辑的多台机器处理。
消费进度在客户端维护,出现重复消费的概率稍大于集群模式。
广播模式下,消息队列RocketMQ版保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。
广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
广播模式下服务端不维护消费进度,所以消息队列RocketMQ版控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
https://help.aliyun.com/document_detail/43163.html?spm=5176.rocketmq.0.0.1492176fAeCrhI
不同的消费模式适用于不同的场景。当使用集群消费模式时,消息队列 RocketMQ 版认为任意一条消息只需要被集群内的任意一个消费者处理即可。
当使用广播消费模式时,消息队列 RocketMQ 版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。
consumer消费到那个位点,是记录在客户端,还是记录在Rocket服务器?记录在服务器端。
一个GroupId的消费位点会在服务端保存多少时间呢?
往Topic上发三个消息:
2021-9-20 12:00:00 第一条
2021-9-20 13:00:00 第二条
2021-9-20 14:00:00 第三条
有一个consumer是 2021-9-20 13:10:00注册上来的,是否能收到 第一条和第二条
有一个consumer是 2021-9-20 13:10:00注册上来的,是否能收到 第一条和第二条
如果这个consumer是第一次注册上来,收不到第一条和第二条。
如果这个consumer是在2021-9-20 10:00:00 注册上来,但在2021-9-20 11:00:00点下线。
在2021-9-20 13:10:00换了一台机器运行,并注册上来,能不能收到 第一条和第二条呢?
重置消费位点
以时间轴为坐标,在消息持久化存储的时间范围内(默认3天),重新设置Consumer对已订阅的Topic的消费进度,设置完成后Consumer将接收设定时间点之后由Producer发送到消息队列RocketMQ版服务端的消息。
更多信息,请参见重置消费位点。
消息过滤
Consumer可以根据消息标签(Tag)对消息进行过滤,确保Consumer最终只接收被过滤后的消息类型。消息过滤在消息队列RocketMQ版的服务端完成。更多信息,请参见消息过滤。
Tag过滤
Tag,即消息标签,用于对某个Topic下的消息进行分类。消息队列RocketMQ版的生产者在发送消息时,指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。
场景示例
以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,以以下消息为例:
订单消息
支付消息
物流消息
这些消息会发送到Trade_TopicTopic中,被各个不同的系统所订阅,以以下系统为例:
支付系统:只需订阅支付消息。
物流系统:只需订阅物流消息。
交易成功率分析系统:需订阅订单和支付消息。
实时计算系统:需要订阅所有和交易相关的消息。
过滤示意图如下所示。
配置方式
消息队列RocketMQ版支持通过SDK配置Tag过滤功能,分别在消息发送和订阅代码中设置消息Tag和订阅消息Tag。SDK详细信息,请参见SDK参考概述。消息发送端和消费端的代码配置方法如下:
发送消息
发送消息时,每条消息必须指明Tag。
Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());
订阅所有Tag
消费者如需订阅某Topic下所有类型的消息,Tag用星号(*)表示。
consumer.subscribe("MQ_TOPIC", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
订阅单个Tag
消费者如需订阅某Topic下某一种类型的消息,请明确标明Tag。
consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
订阅多个Tag
消费者如需订阅某Topic下多种类型的消息,请在多个Tag之间用两个竖线(||)分隔。
consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
错误示例
同一个消费者多次订阅某个Topic下的Tag,以最后一次订阅的Tag为准。
//如下错误代码中,Consumer只能订阅到MQ_TOPIC下TagB的消息,而不能订阅TagA的消息。
consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
SQL属性过滤
SQL属性过滤是在消息发送时设置消息的自定义属性,消费者订阅时使用SQL语法设置过滤表达式,根据自定义属性过滤消息,消息队列RocketMQ版根据表法式的逻辑进行计算,将符合条件的消息投递到消费端。
说明 Tag属于一种特殊的消息属性,所以SQL过滤方式也兼容Tag过滤方法,支持通过Tag属性过滤消息。在SQL语法中,Tag的属性值为TAGS。
使用限制
使用SQL属性过滤消息时,有以下限制:
只有企业铂金版实例支持SQL属性过滤,标准版实例不支持该功能。
只有TCP协议的客户端支持SQL属性过滤,HTTP协议的客户端不支持该功能。
若服务端不支持SQL过滤时,客户端使用SQL过滤消息,则客户端启动会报错或收不到消息。
配置方式
消息队列RocketMQ版支持通过SDK配置SQL属性过滤。发送端需要在发送消息的代码中设置消息的自定义属性;消费端需要在订阅消息代码中设置SQL语法的过滤表达式。SDK详细信息,请参见SDK参考概述。消息发送端和消费端的代码配置方法如下:
消息发送端:
设置消息的自定义属性。
Message msg = new Message("topic", "tagA", "Hello MQ".getBytes());
// 设置自定义属性A,属性值为1。
msg.putUserProperties("A", "1");
消息消费端
使用SQL语法设置过滤表达式,并根据自定义属性过滤消息。
注意 使用属性时,需要先判断属性是否存在。若属性不存在则过滤表达式的计算结果为NULL,消息不会被投递到消费端。
// 订阅自定义属性A存在且属性值为1的消息。
consumer.subscribe("topic", MessageSelector.bySql("A IS NOT NULL AND TAGS IS NOT NULL AND A = '1'"), new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.printf("Receive New Messages: %s %n", message);
return Action.CommitMessage;
}
});
示例代码
发送消息
同时设置消息Tag和自定义属性。
Producer producer = ONSFactory.createProducer(properties);
// 设置Tag的值为tagA。
Message msg = new Message("topicA", "tagA", "Hello MQ".getBytes());
// 设置自定义属性region为hangzhou。
msg.putUserProperties("region", "hangzhou");
// 设置自定义属性price为50。
msg.putUserProperties("price", "50");
SendResult sendResult = producer.send(msg);
根据单个自定义属性订阅消息。
Consumer consumer = ONSFactory.createConsumer(properties);
// 只订阅属性region为hangzhou的消息,若消息中未定义属性region或属性值不是hangzhou,则消息不会被投递到消费端。
consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND region = 'hangzhou'"), new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.printf("Receive New Messages: %s %n", message);
return Action.CommitMessage;
}
});
预期结果:示例中发送的消息属性符合订阅的过滤条件,消息被投递到消费端。
同时根据Tag和自定义属性订阅消息。
Consumer consumer = ONSFactory.createConsumer(properties);
// 只订阅Tag的值为tagA且属性price大于30的消息。
consumer.subscribe("topicA", MessageSelector.bySql("TAGS IS NOT NULL AND price IS NOT NULL AND TAGS = 'tagA' AND price > 30 "), new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.printf("Receive New Messages: %s %n", message);
return Action.CommitMessage;
}
});
- 同一个Group ID下的消费者实例与Topic的订阅关系需保持一致,更多信息,请参见订阅关系一致。
- 合理使用Topic和Tag来过滤消息可以让业务更清晰,更多信息,请参见Topic与Tag最佳实践。
同一个Group ID下的消费者实例与Topic的订阅关系要一致。
每个消费者都需要一个groupId,那么groupId相同的消费者,订阅的 Topic和Tag都要相同。
GroupId相同的consumer有以下几种场景:
1、
正确订阅关系一:订阅一个Topic且订阅一个Tag
如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA,且订阅TopicA的Tag也都是Tag1,符合订阅关系一致原则。
代码必须完全一致【应用水平扩展场景】,代码示例如下:
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicA", "Tag1", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
2、
正确订阅关系二:订阅一个Topic且订阅多个Tag
如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicB,订阅TopicB的Tag也都是Tag1和Tag2,表示订阅TopicB中所有Tag为Tag1或Tag2的消息,且顺序一致都是Tag1||Tag2,符合订阅关系一致性原则。
代码必须完全一致【应用水平扩展场景】,代码示例如下:
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
3、
正确订阅关系三:订阅多个Topic且订阅多个Tag
如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA和TopicB,且订阅的TopicA都未指定Tag,即订阅TopicA中的所有消息,订阅的TopicB的Tag都是Tag1和Tag2,表示订阅TopicB中所有Tag为Tag1或Tag2的消息,且顺序一致都是Tag1||Tag2,符合订阅关系一致原则。
代码必须完全一致【应用水平扩展场景】,代码示例如下:
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicA", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
针对耗时较长的消费端,可以按如下方式配置一个合适的时间时间
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, "ak");
properties.setProperty(PropertyKeyConst.SecretKey, "sk");
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, "TcpSrvAddr");
properties.setProperty(PropertyKeyConst.GROUP_ID, "test.consumeTimeout");
/**
* 将消费者线程数固定为1个 20为默认值
*/
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "100");
/**
* 设置每条消息消费的最大超时时间,超过设置时间则被视为消费失败,等下次重新投递再次消费。每个业务需要设置一个合理的值,默认值:15,单位:分钟。
*/
properties.setProperty(PropertyKeyConst.ConsumeTimeout, "120");//超时时间设置为2小时
ConsumerBean consumerBean = new ConsumerBean();
consumerBean.setProperties(properties);
当然也可以设置消息实例的消费模式
最早发送时间是不是三天的,超过三天的轨迹在Rocket控制台就查不到了
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ProducerBean rocketProducer() {
ProducerBean producer = new ProducerBean();
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
/**
* 发送方不需要写指定groupId。如果指定,这个topic消费的情况也会异常在发送方的控制台上,这样不能一目了然了解本groupId的消费情况
*/
// properties.setProperty(PropertyKeyConst.GROUP_ID, this.groupId);
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
producer.setProperties(properties);
return producer;
}