首页 > 其他分享 >十三、批量消息

十三、批量消息

时间:2023-07-23 17:55:05浏览次数:33  
标签:return String 批量 十三 builder toString 消息 new public

如果要发送很多消息,可以使用批量消息,一次发送,避免多次调用网络,同时提供吞吐量。

代码如下:

@Component
public class MessageDataUtils {

    private String data;

    public String getData() {
        return data;
    }

    public void setData(String data) {
        this.data = data;
    }
}

MessageDataUtils 用于保存消息。

@Component
@Slf4j
public class GenerateData implements CommandLineRunner {
    @Autowired
    private MessageDataUtils dataUtils;

    String  a = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
    int size =  4 * 1024 * 1024 - 1024 ;


    @Override
    public void run(String... args) throws Exception {
        String s = buildOneMBData();
        log.info("生成数据:{}", s.length());
        dataUtils.setData(s);
    }



    private String buildOneMBData() {
        StringBuilder builder = new StringBuilder();

        while (builder.toString().getBytes(StandardCharsets.UTF_8).length < size) {
            builder.append(a);
        }
        log.info("长度:{}", builder.toString().length());
        return builder.toString();
    }
}

GenerateData 用于生成数据。字符串a的长度是1024,表示1KB数据。

@RestController
@Slf4j
public class OldVersionController {

    @Autowired
    private OldVersionProducer producer;

    String topic = "MyTopic";

    @Autowired
    private MessageDataUtils dataUtils;


    @RequestMapping("/sendBatchMsg")
    public SendResult sendBatchMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        List<Message> list =new ArrayList<>();
        //        for (int i = 0; i < 10; i++) {
        //            Message message = new Message(topic, ("My Batch Message " + i).getBytes(StandardCharsets.UTF_8));
        //           // message.setDelayTimeLevel(3);
        //            list.add(message);
        //        }
        String data = dataUtils.getData();
        Message message = new Message(topic, data.getBytes(StandardCharsets.UTF_8));
        // message.setDelayTimeLevel(3);
        list.add(message);
        return producer.sendMsg(list);
    }


}

要使用批量消息,只需将消息封装成 Collection msgs 传入方法中即可。在RocketMQ5.x新版本的api中没有找到批量消息的接口。所以这里使用的是RocketMQ4.x版本的api。同时这里演示了批量消息支持的发送消息的大小。分别发送了单个消息大小为1M,4M的数据。

 

批量消息使用限制:

1、不支持延时消息。上面的例子设置了延时等级,发送直接报错。

2、同一批 batch 中 topic 必须相同

3、消息总大小不超过4M,上面的例子中GenerateData的size要减去1024,不然大小会接近4M会报错。

 

解决方法如下:

1、发送时压缩数据

@Component
@Slf4j
public class GenerateData implements CommandLineRunner {
    @Autowired
    private MessageDataUtils dataUtils;

    String  a = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
    int size =  5 * 1024 * 1024 ;


    @Override
    public void run(String... args) throws Exception {
        String s = zipData(buildOneMBData());
        log.info("生成数据:{}", s.length());
        dataUtils.setData(s);
    }



    private String buildOneMBData() {
        StringBuilder builder = new StringBuilder();

        while (builder.toString().getBytes(StandardCharsets.UTF_8).length < size) {
            builder.append(a);
        }
        log.info("长度:{}", builder.toString().length());
        return builder.toString();
    }


     private String zipData(String s) {
        ByteArrayOutputStream byteArrayOutputStream =new ByteArrayOutputStream();

        try(GZIPOutputStream gzipOutputStream= new GZIPOutputStream(byteArrayOutputStream)){
            gzipOutputStream.write(s.getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
            log.error("压缩数据出错", e);
        }


        return new sun.misc.BASE64Encoder().encode(byteArrayOutputStream.toByteArray());
    }
}

用GZIPOutputStream压缩5M数据并发送。

消费时解压数据:

/**
 * 使用gzip解压缩
 *
 * @param compressedStr 压缩字符串
 * @return 解压缩字符串
 */
public static String uncompress(String compressedStr) {
    if (compressedStr == null) {
        return null;
    }

    byte[] compressed = null;
    String decompressed = null;
    GZIPInputStream ginzip = null;
    ByteArrayInputStream in = null;
    ByteArrayOutputStream out = new ByteArrayOutputStream();

    try {
        // 先解码
        compressed = new sun.misc.BASE64Decoder().decodeBuffer(compressedStr);
        in = new ByteArrayInputStream(compressed);
        ginzip = new GZIPInputStream(in);
        byte[] buffer = new byte[1024];
        int offset = -1;

        while ((offset = ginzip.read(buffer)) != -1) {
            out.write(buffer, 0, offset);
        }

        decompressed = out.toString();
    } catch (IOException e) {
        log.error("字符串解压缩异常!", e);
        e.printStackTrace();
    } finally {
        // 关流

    }

    return decompressed;
}

