一、批量发送消息
发送限制
生产者进行消息发送时可以一次发送多条消息,这可以大大提升 Producer 的发送效率。不过需要注意以下几点:
- 批量发送的消息必须具有相同的 Topic
- 批量发送的消息必须具有相同的刷盘策略
- 批量发送的消息不能是延时消息与事务消息
批量发送大小
默认情况下,一批发送的消息总大小不能超过 4MB 字节。如果想超出该值,有两种解决方案:
- 方案一:将批量消息进行拆分,拆分为若干不大于 4M 的消息集合分多次批量发送
- 方案二:在 Producer 端与 Broker 端修改属性
- Producer 端需要在发送之前设置 Producer 的 maxMessageSize 属性
- Broker 端需要修改其加载的配置文件中的 maxMessageSize 属性
生产者发送的消息大小
生产者通过 send()方法发送的 Message,并不是直接将 Message 序列化后发送到网络上的,而是通过这个 Message 生成了一个字符串发送出去的。这个字符串由四部分构成:Topic、消息 Body、消息日志(占 20 字节),及用于描述消息的一堆属性 key-value。这些属性中包含例如生产者地址、生产时间、要发送的 QueueId 等。最终写入到 Broker 中消息单元中的数据都是来自于这些属性。
二、 批量消费消息
修改批量属性
Consumer 的 MessageListenerConcurrently 监听接口的 consumeMessage()方法的第一个参数为消息列表,但默认情况下每次只能消费一条消息。若要使其一次可以消费多条消息,则可以通过修改 Consumer 的 consumeMessageBatchMaxSize 属性来指定。不过,该值不能超过 32。因为默认情况下消费者每次可以拉取的消息最多是 32 条。若要修改一次拉取的最大值,则可通过修改 Consumer 的 pullBatchSize 属性来指定。
存在的问题
Consumer 的 pullBatchSize 属性与 consumeMessageBatchMaxSize 属性是否设置的越大越好?当然不 是。
- pullBatchSize 值设置的越大,Consumer 每拉取一次需要的时间就会越长,且在网络上传输出现问题的可能性就越高。若在拉取过程中若出现了问题,那么本批次所有消息都需要全部重新拉取。
- consumeMessageBatchMaxSize 值设置的越大,Consumer 的消息并发消费能力越低,且这批被消费的消息具有相同的消费结果。因为 consumeMessageBatchMaxSize 指定的一批消息只会使用一个线程进行处理,且在处理过程中只要有一个消息处理异常,则这批消息需要全部重新再次消费处理。
三、代码举例
该批量发送的需求是,不修改最大发送 4M 的默认值,但要防止发送的批量消息超出 4M 的限制。
定义消息列表分割器
// 消息列表分割器:其只会处理每条消息的大小不超4M的情况。
// 若存在某条消息,其本身大小大于4M,这个分割器无法处理,
// 其直接将这条消息构成一个子列表返回。并没有再进行分割
public class MessageListSplitter implements Iterator<List<Message>> {
// 指定极限值为4M
private final int SIZE_LIMIT = 4 * 1024 * 1024;
// 存放所有要发送的消息
private final List<Message> messages;
// 要进行批量发送消息的小集合起始索引
private int currIndex;
public MessageListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
// 判断当前开始遍历的消息索引要小于消息总数
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
// 记录当前要发送的这一小批次消息列表的大小
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
// 获取当前遍历的消息
Message message = messages.get(nextIndex);
// 统计当前遍历的message的大小
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20;
// 判断当前消息本身是否大于4M
if (tmpSize > SIZE_LIMIT) {
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
if(tmpSize + totalSize > SIZE_LIMIT) {
break;
} else{
totalSize += tmpSize;
}
} // end-for
// 获取当前messages列表的子集合[currIndex, nextIndex)
List<Message> subList = messages.subList(currIndex, nextIndex);
// 下次遍历的开始索引
currIndex = nextIndex;
return subList;
}
}
定义批量消息生产者
public class BatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
// 指定要发送的消息的最大大小,默认是4M
// 不过,仅修改该属性是不行的,还需要同时修改broker加载的配置文件中的
// maxMessageSize属性
// producer.setMaxMessageSize(8 * 1024 * 1024);
producer.start();
// 定义要发送的消息集合
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 100; i++) {
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("someTopic", "someTag", body);
messages.add(msg);
}
//定义消息列表分割器,将消息列表分割为多个不超出4M大小的小列表
MessageListSplitter splitter = new
MessageListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
定义批量消息消费者
public class BatchConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
consumer.setNamesrvAddr("rocketmqOS:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("someTopicA", "*");
// 指定每次可以消费10条消息,默认为1
consumer.setConsumeMessageBatchMaxSize(10);
// 指定每次可以从Broker拉取40条消息,默认为32
consumer.setPullBatchSize(40);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg);
}
// 消费成功的返回结果
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// 消费异常时的返回结果
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
标签:20,批量,05,messages,发送,4M,消息,public,第四章
From: https://www.cnblogs.com/niujifei/p/16586110.html