首页 > 其他分享 >MQ系列15:MQ实现批量消息处理

MQ系列15:MQ实现批量消息处理

时间:2023-10-17 12:56:12浏览次数:36  
标签:15 批量 messages topic MQ 消息 系列

MQ系列1:消息中间件执行原理
MQ系列2:消息中间件的技术选型
MQ系列3:RocketMQ 架构分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的发送模式
MQ系列6:消息的消费
MQ系列7:消息通信,追求极致性能
MQ系列8:数据存储,消息队列的高可用保障
MQ系列9:高可用架构分析
MQ系列10:如何保证消息幂等性消费
MQ系列11:如何保证消息可靠性传输
MQ系列12:如何保证消息顺序性
MQ系列13:消息大量堆积如何为解决
MQ系列14:MQ如何做到消息延时处理

1 背景

在互联网业务的实际应用场景中,消息的批量处理是非常必要的,因为我们时刻面临着大量数据的并发执行。
例如,我们在一个业务交互的时候会有大量的分支行为需要异步去处理,但是这些动作又是在不同的业务粒度上的,所以我们需要多次调用MQ写入消息,可能有多次的连接和消息发送。
这个写MySQL数据库是一样的,多次建连和写入,跟一次建连和批量数据库,性能是完全不能比的。
所以我们需要有MQ有批量消息的能力来对我们的业务数据进行快速处理。

2 批量消息实现过程

Rocket MQ的批量消息,可以提高消息的吞吐能力和处理效率,降低下游系统的API调用频率,同时对消息服务的稳定性也有帮助。
image

2.1 批量消息的特点

  • 批量消息具有相同的topic。
  • 批量消息具有相同的waitStoreMsgOK属性。
  • 批量消息不支持延迟消息。
  • 批量消息的大小不超过4M(4.4版本之后要求不超过1M)。

2.2 批量消息的使用场景

  • 消息的吞吐能力和处理效率:通过将多条消息打包成一批进行发送,可以减少网络传输开销和消息处理的时间,从而提高整体的消息处理效率。
  • 下游系统的API调用频率:通过将多条消息合并成一条批量消息进行发送,可以减少下游系统接收和处理消息的次数,从而降低API调用频率,减轻下游系统的负载压力。

2.3 批量消息的发送示例

Rocket MQ提供了批量发送消息的功能,可以通过调用DefaultMQProducer的send()方法,将多条消息以列表的形式发送给指定的topic。
以下是一个简单的示例代码:

DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName_1");
String topic = "BatchSendTest_1";
producer.start();  
List<Message> msgs = new ArrayList<>();  
msgs.add(new Message(topic, "Tag1", "OrderID-063105013", "Hello world".getBytes()));  
msgs.add(new Message(topic, "Tag1", "OrderID-063105014", "Brand".getBytes()));  
msgs.add(new Message(topic, "Tag1", "OrderID-063105015", "handsome boy ".getBytes()));  
try {
   producer.send(msgs);
} catch (Exception e) {
   e.printStackTrace();
   // 处理异常
}
finally { 
  // 如果不再发送消息,关闭生产者Producer
  producer.shutdown();
}

在以上示例代码中,创建了一个DefaultMQProducer实例,并调用其start()方法启动生产者。
然后构造了一个包含三条消息的列表,通过调用producer的send()方法将列表中的消息发送给指定的topic。
如果消息的总长度可能大于1MB时,这时候最好把消息进行分割,参考下面的代码:

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1024 * 1024;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
            this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > SIZE_LIMIT) {
                //it is unexpected that single message exceeds the SIZE_LIMIT
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                   //if the next sublist has no element, add this one and then break, otherwise just break
                   nextIndex++;  
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
    
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
   try {
       List<Message>  listItem = splitter.next();
       producer.send(listItem);
   } catch (Exception e) {
       e.printStackTrace();
       // handle the error
   }
}

可以看出来,Rocket MQ的批量消息可以提高消息的吞吐能力和处理效率,降低下游系统的API调用频率,是一种优化消息传输和处理的有效手段。

3 总结

  • 对于同类型、同特征的消息,可以聚合进行批量发送,减少MQ的连接发送次数,能够显著提升性能。
  • 批量发送消息须有相同的topic,相同的waitStoreMsgOK,且不能是延时消息。