  uncompress(StandardCharsets.UTF_8.decode(messageView.getBody()).toString())

messageView是MessageView。

2、按大小切割数据在发送

3、将数据保存到文件,将文件上传到文件存储服务器,发送时发送存储文件的url。

标签:return,String,批量,十三,builder,toString,消息,new,public
From: https://www.cnblogs.com/shigongp/p/17574757.html

相关文章

  • 老杜 JavaWeb 讲解(十三) ——JSP简单了解
    (十四)JSP相关视频:35-JSP原理深度解析36-JSP的各种基础语法37-JSP的输出语法第一个JSP程序在WEB-INF目录之外创建一个index.jsp文件,然后这个文件中没有任何内容。将上面的项目部署之后,启动服务器,打开浏览器,访问以下地址:http://localhost:8080/jsp/index.jsp展现......
  • 一个故事告诉你什么是消息队列
    有一天,产品跑来说:“我们要做一个用户注册功能,需要在用户注册成功后给用户发一封成功邮件。”小明(攻城狮):“好,需求很明确了。”不就提供一个注册接口,保存用户信息,同时发起邮件调用,待邮件发送成功后,返回用户操作成功。没一会功夫,代码就写完了。验证功能没问题后,就发布上线了。线上......
  • Java文件批量上传
    Java文件批量上传在日常的开发过程中,我们常常需要将一批文件批量上传到服务器或其他存储设备上。使用Java语言可以轻松地实现这一功能。本文将介绍如何使用Java实现文件批量上传,并提供相应的代码示例。准备工作在开始编写上传文件的Java代码之前,我们需要进行一些准备工作。首先,......
  • react批量生成pdf
    使用htmlpdf.js批量将html页面转为pdf,打包成zip下载。htmlpdf.js是结合html2canvas和jsPDF实现的。首先先安装包npminstall--savehtml2pdf.js基本页面importhtml2pdffrom"html2pdf.js";functionApp(){consthandleExportPdf=()=>{//导出pdf......
  • (四) MdbCluster分布式内存数据库——业务消息处理
    (四)MdbCluster分布式内存数据库——业务消息处理 上篇:(三)MdbCluster分布式内存数据库——节点状态变化及分片调整 离上次更新文章已有快5个月,我还是有点懒。但我们系统的研发并没有因此停下来。下面先简单介绍下MdbCluster最近的一些进展。1.提供了java语......
  • 测试发送消息到Microsoft Teams
    创建测试频道:点击团队右侧“···”,点击添加频道: 然后完善频道信息: 创建好频道之后,点击频道右侧“···”管理频道: 可以进行频道人员的添加与删除管理: 点击频道右侧“···”连接器: 在搜索框中,搜索webhook进行搜索: 将IncomingWebhook 添加到频道中: 可......
  • MQTT 与 Kafka|物联网消息与流数据集成实践
    MQTT如何与Kafka一起使用?MQTT(MessageQueuingTelemetryTransport)是一种轻量级的消息传输协议,专为受限网络环境下的设备通信而设计。ApacheKafka是一个分布式流处理平台,旨在处理大规模的实时数据流。Kafka和MQTT是实现物联网数据端到端集成的互补技术。通过结合使用......
  • springboot学习之十三(druid+mybaits plus)
    Druid介绍Druid是阿里巴巴的一个开源项目,号称为监控而生的数据库连接池,在功能、性能、扩展性方面都超过其他例如DBCP、C3P0、BoneCP、Proxool、JBossDataSource等连接池,而且Druid已经在阿里巴巴部署了超过600个应用,通过了极为严格的考验,这才收获了大家的青睐! Springboot集成......
  • mysql 存储过程大批量插入速度慢
    MySQL存储过程大批量插入速度慢的原因及解决方法在使用MySQL数据库进行大批量数据插入时,可能会遇到插入速度慢的问题。这个问题很常见,通常是由于存储过程执行效率低下导致的。本文将介绍这个问题的原因,并提供一些优化的解决方法。原因分析在MySQL数据库中,存储过程是一组预定义的......
  • 批量新增接口正确传参以及优化(含数组传参)
    需求页面: 接口文档说明: 单个新增计划传参: 批量新增计划传参:批量新增7月24日和8月1日的复制计划 接口优化:由于当前医生给指定的患者新增计划时,当前医生和患者都不会发生变化,批量新增时仅是新增多个计划时间,所以计划时间可以调整为数组进行传参接口文档优化: 单个计......