概述
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB
发送批量消息
如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
消息列表分割
复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:
/**
* 消息分割,在rocketmq中,一次性发送消息的长度不可超过4mb,此时我们需要进行切割,确保消息长度小于4mb
*/
public class ListSplitter implements Iterator<List<Message>> {
// 定义消息的大小
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
// 表示索引
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
// 开始的索引
int startIndex = getStartIndex();
int nextIndex = startIndex;
// 大小
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
// 计算消息大小
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while(tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(currIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}
/**
* 消息大小的计算
* @param message 消息
* @return 长度
*/
private int calcMessageSize(Message message) {
// 消息大小 = 主题的长度 + 消息体长度 + 日志开销
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节
return tmpSize;
}
}
然后在发送的时候通过迭代器发送:
// 启动Producer实例
producer.start();
// 构建一个消息列表
List<Message> messageList = new ArrayList<>();
messageList.add(new Message("batch","tag1",("发送批量消息【"+"1】").getBytes()));
messageList.add(new Message("batch","tag1",("发送批量消息【"+"2】").getBytes()));
messageList.add(new Message("batch","tag1",("发送批量消息【"+"3】").getBytes()));
// 发送消息到一个Broker
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messageList);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
SendResult sendResult = producer.send(listItem);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
标签:批量,int,messages,tmpSize,消息,new,Message,RocketMq
From: https://www.cnblogs.com/zgf123/p/17725889.html