标签:15,批量,messages,topic,MQ,消息,系列
From: https://www.cnblogs.com/wzh2010/p/17763797.html

相关文章

  • PHP轮子批量替换数据库前缀
    <?phpinclude_once('fix_mysql.inc.php');//设置好相关信息echo'<metacharset="utf-8">';$dbserver='localhost';$dbname='test';//替换成你的数据库名$dbuser='root';//替换成你的数据库用户名$dbpassword='123......
  • Prometheus监控RocketMQ
    本文基于官方提供的RocketMQExporter来监控RocketMQ集群1.BrokerTPS/QPS的监控2.消息积压监控3.消费组消费演示监控最终的Grafana面板效果图如下:楼主RocketMQ环境是三主三从集群(只要在其中一台部署监控即可)配置步骤1.安装RocketMQExporterRocketMQ官方已经提供了expor......
  • 理解MQTT协议(v3.1.1)
    1.概述MQTT协议概述2.数据包详解ControlPackets即14种MQTT类型的消息的二进制定义,本人按对协议的个人理解分别详述如下:CONNECT连接请求CONNACK连接请求应答PUBLISH发布消息PUBACK发布应答PUBREC发布收到QoS2消息收到PUBREL发布释放QoS2消息收到PUBCOMP......
  • 15.1 套接字通过域名取IP地址
    首先我们来实现一个DNS查询功能,该功能的目的是传入一个网站域名自动将该域名解析为对应的IP地址,该功能的实现依赖于gethostbyname函数,该函数将主机名作为参数,并返回一个指向hostent类型结构的指针,结构包含有关主机的信息。结构包含许多字段,其中最重要的是h_name和h_addr_list。h_n......
  • MQTT控制报文格式 -- UNSUBACK – 取消主题订阅应答
    UNSUBACK数据包由服务器发送到客户端以确认收到UNSUBSCRIBE数据包。该数据包不包含Payload,所以剩余长度为2,即2字节的可变包头长度。1.固定包头FixedheaderBit76543210byte1MQTTControlPackettype(11)Reserved 1......
  • MQTT控制报文格式 -- PINGREQ – ping请求
    PINGREQ数据包从客户端发送到服务器。它可用于:在没有任何其他控制数据包从客户端发送到服务器的情况下,向服务器指示客户端处于活动状态。请求服务器响应以确认其处于活动状态。测试网络以指示网络连接处于活动状态。该数据包不需要可变包头,没有Payload部分其固定包头格式......
  • MQTT控制报文格式 -- UNSUBSCRIBE– 客户端取消订阅请求
    客户端向服务器发送取消订阅数据包,以取消订阅主题。1.固定包头FixedheaderBit76543210byte1MQTTControlPackettype(10)Reserved 10100010byte2RemainingLength2.可变包头Variab......
  • MQTT控制报文格式 -- SUBACK – 订阅应答
    SUBACK数据包由服务器发送到客户端以确认SUBSCRIBE数据包的接收和处理。SUBACK数据包包含返回代码列表,指定在SUBSCRIBE请求的每个订阅中授予的最大QoS级别1.固定包头Fixedheader固定包头的剩余长度=可变包头(2字节)+Payload长度Bit765432......
  • Educational Codeforces Round 154 (Rated for Div. 2) B. Two Binary Strings
    给定两个长度相等的\(01\)字符串\(a\)和\(b\)。每个字符串都是以\(0\)开始以\(1\)结束。在一步操作中,你可以选择任意一个字符串:选择任意两个位置\(l,r\)满足\(s_l=s_r\),然后让\(\foralli\in[l,r],s_i=s_l\)。询问经过若干次操作后能否使得\(a=b......
  • CF1548E Gregor and the Two Painters
    Day\(\text{叁拾肆}\)。DS写不动了,标题也取不动了www。类似Day1CF1270HNumberofComponents,每个连通块中选出一个代表的点。令一个连通块内所有点按照\(v_{i,j}=\{a_i+b_j,i,j\}\)排序,对最小的\(v_{i,j}\)计数。于是相当于求多少个格子不能走到另一个(非严格)三位偏序......