首页 > 其他分享 >RocketMq发送消息之批量消息

RocketMq发送消息之批量消息

时间:2023-09-24 13:22:10浏览次数:41  
标签:批量 int messages tmpSize 消息 new Message RocketMq

概述

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB

发送批量消息

如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
   producer.send(messages);
} catch (Exception e) {
   e.printStackTrace();
   //处理error
}

消息列表分割

复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:

/**
 * 消息分割,在rocketmq中,一次性发送消息的长度不可超过4mb,此时我们需要进行切割,确保消息长度小于4mb
 */
public class ListSplitter implements Iterator<List<Message>> { 
    // 定义消息的大小
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List<Message> messages;
    // 表示索引
    private int currIndex;
    public ListSplitter(List<Message> messages) { 
        this.messages = messages;
    }
    
    @Override 
    public boolean hasNext() {
        return currIndex < messages.size(); 
    }
    
    @Override 
    public List<Message> next() { 
        // 开始的索引
        int startIndex = getStartIndex();
        int nextIndex = startIndex;
        // 大小
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex); 
            // 计算消息大小
            int tmpSize = calcMessageSize(message);
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break; 
            } else {
                totalSize += tmpSize; 
            }
        }
        List<Message> subList = messages.subList(startIndex, nextIndex); 
        currIndex = nextIndex;
        return subList;
    }
    private int getStartIndex() {
        Message currMessage = messages.get(currIndex); 
        int tmpSize = calcMessageSize(currMessage); 
        while(tmpSize > SIZE_LIMIT) {
            currIndex += 1;
            Message message = messages.get(currIndex); 
            tmpSize = calcMessageSize(message);
        }
        return currIndex; 
    }

    /**
     * 消息大小的计算
     * @param message 消息
     * @return 长度
     */
    private int calcMessageSize(Message message) {
        // 消息大小 = 主题的长度 + 消息体长度 + 日志开销
        int tmpSize = message.getTopic().length() + message.getBody().length; 
        Map<String, String> properties = message.getProperties();
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            tmpSize += entry.getKey().length() + entry.getValue().length(); 
        }
        tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节
        return tmpSize;
    }
}

然后在发送的时候通过迭代器发送:

 // 启动Producer实例
        producer.start();
        // 构建一个消息列表
        List<Message> messageList = new ArrayList<>();
        messageList.add(new Message("batch","tag1",("发送批量消息【"+"1】").getBytes()));
        messageList.add(new Message("batch","tag1",("发送批量消息【"+"2】").getBytes()));
        messageList.add(new Message("batch","tag1",("发送批量消息【"+"3】").getBytes()));

        // 发送消息到一个Broker
        //把大的消息分裂成若干个小的消息
        ListSplitter splitter = new ListSplitter(messageList);
        while (splitter.hasNext()) {
            try {
                List<Message>  listItem = splitter.next();
                SendResult sendResult = producer.send(listItem);
                // 通过sendResult返回消息是否成功送达
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                //处理error
            }
        }

标签:批量,int,messages,tmpSize,消息,new,Message,RocketMq
From: https://www.cnblogs.com/zgf123/p/17725889.html

相关文章

  • 解除锁定:一键批量完成 【解除锁定】所有指定文件
    ......
  • RocketMq发送消息之延迟消息
    延迟消息比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。使用限制对比于rabbitmq中的延迟消息来说,rockermq并不支持任意时间的延迟,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18级//org/apache/rocket......
  • MQ - 04 基础篇_存储_消息数据和元数据的存储设计
    @[toc]![在这里插入图片描述](https://img-blog.csdnimg.cn/855d3c6d2ef74a1893f352a4545b479c.png)---------------------#导图![在这里插入图片描述](https://img-blog.csdnimg.cn/d0569e3871dd433784f74d76ebee9a9d.png)----------#概述消息数据和元数据的存......
  • RockerMq发送消息之顺序消息
    顺序消息        消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。        顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(分区队列);而消费消......
  • RocketMQ发送消息之同步异步单向
    官网教程:https://rocketmq.apache.org/zh/docs/quickStart/01quickstart基于双主双从异步方式开启的前提下,在maven项目中引入下列依赖<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.1&l......
  • Kafka的消息传递保证和一致性
    前言通过前面的文章,相信大家对Kafka有了一定的了解了,那接下来问题就来了,Kafka既然作为一个分布式的消息队列系统,那它会不会出现消息丢失或者重复消费的情况呢?今天咱们就来一探。实现机制Kafka采用了一系列机制来实现消息传递的保证和一致性,关键点:至少一次的消息传递(AtLeastOnceD......
  • 消息队列中,如何保证消息的顺序性?
    本文选自:advanced-java作者:yanglbme问:如何保证消息的顺序性?面试官心理分析其实这个也是用MQ的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的?这是生产系统中常见的问题。面试题剖析我举个例子,我们以前做过一个mysqlbinlog同步的系统,压......
  • 消息队列中,如何保证消息的顺序性?
    本文选自:advanced-java作者:yanglbme问:如何保证消息的顺序性?面试官心理分析其实这个也是用MQ的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的?这是生产系统中常见的问题。面试题剖析我举个例子,我们以前做过一个mysqlbinlog同步......
  • Kafka的消息传递保证和一致性
    前言通过前面的文章,相信大家对Kafka有了一定的了解了,那接下来问题就来了,Kafka既然作为一个分布式的消息队列系统,那它会不会出现消息丢失或者重复消费的情况呢?今天咱们就来一探。实现机制Kafka采用了一系列机制来实现消息传递的保证和一致性,关键点:至少一次的消息传递(AtLeas......
  • redis消息队列——发布订阅
    一、相关依赖<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><grou......